RocketMQ5 Pop、Ack源码拆解

数据流通道

2023-11-26

60

0

01 RocketMQ5 架构

RocketMQ已经开启5.X时代,4.X已成为LTS版本。

各大云厂商也推出支持RocketMQ5.X版本的产品,在介绍Pop,ack之前需要先了解RocketMQ5的架构。

1.1 RocketMQ5.X架构

请在此添加图片描述

如上图,在RocketMQ5.X的架构中,新增组件如下:

  • 02 Controller:控制器,帮助Broker做主从切换。
  • 04 Proxy:RocketMQ的代理服务,支持gRPC协议客户端、Remoting客户端收发消息。
  • 05 gRPC client:RocketMQ5的新客户端,使用gRpc协议访问RocketMQ Proxy。

注意:社区很多朋友咨询这个客户端是否可以访问4.X 集群,答案是不支持。

5.X其实有2个客户端,优先推荐使用gRPC客户端:

5.X gRPC客户端源码: https://github.com/apache/rocketmq-clients

5.X remoting客户端源码:

https://github.com/apache/rocketmq/tree/develop/client

5.X gRPC客户端使用gRPC协议访问proxy,5.X remoting客户端可以使用remoting协议访问namesrv和proxy。

其余组件是RocketMQ4.X的原组件。想要了解更多5.X和4.X的差别,请看:RocketMQ 5.0 vs 4.9.X 图解架构对比

1.2 RocketMQ5.X为什么发明Pop

pop主要解决push、pull消费者常见的4种问题:

  • 消费者卡住问题
    下图是Push消费者订阅consumer queue的情况,如果push client 2客户端由于gc等原因执行特别慢,此时broker 1-1中queue 1,broker2-1中的queue1将出现堆积。

请在此添加图片描述

下图是pop消费者订阅consumer queue的情况:

请在此添加图片描述

从上图可以看出来,每个pop client消费全部broker的全部consumer queue。

如果pop client2卡住了,其他的pop client会消费全部的consumer queue,在push消费中queue由于消费卡住或者无人消费而堆积的问题得到解决。

  • 负载均衡慢的问题。
    如果出现push consumer卡住,或者gc导致消费慢,此时我们一般通过重启消费者程序来临时解决。
    消费者重启后reblance,消费者数量越多reblance花费的时间越长,在reblance时消费者无法消费消息。 pop消费时,一个消费者的上线、下线不会触发reblance,故而不存在负载均衡慢的问题。
  • 有状态变无状态。pop是无状态消费者。在云原生大环境下,无状态服务在扩/缩容时更方便、快捷。
  • 消费实例数上限问题。push消费者的消费者个数最大不能超过consumer queue的个数,在pop时不再有这个限制,但是也有自身限制。broker在pop时通过lock consumer queue实现pop消息,多个pop消费者客户端进行锁竞争的时间和pop消费者的个数成此起彼伏的形态,故而pop消费者个数不能无限扩张。如何锁consumer queue,会在接下来讲解。

02 Pop流程

在Broker中,Pop的实现代码从

PopMessageProcessor.processRequest(ChannelHandlerContext, RemotingCommand)方法开始的。

请在此添加图片描述

2.1 Pop流程

请在此添加图片描述

笔者把pop message主要分为5个流程,这里忽略数据校验、pop前参数准备。

这里大家直接看代码PopMessageProcessor.processRequest()中调用了popMsgFromQueue()方法,此方法为pop消息的实现关键,笔者将其分解为5步:

第一步:锁consumer queue

请在此添加图片描述

在pop消息的时候,调用queueLockManager.tryLock(lockKey)方法,实现加锁。 锁key的格式如下:

String lockKey =
            topic + // topic名字
            PopAckConstants.SPLIT +  // 分隔符
            requestHeader.getConsumerGroup() + // 消费者组
            PopAckConstants.SPLIT +  // 分隔符
            queueId; // consumer queue id

可以得知:一个consumer queue同时只会被同一个消费者组中的某一个1个消费者实例锁住。

如果同一个消费者组中同时有2个消费者实例来pop消息,只有一个会锁成功。

这个锁的实现代码如下:

请在此添加图片描述

关于这把锁有2点需要注意:

  1. 这是一把TimedLock。 锁是有超时时间的,超过锁的时间自动释放。通过图中标注1可以看出, 当前锁服务是一个ServiceThread,在RocketMQ中这个代表这个服务是一个后台线程,会自动执行检查。
  2. 高效锁。锁是通过ConcurrentHashMap的线程安全实现的,在一般的java面试中相信大家也经常被问到。

