Foreword
希望有基本Kafka使用经验的用户对Kafka有进一步认识. 大部分内容是来自2017年度好书之一《Kafka权威指南》的笔记~~
内容包括
- 一些基本姿势
- 重要指标
- 对比RocketMQ
整体架构描述
Topic & Broker & Producer & Consumer / ConsumerGroup
这几个概念是最基本的, 不作太多描述.
- A topic is a category or feed name to which records are published.
- 对于一个指定的ConsumerGroup, 共享一整份数据. 即对于消费组中的每个消费者来说, 只能消费一部分数据.
- Broker : 存储数据, 处理生成消费请求. 当启动了Kafka进程, 即为启动了一个Broker. 多个Broker构成了一个Kafka集群. (Zookeeper: /brokers/ids)
Partition & Replica
Partition
从物理角度讲, 一个Topic的数据会被分为多个Partition, 保存在不同的Broker上.一条重要的规则是, 一个Partition同时最多只能被一个Consumer消费, 即Partition和Consumer之间是多对一/一对一的. 所以注意创建Topic时分区数量会影响消费的吞吐量. 不过分区数也可以改大.
Replica
我们造, 数据保存在一个机器上可用性较低. 所以通常每个Partition会的数据会保存多份在不同的Broker中, 即多个副本. 这些副本中, 只有一个会用于处理Provider,Consumer的请求, 称之为Leader副本; 其他副本只是负责同步数据, 称之为Follower副本. 这样如果Leader跪了, 那么Follower可能被提升上来.
举个栗子
对于一个Topic, 指定分区数为3, 副本数为3. 共有共同属于一组的2个消费者, 整体情况如图.
Controller
Leader是如何选出来的? 如果跪了谁负责重新选举? 整个集群中, 一个Broker会被选举为控制器(Zookeeper: /controller). 他额外负责的作用就是, 负责分区首领的选取.
Controller自身的选举
基于ZK节点/controller
. 并额外通过/controller_epoch
避免脑裂现象.
Controller选择分区的首领副本
控制器会观察Zookeeper中的Broker路径:
- 如果有Leader Broker离开, 则会有一些分区需要一个新的Leader. Controller遍历这些分区, 选择新的Leader, 通知新的Leader/Follwer他们自己的身份.
当控制器发现一个 broker 加入集群时,它会使用 broker ID 来检查新加入的 broker 是否包 含现有分区的副本。如果有,控制器就把变更通知发送给新加入的 broker 和其他 broker, 新 broker 上的副本开始从首领那里复制消息。
再谈Producer & Consumer
获取分区信息
不论生产消费, 客户端都需要对现有的分区情况进行了解. 客户端定时(metadata.max.age.ms
)向任意一个Broker(每一个都包含了需要的所有信息)发送感兴趣Topic的”元数据请求”, Broker会返回对应Topic的信息: 包含的分区、每个分区都有哪些副本, 以及哪个副本是首领。
如果客户端收到“非首领”错误,它会在尝试重发请求之前先刷新元数据,因为这个错误说明了客户端正在使用过期的元数据信息,之前的请求被发到了错误的 broker 上。
写入哪个分区
键为null
, 则默认根据Round Robin来发到各个分区. 如果指定了键, 那么有相同键的消息将被写到同一个分区。但是注意了:
只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。举个例子,在分区数量保持不变的情况下,可以保证用户 045189 的记录总是被写到分区 34。在从分区读取数据时,可以进行各种优化。不过,一旦主题增加了新的分区,这些就无法保证了——旧数据仍然留在分区 34,但新的记录可能被写到其他分区上。
另外也可以自己实现Partitioner
.
写入错误以及重试
生成者可能收到Broker返回的错误分两种: 可以通过重试解决的错误 & 无法通过重试解决的错误. 如果是可重试错误, Producer会根据配置(retries)来决定是否重试. 可以通过重试解决的的比如LeaderNotAvailable
, 不可以的比如InvalidConfig
, 消息大小错误, 认证错误.
Broker返回的错误中可以通过重试解决的, ProducerAPI会自动重试. 而不可以的需要开发者处理. 除了Broker返回的无法重试解决错误之外, 开发还需要处理:
- 消息发送之前的错误: 序列化
- Producer重试次数达到上限, 消息占用内存达到上限.
一些小问题
数据保存多少
默认使用 log.retention.hours 参数来配置时间,默认值为 168 小时,也就是一周。
另一种方式是通过保留的消息字节数来判断消息是否过期。它的值通过参数 log.retention.bytes 来指定,作用在每一个分区上。也就是说,如果有一个包含 8 个分区的主题,并且 log.retention.bytes 被设为 1GB,那么这个主题最多可以保留 8GB 的数据。
KafkaConsumer
, KafkaProducer
线程安全性
KafkaProducer
是线程安全的. 代码注释中有提到:
The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
而KafkaConsumer
则在kafka 0.10.0.1 API)中说明了它不是线程安全的. 其中的wakeup()
方法是线程安全的, 目的是方便在A线程中终止B线程的消费. 容我直接Quote:
The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException.
The only exception to this rule is wakeup(), which can safely be used from an external thread to interrupt an active operation. In this case, a WakeupException will be thrown from the thread blocking on the operation. This can be used to shutdown the consumer from another thread.
如果想并发消费, 有下面两种方式
- One Consumer Per Thread
A simple option is to give each thread its own consumer instance. Here are the pros and cons of this approach:
PRO: It is the easiest to implement
PRO: It is often the fastest as no inter-thread co-ordination is needed
PRO: It makes in-order processing on a per-partition basis very easy to implement (each thread just processes messages in the order it receives them).
CON: More consumers means more TCP connections to the cluster (one per thread). In general Kafka handles connections very efficiently so this is generally a small cost.
CON: Multiple consumers means more requests being sent to the server and slightly less batching of data which can cause some drop in I/O throughput.
CON: The number of total threads across all processes will be limited by the total number of partitions.- Decouple Consumption and Processing
Another alternative is to have one or more consumer threads that do all data consumption and hands off ConsumerRecords instances to a blocking queue consumed by a pool of processor threads that actually handle the record processing. This option likewise has pros and cons:
PRO: This option allows independently scaling the number of consumers and processors. This makes it possible to have a single consumer that feeds many processor threads, avoiding any limitation on partitions.
CON: Guaranteeing order across the processors requires particular care as the threads will execute independently an earlier chunk of data may actually be processed after a later chunk of data just due to the luck of thread execution timing. For processing that has no ordering requirements this is not a problem.
CON: Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure that processing is complete for that partition.
There are many possible variations on this approach. For example each processor thread can have its own queue, and the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify commit.
关键问题
实际上对于所有的MQ来说, 主要的评估指标都是下面这几个关键词
- 可用性
- 重复
- 吞吐量
- 顺序性
- 丢失
基本保障
- Kafka 可以保证分区消息的顺序。如果使用同一个生产者往同一个分区写入消息,而且 消息 B 在消息 A 之后写入,那么 Kafka 可以保证消息 B 的偏移量比消息 A 的偏移量大, 而且消费者会先读取消息 A 再读取消息 B。
- 只有当消息被写入分区的所有同步副本时(但不一定要写入磁盘),它才被认为是“已提交”的。生产者可以选择接收不同类型的确认,比如在消息被完全提交时的确认,或 者在消息被写入首领副本时的确认,或者在消息被发送到网络时的确认。
- 只要还有一个副本是活跃的,那么已经提交的消息就不会丢失。
- 消费者只能读取已经提交的消息。
所以如果需要注意顺序性, 则尽量将需要保证顺序的数据, 提供相同的Key. Kafka的配置都是各项指标之间的权衡, 是一种零和博弈.
Producer
发送方式:
Future<RecordMetadata> send(ProducerRecord<K,V> record)
Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)
虽然异步发送本身也会重试, 但如果想确保发送成功后再处理后续操作, 则可以通过future.get()
变成同步, 或者在callback
中指定后续操作.
acks
吞吐量 还是 丢数据
- 0: 如果能通过网络发送出消息就认为成功. (除非序列化错误, 或者网卡错误, 否则不会报错).
- 1: 只要Leader节点收到, 并写入分区数据文件(不一定是磁盘)即可. (首领崩溃, 暂未选举出来
LeaderNotAvailableException
会重试). 这种依然可能丢数据: 写入leader后, follower还没来得及同步, leader就挂了. - all: 首领在返回确认或错误响应之前,会等待所有同步副本都收到消息。如 果和 min.insync.replicas 参数结合起来,就可以决定在返回确认前至少有多少个副本 能够收到消息。
retries, retry.backoff.ms
丢数据 还是 重复
如果不设置重复, 那么可能会丢数据. 而如果设置了重试, 则有重复的风险: 假如由于网络问题没有收到Broker确认, 实际上却成功了, 那么重试的话就多了一条.
所以说, 如果使用了多次重试, 可以保证”at least once”, 不能保证”exactly once”.
Broker
default.replication.factor, replication.factor
硬件成本 还是 可靠性
设置多个副本可用性更高, 但硬件成本也更高. 一般设置3个就足够安全.
replica.lag.time.max.ms
来自Apache Kafka - Documentation)
If a follower hasn’t sent any fetch requests or hasn’t consumed up to the leaders log end offset for at least this time, the leader will remove the follower from isr
ISR是in-sync replica的缩写.
unclean.leader.election
一致性(可能丢数据) 还是 可用性
当分区首领不可用时,一个同步副本会被选为新首领。如果在选举过程中没有丢失数据,也就是说提交的数据同时存在于所有的同步副本上,那么这个选举就是 “完全选举”.
如果把 unclean.leader.election.enable 设为 true,就是允许不同步的副本成为首领, 即允许“不完全的选举”
两个极端Case, 假设副本数量为3
- 两个follower跪了. 后来leader也跪了, 然后一个follower启动. 那么这个follower是唯一可用的, 但是是不同步的.
- 两个follower因为网络原因不同步. Leader突然跪了, 数据永远无法同步~
选择: 在数据无法一致的情况下, 是否提升一个Follower做leader?
- 否: 只能等之前的leader恢复, 否则系统一直不可用
- 是: 数据不一致.
大部分银行系统宁愿选择在几分钟甚至几个小时内不处理信用卡支 付事务,也不会冒险处理错误的消息。
min.insync.replicas
可用性 还是 一致性
表示至少有多少个同步的副本, 才允许向对应分区写数据. 比如当此项设置为2, 如果3个副本中有两个不同步了. 那么写时会抛出NotEnoughReplicas
.
Consumer
消费者
消费者侧的主要问题就是发生在Rebalance
阶段: 当一个Consumer退出, 需要将本属于他的分区交给另一个Consumer.
如果所有的Consumer都能稳定地存活那是最理想的了, 可是如果其中一个消费者挂掉, 另一个接替他分区的从哪里开始处理? 答案就是, 消费者会向Broker提交自己消费的offset, 表示消费完了哪些数据. 消费者端大部分问题都源于offset的提交
auto.offset.reset
当消费者最开始加入的时候, 是根据配置决定从哪里读的: 最近(latest
), 或者最早earliest
. 设置为最近则可能会丢数据, 设置为最早, 则可能重复消费数据.
enable.auto.commit/auto.commit.interval.ms.
重复 还是 性能
是否自动提交/自动提交间隔. 如果使用自动提交, 频率比较低, 那么重复数据可能性更大, 如果太频繁就会有额外开销.
如果手动提交, 同样要注意提交频率. 也要注意是commitSync
还是commitAsync
:
在成功提交或碰到无法恢复的错误之前,commitSync() 会一直重试,但是 commitAsync() 不会,这也是 commitAsync() 不好的一个地方。它之所以不进行重试,是因为在它收到 服务器响应的时候,可能有一个更大的偏移量已经提交成功。假设我们发出一个请求用 于提交偏移量 2000,这个时候发生了短暂的通信问题,服务器收不到请求,自然也不会 作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量 3000。如果 commitAsync() 重新尝试提交偏移量 2000,它有可能在偏移量 3000 之后提交成功。这个时 候如果发生再均衡,就会出现重复消息。
我们之所以提到这个问题的复杂性和提交顺序的重要性,是因为 commitAsync() 也支持回 调,在 broker 作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标,不 过如果你要用它来进行重试,一定要注意提交的顺序。
恰当处理Rebalance
ConsumerRebalanceListener
有两个需要实现的方法.onPartitionsRevoked(Collection<TopicPartition> partitions)
方法会在 再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了。
幂等消费
如上文所说, 如果因重试导致Kafka无法保证exactly once
, 则消费时要保证操作是幂等的.
处理消费中遇到的错误
假如消费到, 但是部分调用出现故障(比如数据库), 可以放进另一个队列里, 等会消费. @see RocketMQ (%RETRY%, %DLQ%)
RocketMQ对比
这篇Motivation - Apache RocketMQ中说明的比较全面.
- Kafka支持单个Partition内的顺序保证, RocketMQ能保证全局的顺序性. (?好奇如何保证的)
- 消息回溯, Kafka支持基于offset, RocketMQ支持基于offset和时间.
- RocketMQ支持调度消息(延迟)
- RocketMQ支持Broadcast
- Kafka需要依赖Zookeeper.
- RocketMQ支持RETRY, DLQ
但是根据阿里的文档来看, 上面这些并不是当初实现RocketMQ的主要目的. 主要的原因是RocketMQ在更多Queue(对应 Kafka Partition), 更多Topic的场景下性能更好. 从而提升了队列的生产消费并行度. 性能更好的原因 How to Support More Queues in RocketMQ? - Apache RocketMQ和RocketMQ与kafka对比(18项差异) | 阿里中间件团队博客都有提到.
See Also
- Kafka Patterns
- Apache Kafka
- 《Kafka权威指南 - Neha Narkhede, Gwen Shapira, Todd Palino》
- How does Kafka store offsets for each topic? - Stack Overflow
- Kafka 1.0.1 API
- Exactly-once Semantics is Possible: Here’s How Apache Kafka Does it
- Kafka vs RocketMQ—— Topic数量对单机性能的影响 | 阿里中间件团队博客
- RocketMQ与kafka对比(18项差异) | 阿里中间件团队博客
- 十分钟入门RocketMQ | 阿里中间件团队博客
- RocketMQ Documentation - Motivation
- 分布式开放消息系统(RocketMQ)的原理与实践 - 简书