Kafka 核心技术与实战 —— 03 消费者
您目前处于:后端  2019-07-23

一、Consuemr Group

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。

老版本的 Consumer Group 把位移保存在 ZooKeeper 中。新版本的 Consumer Group 将位移保存在 Broker 端的内部主题中。

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。

举个简单的例子,假设目前某个 Consumer Group 下有两个 Consumer,比如 A 和 B,当第三个成员 C 加入时,Kafka 会触发 Rebalance,并根据默认的分配策略重新为 A、B 和 C 分配分区,如下图所示:

但在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。

二、Offsets Topic

__consumer_offsets 在 Kafka 源码中有个更为正式的名字,叫位移主题,即 Offsets Topic。

位移主题的 Key 中应该保存 3 部分内容:<Group ID,主题名,分区号 >

当 Kafka 集群中的第一个 Consumer 程序启动时,Kafka 会自动创建位移主题。如果位移主题是 Kafka 自动创建的,那么该主题的分区数是 50,副本数是 3。

Kafka 使用 Compact 策略来删除位移主题中的过期消息,避免该主题无限期膨胀。

Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner。

将很多元数据以消息的方式存入 Kafka 内部主题的做法越来越流行。除了 Consumer 位移管理,Kafka 事务也是利用了这个方法,当然那是另外的一个内部主题了。

三、Rebalance

所谓协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等。

如果 Consumer Group 下的 Consumer 实例数量发生变化,就一定会引发 Rebalance。这是 Rebalance 发生的最常见的原因。我碰到的 99% 的 Rebalance,都是这个原因导致的。

哪些 Rebalance 是“不必要的”:

  • 未能及时发送心跳,导致 Consumer 被“踢出”Group

  • Consumer 消费时间过长

  • Consumer 端的 GC

四、Committing Offsets

Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。

提交位移主要是为了表征 Consumer 的消费进度。

五、CommitFailedException

所谓 CommitFailedException,顾名思义就是 Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常。

场景一:当消息处理的总时间超过预设的 max.poll.interval.ms 参数值时,Kafka Consumer 端会抛出 CommitFailedException 异常。

防止这种场景下抛出异常:

  • 1、缩短单条消息处理的时间。

  • 2、增加 Consumer 端允许下游系统消费一批消息的最大时长。这取决于 Consumer 端参数 max.poll.interval.ms 的值。

  • 3、减少下游系统一次性消费的消息总数。这取决于 Consumer 端参数 max.poll.records 的值。

  • 4、下游系统使用多线程来加速消费。

场景二:应用中同时出现了设置相同 group.id 值的消费者组程序和独立消费者程序,那么当独立消费者程序手动提交位移时,Kafka 就会立即抛出 CommitFailedException 异常,因为 Kafka 无法识别这个具有相同 group.id 的消费者实例,于是就向它返回一个错误,表明它不是消费者组内合法的成员。

六、多线程开发消费者

早期 KafkaConsumer 是单线程的设计,从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程。

引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。

KafkaConsumer 类不是线程安全的 (thread-safe)。简单来说,就是你不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。

七、管理 TCP 连接

和生产者不同的是,构建 KafkaConsumer 实例时是不会创建任何 TCP 连接的,TCP 连接是在调用 KafkaConsumer.poll 方法时被创建的。


转载请并标注: “本文转载自 linkedkeeper.com (文/张松然)”  ©著作权归作者所有