弥补延时消息的不足,RocketMQ基于时间轮算法实现了定时
11月23日 菩提门投稿 在RocketMQ4。x版本,使用延时消息来实现消息的定时消费。延时消息可以一定程度上实现定时发送,但是有一些局限。
RocketMQ新版本基于时间轮算法引入了定时消息,目前,精确到秒级的定时消息实现的pr已经提交到社区,今天来介绍一下。1延时消息1。1简介
RocketMQ的延时消息是指Producer发送消息后,Consumer不会立即消费,而是需要等待固定的时间才能消费。在一些场景下,延时消息是很有用的,比如电商场景下关闭30分钟内未支付的订单。
使用延时消息非常简单,只需要给消息的delayTimeLevel属性赋值就可以。参考下面代码:MessagemessagenewMessage(TestTopic,(Helloscheduledmessagei)。getBytes());第3个级别,10smessage。setDelayTimeLevel(3);producer。send(message);
延时消息有18个级别,如下:privateStringmessageDelayLevel1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h;1。2实现原理
延时消息的实现原理如下图:
Producer把消息发送到Broker后,Broker判断到是延时消息,首先会把消息投递到延时队列(TopicSCHEDULETOPICXXXX,queueIddelayTimeLevel1)。定时任务线程池会有18个线程来对延时队列进行调度,每个线程调度一个延时级别,调度任务把延时消息再投递到原始队列,这样Consumer就可以拉取到了。1。3存在不足
延时消息存在着一些不足:
1。延时级别只有18个,并不能满足所有场景;
2。如果通过修改messageDelayLevel配置来自定义延时级别,并不灵活,比如一个在大规模的平台上,延时级别成百上千,而且随时可能增加新的延时时间;
3。延时时间不准确,后台的定时线程可能会因为处理消息量大导致延时误差大。2定时消息
为了弥补延时消息的不足,RocketMQ5。0引入了定时消息。2。1时间轮算法
为了解决定时任务队列遍历任务导致的性能开销,RocketMQ定时消息引入了秒级的时间轮算法。如下图:
图中是一个60s的时间轮,时间轮上会有一个指向当前时间的指针定时地移动到下一个时间(秒级)。
时间轮算法的优势是不用去遍历所有的任务,每一个时间节点上的任务用链表串起来,当时间轮上的指针移动到当前的时间时,这个时间节点上的全部任务都执行。
虽然上面只是一个60s的时间轮,但是对于所有的时间延时,都是支持的。可以在每个时间节点增加一个round字段,记录时间轮转动的圈数,比如对于延时130s的任务,round就是2,放在第10个时间刻度的链表中。这样当时间轮转到一个节点,执行节点上的任务时,首先判断round是否等于0,如果等于0,则把这个任务从任务链表中移出交给异步线程执行,否则将round减1继续检查后面的任务。2。2使用方式
基于时间轮算法的思想,RocketMQ实现了精准的定时消息。使用RocketMQ定时消息时,客户端定义消息的示例代码如下:org。apache。rocketmq。common。message。MessagemessageExtthis。sendMessageActivity。buildMessage(null,Lists。newArrayList(Message。newBuilder()。setTopic(Resource。newBuilder()。setName(TOPIC)。build())。setSystemProperties(SystemProperties。newBuilder()。setMessageId(msgId)。setQueueId(0)。setMessageType(MessageType。DELAY)。setDeliveryTimestamp(Timestamps。fromMillis(deliveryTime))定义消息投递时间。setBornTimestamp(Timestamps。fromMillis(System。currentTimeMillis()))。setBornHost(StringUtils。defaultString(RemotingUtil。getLocalAddress(),127。0。0。1:1234))。build())。setBody(ByteString。copyFromUtf8(123))。build()),Resource。newBuilder()。setName(TOPIC)。build())。get(0);2。3实现原理2。3。1消息投递
上面的代码构中,Producer创建消息时给消息传了一个系统属性deliveryTimestamp,这个属性指定了消息投递的时间,并且封装到消息的TIMERDELIVERMS属性,代码如下:protectedvoidfillDelayMessageProperty(apache。rocketmq。v2。Messagemessage,org。apache。rocketmq。common。message。MessagemessageWithHeader){if(message。getSystemProperties()。hasDeliveryTimestamp()){TimestampdeliveryTimestampmessage。getSystemProperties()。getDeliveryTimestamp();delayTime这个延时时间默认不能超过1天,可以配置longdeliveryTimestampMsTimestamps。toMillis(deliveryTimestamp);validateDelayTime(deliveryTimestampMs);。。。StringtimestampStringString。valueOf(deliveryTimestampMs);MessageConst。PROPERTYTIMERDELIVERMSTIMERDELIVERMSMessageAccessor。putProperty(messageWithHeader,MessageConst。PROPERTYTIMERDELIVERMS,timestampString);}}
Broker收到这个消息后,如果判断到TIMERDELIVERMS这个属性有值,就会把这个消息投递到Topic是rmqsyswheeltimer的队列中,queueId是0,同时会保存原始消息的Topic、queueId、投递时间(TIMEROUTMS)。
TimerMessageStore中有个定时任务TimerEnqueueGetService会从rmqsyswheeltimer这个Topic中读取消息,然后封装TimerRequest请求并放到队列enqueuePutQueue。2。3。2绑定时间轮
RocketMQ使用TimerLog来保存消息的原始数据绑定到时间轮上。首先看一下TimerLog保存的数据结构,如下图:
参考下面代码:TimerMessageStore类ByteBuffertmpBuffertimerLogBtmpBuffer。clear();tmpBuffer。putInt(TimerLog。UNITSIZE);sizetmpBuffer。putLong(slot。lastPos);prevpostmpBuffer。putInt(magic);magictmpBuffer。putLong(tmpWriteTimeMs);currWriteTimetmpBuffer。putInt((int)(delayedTimetmpWriteTimeMs));delayTimetmpBuffer。putLong(offsetPy);offsettmpBuffer。putInt(sizePy);sizetmpBuffer。putInt(hashTopicForMetrics(realTopic));hashcodeofrealtopictmpBuffer。putLong(0);reservedvalue,justsetto0nowlongrettimerLog。append(tmpBuffer。array(),0,TimerLog。UNITSIZE);if(1!ret){Ifitsadeletemessage,thenslotstotalnum1TODO:checkifthedeletemsgisinthesameslotwiththemsgtobedeleted。timerWheel。putSlot(delayedTime,slot。firstPos1?ret:slot。firstPos,ret,isDelete?slot。num1:slot。num1,slot。magic);}
TimerEnqueuePutService这个定时任务从上面的enqueuePutQueue(2。3。1节)取出TimerRequest然后封装成TimerLog。
那时间轮是怎么跟TimerLog关联起来的呢?RocketMQ使用TimerWheel来描述时间轮,TimerWheel中每一个时间节点是一个Slot,Slot保存了这个延时时间的TimerLog信息。数据结构如下图:
参考下面代码:类TimerWheelpublicvoidputSlot(longtimeMs,longfirstPos,longlastPos,intnum,intmagic){localBuffer。get()。position(getSlotIndex(timeMs)Slot。SIZE);localBuffer。get()。putLong(timeMsprecisionMs);localBuffer。get()。putLong(firstPos);localBuffer。get()。putLong(lastPos);localBuffer。get()。putInt(num);localBuffer。get()。putInt(magic);}
这样时间轮跟TimerLog就关联起来了,见下图:
如果时间轮的一个时间节点(Slot)上有一条新的消息到来,那只要新建一个TimerLog,然后把它的指针指向该时间节点的最后一个TimerLog,然后把Slot的lastPos属性指向新建的这个TimerLog,如下图:
从源码上看,RocketMQ定义了一个7天的以秒为单位的时间轮。2。3。3时间轮转动
转动时间轮时,TimerDequeueGetService这个定时任务从当前时间节点(Slot)对应的TimerLog中取出数据,封装成TimerRequest放入dequeueGetQueue队列。2。3。4CommitLog中读取消息
定时任务TimerDequeueGetMessageService从队列dequeueGetQueue中拉取TimerRequest请求,然后根据TimerRequest中的参数去CommitLog(MessageExt)中查找消息,查出后把消息封装到TimerRequest中,然后把TimerRequest写入dequeuePutQueue这个队列。2。3。5写入原队列
定时任务TimerDequeuePutMessageService从dequeuePutQueue队列中获取消息,把消息转换成原始消息,投入到原始队列中,这样消费者就可以拉取到了。3总结
RocketMQ4。x版本只支持延时消息,有一些局限性。而RocketMQ新版本引入了定时消息,弥补了延时消息的不足。定时消息的处理流程如下图:
可以看到,RocketMQ的定时消息的实现还是有一定复杂度的,这里用到5个定时任务和3个队列来实现。
最后,对于定时时间的定义,客户端、Broker和时间轮的默认最大延时时间定义是不同的,使用的时候需要注意。来源:https:mp。weixin。qq。comsI91QRel7CraP7zCRh0ISw
作者:朱晋君