消息队列序及kafka分析

消息队列已经成为各个互联网大厂必用的基础设施,也是现在程序员必须掌握除Java, mysql, redis之外的又一核心技能。从我在大厂接触的项目来看,消息队列主要用于解耦,广播,异步等,具体应用包括多个系统处理能力差异异构解决方案,数据库CDC方案,业务系统和业务系统的交互(另一种是RPC)等。总之来说,消息队列已经是整个系统必备的基础建设。

功能

解耦、广播(发布/订阅模式)、平滑削峰

解耦: 解耦是指将系统中原本直接依赖的模块或服务通过消息队列间接关联起来。生产者(发送消息的系统)和消费者(处理消息的系统)之间不再有直接的调用关系,它们都只与消息队列交 。 ”高内聚、低耦合“这句设计理念刚好能被消息队列贯彻。在实践项目的过程中,体现在上游和下游直接解耦,不直接进行沟通交互,出现代码层面的耦合

比如:在一个电商系统中,当用户下单后,“订单服务”(生产者)会发送一个“订单创建成功”的消息到 Kafka 的 order_created Topic。下游的“库存服务”、“通知服务”、“积分服务”(消费者们)各自订阅这个 Topic。而不用在订单服务手动调用这几个服务,当然会涉及到分布式事务问题,后面有机会细聊,本质上是单体系统逐渐向分布式微服务的演化形成的问题。

  • 如果“通知服务”暂时宕机,不影响“订单服务”继续创建订单,也不影响“库存服务”和“积分服务”处理消息。
  • 未来如果新增一个“物流跟踪服务”也需要知道订单创建事件,只需让它订阅 order_created Topic 即可,无需修改“订单服务”。

广播(事件驱动):广播是指一条消息可以被多个不同的消费者(或消费群体)都接收并处理。当一条订单消息进入 order_created Topic,这三个不同的消费者组(库存、通知、分析)都会收到并处理这条订单消息。一个事件(消息)可以触发多个下游系统的不同业务逻辑。

平滑削峰 :在系统面临突发流量(例如秒杀活动、促销日)时,后端处理能力可能无法瞬间匹配峰值请求。消息队列可以作为缓冲区,将短时间内涌入的大量请求暂存起来,然后让后端系统按照自己的处理能力平稳地、逐步地从队列中拉取并处理这些请求,避免后端系统因过载而崩溃。这点几乎是除了限流的另一个保证系统高可用的方式。

基本特性

消息完整性

  • 不丢失(数据可靠replica、投递可靠ack)
  • 消费一次(记录消费位置、至少消费一次、业务幂等)
  • 顺序消费(全局有序(单线程生成+单队列+单线程消费)、局部有序(hash一类消息至一类分区)、大致有序(时间窗口期之间有序))

可靠/性能

延迟消息、事务消息、回溯、死信

容灾(节点容灾、集群容灾、机房容灾、地域容灾)

kafka

image-20250603235402421

Broker

partitoin

  • 数据分片
  • 顺序写入

producer

  • Round Robin
  • Partition key

consumer group

  • 组之间隔离
  • 组内共享

Partition & Replica

可靠不只是磁盘,还能是复制

kafka集群的热扩展怎么做的?

对于kafka来说,热扩展的做法都是手动添加broker,具体要结合业务来看,是否要考虑消息丢失、消息重复、消息失序的问题。根据要求的不同,可以选择不同的方案。

场景一:对消息丢失、重复、失序都基本不敏感 (例如:日志收集、指标监控等允许少量误差的场景)

直接新增broker就行。使用 kafka-reassign-partitions.sh 工具生成分区重分配计划,将现有Topic的分区副本均匀(或按需)地分布到包括新Broker在内的所有Broker上。

  • 失序:在这种场景下不关心。即使增加了现有Topic的分区数(kafka-topics.sh --alter --partitions <new_count>),导致key的路由变化,也是可接受的。
  • 重复:如果源头(生产者)或消费端本身就可能产生重复,那么扩展过程中的短暂波动影响不大。
  • 丢失:配置合理的acks和min.insync.replicas来防止Broker故障导致的数据丢失。

场景二:担心消息失序,但不担心消息重复 (例如:某些事件处理,顺序很重要,但消费端有去重逻辑)

热扩展一定会导致全局失序。所以只能是仅迁移现有分区,不改变Topic分区数。新Broker承载了部分旧分区的负载,(意义: 如果未来需要更高的并发,可以创建新Topic时在新Broker上分配更多分区)

