一、Kafka基本概念

  • Producer: 消息和数据的生产者,它会根据要求将数据以不同的分区策略分布到各个分区里
  • Consumer:消息和数据的消费者,订阅数据(topic)并且处理其发布的消息的进程/代码/服务
  • Consumer Group:逻辑概念,对于同一个topic,会广播给不同的group,一个group中,只有一个consumer可以消费该消息,以保证一个组内的每条消息仅会被消费一次
  • **Broker:**物理概念,是 Kafka 的核心,消息在这里存储和管理。每个 Kafka 集群可以包含一个或多个 Broker,负责接收、存储、以及发送数据
  • Topic:逻辑概念,kafka消息的类别,对数据进行区分、隔离
  • Partition:物理概念,kafka下数据存储的基本单元。一个topic数据,会被分散存储到多个Partition,每一个Partition是有序的
  • Replication: 同一个Partition可能会多个Replication,多个Replication之间数据是一样的。每个分区有一个 leader 副本,处理所有读写请求,其他副本作为 follower,只做数据同步,当 leader 副本故障时,可以通过选举机制选出新的 leader,从而保证数据的可靠性和可用性
  • ISR (In-Sync Replicas):Kafka 中的 ISR 是指所有和 Leader 保持同步的副本集合。只有在 ISR 内的 Followers 才有资格在 Leader 失败时被选举成新的 Leader。从而,ISR 集合的维护是保证系统一致性的重要机制
  • ReplicaManager:负责管理当前broker所有分区和副本的信息,处理KafkaController发起的一些请求,副本状态的切换、添加/读取消息等

二、 Kafka特点

分布式

  • 多分区
  • 多副本
  • 多订阅者
  • 基于ZooKeeper调度

高性能

  • 高吞吐量
  • 低延迟
  • 高并发
  • 时间复杂度为O(1)

持久性和扩展性

  • 数据可持久化
  • 容错性
  • 支持水平在线扩展
  • 消息自动平衡

Kafka快的原因

  • 多partition提升了并发
  • zero-copy零拷贝
  • 顺序写入
  • 消息batch批量操作
  • page cache页缓存

三、Kafka基本命令使用

  • 创建主题(4个分区,2个副本)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
  • 查询集群描述
bin/kafka-topics.sh --describe --zookeeper hadoop2:2181
  • topic列表查询
bin/kafka-topics.sh --zookeeper hadoop2:2181 --list
  • 新消费者列表查询
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server hadoop2:6667 --list
  • 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
bin/kafka-consumer-groups.sh --describe --bootstrap-server hadoop2:6667 --group test
  • 生产者发送消息
bin/kafka-console-producer.sh --broker-list hadoop2:6667 --topic kafka-test1
  • 消费者消费消息
bin/kafka-console-consumer.sh --bootstrap-server hadoop2:6667 --topic kafka-test1 --from-beginning

四、Kafka常用配置

生产者的配置

参考链接: kafka生产者Producer参数设置及参数调优建议

  1. acks 指定需要有多少个分区副本收到消息,生产者才会认为消息写入是成功的,这个参数对消息丢失的可能性有重要影响

    • acks=0 表示produce请求立即返回,不需要等待leader的任何确认。这种方案有最高的吞吐率,但是不保证消息是否真的发送成功
    • acks=1 只要集群的leader节点收到消息,生产者就会收到一个来自服务器的成功 响应。如果消息无法到达leader节点(比如leader节点崩溃,新的leader还没有被选举出来), 生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个 没有收到消息的节点成为新leader,消息还是会丢失。这个时候的吞吐量取决于使用的是Kafka生产者——向Kafka写入数据 同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用 Future 对象的 get() 方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生 产者在收到服务器响应之前可以发送多少个消息)。
    • acks=all/-1 分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为produce请求成功, 这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的
  2. buffer.memory 设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息

  3. compression.type 设置消息压缩算法, 可以设置为snappy、gzip或lz4

  4. retries 生产者可以重发消息的次数

  5. batch.size 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里一起发送,该参数指定了一个批次可以使用的内存大小, 按照字节数计算(而不是消息个数). 不过批次没满的时候如果达到了 linger.ms 设置的上限也会把批次发送出去

  6. linger.ms 指定生产者在发送批次之前的等待时间, 配合batch.size设置

  7. client.id 消息来源的识别标志

  8. max.in.flight.requests.per.connection 指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试

  9. 超时时间配置

    • request.timeout.ms: 生产者在发送数据时等待服务器返回响应的时间
    • metadata.fetch.timeout.ms: 生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间
    • timeout.ms: broker 等待同步副本返回消息确认的时间
  10. max.block.ms 该参数指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间, 达到上限后会抛出超时异常

  11. max.request.size 用于控制生产者发送的请求大小

  12. recieive.buffer.bytessend.buffer.bytes 这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小

