消息顺序性

日志事件需要按发生的时间顺序进行处理,以便准确地重现事件顺序

  1. 分区(Partition)机制: 在 Kafka 中,每个 Topic 都可以配置为多个分区,每个分区都是一个有序的、不可变的消息日志。生产者在发送消息时,可以指定消息的键(Key),Kafka 根据这个键来进行哈希运算,将消息写入相应的分区。同一键的消息总会被写入到同一个分区,这样就保证了同一键的消息在同一个分区内是有序的。

  2. 消息键(Message Key)和分区策略: 当生产者发送消息时,可以通过配置分区策略(Partitioner)决定消息去哪个分区。默认的分区策略是基于消息键的哈希值,比如 key.hashCode() % partitionCount。通过这种策略,可以确保相同键的消息被发送到同一个分区,从而保证它们的顺序性。

  3. 消费端的顺序保证: 消费者在消费消息时,同一个消费者线程只能同时消费一个分区的消息,这样可以保证消费端在处理某个分区内的消息时是按顺序的。如果 Kafka 集群中没有足够的消费者线程,某个消费者线程可能需要同时消费多个分区的消息,但这些分区之间的顺序是无法保证的。

  4. 顺序性在高可用环境下的挑战: 当 Kafka 分区的 Leader 发生切换时,可能会有短时间的数据不一致。如果处理不当,可能会影响顺序性保证。Kafka 通过保持分区副本(Replica)的一致性,并在重新选举 Leader 时确保新 Leader 从最新的数据点开始处理,尽量减少顺序性的损失

消息存储机制

Kafka 的消息持久化主要依赖于它的一个核心组件:日志文件 (log files)

Kafka 会将消息分成若干个段 (segment),并将这些段保存在磁盘上。每条消息会被追加到当前的日志文件的末尾,Kafka 默认通过顺序写入的方式来存储数据,这样的方式使得磁盘 I/O 效率非常高。

可靠性

通过配置 acks 参数来提高数据的可靠性

  • acks=0:生产者不会等待任何服务器的确认。消息可能会丢失,但性能最高。

  • acks=1:生产者会在领导者副本(leader)成功接收到数据后收到确认。数据可靠性得到了基本保障,但如果领导者副本崩溃,仍有可能丢失消息。

  • acks=all(或 -1):生产者会等待所有同步副本(ISR)接收到数据后收到确认。数据可靠性最高,但性能会有所下降,因为需要等多个副本都确认接收

消息持久性

Kafka 使用磁盘进行消息存储,确保即使在系统故障的情况下,消息也不会丢失。具体措施包括:

  • 分区:Kafka 将每个主题分成多个分区,每个分区是有序且持久的日志。分区方便了数据的存储和读取。
  • 日志分段和索引:每个分区被分段为多个日志段,分段之后的日志文件会以可配置的方式进行轮转。Kafka 还会为每个消息生成索引,以快速定位消息。
  • 文件系统的强制刷新:Kafka 使用页缓存来提高磁盘 I/O 性能,并定期调用 fsync 系统调用,将数据从页缓存刷新到磁盘,确保数据持久化。

高可用性

Kafka 通过复制机制和分布式架构来实现高可用性,具体包括:

  • 副本(Replica):每个分区有一个主副本(Leader)和若干个从副本(Follower)。主副本处理读写请求并将数据同步到从副本,从副本在主副本失败时能顶上处理。

  • ISR(In-Sync Replica):Kafka 维护一个同步副本集合,只有在 ISR 中的副本才被认为是健康的,从而保证了高可用性。

  • ACK 机制:在生产者发送消息时,可以配置不同的确认级别(acks),例如 acks=all 则需要等待所有 ISR 中的副本确认收到消息,进一步提高可靠性

  • 相关配置: min.insync.replicas(最小同步副本数)和 replica.lag.time.max.ms(允许副本滞后的最大时间)

日志清理 (Log Compaction)

Kafka 支持日志清理机制,通过定期对日志进行压缩,删除那些已经不再需要的日志,来释放磁盘空间,两种日志清理策略:

  • 基于时间的清理:删除超过设定的保留时间的日志段。
  • 基于键值的清理 (Log Compaction):当键有多个版本的消息时,只保留最新的消息,从而减少存储开销

消息事务

消息投递语义

  • At-most-once(最多一次): 可能出现消息丢失的情况
  • At-least-once(最少一次): 可能出现消息重复的情况
  • Exactly-once(正好一次): 理想的语义实现,但实现难度较高

消息重复

参考 kafka消息偏移量机制

消息丢失

1)使用适当的确认机制(Acknowledgments)

  • 生产者确认机制(acks):可以配置acks参数来决定生产者在收到多少个副本的确认后认为消息发送成功。例如,acks=all表示所有参与副本都确认接收到消息才表示成功,能最大限度地保证消息不丢失。
  • 消费者确认机制:消费者也可以通过手动提交偏移量来确保消息已经被正确处理。例如,enable.auto.commit=false时,消费者处理完消息后手动提交偏移量。

2)配置多个副本(Replication)和耐久性(Durability)

  • 副本因子(Replication Factor):Kafka 允许配置每个主题的副本(即每个分区的备份数量)。如果一个 broker 失效,其他副本可以接管工作,从而保证消息不丢失。
  • 最小同步副本(min.insync.replicas):可以设置最小同步副本的数量,确保在确认之前至少有指定数量的副本持久化了这条消息。