消息重复:需要自定义消费者去处理。

场景三:严格要求消息不丢失、不重复(或由Kafka层面保证幂等性)、且Key有序 (例如:金融交易、订单处理等)

对于这个常见来说,其实是不推荐进行热扩展。因为一个consumer对应一个partition,直接热扩展一定会打乱消息的顺序,做这个操作除非是在生产者、消费者中有兜底逻辑,否则会造成严重的错误。

kafka为什么这么快?

  1. 顺序写入优化:kafka将消息顺序写入磁盘,减少磁盘的寻道时间
  2. 批量处理技术:生产者在发送消息时等积累到一定量再发送,减少网络开销/磁盘io操作的次数
  3. 零拷贝技术:直接将数据从磁盘发送到网络套接字
  4. 压缩技术:对消息进行压缩

low watermark(低水位:log StartOffSet):控制日志清理起点,早于低水位的数据已经被删除

  • 日志保留策略:日志保留时间超时/日志总大小超过阈值
  • 压缩任务触发:低水位标记已经完成压缩的位置

如何将topic里面的消息合理的分发到不同的partition中?

和负载均衡类似,将一批数据进行分片,如果为了做均匀数据分片最好的方式就是轮询。

但是如果消息是需要区分先后顺序的,则需要将用一类消息划分到一起,采用指定key进行分片。

除了这两种方式,还可以自定义去实现分区策略,通过实现org.apache.kafka.clients.producer.Partitioner

消费者如何获取不同的partition中里面的消息?

第一种就是Range策略,根据公式 分区/消费者数量 来分配partition

第二种是**RoundRobin,**可能按照Range分配的partition不均衡,我们对所有partition进行hashcode之后再进行分配,可以保证partition的分配均匀

kafka的高可用

消息备份机制

消息备份主要是允许同一个Partition存在多个消息副本Replica,每一个Partition的副本由一个Leader和0个以上的Follower组成,Follwer会周期向Leader发送同步消息。同一个Partiton分布在不同的broker上,确保一个broker宕机之后,其他能够正常访问。

主要依赖于ISR机制

kafka保证数据一致性的核心机制

  • kafka的水位(water mark)机制
    1. 水位的作用
      高水位(High Watermark)是 Kafka 用来控制消费者可见消息范围的边界线。它标记了「已被成功备份到所有副本」的最新消息位置。
    2. 消息可见性规则
      生产者新写入的消息必须被同步到指定数量的副本(例如 ISR 副本集合),才会提升高水位。只有水位线之前的消息才对消费者可见,未达到备份数量的新消息会被"压在水位线下",消费者无法读取。
    3. 实际场景示例
      假设副本数要求为3,当生产者写入消息A时:
      • Leader 副本保存A → 不可见(尚未同步)
      • 同步到1个Follower → 不可见(未满足副本数)
      • 同步到第2个Follower → 高水位提升 → 消息A对消费者可见
    1. 背后的设计意义
      这种机制通过牺牲少量延迟(等待副本同步)来保证:消费者永远不会读到可能丢失的数据(未完成备份的消息可能在Leader宕机时丢失),从而实现了 Exactly-Once 语义的数据可靠性保障。
  • LEO(log end offset) ,即日志末端偏移,指向了副本日志中下一条消息的位移值(即下一条消息的写入位置)

Leader的HW值由ISR中的所有备份的LEO最小值决定(Follower在发送FetchRequest时会在PartitionFetchInfo中会携带Follower的LEO)

image-20250603235441701

  1. 数据可靠replica具体是怎么做的?

kafka为每一个partition提供了replication机制,意思就是每一个partition里面的消息都有多个副本。消息会先写到leader partition里面,然后再通过网络异步写入其他broker的follow partition中。所以根据选择消息写入的确认时机可以来设置消息的replication机制,具体根据request.required.acks参数来设置可靠性:

  • 1:这意味着producer在replication中的leader已成功收到的数据并得到确认后发送下一条message,如果leader宕机了,则会丢失数据。
  • 0:这意味着producer无需等待来自broker的确认而继续发送下一批消息,这种情况下数据传输效率最高,但是数据可靠性确是最低的。
  • -1:producer需要等待replication中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高,但是这样也不能保证数据不丢失,比如当replication中只有leader时,这样就变成了acks=1的情况。

故障恢复