第二步:计算pop offset

pop offset表示当前需要从这个consumer queue的哪个offset开始拉取消息。

在pop消息流程中有2处地方计算了pop offset。

请在此添加图片描述

  • 第一处。在加锁queueXX前计算pop offset。在加锁失败时,会根据pop offset估算在这个consumer queue中还有多少个消息没有被消费。
    也就是返回字段 :restNum。
  • 第二处。在加锁queueXX成功后计算pop offset。
    为什么需要重新计算?从第一次执行计算pop offset到加锁成功后这段时间可能有其他人更新了消费位点,导致第一次计算的pop offset不准确,
    在加锁queueXX成功后,queueXX只会被当前客户端pop消息,此时重新计算pop offset的值是准确值,根据这个再去store中读取消息。

第三步:读取消息

读取消息调用this.brokerController.getMessageStore().getMessageAsync()方法读取,下次有机会再细聊。

第四步:添加check point

check point消息(简称ck消息),其中记录了每次pop的消息信息。

在读取完消息后,将生成一个check point消息(简称ck消息)。ck消息将写入一个buffer中。调用代码如下:

请在此添加图片描述

写入buffer后,这些消息将进入不可见时间,也就是同一个消费者组的其他消费者实例无法再读取到,为什么呢?

在写入buffer后,其他消费者实例计算pop offset时,会把buffer中已经pop的消息计算在内,所以就不会读取到消息。

如果不可见时间到了,用户也没有ack,这些被pop 的消息会被revive服务再次恢复到用户的topic中被用户消费

第五步:释放consumer queue锁

这个没啥说的,直接看加锁的那个图。

2.2 Pop关键数据结构介绍

Pop offset

pop offset是每个消费者实例在pop消息的时候会计算的,被pop 的queue中可以被消费的消息的起始位点。

rocketmq会用这个pop offset去存储中读取消息。

读取消息的流程和4.X差不多,这里不再赘述。

下图展示了pop offset是如何计算的:

请在此添加图片描述

pop offset的值计算有3个来源:

  • 第一:查询已经提交的位点。每次用户消费完成,提交消费位点后,会更新到这里。
  • 第二:检查重置消费位点。目前5.1.4版本的重置消费位点也会单独存储。

这个是5.X中新增的逻辑, 如果broker配置

useServerSideResetOffset=true,则通过admin api可以直接重置位点, 重置的位点会临时保存,提供给pop这个时候使用。

  • 第三:检查ack提交的消费位点。
    一次pop一般会pop一批消息, 而ack可能是一条一条的ack的,所以需要检查当前ack提交到哪条消息,已经被pop而没有被ack的不能再次pop,直到被重试或者被恢复到用户topic。

Check point

数据结构如下:

public class PopCheckPoint implements Comparable<PopCheckPoint> {
    // 本次pop消息d的起始consumer queue offset
    private long startOffset; 
        // 本次pop时的时间戳,单位毫秒
    private long popTime;
    // 本次pop消息d的不可见时间,单位毫秒
        // 一般来自pop客户端请求的request header
    private long invisibleTime;
        // 特别重要
        // 记录本次pop消息的ack情况
    private int bitMap;
    // 本次pop消息d的条数
    private byte num;
    // 本次pop的consumer queue id
    private int queueId;
        // 本次pop 的topic
    private String topic;
        // 本次pop 的消费者组
    private String cid;
    // 特别重要
        // revieve topic的位点,后面详细讲解
    private long reviveOffset;
    // 特别重要
        // 本次拉取消息d的每个消息的queue offset 减去 pop offset
        // 的差值
    private List<Integer> queueOffsetDiff;
    // 本次pop 消息所在d的broker
    private String brokerName;
}

