[RocketMQ]消息数堆积跳过消息 有更新!
发生消息堆积时,如果消费速度一直追不上发送速度,可以选择丢弃不重要的消息
如何判断消费发生了堆积?
public ConsumeConcurrentlyStatus consumeMessage(//
List<MessageExt> msgs, //
ConsumeConcurrentlyContext context) {
long offset = msgs.get(0).getQueueOffset();
String maxOffset = //
msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
long diff = Long.parseLong(maxOffset) - offset;
if (diff > 100000) {
// TODO 消息堆积情况的特殊处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// TODO 正常消费过程
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
如以上代码所示,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。
评论
发表评论
|
|