3)配置合理的消费偏移(Consumer Offsets)

  • 存储位置:可以将消费偏移保存在 Kafka 中,而不是通过Zookeeper。这种方法能更好地处理消费者重启和再平衡后读取到准确的偏移。
  • 定期保存:确保定期且频繁地保存消费偏移,防止突然宕机导致偏移量丢失。

4)启用幂等生产者(Idempotent Producer)和事务(Transactions)

  • 幂等生产者:Kafka 提供enable.idempotence=true配置,确保生产者在重复写入时不会产生重复数据,保证消息不丢失。

  • 事务:Kafka 的事务功能允许生产者保证一系列消息的原子性,要么全部成功,要么全部失败,避免中间状态导致消息丢失

灾难场景分析

Broker失败

Kafka有自己的备份机制,保证消息写入leader replica成功后会冗余n份,同步到其他replica,所以理论上可以容忍n-1个broker节点宕机

Producer 到 Broker的RPC失败

Kafka的消息可靠性是基于producer接收到broker的ack确认信息为准的,但是Broker在消息已经写入但还未返回ack确认信息之前就可能会发生故障,也可能在消息被写入topic之前就宕机了,因为producer端无法知道失败的原因,只能尝试重发消息,因此某些下游场景可能就会存在消息乱序或者consumer重复消费的情况

客户端失败

客户端也可能存在永久宕机或者暂时性心跳丢失的情况,追求正确的性的话,broker和consumer应该丢弃从zombie producer发送的消息。同时新的客户端实例启动后它需要能从失败实例的任何状态中恢复,并从安全点(safe checkpoint)开始处理,这需要消费的偏移量位置和实际产生的消息输出保持同步

Kafka的Exactly-once语义保证

幂等: partition内部的exactly-once语义保证

幂等是指执行多次同样的操作得到的结果是一致的, Producer的send()操作现在就是幂等的。在任何导致producer重试的情况下,相同的消息如果被producer发送多次,也只会写入一次。需要修改broker的配置: enable.idempotence = true开启此功能

原理,类似于TCP中可靠传输的累积确认机制实现: Producer中每个RecordBatch都有一个单调递增的seq; Broker上每个topic也会维护pid-seq的映射,并且每次 Commit都会更新lastSeq。这样recordBatch到来时,broker会先检查RecordBatch再保存数据:如果batch中 baseSeq(第一条消息的seq)比Broker维护的序号(lastSeq)大1,则保存数据,否则将其丢弃:

  • 如果大1以上,说明中间有数据尚未写入,此时Broker拒绝此消息,Producer抛出InvalidSequenceNumber
  • 如果小于等于Broker维护的序号,说明是重复消息,Broker直接丢弃该消息,Producer抛出DuplicateSequenceNumber

事务性保证: 跨partition分区的原子操作

Kafka提供事务API对消息进行跨partition分区的原子性写操作:

producer.initTransactions();
try {
  producer.beginTransaction();
  producer.send(record1);
  producer.send(record2);
  producer.commitTransaction();
} catch(ProducerFencedException e) {
  producer.close();
} catch(KafkaException e) {
  producer.abortTransaction();
}

引入Transaction IDepoch, Transaction ID与PID可能一一对应,区别在于Transaction ID由用户提供,而PID是Kafka内部实现的,对用户透明, 可以保证:

  • 跨Session的数据幂等发送: 当具有相同Transaction ID的新的Producer实例被创建且工作时,epoch会单调递增,由于旧的Producer的epoch比新Producer的epoch小,旧的且拥有相同Transaction ID的Producer将不再工作
  • 跨Session的事务恢复: 如果某个应用实例宕机,新的实例可以保证任何未完成的旧的事务要么Commit要么Abort,使得新实例从一个正常状态开始工作

从Consumer的角度来看,事务保证会相对弱一些。尤其是不能保证所有被某事务Commit过的所有消息都被一起消费,因为:

  • 对于压缩的Topic而言,同一事务的某些消息可能被其它版本覆盖
  • 事务包含的消息可能分布在多个Segment中(即使在同一个Partition内),当老的Segment被删除时,该事务的部分数据可能会丢失
  • Consumer在一个事务内可能通过seek方法访问任意Offset的消息,从而可能丢失部分消息
  • Consumer可能并不需要消费某一事务内的所有Partition,因此它将永远不会读取组成该事务的所有消息

开发实现思路

自定义offset管理, 需要保证对消息的处理和offset偏移量在同一个事务中,例如在消息处理的数据写入到MySQL的同时更新此时的消息offset消息偏移量

实现:

  1. 设置enable.auto.commit = false, 关闭消息偏移量自动提交
  2. 处理消息的同时将offset消息偏移量保存,比如保存到MySQL
  3. 当partition分区发生变化的时候可能会发生分区再平衡,需要自定义类实现ConsumerRebalanceListener接口捕捉事件变化,对偏移量进行处理
  4. 在重新完成分配分区后,消费者开始读取消息之前 通过调用seek(TopicPartition, long)方法,移动到指定的分区的偏移量位置