那假如真的发生了问题,怎么进行恢复呢?发生故障主要是两个方面,Broker和Controller(管理:负责各Partition的Leader选举以及Replica的重新分配)

当一个Broker与其他Broker断开连接,由于ZooKeeper还能接收到Broker0的心跳,因此ZooKeeper认为Broker0依然存活,则对于

  • Partition 0 (Broker0 为Leader): Broker0超时没有收到Broker1、Broker2的请求,会将自身ISR收缩到Broker0本身,并且将ISR变更同步到ZK;并且Broker0根据最小的replicas配置来决定是否继续接收数据
  • Partition 1(Broker0 为Follow):Broker1将Broker0从ISR中移除,后续恢复并且赶上Broker1后重新添加到Partition 1的ISR

当Broker与ZK断开连接,ZK会删除该节点,并且认为Broker0已经宕机。

  • Partition 0 (Broker0 为Leader): ZK删除节点之后,节点上注册的Watcher会通知Controller,Controller从存活的ISR中选择新的Broker作为Leader,并且将变更通知发布到其他节点。生产者有个定时任务会获取到最新的节点数据,会改向新的Leader发送Partition数据。另外Broker0收不到ZK的通知,会继续认为自己是Leader,此时如果生产者还会改变,Broker0会根据ack确定是否写入消息。如果是ack=-1,则broker0再其它超时的时候,会主动发ISR变更消息到ZK但是失败就不再接收消息了。如果Broker恢复,会发现自己不是leader,为了保证此时和新leader一致,会进行消息截断(ack=0、1),此时会造成消息丢失
  • Partition 1(Broker0 为Follow): Broker0中的副本只是作为Partition1的Follower节点,而Broker0与Broker1依然保持连接,因此Broker0依然会向Broker1发送FetchRequest。只要Broker0能继续保持同步,Broker1也不会向ZooKeeper变更ISR

Controller与ZooKeeper断开连接

此时ZooKeeper会将Controller临时节点删除,并按照下节的故障恢复过程重新竞选出新Controller。而原本的Controller由于无法连上ZooKeeper,它什么也执行不了;当它与ZooKeeper恢复连接后发现自己不再是Controller,会在Kafka集群中充当一个普通的Broker

Controller与某个Broker断开连接

因为Controller无法通知到Broker0,所以Broker0不晓得Partition0的Leader已经更换了,所以会出现短暂服务不可用。Controller不会重新选举。

kafka的高性能

生产方:批量发送

Broker:持久化消息(顺序写+零拷贝)

批量发送:kafka将一个topic分成多个partition,根据不同的路由策略可以将消息发到不同的partition对应的本地队列中,在批量发送到partition对应的leader节点。类似于MAC协议的分组突发。

发送时机主要有两个,一个是消息大小达到阈值,另一个是消息等待时间达到阈值

  • Kafka在批量发送失败时不会重发整个批次,而是只重试失败的消息
  • 失败的消息会被添加到新的批次中进行重试

持久化设计:

producer在生产一批消息的时候会给每个消息设置一个批内顺序值(0,1,2,3,,,)。而在broker中partition存储的时候,需要存储保证所有消息的顺序,所以broker端会维护一个绝对偏移值。Kafka通过nextOffset(下一个偏移量)来记录存储在日志中最近一条消息的偏移量,也就是下一条消息的写入位置。

Broker将每个Partition的消息追加到日志中,是以日志分段(Segment)为单位的。当Segment的大小达到阈值(默认是1G)时,会新创建一个Segment保存新的消息,每个Segment都有一个基准偏移量(baseOffset,每个Segment保存的第一个消息的绝对偏移量),通过这个基准偏移量,就可以计算出每条消息在Partition中的绝对偏移量。 每个日志分段由数据文件和索引文件组,数据文件(文件名以log结尾)保存了消息集的具体内容,索引文件(文件名以index结尾)保存了消息偏移量到物理位置的索引。

关于写入磁盘问题?

  • Kafka消息写入是即时的(写入页缓存PageCache)
  • 实际刷盘(fsync到物理磁盘)由多种条件触发,而非仅在Segment写满时
  • Segment写满是触发新建Segment的条件,但不是唯一的刷盘时机
  • Kafka默认优先考虑性能,依赖操作系统页缓存和复制机制保证数据安全

零拷贝:解决网络数据持久化到磁盘(Producer到Broker),磁盘文件通过网络发送(Broker到Consumer)的过程