消费者配置

  1. fetch.min.bytes 该属性指定了消费者从服务器获取记录的最小字节数, 类比生产者buffer.memory的配置。broker 在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者

  2. fetch.max.wait.ms 我们通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。而 fetch.max.wait.ms 则用于指定 broker 的等待时间,默认是 500ms, 类比生产者linger.ms配置

  3. max.partition.fetch.bytes 指定服务器从每个分区里返回给消费者的最大字节数

  4. session.timeout.msheartbeat.interval.ms heartbeat.interval.ms指定了 poll() 方法向协调器发送心跳的频率, session.timeout.ms 则指定了消费者可以多久不发送心跳。heartbeat.interval.ms 必须比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。

  5. auto.offset.reset 指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长 时间失效,包含偏移量的记录已经过时并被删除)该作何处理(latest/earliest/none/anything else)

  6. enable.auto.commit 指定了消费者是否自动提交偏移量,默认值是 true, 手动维护offset 时需要设置为 fasle

  7. partition.assignment.strategy 分区分配策略

  8. client.id broker用来标识从客户端发送过来的消息

  9. max.poll.records 该属性用于控制单次调用 poll() 方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量

  10. receive.buffer.bytessend.buffer.bytes

五、Producer 生产消息和 Consumer 消费消息

Producer 发送消息的过程可以分为以下几个步骤:

  1. 序列化消息:首先,Producer 会将消息对象(比如说一个 Java 对象)序列化成字节数组。Kafka 的序列化机制是可插拔的,你可以使用默认的,也可以自定义。
  2. 分区选择:Producer 根据配置的分区策略(默认是轮询,当然也可以自定义,比如按消息的 key 进行散列)选择要将消息发往的分区。
  3. 发送到缓冲区:Producer 将消息存入一个缓冲区(RecordAccumulator)。这个缓冲区是一个庞大的阻塞队列,它会把消息批量存储起来。
  4. 批量发送:设置 linger.msbatch.size 参数,当等待时间超过设定的阈值,或者缓冲区内的消息达到一定的大小时,Producer 会将消息批量发送到 Kafka Brokers

Consumer 订阅 Topic 分为两种方式:

  • 自动订阅:消费者使用 subscribe 方法,传入一个 Topic 列表。如果 Topic 列表发生变化,消费者会自动调整。
  • 手动订阅:消费者使用 assign 方法,传入一个 Topic 和分区的列表。消费者只接收这些分区的数据,不会自动感知 Topic 列表的变化。

Kafka 的消费模式主要有两种:

  • 拉取模式(Pull):消费者显式地从 Kafka 中拉取(poll)消息。这种模式下,消费者可以控制消费的速率。
  • 推送模式(Push):虽然 Kafka 本身不直接支持推模式,但我们可以在消费者的基础上实现一个简易的推模式,即生产者或中间层负责将消息主动推送给消费者

批量消费

  1. 消费者从 Kafka broker 中拉取消息时,可以指定每次拉取的最大消息数(max.poll.records
  2. 消费者会等待直到足够多的消息到达,或者达到指定的超时时间(fetch.max.wait.ms),然后一起处理这些消息
  3. 一旦消费者拉取到一批消息,应用程序就可以对这批消息进行处理。通过批量处理,可以减少每次处理的开销(例如数据库插入、文件写入等操作的开销),从而提高整体的处理效率

故障处理: 在批量消费模式下处理失败的消息时,需要特别注意。如果批量中的某条消息处理失败,通常会影响整个批次的处理结果。可以采取以下措施来处理这种情况:

  • 对于无法处理的消息,可以进行重试或记录日志,必要时可以将这些消息发送到特定的错误队列(Dead Letter Queue, DLQ)。

  • 采用部分成功策略,只处理那些成功的消息,失败的消息可以单独处理

反压机制

Kafka 的反压机制主要通过调节发送速率和分区的流量控制来实现。具体来说,它提供了多个控制点,如批量发送、消息积压检测、消费者消费速率调节等。为了避免生产者压垮消费者,Kafka 可以针对不同的情况采取如下几种措施:

  • 配置适当的 linger.ms(生产者在发送一个消息批次之前等待的时间)batch.size(每个批次的消息大小) 参数,控制消息发送的频率和每次发送的消息大小,这样可以减缓生产者的压力。
  • 通过设置 acks 参数确保消息在被写入多个副本之前,生产者会等待响应。
  • 设置消费者每次请求最小拉取的字节数 fetch.min.bytes等待可用数据的最大时间 fetch.max.wait.ms,控制拉取请求频率,从而控制流量
  • Kafka消费者可以选择手动提交偏移量,通过控制提交的频率来调节消息处理进度,从而实现流量控制