消息队列:优缺点、高可用性及架构设计实战

消息队列一般在大型系统中使用比较多,特别是流量比较大的系统,小系统引入消息队列反而是一种累赘,可以使用 Redis List 数据结构模拟简单的队列,成本更低。

消息队列使用场景:解耦(例如:系统之间通讯)、异步、削峰

以下是常见消息队列 ActiveMQ、RabbitMQ、RocketMQ、Kafka 之间的对比

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,比 RocketMQ、Kafka 低一个数量级 同 ActiveMQ 10 万级,支撑高吞吐 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic 数量对吞吐量的影响 topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性 ms 级 微秒级,这是 RabbitMQ 的一大特点,延迟最低 ms 级 延迟在 ms 级以内
可用性 高,基于主从架构实现高可用 同 ActiveMQ 非常高,分布式架构 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到 0 丢失 同 RocketMQ
功能支持 MQ 领域的功能极其完备 基于 erlang 开发,并发能力很强,性能极好,延时很低 MQ 功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

RocketMQ 和 Kafka 用的比较多,对 Java 程序员比较友好,Kafka 支持的吞吐量大,经常在大数据场景下使用。

以下会介绍线上问题解决方案,使用的消息队列都是以 Kafka 为例

保证消息不会被重复消费

或者是重复消费之后如何保证系统幂等,可以从 Kafka 层面和外部系统层面解决这个问题

从 Kafka 层面:Kafka 消费完之后会定期提交 offset(默认),offset 是消费者读取消息的位置信息,新版 Kafka 会将 offset 提交到内部位移主题 __consumer_offsets,但是如果提交之后,消费失败就会存在重复消费的问题,可以通过手动提交 offset 来解决这个问题,使用 commitSync()commitAsync(),优点:控制 Offset 提交时机,避免重复。缺点:增加代码复杂性。

从外部系统层面:如果消息队列重复消费了,我们可以业务层面避免对系统的影响,这时候就要做系统的幂等方案。比如:

  • 插入之前判断数据主键是否已经存在,存在则不插入或者更新,可以使用 SQL 中 ON DUPLICATE KEY UPDATE
  • 利用去重的数据结构,例如:Set
  • 将消费之后的数据存入 Redis,每次消费都检查一边是否消费过
  • 利用数据库主键唯一性(UNIQUE),如果重复则会报错

保证消息不会丢

多一条还能避免重复消费,少一条就找不回来了

我们先要知道 Kafka 提交的偏移量 offset,Kafka 会按照配置的时间间隔(auto.commit.interval.ms,默认 5000ms)自动提交 Offset,无论消息是否成功消费或处理,所以提交 offset 可能在消费前也可以能在消费后,而且无论消费是否成功,都会提交 offset

第一种消息丢失的情况,比如:提交完 offset 后,消费数据的时候宕机了,那么这条消息就丢了,可以通过手动提交 offset 解决这个问题,重复消费的问题可以在外部系统做幂等解决

第二种消息丢失的情况,比如:leader 挂了,数据没来得及同步到被选举为 leader 的 follower,导致数据丢失,可以通过一些配置避免

  • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本
  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧
  • 在 producer 端设置 acks=all :这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了
  • 在 producer 端设置 retries=MAX (很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了

保证消息的顺序性

场景:一个 topic,有多个 partition,生产者写的时候可以将唯一标识作为 key,例如订单id,那么这个订单相关的数据都会分配到同一个 partition ,这个 partition 消息一定是有顺序的,但是消费者从 partition 取数据,如果使用并发,消息顺序就乱了

既然并发会让消息乱了,那么就一个 topic 一个 partition ,消费者同步处理消息,但是效率会变低,不是最优方案

在内存中模拟队列,可以通过 Queue 数据结构实现,同一个 Key 放入同一个 Queue,每个线程消费一个 Queue,从而保证消费的有序

解决消息积压

导致积压可能是消费者出问题,就优先解决问题,然后按照原来的消费速度进行消费,可能过一小时消将历史消息消费掉之后才会恢复正常,不是很急的话,可以用这个办法。

优化之后的方案是进行动态扩容,加快消费速度,消费完之后再变回原来的消费架构。

比如:新建一个 topic ,partition 是原来的10倍,新建一个 临时 consumer 用于分发消息,均匀写入临时的 partition,新增 10 台机器部署 consumer 进行消费,就可以以正常10倍的速度进行消费,处理完积压消息后恢复成原先架构。还可以在业务侧判断消息如果不重要则直接跳过,加快处理速度。

消息过期

Kafka 有两种消息过期策略,1. 基于时间的过期(默认7天)2. 基于磁盘大小的过期

Kafka 有两种消息清理策略,1. 到期直接删除(默认) 2. 压缩,保留最新消息版本

Segment(日志段)是 Kafka 存储数据的基本单元,比如:Segment 内消息都过期了,Kafka 会删除这个 Segment 文件,或者 Segment 大小超过了阈值,则删除 Segment 文件,这个可以提高效率

这里存在一个问题,消息过期之后,但是 同一个 Segment 中其他消息没有过期,这就会导致过期消息残留,导致消费者消费到了过期的消息,有以下几种解决办法

  1. 业务逻辑校验,在消息中存储到期时间,比如:timestamp,消费者判断消息过期则跳过
  2. 通过压缩的过期策略,只保留消息的最新版
  3. 手动提交 Offset,控制每个 Partition 的消费进度

Kafka 配置消息过期有以下几个配置项

  • log.retention.ms:设置消息最大保留时间(毫秒),超过时间后消息被删除
  • log.retention.bytes:设置最大日志文件大小,超过该大小后删除最旧消息
  • log.cleanup.policy:设置日志清理策略,可以选择删除策略或压缩策略
  • log.segment.bytes:设置日志文件的最大大小,超过该大小时会创建新的日志文件

批量方式消费

配置项 max.poll.records (限制单次调用 poll() 方法时,Kafka 消费者能够返回的最大记录数,默认500)和 fetch.max.bytes( 控制单次拉取请求(Fetch Request)允许的最大数据字节数,默认55MB)

设计一个消息队列

需要考虑一下几点

  1. 伸缩性:分布式部署,动态扩容,broker -> topic -> partition,每个 partition 放一个机器,吞吐量不够则添加 partition
  2. 可用性:多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务
  3. 可靠性:参考上文“保证消息不会丢”,通过配置 Kafka 保证消息不会丢失
  4. 持久化:使用磁盘存储数据,保证重启之后数据不会丢,磁盘顺序读写,减少寻址开销

知其然知其所以然

Kafka 源码:https://github.com/apache/kafka/tree/3.9

消息过期策略源码在 kafka.log.LogManager#cleanupLogs -> kafka.log.UnifiedLog#deleteOldSegments

1
2
3
4
5
6
7
8
9
def deleteOldSegments(): Int = {
if (config.delete) {
deleteLogStartOffsetBreachedSegments() +
deleteRetentionSizeBreachedSegments() +
deleteRetentionMsBreachedSegments()
} else {
deleteLogStartOffsetBreachedSegments()
}
}
  • deleteLogStartOffsetBreachedSegments:删除日志起始偏移量之前的任何本地日志段
  • deleteRetentionSizeBreachedSegments:基于磁盘大小的过期,日志大小 > 保留大小
  • deleteRetentionMsBreachedSegments:基于时间的过期