消息队列:优缺点、高可用性及架构设计实战
消息队列一般在大型系统中使用比较多,特别是流量比较大的系统,小系统引入消息队列反而是一种累赘,可以使用 Redis List 数据结构模拟简单的队列,成本更低。
消息队列使用场景:解耦(例如:系统之间通讯)、异步、削峰
以下是常见消息队列 ActiveMQ、RabbitMQ、RocketMQ、Kafka 之间的对比
| 特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
|---|---|---|---|---|
| 单机吞吐量 | 万级,比 RocketMQ、Kafka 低一个数量级 | 同 ActiveMQ | 10 万级,支撑高吞吐 | 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
| topic 数量对吞吐量的影响 | topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic | topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源 | ||
| 时效性 | ms 级 | 微秒级,这是 RabbitMQ 的一大特点,延迟最低 | ms 级 | 延迟在 ms 级以内 |
| 可用性 | 高,基于主从架构实现高可用 | 同 ActiveMQ | 非常高,分布式架构 | 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
| 消息可靠性 | 有较低的概率丢失数据 | 基本不丢 | 经过参数优化配置,可以做到 0 丢失 | 同 RocketMQ |
| 功能支持 | MQ 领域的功能极其完备 | 基于 erlang 开发,并发能力很强,性能极好,延时很低 | MQ 功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用 |
RocketMQ 和 Kafka 用的比较多,对 Java 程序员比较友好,Kafka 支持的吞吐量大,经常在大数据场景下使用。
以下会介绍线上问题解决方案,使用的消息队列都是以 Kafka 为例
保证消息不会被重复消费
或者是重复消费之后如何保证系统幂等,可以从 Kafka 层面和外部系统层面解决这个问题
从 Kafka 层面:Kafka 消费完之后会定期提交 offset(默认),offset 是消费者读取消息的位置信息,新版 Kafka 会将 offset
提交到内部位移主题 __consumer_offsets,但是如果提交之后,消费失败就会存在重复消费的问题,可以通过手动提交 offset
来解决这个问题,使用 commitSync() 或 commitAsync(),优点:控制 Offset 提交时机,避免重复。缺点:增加代码复杂性。
从外部系统层面:如果消息队列重复消费了,我们可以业务层面避免对系统的影响,这时候就要做系统的幂等方案。比如:
- 插入之前判断数据主键是否已经存在,存在则不插入或者更新,可以使用 SQL 中
ON DUPLICATE KEY UPDATE - 利用去重的数据结构,例如:Set
- 将消费之后的数据存入 Redis,每次消费都检查一边是否消费过
- 利用数据库主键唯一性(UNIQUE),如果重复则会报错
保证消息不会丢
多一条还能避免重复消费,少一条就找不回来了
我们先要知道 Kafka 提交的偏移量 offset,Kafka 会按照配置的时间间隔(auto.commit.interval.ms,默认 5000ms)自动提交 Offset,无论消息是否成功消费或处理,所以提交 offset 可能在消费前也可以能在消费后,而且无论消费是否成功,都会提交 offset
第一种消息丢失的情况,比如:提交完 offset 后,消费数据的时候宕机了,那么这条消息就丢了,可以通过手动提交 offset 解决这个问题,重复消费的问题可以在外部系统做幂等解决
第二种消息丢失的情况,比如:leader 挂了,数据没来得及同步到被选举为 leader 的 follower,导致数据丢失,可以通过一些配置避免
- 给 topic 设置
replication.factor参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本 - 在 Kafka 服务端设置
min.insync.replicas参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧 - 在 producer 端设置
acks=all:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了 - 在 producer 端设置
retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了
保证消息的顺序性
场景:一个 topic,有多个 partition,生产者写的时候可以将唯一标识作为 key,例如订单id,那么这个订单相关的数据都会分配到同一个 partition ,这个 partition 消息一定是有顺序的,但是消费者从 partition 取数据,如果使用并发,消息顺序就乱了
既然并发会让消息乱了,那么就一个 topic 一个 partition ,消费者同步处理消息,但是效率会变低,不是最优方案
在内存中模拟队列,可以通过 Queue 数据结构实现,同一个 Key 放入同一个 Queue,每个线程消费一个 Queue,从而保证消费的有序
解决消息积压
导致积压可能是消费者出问题,就优先解决问题,然后按照原来的消费速度进行消费,可能过一小时消将历史消息消费掉之后才会恢复正常,不是很急的话,可以用这个办法。
优化之后的方案是进行动态扩容,加快消费速度,消费完之后再变回原来的消费架构。
比如:新建一个 topic ,partition 是原来的10倍,新建一个 临时 consumer 用于分发消息,均匀写入临时的 partition,新增 10 台机器部署 consumer 进行消费,就可以以正常10倍的速度进行消费,处理完积压消息后恢复成原先架构。还可以在业务侧判断消息如果不重要则直接跳过,加快处理速度。
消息过期
Kafka 有两种消息过期策略,1. 基于时间的过期(默认7天)2. 基于磁盘大小的过期
Kafka 有两种消息清理策略,1. 到期直接删除(默认) 2. 压缩,保留最新消息版本
Segment(日志段)是 Kafka 存储数据的基本单元,比如:Segment 内消息都过期了,Kafka 会删除这个 Segment 文件,或者 Segment 大小超过了阈值,则删除 Segment 文件,这个可以提高效率
这里存在一个问题,消息过期之后,但是 同一个 Segment 中其他消息没有过期,这就会导致过期消息残留,导致消费者消费到了过期的消息,有以下几种解决办法
- 业务逻辑校验,在消息中存储到期时间,比如:timestamp,消费者判断消息过期则跳过
- 通过压缩的过期策略,只保留消息的最新版
- 手动提交 Offset,控制每个 Partition 的消费进度
Kafka 配置消息过期有以下几个配置项
- log.retention.ms:设置消息最大保留时间(毫秒),超过时间后消息被删除
- log.retention.bytes:设置最大日志文件大小,超过该大小后删除最旧消息
- log.cleanup.policy:设置日志清理策略,可以选择删除策略或压缩策略
- log.segment.bytes:设置日志文件的最大大小,超过该大小时会创建新的日志文件
批量方式消费
配置项 max.poll.records (限制单次调用 poll() 方法时,Kafka 消费者能够返回的最大记录数,默认500)和 fetch.max.bytes(
控制单次拉取请求(Fetch Request)允许的最大数据字节数,默认55MB)
设计一个消息队列
需要考虑一下几点
- 伸缩性:分布式部署,动态扩容,broker -> topic -> partition,每个 partition 放一个机器,吞吐量不够则添加 partition
- 可用性:多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务
- 可靠性:参考上文“保证消息不会丢”,通过配置 Kafka 保证消息不会丢失
- 持久化:使用磁盘存储数据,保证重启之后数据不会丢,磁盘顺序读写,减少寻址开销
知其然知其所以然
Kafka 源码:https://github.com/apache/kafka/tree/3.9
消息过期策略源码在 kafka.log.LogManager#cleanupLogs -> kafka.log.UnifiedLog#deleteOldSegments
1 | def deleteOldSegments(): Int = { |
- deleteLogStartOffsetBreachedSegments:删除日志起始偏移量之前的任何本地日志段
- deleteRetentionSizeBreachedSegments:基于磁盘大小的过期,日志大小 > 保留大小
- deleteRetentionMsBreachedSegments:基于时间的过期