RocketMQ 顺序消费模式下 消费端是如何保持顺序的

前置: 拉取消息的流程,并发和顺序都是一样的

1、将拉取到的消息(默认32条,单次拉取的条数)放入到 MessageQueue 的对应的ProcessQueue中。

2、放入消息时并修改当前ProcessQueue队列是否正在消费,刚启动时为false。注意如果当前队列放置消息时不存在消费,此时dispatchToConsume为true

3、当dispatchToConsume为true时,创建一个消费消息的任务 ConsumeRequest,因为此时32条消息已经均放入该MessageQueue的Procesueue中,只需要起一个线程一直消费就行。

4、当前任务一直循环处理ProcessQueue中的消息(for循环)

5、在上报消息处理结果

  • 如果消息成功处理,则continueConsume仍为true,此任务继续循环。
  • 如果消息处理失败并且当前消息没有到最大循环次数,则continueConsume设置为false跳出步骤4的循环并重新提交一个ConsumeRequest任务,延迟一秒触发。待当前任务到达时间后,继续步骤4循环

扩展:

  • 并发消费模式下为什么是并发的?是因为一个消息生成一个ConsumeRequest任务并提交到线程池

    • 单次消息拉取会生成32个任务进行并发消费

总结:

  • 并发消费和顺序模式下的消息拉取机制是一样的

  • 并发模式下 同一个队列的一条消息生成一个任务

    • 默认一次拉取32条,会有32个任务
  • 顺序模式下 同一个队列始终最多一个任务在运行

    • synchronized (messageQueue) 对MessageQueue加锁当前队列保证同一队列串行。
    • 一旦ProcessQueue中存在消息,会修改ProcessQueue的正在消费字段consuming为true,同时创建一个消息处理任务ConsumeRequest。
    • 如果ProcessQueue中的消息处理完,则会修改consuming为false。
    • 如果第二次拉取消息时,ProcessQueue队列中还有消息,此时不会再创建ConsumeRequest任务,但消息会继续放入ProcessQueue队列中。
  • 问题

    • 由于加锁,并发度下降。
    • 如果某一个消息处理失败,在未达到最大处理次数前,会一直处理该消息,并且每次延迟1秒,也会阻塞当前MessageQueue中的后续消息消费。