下面将一些特别重要的字段详细说明:

  • bitMap
    这个字段是一个int类型,1个int是由32个bit表示,每个bit其实就是0,1,rocketmq利用bitmap标记本次pop的消息哪些被ack(标记为1),哪些未ack(标记为0)。具体过程详见后面讲解ack的过程。
  • reviveOffset
    revive英文翻译是恢复的含义,那些不可见消息基础信息(非消息body)会保存到revive topic中,到时间后会被revive服务恢复到用户的原始topic中让用户再次消费。
    reviveOffset就是这个revive topic的consumer queue位点。
  • queueOffsetDiff
    queueOffsetDiff是一个数组,保存了本次pop的每条消息的消费位点和pop offset的差值,用来辅助rocketmq实现ack。
    大家感兴趣了解详细的可以翻看源码中是如何使用的这个字段的就知道了。
  • ReceiptHandle
    这个值叫一条消息的句柄,每个消息一条,ack的时候会给到broker,broker通过解析判断ack的哪次pop的哪条消息, 格式如下:

请在此添加图片描述

通过上图我们可以知道,所谓的句柄其实是消息的一堆属性拼接起来的一个字符串。

这个字符串实际长这个样子:

请在此添加图片描述

  • StartOffsetInfo
    这个值是一次pop一个值, 记录了pop的起始位点信息, 实际格式如下:

请在此添加图片描述

这个数据结构主要在proxy中被用到,用来帮助构造pop_ck, 也就是pop消息的句柄。因为数据简单, 样例大家自行debug看看吧。

proxy中使用的代码如下:

请在此添加图片描述

上文不是说句柄broker已经构造了, 为什么proxy还需要再构造一次呢? 大家可以思考下。

  • MsgOffsetInfo

这个值是一次pop一个值, 记录了pop的每个消息的位点信息, 实际格式如下:

请在此添加图片描述

这个数据结构主要在proxy中被用到,用来帮助构造pop_ck, 也就是pop消息的句柄。因为数据简单, 样例大家自行debug看看吧。

proxy中使用的代码如下:

请在此添加图片描述

  • OrderCountInfo
    这个值是一次pop一个值, 记录了pop顺序消息的每个消息reconsume times,格式如下:

请在此添加图片描述

proxy中使用的代码如下:

请在此添加图片描述

通过以上核心数据结构,我们可以看出来:broker针对pop输出了很多数据结构给proxy使用。

这里笔者也有一个疑问:这些数据结构加大了proxy与broker的耦合逻辑,这使得proxy做纯粹的无状态变得困难。

是否可以只做到接口耦合,不用做逻辑耦合?

03 Ack流程

ack是针对pop的, 一次pop可以pop出多条消息, 但是ack的需要解决以下几种情况:

  • 用户每次ack一条消息
  • 用户每次ack一批消息
  • 用户出错了, 永远没有ack。

上面3种情况的结果有4种:

  • 用户ack完成了这次pop的全部消息
  • 用户ack完成了部分消息, 并且ack的位点有空洞
  • 用户ack完成了部分消息, 并且ack的位点无空洞
  • 用户没有ack任何一条消息

基于上面的几种场景, RocketMQ ack是如何实现的呢?

笔者总结了ack的流程:

请在此添加图片描述

在broker中, ack的入口是

AckMessageProcessor.processRequest()方法,其中虚线是异步的流程,实线是同步流程。笔者将其分为以下5步。

第一步:proxy提交ack请求

用户提交ack请求,ack请求被Broker的AckMessageProcessor.processRequest(Channel, RemotingCommand, boolean)方法处理,并解析AckMessageRequestHeader。

AckMessageRequestHeader中包含pop ck信息, 这里逻辑上区分单个消息ack还是批量消息ack:

请在此添加图片描述

标记1: 单个消息ack

标记2: 批量消息ack

标记3: appendAck()方法是ack核心逻辑,后面的全部逻辑都在这个方法中实现。

标记4: 批量执行appendAck()方法。

可以看到标记4处理非原子操作是一种风险,批量提交结果未知,以最终结果一致为准。

第二步:内存标记消费进度

经过第一步后,我们知道核心逻辑在appendAck()中: rocketmq将ack request header解析为AckMsg,并且调用PopBufferMergeService.addAk()将ack msg写入PopBufferMergeService的缓存中。

PopBufferMergeService顾名思义,是一个在内存中提供合并的服务。

合并什么呢, 合并ack和ck消息,也就是用ack 的consumer queue offset去标记ck中的bitmap。

其实就是标记一个ck中的哪些消息被ack了,也就是标记了消费进度。

请在此添加图片描述

下面讲解一些关键变量:

point:是当前ack对应的pop check point对象,里面有一个bitmap用来标记每个消息是否被ack,

具体如何标记呢:

  • 假设拉取了4个消息,组成一个数组,每个“消息的下标”分别为:0,1,2,3
  • 4个消息是否消费的标记由4个“二进制标记”组成一个数组
  • 二进制标记数组,可以转化为“1个10进制数int”保存ck对象中

图示如下:

请在此添加图片描述

我们从pop check point对象初始化的时候可以知道, bitmap是一个int,并且初始化的值为0。将0转化为二进制,可以知道每一个bit都是0。

我们用这个bitmap的前4个bit来举例说明是如何标记每条消息是否ack的。

将int转化为bitmap,是一个bit数组,每个数组元素的下标表示pop的消息的下标。

比如pop了4条消息,按照consumer queue offset从小到大排序就会有4个consumer queue offset的下标。

假如在时间t1pop了4条消息,consumer queue offset为100, 101, 102, 103。

如果第一次ack了100,则bitmap中下标=0的bit设置为1。

bit数组的结果就是上图第一列。

请在此添加图片描述

如果第一次ack了101,则bitmap中下标=1的bit设置为1。

bit数组的结果就是上图第列二列。

请在此添加图片描述

如果分别ack了第一个、第三个消息,则bitmap的结果如上图最右一列。

请在此添加图片描述

每次ack后,bitmap都可以转化为int,并且将这个int保存到pop check中。

这里会有3个问题

  • 全部的消息都ack
  • 用户在允许的时间内没有ack完成全部消息
  • 用户ack的时候, check point消息已经不存在了

这些问题在下一步会被处理。

第三步:持久化ack到revive topic

在上一步中, 如果消息全部被ack了, 这个是正常情况, 将最终的消费位点提交到consumer offset manager中,consumer offset manager会定时自动持久化消费位点。

如果用户在允许的时间内,没有ack完成全部的消息, 此时pop check point会被删除,这些消息用户可以继续pop。

请在此添加图片描述

下面介绍了这个超时时间是如何计算的:来自pop时间和不可见时间。这里可以解释不可见时间超过后, 为什么可以再次pop到消息了。

请在此添加图片描述

如果用户在ack的时候, pop check point消息不存在了怎么办?

首先是为什么pop check point会不存在?

  • 内存不能保存全部的ck。pop check point信息会保存到内存中, 这里不可能保存全部的pop check point, broker提供配置popCkMaxBufferSize内存最大可以保存的pop check point数,默认20w。

超过后, pop check point消息会直接持久化到revieve topic。

  • 允许时间内没有ack的的ck需要丢弃,这个ck对应的全部消息全部对用户再次可见。

请在此添加图片描述

如果check point不存在了, 则将ack消息保存到revieve topic中,方便与持久化的pop check point再次匹配标记哪些消息被ack了。

请在此添加图片描述

第四步:异步标记消费进度

经过上一步,我们知道有一些check point信息和ack信息会被持久化到revieve topic。

PopBufferMergeService服务是一个后台服务, 会消费revieve topic中的ack、ck信息,然后做异步匹配, 来标记ck信息中的用户消息哪些被ack了。

请在此添加图片描述

这里细节特别多, 建议大家debug查看,这里如果需要细讲大家留言我们再出一期。

经过scan后,可以知道哪些ck中的用户消息被全部ack了, 就会提交消费位点到 consumer queue offset manager:

请在此添加图片描述

如果经过这一步,还是有ck没有完全被ack呢?请看下一步。

第五步:可见时间过后,消息恢复消费

如果经过上一步还有ck没有被ack完全匹配,此时这些ck对应的用户消息将被重新可见,用户可以重新pop。

请在此添加图片描述

这个过程是在 PopReviveService服务中实现的, 这也是一个后台服务, 会定时检查哪些ck没有被完全ack, 然后根据ck将这个ck包含的全部消息重新恢复到重试topic中。

04 结语

PopBufferMergeService还有大量的细节, 建议大家通过在每个关键点打日志,然后生产消费模拟ack的几种情况再查看日志输出,再结合代码很快就会了解更多的细节。

结尾也留2个问题,欢迎大家讨论

  1. 同一个pop ck,多次重复ack会出现什么情况, Broker是如何处理的?
  2. 如果pop没有读取到消息需要写ck信息吗, 为什么?
  3. 下期准备讲proxy或者基于时间轮的任意定时消息,想看什么请留言。

欢迎添加微信,互相学习↑↑↑ -_-

发表评论

全部评论:0条

白老虎

programming is not only to solve problems, ways to think