MQ面试题目hot

MQ面试题目hot
mengnankkzhou消息队列
1.什么是消息队列?
你可以把消息队列理解为一个使用队列来通信的组件。它的本质,就是个转发器,包含发消息、存消息、消费消息的过程。最简单的消息队列模型如下:
我们通常说的消息队列,简称MQ(Message Queue),它其实就指消息中间件,当前业界比较流行的开源消息中间件包括:RabbitMQ、RocketMQ、Kafka。
2.消息队列怎么选型?
Kafka、ActiveMQ、RabbitMQ、RocketMQ来进行不同维度对比。
| 特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
|---|---|---|---|---|
| 单机吞吐量 | 万级 | 万级 | 10 万级 | 10 万级 |
| 时效性 | 毫秒级 | 微秒级 | 毫秒级 | 毫秒级 |
| 可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 非常高(分布式) |
| 消息重复 | 至少一次 | 至少一次 | 至少一次 最多一次 | 至少一次最多一次 |
| 消息顺序性 | 有序 | 有序 | 有序 | 分区有序 |
| 支持主题数 | 千级 | 百万级 | 千级 | 百级,多了性能严重下滑 |
| 消息回溯 | 不支持 | 不支持 | 支持(按时间回溯) | 支持(按offset回溯) |
| 管理界面 | 普通 | 普通 | 完善 | 普通 |
kafka:
极高吞吐量和并发处理能力,适合海量数据流
消息无状态,不支持复杂路由(需应用层实现)
消息持久化和多副本机制保证数据不丢失
延迟性在高并发下表现不如 RabbitMQ
适用于流式数据处理,天生支持大数据生态
消费端需要自己管理 Offset,复杂度较高
强大的消费者组和分区机制,易于水平扩展
不支持 JMS 协议
提供了生产者幂等性、事务性(Exactly-Once)
维护和配置相对复杂
日志收集与聚合:作为日志数据生产者和消费者之间的桥梁,高效收集来自各种服务的海量日志数据,并传输到大数据分析平台(如 ELK Stack, Hadoop HDFS)。
流式数据处理:与 Flink、Spark Streaming 等流处理框架结合,构建实时数据管道和实时计算平台,用于实时报表、风控、推荐等。
用户行为追踪:追踪网站/APP 上的用户点击、浏览、搜索等行为数据,用于用户画像、精准营销和数据分析。
RabbitMq:
支持 AMQP 协议,功能丰富,如四种交换器
吞吐量相较 Kafka 和 RocketMQ 较低
路由灵活,满足多种消息分发需求
持久化性能一般,对硬盘依赖较重
易于上手和管理,有友好的管理界面
遇到大量消息堆积时,性能会急剧下降
可靠的消息确认机制(ACK/NACK)
高可用集群部署相对复杂且对网络要求高
支持延迟队列、死信队列、优先级队列等高级特性
客户端库多语言支持不如 Kafka 广泛
复杂路由与消息分发:电商订单系统,订单支付成功后,需要同时通知库存系统、物流系统、积分系统、短信通知系统等多个模块,且可能根据订单类型进行不同路由。
短任务异步处理:用户注册后发送激活邮件、生成缩略图、处理小文件等,将这些耗时短但不影响主流程的任务异步化。
服务间解耦与消息驱动:微服务架构中,服务之间通过消息进行通信,实现松耦合和事件驱动架构。例如,商品价格更新事件通知给缓存服务、搜索服务和推荐服务。
RocketMQ:
高吞吐量、低延迟,专为互联网电商场景优化
社区生态相比 Kafka 较小,国际化程度不够
丰富的功能特性,如顺序消息、分布式事务、回溯
部署和运维相对复杂
消息可靠性高,支持同步/异步刷盘
依赖 Java 生态,客户端语言支持相对局限
集群扩展性好,支持多 Master/Slave 模式
文档虽然有中文,但不如 RabbitMQ 和 Kafka 详尽
针对消息中间件的高级需求(如消息轨迹)支持好
电商交易系统:处理海量的交易消息,支持分布式事务(如订单创建与支付扣减的事务一致性),保证消息的可靠性和顺序性。
金融支付系统:对消息的可靠性、事务一致性、顺序性要求极高,RocketMQ 在这些方面表现优异。
双十一等高并发场景下的削峰填谷:在瞬时流量高峰到来时,将大量请求暂存到消息队列,然后后端服务根据自身能力匀速消费,确保系统稳定不崩溃。
ActiveMQ:
完全支持 JMS 1.1 和 2.0 规范,易于集成
性能较差,吞吐量低,不适合高并发场景
支持多种传输协议(如 OpenWire, Stomp, MQTT)
消息积压时性能急剧下降,可能导致 OutOfMemory
易于上手,配置简单,开箱即用
长期运行稳定性有待提高
社区活跃度不如前三者,但功能稳定成熟
持久化方式多但都不突出,可靠性一般
提供了 Web 控制台
缺乏对大数据和流处理的天然支持
传统企业应用集成(JMS):在基于 JMS 标准的老旧或传统企业内部系统之间进行集成,作为消息传递的桥梁。
小型或中型项目的轻量级消息通信:对性能要求不高,但需要基本消息队列功能的独立应用,快速启动和部署。
嵌入式消息队列:在某些 Java 应用程序中,可能需要将消息队列功能直接嵌入到应用程序内部,ActiveMQ 提供了这样的能力。
3.消息队列使用场景有哪些?
- 解耦:可以在多个系统之间进行解耦,将原本通过网络之间的调用的方式改为使用MQ进行消息的异步通讯,只要该操作不是需要同步的,就可以改为使用MQ进行不同系统之间的联系,这样项目之间不会存在耦合,系统之间不会产生太大的影响,就算一个系统挂了,也只是消息挤压在MQ里面没人进行消费而已,不会对其他的系统产生影响。
- 异步:加入一个操作设计到好几个步骤,这些步骤之间不需要同步完成,比如客户去创建了一个订单,还要去客户轨迹系统添加一条轨迹、去库存系统更新库存、去客户系统修改客户的状态等等。这样如果这个系统都直接进行调用,那么将会产生大量的时间,这样对于客户是无法接收的;并且像添加客户轨迹这种操作是不需要去同步操作的,如果使用MQ将客户创建订单时,将后面的轨迹、库存、状态等信息的更新全都放到MQ里面然后去异步操作,这样就可加快系统的访问速度,提供更好的客户体验。
- 削峰:一个系统访问流量有高峰时期,也有低峰时期,比如说,中午整点有一个抢购活动等等。比如系统平时流量并不高,一秒钟只有100多个并发请求,系统处理没有任何压力,一切风平浪静,到了某个抢购活动时间,系统并发访问了剧增,比如达到了每秒5000个并发请求,而我们的系统每秒只能处理2000个请求,那么由于流量太大,我们的系统、数据库可能就会崩溃。这时如果使用MQ进行流量削峰,将用户的大量消息直接放到MQ里面,然后我们的系统去按自己的最大消费能力去消费这些消息,就可以保证系统的稳定,只是可能要跟进业务逻辑,给用户返回特定页面或者稍后通过其他方式通知其结果
4.消息重复消费怎么解决?
生产端为了保证消息发送成功,可能会重复推送(直到收到成功ACK),会产生重复消息。但是一个成熟的MQ Server框架一般会想办法解决,避免存储重复消息(比如:空间换时间,存储已处理过的message_id),给生产端提供一个幂等性的发送消息接口。
但是消费端却无法根本解决这个问题,在高并发标准要求下,拉取消息+业务处理+提交消费位移需要做事务处理,另外消费端服务可能宕机,很可能会拉取到重复消息。
所以,只能业务端自己做控制,对于已经消费成功的消息,本地数据库表或Redis缓存业务标识,每次处理前先进行校验,保证幂等。
5.消息丢失怎么解决的?
使用一个消息队列,其实就分为三大块:生产者、中间件、消费者,所以要保证消息就是保证三个环节都不能丢失数据。
- 消息生产阶段:生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。从消息被生产出来,然后提交给 MQ 的过程中,只要能正常收到 ( MQ 中间件) 的 ack 确认响应,就表示发送成功,所以只要处理好返回值和异常,如果返回异常则进行消息重发,那么这个阶段是不会出现消息丢失的。
- 消息存储阶段:Kafka 在使用时是部署一个集群,生产者在发布消息时,队列中间件通常会写「多个节点」,也就是有多个副本,这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失。
- 消息消费阶段:消费者接收消息+消息处理之后,才回复 ack 的话,那么消息阶段的消息不会丢失。不能收到消息就回 ack,否则可能消息处理中途挂掉了,消息就丢失了。
6.消息队列的可靠性、顺序性怎么保证?
消息可靠性可以通过下面这些方式来保证
- 消息持久化:确保消息队列能够持久化消息是非常关键的。在系统崩溃、重启或者网络故障等情况下,未处理的消息不应丢失。例如,像 RabbitMQ 可以通过配置将消息持久化到磁盘,通过将队列和消息都设置为持久化的方式(设置
durable = true),这样在服务器重启后,消息依然可以被重新读取和处理。 - 消息确认机制:消费者在成功处理消息后,应该向消息队列发送确认(acknowledgment)。消息队列只有收到确认后,才会将消息从队列中移除。如果没有收到确认,消息队列可能会在一定时间后重新发送消息给其他消费者或者再次发送给同一个消费者。以 Kafka 为例,消费者通过
commitSync或者commitAsync方法来提交偏移量(offset),从而确认消息的消费。 - 消息重试策略:当消费者处理消息失败时,需要有合理的重试策略。可以设置重试次数和重试间隔时间。例如,在第一次处理失败后,等待一段时间(如 5 秒)后进行第二次重试,如果重试多次(如 3 次)后仍然失败,可以将消息发送到死信队列,以便后续人工排查或者采取其他特殊处理。
消息顺序性保证的方式如下:
- 有序消息处理场景识别:首先需要明确业务场景中哪些消息是需要保证顺序的。例如,在金融交易系统中,对于同用户的转账操作顺序是不能打乱的。对于需要顺序处理的消息,要确保消息队列和消费者能够按照特定的顺序进行处理。
- 消息队列对顺序性的支持:部分消息队列本身提供了顺序性保证的功能。比如 Kafka 可以通过将消息划分到同一个分区(Partition)来保证消息在分区内是有序的,消费者按照分区顺序读取消息就可以保证消息顺序。但这也可能会限制消息的并行处理程度,需要在顺序性和吞吐量之间进行权衡。
- 消费者顺序处理策略:消费者在处理顺序消息时,应该避免并发处理可能导致顺序打乱的情况。例如,可以通过单线程或者使用线程池并对顺序消息进行串行化处理等方式,确保消息按照正确的顺序被消费。
7.如何保证幂等写?
幂等性是指 同一操作的多次执行对系统状态的影响与一次执行结果一致。例如,支付接口若因网络重试被多次调用,最终应确保仅扣款一次。实现幂等写的核心方案:
- 唯一标识(幂等键):客户端为每个请求生成全局唯一ID(如 UUID、业务主键),服务端校验该ID是否已处理,适用场景接口调用、消息消费等。
- 数据库事务 + 乐观锁:通过版本号或状态字段控制并发更新,确保多次更新等同于单次操作,适用场景数据库记录更新(如余额扣减、订单状态变更)。
- 数据库唯一约束:利用数据库唯一索引防止重复数据写入,适用场景数据插入场景(如订单创建)。
- 分布式锁:通过锁机制保证同一时刻仅有一个请求执行关键操作,适用场景高并发下的资源抢夺(如秒杀)。
- 消息去重:消息队列生产者为每条消息生成唯一的消息 ID,消费者在处理消息前,先检查该消息 ID 是否已经处理过,如果已经处理过则丢弃该消息。
8.如何处理消息队列的消息积压问题?
消息积压是因为生产者的生产速度,大于消费者的消费速度。遇到消息积压问题时,我们需要先排查,是不是有bug产生了。
如果不是bug,我们可以优化一下消费的逻辑,比如之前是一条一条消息消费处理的话,我们可以确认是不是可以优为批量处理消息。如果还是慢,我们可以考虑水平扩容,增加Topic的队列数,和消费组机器的数量,提升整体消费能力。
如果是bug导致几百万消息持续积压几小时。有如何处理呢?需要解决bug,临时紧急扩容,大概思路如下:
- 先修复consumer消费者的问题,以确保其恢复消费速度,然后将现有consumer 都停掉。
- 新建一个 topic,partition 是原来的 10 倍,临时建立好原先10倍的queue 数量。
- 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
- 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
- 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
9.如何保证数据一致性,事务消息如何实现?
一条普通的MQ消息,从产生到被消费,大概流程如下:
- 生产者产生消息,发送带MQ服务器
- MQ收到消息后,将消息持久化到存储系统。
- MQ服务器返回ACk到生产者。
- MQ服务器把消息push给消费者
- 消费者消费完消息,响应ACK
- MQ服务器收到ACK,认为消息消费成功,即在存储中删除消息。
我们举个下订单的例子吧。订单系统创建完订单后,再发送消息给下游系统。如果订单创建成功,然后消息没有成功发送出去,下游系统就无法感知这个事情,出导致数据不一致。
如何保证数据一致性呢?可以使用事务消息。一起来看下事务消息是如何实现的吧。
- 生产者产生消息,发送一条半事务消息到MQ服务器
- MQ收到消息后,将消息持久化到存储系统,这条消息的状态是待发送状态。
- MQ服务器返回ACK确认到生产者,此时MQ不会触发消息推送事件
- 生产者执行本地事务
- 如果本地事务执行成功,即commit执行结果到MQ服务器;如果执行失败,发送rollback。
- 如果是正常的commit,MQ服务器更新消息状态为可发送;如果是rollback,即删除消息。
- 如果消息状态更新为可发送,则MQ服务器会push消息给消费者。消费者消费完就回ACK。
- 如果MQ服务器长时间没有收到生产者的commit或者rollback,它会反查生产者,然后根据查询到的结果执行最终状态。
10.消息队列是参考哪种设计模式?
是参考了观察者模式和发布订阅模式,两种设计模式思路是一样的,举个生活例子:
- 观察者模式:某公司给自己员工发月饼发粽子,是由公司的行政部门发送的,这件事不适合交给第三方,原因是“公司”和“员工”是一个整体
- 发布-订阅模式:某公司要给其他人发各种快递,因为“公司”和“其他人”是独立的,其唯一的桥梁是“快递”,所以这件事适合交给第三方快递公司解决
上述过程中,如果公司自己去管理快递的配送,那公司就会变成一个快递公司,业务繁杂难以管理,影响公司自身的主营业务,因此使用何种模式需要考虑什么情况两者是需要耦合的
观察者模式
观察者模式实际上就是一个一对多的关系,在观察者模式中存在一个主题和多个观察者,主题也是被观察者,当我们主题发布消息时,会通知各个观察者,观察者将会收到最新消息,图解如下:每个观察者首先订阅主题,订阅成功后当主题发送消息时会循环整个观察者列表,逐一发送消息通知。
发布订阅模式
发布订阅模式和观察者模式的区别就是发布者和订阅者完全解耦,通过中间的发布订阅中心进行消息通知,发布者并不知道自己发布的消息会通知给谁,因此发布订阅模式有三个重要角色,发布者->发布订阅中心->订阅者。
图解如下:当发布者发布消息到发布订阅中心后,发布订阅中心会将消息通知给所有订阅该发布者的订阅者
11.让你写一个消息队列,该如何进行架构设计?
- 首先是消息队列的整体流程,producer发送消息给broker,broker存储好,broker再发送给consumer消费,consumer回复消费确认等。
- producer发送消息给broker,broker发消息给consumer消费,那就需要两次RPC了,RPC如何设计呢?可以参考开源框架Dubbo,你可以说说服务发现、序列化协议等等
- broker考虑如何持久化呢,是放文件系统还是数据库呢,会不会消息堆积呢,消息堆积如何处理呢。
- 消费关系如何保存呢?点对点还是广播方式呢?广播关系又是如何维护呢?zk还是config server
- 消息可靠性如何保证呢?如果消息重复了,如何幂等处理呢?
- 消息队列的高可用如何设计呢?可以参考Kafka的高可用保障机制。多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。
- 消息事务特性,与本地业务同个事务,本地消息落库;消息投递到服务端,本地才删除;定时任务扫描本地消息库,补偿发送。
- MQ得伸缩性和可扩展性,如果消息积压或者资源不够时,如何支持快速扩容,提高吞吐?可以参照一下 Kafka 的设计理念,broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给 topic 增加 partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了。
RocketMQ
1.消息队列为什么选择RocketMQ的?
项目用的是 RocketMQ 消息队列。选择RocketMQ的原因是:
- 开发语言优势。RocketMQ 使用 Java 语言开发,比起使用 Erlang 开发的 RabbitMQ 来说,有着更容易上手的阅读体验和受众。在遇到 RocketMQ 较为底层的问题时,大部分熟悉 Java 的同学都可以深入阅读其源码,分析、排查问题。
- 社区氛围活跃。RocketMQ 是阿里巴巴开源且内部在大量使用的消息队列,说明 RocketMQ 是的确经得起残酷的生产环境考验的,并且能够针对线上环境复杂的需求场景提供相应的解决方案。
- 特性丰富。根据 RocketMQ 官方文档的列举,其高级特性达到了
12 种,例如顺序消息、事务消息、消息过滤、定时消息等。顺序消息、事务消息、消息过滤、定时消息。RocketMQ 丰富的特性,能够为我们在复杂的业务场景下尽可能多地提供思路及解决方案。
2.RocketMQ和Kafka的区别是什么?如何做技术选型?
Kafka的优缺点:
- 优点:首先,Kafka的最大优势就在于它的高吞吐量,在普通机器4CPU8G的配置下,一台机器可以抗住十几万的QPS,这一点还是相当优越的。Kafka支持集群部署,如果部分机器宕机不可用,则不影响Kafka的正常使用。
- 缺点:Kafka有可能会造成数据丢失,因为它在收到消息的时候,并不是直接写到物理磁盘的,而是先写入到磁盘缓冲区里面的。Kafka功能比较的单一 主要的就是支持收发消息,高级功能基本没有,就会造成适用场景受限。
RocketMQ是阿里巴巴开源的消息中间件,优缺点
- 优点:支持功能比较多,比如延迟队列、消息事务等等,吞吐量也高,单机吞吐量达到 10 万级,支持大规模集群部署,线性扩展方便,Java语言开发,满足了国内绝大部分公司技术栈
- 缺点:性能相比 kafka 是弱一点,因为 kafka 用到了 sendfile 的零拷贝技术,而 RocketMQ 主要是用 mmap+write 来实现零拷贝。
该怎么选择呢?
- 如果我们业务只是收发消息这种单一类型的需求,而且可以允许小部分数据丢失的可能性,但是又要求极高的吞吐量和高性能的话,就直接选Kafka就行了,就好比我们公司想要收集和传输用户行为日志以及其他相关日志的处理,就选用的Kafka中间件。
- 如果公司的需要通过 mq 来实现一些业务需求,比如延迟队列、消息事务等,公司技术栈主要是Java语言的话,就直接一步到位选择RocketMQ,这样会省很多事情。
3.RocketMQ延时消息的底层原理
总体的原理示意图,如下所示:
broker 在接收到延时消息的时候,会将延时消息存入到延时Topic的队列中,然后ScheduleMessageService中,每个 queue 对应的定时任务会不停地被执行,检查 queue 中哪些消息已到设定时间,然后转发到消息的原始Topic,这些消息就会被各自的 producer 消费了。
也可以使用这个思路用redis的stream流来实现延时消息
4.RocektMQ怎么处理分布式事务?
RocketMQ是一种最终一致性的分布式事务,就是说它保证的是消息最终一致性,而不是像2PC、3PC、TCC那样强一致分布式事务
假设 A 给 B 转 100块钱,同时它们不是同一个服务上,现在目标是就是 A 减100块钱,B 加100块钱。
实际情况可能有四种:
- 1)就是A账户减100 (成功),B账户加100 (成功)
- 2)就是A账户减100(失败),B账户加100 (失败)
- 3)就是A账户减100(成功),B账户加100 (失败)
- 4)就是A账户减100 (失败),B账户加100 (成功)
这里 第1和第2 种情况是能够保证事务的一致性的,但是 第3和第4 是无法保证事务的一致性的。
那我们来看下RocketMQ是如何来保证事务的一致性的。
分布式事务的流程如上图:
- 1、A服务先发送个Half Message(是指暂不能被Consumer消费的消息。Producer 已经把消息成功发送到了Broker 端,但此消息被标记为暂不能投递状态,处于该种状态下的消息称为半消息。需要 Producer对消息的二次确认后,Consumer才能去消费它)给Brock端,消息中携带 B服务 即将要+100元的信息。
- 2、当A服务知道Half Message发送成功后,那么开始第3步执行本地事务。
- 3、执行本地事务(会有三种情况1、执行成功。2、执行失败。3、网络等原因导致没有响应)
- 4.1)、如果本地事务成功,那么Product像Brock服务器发送Commit,这样B服务就可以消费该message。
- 4.2)、如果本地事务失败,那么Product像Brock服务器发送Rollback,那么就会直接删除上面这条半消息。
- 4.3)、如果因为网络等原因迟迟没有返回失败还是成功,那么会执行RocketMQ的回调接口,来进行事务的回查。
从上面流程可以得知 只有A服务本地事务执行成功 ,B服务才能消费该message。
那么 A账户减100 (成功),B账户加100 (失败),这时候B服务失败怎么办?
如果B最终执行失败,几乎可以断定就是代码有问题所以才引起的异常,因为消费端RocketMQ有重试机制,如果不是代码问题一般重试几次就能成功。
如果是代码的原因引起多次重试失败后,也没有关系,将该异常记录下来,由人工处理,人工兜底处理后,就可以让事务达到最终的一致性。
5.RocketMQ消息顺序怎么保证?
消息的有序性是指消息的消费顺序能够严格保存与消息的发送顺序一致。例如,一个订单产生了3条消息,分别是订单创建、订单付款和订单完成。在消息消费时,同一条订单要严格按照这个顺序进行消费,否则业务会发生混乱。同时,不同订单之间的消息又是可以并发消费的,比如可以先执行第三个订单的付款,再执行第二个订单的创建。
RocketMQ采用了局部顺序一致性的机制,实现了单个队列中的消息严格有序。也就是说,如果想要保证顺序消费,必须将一组消息发送到同一个队列中,然后再由消费者进行注意消费。
RocketMQ推荐的顺序消费解决方案是:安装业务划分不同的队列,然后将需要顺序消费的消息发往同一队列中即可,不同业务之间的消息仍采用并发消费。这种方式在满足顺序消费的同时提高了消息的处理速度,在一定程度上避免了消息堆积问题
RocketMQ 顺序消息的原理是:
- 在 Producer(生产者) 把一批需要保证顺序的消息发送到同一个 MessageQueue
- Consumer(消费者) 则通过加锁的机制来保证消息消费的顺序性,Broker 端通过对 MessageQueue 进行加锁,保证同一个 MessageQueue 只能被同一个 Consumer 进行消费。
6.RocketMQ怎么保证消息不被重复消费
在业务逻辑中实现幂等性,确保即使消息被重复消费,也不会影响业务状态。例如,对于支付或转账类操作,可以使用唯一订单号或事务ID作为幂等性的标识符,确保同样的操作只会被执行一次。
消息投递时,网络中断或消费失败重试可能会导致 重复消费
消息投递给消费者后,消费者处理异常 或返回失败,会被 RocketMQ 重新投递
- 数据库表加“唯一约束 + 去重表”【最常用】
- 使用 Redis 实现幂等控制
- 利用 RocketMQ 提供的
msg.getKeys()做幂等键
7.RocketMQ消息积压了,怎么办?
导致消息积压突然增加,最粗粒度的原因,只有两种:要么是发送变快了,要么是消费变慢了。
要解决积压的问题,可以通过扩容消费端的实例数来提升总体的消费能力。
如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。
8.什么是零拷贝
传统的数据传输流程中,用户数据通常会经过如下多次拷贝:
1 | 硬盘 → 内核缓冲区 → 用户态 → Socket 缓冲区 → 网卡 |
一般来说文件拷贝是要拷贝四次的,
当用户进程调用read(),用户态无法调用内核态的设备,只能触发系统调用(IO)。这时计算机需要从用户态切换为内核态。
到达内核态之后,计算机通过DMA控制器将数据从磁盘读取出来,放到内核的缓冲区。完成第一次拷贝。
CPU需要将缓冲区的数据拷贝到用户态的缓冲区,完成第二次拷贝,也是read()函数的返回。这时计算器需要从内核态切换为用户态。
因为最终的数据需要通过网卡输出,所以用户进程就需要调用write()函数,CPU将用户缓冲区的数据拷贝到Socket缓冲区,完成第三次拷贝。同时需要再次触发系统调用。这时计算机又需要从用户态切换为内核态。
DMA控制器把数据从Socket缓冲区,拷贝到网卡设备输出,至此完成第四不拷贝。同时需要将内核态切换为用户态,write()函数返回。
而“零拷贝”技术通过内核优化和 API 支持,能避免数据在用户态与内核态间的多次拷贝,从而提升性能。常用技术:
| 技术 | 说明 |
|---|---|
mmap |
将文件映射到内存地址空间,避免文件拷贝 |
sendfile |
直接将文件从磁盘发送到 Socket,避免数据进入用户态 |
writev |
批量写入多个内存区域,减少系统调用 |
DirectByteBuffer(Java NIO) |
Java 堆外内存,提高 I/O 性能 |
RocketMQ使用零拷贝的场景:
MMAP:
RocketMQ 使用 顺序写入磁盘 + MappedByteBuffer(mmap)机制】
mmap将用户空间的虚拟地址和内核空间的文件缓冲区映射到同一块物理内存区域。 这样, 用户进程可以直接访问内核空间的文件缓冲区, 避免了 CPU 拷贝。
- CommitLog 文件通过
mmap映射为内存地址空间,写消息时直接写入这段地址 - 消息写完之后由 刷盘线程 flush 到磁盘(异步或同步)
优点:避免了传统写文件的 内核缓冲区 → 用户缓冲区 → 文件系统缓存 的多次复制。
消息消费(拉取时)
RocketMQ 使用 零拷贝 + SendFile 技术 实现高效消息下发:
- 消费者从 Broker 拉取消息时,Broker 会读取 CommitLog 中的内容
- 若消息在 OS PageCache 中,可直接使用
FileChannel.transferTo(即 sendfile)将消息直接写入 socket 输出流
相比传统读入用户空间再写出,sendfile 直接 在内核中完成数据搬运,性能极高。
ConsumeQueue 与 IndexFile
RocketMQ 的 ConsumeQueue(消费队列)和 IndexFile(索引文件)同样是基于 mmap 方式读写,提升顺序读性能,避免 GC 干扰。
- ConsumeQueue 中记录了消息的偏移量、大小和 tag hash
- 查询或消费时不需要实际读取 CommitLog 内容,而是通过偏移快速定位
sendfile()
sendfile() 系统调用允许将数据从一个文件描述符 (例如, 文件) 直接传输到另一个文件描述符 (例如, Socket)。 避免了数据在用户空间和内核空间之间的拷贝。
- 用户进程调用
sendfile()系统调用, 指定输入和输出文件描述符。 - 数据通过 DMA 从磁盘读取到内核缓冲区。
- 数据直接从内核缓冲区拷贝到 Socket 缓冲区,或者更优的方式是:只有描述符信息从内核缓冲区拷贝到socket缓冲区。
- 数据通过 DMA 从 Socket 缓冲区传输到网卡。
静态文件服务器(例如 Nginx)通常使用 sendfile() 来将静态文件发送给客户端。只能适用于数据从文件传输到Socket的场景,范围有限
splice() (管道):
splice() 系统调用允许在两个文件描述符之间移动数据,而不需要在用户空间和内核空间之间进行复制。
- 创建两个管道(pipe)对象
- 调用 splice() 系统调用,将数据从输入文件描述符读取到第一个管道.
- 调用 splice() 系统调用,将数据从管道数据写到socket 。
适用于需要数据传输与转换(类似于Linux的管道操作)的场景
Direct I/O:
Direct I/O 允许用户进程绕过内核缓冲区 (Page Cache), 直接访问磁盘。
- 用户进程发起 Direct I/O 请求。
- 数据通过 DMA 直接从磁盘传输到用户进程的缓冲区。
- 需要用户进程自己管理缓存,增加了开发的复杂性。
- 可能影响系统的整体性能, 因为绕过了 Page Cache。 (Page Cache 可以缓存热点数据,提高访问速度)。
大型数据库(例如 Oracle)通常使用 Direct I/O 来进行数据读写, 因为数据库有自己的缓存管理机制。
| 特性 | 优势 |
|---|---|
| mmap | 减少内存复制、提高 I/O 吞吐 |
| sendfile | 内核空间直接完成数据搬运 |
| writev(部分使用) | 多个 buffer 一次写出 |
| 堆外内存使用(DirectByteBuffer) | 降低 GC 压力,提升 I/O 性能 |
9.RocketMQ的Consumer两种消费模式
推模式
实际上,RocketMQ 的推模式底层仍然是基于长轮询(Long Polling)的拉模式来实现的,只是由 RocketMQ SDK 内部管理了拉取消息、维护消费进度(Offset)等复杂逻辑,然后通过回调函数将消息“推”给用户应用。
消费者启动后,向 Broker 注册自己,并订阅感兴趣的 Topic。
RocketMQ SDK 内部会启动一个长轮询线程。它会定期(或在消息到达时)向 Broker 发送拉取消息的请求。
如果 Broker 有消息,就立即返回给消费者;如果没有,Broker 会保持连接一段时间(长轮询),直到有新消息到达或超时。
当 SDK 拉取到消息后,会将其存入本地的消费队列缓存。
然后,SDK 会根据配置的并发度,将缓存中的消息分发给用户注册的消息监听器(MessageListener)进行处理。
用户在消息监听器中完成业务逻辑后,返回消费结果(成功或失败)。
RocketMQ SDK 会根据消费结果自动提交消费进度(Offset)给 Broker,并处理消息重试、死信队列等。
自动重试与死信:内置消息失败重试机制,以及将达到最大重试次数的消息发送到死信队列的功能。
简单易用:用户只需关注业务逻辑,实现一个 MessageListener 接口即可,无需处理消息拉取、偏移量管理、流控等底层细节。
实时性好:由于长轮询机制,消息到达后能被较快地消费。
自动负载均衡:在消费者组模式下,RocketMQ SDK 会自动进行队列的负载均衡,将 Topic 的消息队列分配给组内不同的消费者实例,实现水平扩展。
拉模式
拉模式是一种更原始、更底层的消费模式。它将消息拉取的主动权完全交给用户。消费者需要主动向 Broker 发送请求拉取消息,并手动管理消息的消费进度(Offset)。
消费者启动后,需要自己获取 Topic 下所有消息队列(MessageQueue)的信息。
消费者选择一个或多个消息队列进行拉取。
消费者需要维护每个消息队列的当前消费偏移量 (Offset)。
消费者主动调用 pull() 方法向 Broker 发送拉取请求,指定要拉取的队列、当前偏移量和最大拉取数量。
Broker 返回拉取结果 PullResult,其中包含消息列表、下一个拉取偏移量等。
消费者处理完消息后,需要手动更新并提交新的消费偏移量。
消费者需要自己处理消息拉取的频率(轮询间隔)、批量处理、消息重试等逻辑。
RocketMQ 4.x 引入的 DefaultLitePullConsumer 简化了传统的 DefaultMQPullConsumer 的使用,使其在部分场景下更接近推模式的体验,但本质上仍是拉模式,需要用户主动 poll。
10.RocketMQ的Consumer两种监听方式
一般在push模式下会经常使用到监听。
并发消费是 RocketMQ 默认的也是最常用的消费模式。在这种模式下,消费者可以并发地处理来自同一个队列(MessageQueue)甚至同一个 Topic 的多条消息。RocketMQ 会为每个消息队列分配一个或多个消费线程,或者从线程池中获取线程来处理消息。
RocketMQ 消费者从 Broker 拉取到一批消息。
这些消息会被分发到消费者内部的多个线程中并行处理。
对于同一个消息队列,RocketMQ 可能会同时将多条消息提交给不同的线程进行消费。
不保证严格顺序:对于同一个消息队列内的消息,无法保证其被消费的顺序与发送顺序一致。因为消息被分发到不同的线程并行处理,处理完成的顺序是不确定的。
需要考虑并发问题:如果业务逻辑涉及到共享资源或状态,需要开发者自行处理并发安全问题(例如加锁、使用原子操作等)。
顺序消费模式保证了同一个消息队列(MessageQueue)中的消息,被消费者严格按照发送的顺序进行消费。这意味着在任何时刻,对于一个特定的消息队列,只会有一个线程在处理其中的消息。
RocketMQ 消费者从 Broker 拉取到一批消息。
对于每个消息队列,RocketMQ SDK 会确保只有一个消费线程来处理该队列中的消息。
如果当前消息队列中的某条消息正在被处理,或者处理失败需要重试,那么该队列的后续消息会被阻塞,直到当前消息处理完成并成功提交偏移量。
- 严格保证顺序性:确保了同一消息队列内的消息按照生产顺序被消费,这对于某些业务场景至关重要。
简化并发处理:由于同一队列的消息是单线程处理,开发者无需过多考虑并发安全问题。
吞吐量受限:由于是单线程处理一个消息队列,其消费速度受限于单个线程的处理能力,整体吞吐量会低于并发消费模式。
- 可能出现消息堆积:如果某个消息处理失败并持续重试,或者处理时间过长,会导致该队列的后续消息被阻塞,造成消息堆积。
- 死锁风险:如果消息处理逻辑中存在外部依赖的死锁,可能会导致整个队列的消费停滞。
11.如何顺序的发送消息
使用分区顺序(Partial Order):这是最常用的方式。它保证同一个 ShardingKey (例如订单ID) 关联的所有消息在生产者端按照发送顺序发送到同一个消息队列,并在消费者端也按照这个顺序消费。不同 ShardingKey 的消息则可以并行处理,不保证顺序。这适用于大部分业务场景,例如一个订单的创建、支付、发货等一系列操作。
创建 MessageQueueSelector
arg 就是你在发送消息时传入的业务ShardingKey,例如订单ID,使用 ShardingKey 的哈希值或者模运算来选择队列
需要使用 DefaultMQProducer 的 send(Message msg, MessageQueueSelector selector, Object arg) 方法。
msg: 你要发送的消息对象。selector: 你前面实现的MessageQueueSelector实例。arg: 你的业务ShardingKey,例如订单ID。RocketMQ 会把这个arg传递给MessageQueueSelector的select方法。
1 | import org.apache.rocketmq.client.producer.DefaultMQProducer; |
单一生产者,单一线程:RocketMQ 服务端判定消息顺序性是参照单一生产者、单一线程并发下消息发送的时序。如果多个生产者或多个线程并发发送消息,RocketMQ 只能以到达服务端的时序作为顺序依据,这可能与你业务侧的发送顺序不一致。因此,对于需要严格顺序的场景,最好保证同一个 ShardingKey 的消息由同一个生产者实例的同一个线程发送。
确保 ShardingKey 的稳定性:用于决定消息路由的 ShardingKey(例如订单ID)在整个业务流程中必须保持一致,这样相关的所有消息才能被路由到同一个队列。
消费者端需要确保同一个队列的消息被顺序消费。RocketMQ 的顺序消息消费模式是推模式(Push Consumer),并且默认就提供了顺序消费的保证。
1 | import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
MessageListenerOrderly:这是 RocketMQ 专门为顺序消息设计的监听器。它确保了同一个消息队列中的消息会被一个线程串行地拉取和处理,从而保证了消费顺序。
ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT:如果在消息处理过程中发生异常,返回此状态可以让 RocketMQ 暂停当前队列的消费,并在稍后重试。这可以避免因为一条消息处理失败而导致后续消息无法按序处理的问题。
幂等性:即使 RocketMQ 保证了顺序,但由于网络等原因,消息仍可能被重复投递。因此,你的消费者逻辑必须具备幂等性,即多次处理同一条消息也能得到一致的结果。
Topic 配置
通过 mqadmin 工具更新 Topic:
1 | sh bin/mqadmin updateTopic -c DefaultCluster -t YourOrderTopic -n 127.0.0.1:9876 --order true |
-c: 集群名称-t: Topic 名称-n: NameServer 地址--order true: 标记此 Topic 为有序 Topic
12.RocketMQ的批量消息
使用批量消息的时候,要注意
- 同一批次消息的 Topic 必须相同:这是强制要求,一个批量消息中不能包含不同 Topic 的消息。
- 批量消息的总大小不能超过 1MB:这是 RocketMQ 默认的硬性限制。如果你的批量消息总大小超过 1MB,你需要自行将它们拆分成多个批次进行发送。
- 不支持延迟消息和事务消息:批量消息目前不支持发送延迟消息或事务消息。如果需要这些功能,请使用普通消息或其他消息类型。
- 相同的
waitStoreMsgOK:同一批次消息的waitStoreMsgOK(表示是否等待消息存储成功再返回)属性必须相同。通常情况下,这都是默认值,所以一般不需要特别关注。 - 不保证严格顺序:批量消息通常不保证消息在 Broker 上的存储顺序和消费顺序。如果你需要顺序消息,应该使用上一问中提到的顺序消息特性,并确保同一
ShardingKey的消息发送到同一个队列。即使批量发送,只要你通过MessageQueueSelector确保了同一ShardingKey的消息发送到同一队列,它们在该队列内仍能保持相对顺序。
生成者:
发送批量消息非常简单,只需要将一个 Message 列表作为参数传递给 DefaultMQProducer 的 send 方法即可。
1 | import org.apache.rocketmq.client.producer.DefaultMQProducer; |
消息大小:
如果你的批量消息总大小可能超过 1MB,你需要手动对消息列表进行分割。
实际中可以直接使用 RocketMQ 客户端库中提供的 ListSplitter。
消费者端处理:
消费者端通常不需要为批量消息做特殊处理。无论是单条消息还是批量消息,消费者都会以相同的 MessageExt 列表形式接收到。你只需要像处理普通消息一样遍历 msgs 列表即可。
1 | import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize): 这是消费者端实现批量消费的关键配置。通过设置这个参数,你可以控制消费者每次从 Broker 拉取消息并提交给业务逻辑处理的最大消息数量。默认值是 1,即每次只消费一条消息。增大此值可以提高消费的并行度。
消费幂等性:即使是批量消费,也需要考虑消息的重复投递问题。确保你的业务逻辑在处理批量消息时具备幂等性。
异常处理:如果在批量处理中某条消息处理失败,你可能需要决定是重试整个批次,还是只重试失败的消息并继续处理批次中的其他消息。返回 RECONSUME_LATER 会导致整个批次的消息都被重试。如果业务允许,可以记录失败消息,并返回 SUCCESS 以避免阻塞整个队列。
13.RocketMQ的延时消息
允许你指定消息在发送到 Broker 后,不会立即被消费者消费,而是会延迟一段时间后才投递给消费者。这个功能在许多业务场景中非常有用,比如:
- 订单超时未支付自动取消:用户下单后,如果30分钟内未支付,就发送一个延时消息,30分钟后这个消息被消费,触发订单取消操作。
- 新用户注册奖励延迟发放:用户注册成功后,延时1天发放新人奖励,确保用户体验。
- 任务定时执行:例如,每天凌晨统计前一天的销售数据,可以发送一个延时24小时的消息来触发。
- 消息在指定时间后发送:例如,促销短信在某个特定时间点发送。
RocketMQ 不支持任意精度的时间延时,它预设了18个固定的延时等级(delayLevel)。这些等级是硬编码在 Broker 端的配置中的。
默认的延时等级字符串如下:
1 | 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h |
原理:
发送到内部 Topic:当生产者发送一个延时消息时,Broker 不会将它直接存储到目标 Topic 的队列中。相反,它会将消息存储到一个内部的、名为 SCHEDULE_TOPIC_XXXX 的 Topic 中。
根据 delayTimeLevel 分发:这个内部 Topic 实际上有多个队列,每个队列对应一个延时等级。消息会被路由到对应延时等级的队列中。
定时扫描:Broker 端有一个后台线程(或者多个线程)会定时扫描这些内部延时队列。
达到延时时间后重新投递:当扫描发现某个消息的投递时间已到,它就会被重新存储到原始目标 Topic 的队列中,此时消息才对消费者可见,可以被正常消费。
生产者:
生产者发送延时消息非常简单,只需要在发送消息前设置 delayTimeLevel 属性。
1 | import org.apache.rocketmq.client.producer.DefaultMQProducer; |
不能直接设置延迟等级,需要具体的时间的话,
选择最近的延时等级:例如,对于 15 秒的延时,你可以选择 10 秒等级。在消费端,获取消息的生产时间戳和当前时间戳,如果还没到 15 秒,可以进行二次延时投递(重新发送一个延时消息,直到满足条件)。
业务层二次确认/轮询:发送一个较短延时的消息,在消费者端收到消息后,检查业务条件是否满足。如果不满足,可以重新投递或者通过其他方式(如数据库轮询)进行补偿。
自定义延时消息存储:如果对精度有极高要求,并且预设等级无法满足,可能需要考虑自己实现一个延时消息存储方案(例如,基于 Redis ZSET 或数据库)。
消费者端是正常处理的
额外:
消息积压与延时准确性:如果 Broker 消息积压严重,或者 Broker 负载过高,延时消息的投递时间可能会有一定偏差。RocketMQ 会尽力在指定延时时间后投递,但不能保证毫秒级的精确。
修改延时等级配置:如果你需要自定义延时等级,可以在 Broker 的 broker.conf 配置文件中修改 messageDelayLevel 参数,并重启 Broker。但请注意,修改后会影响所有使用延时消息的 Topic。 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 3h 4h
不适用于定时任务调度:尽管延时消息可以实现类似定时任务的功能,但对于复杂的、需要精确控制和管理(如 Quartz)的定时任务,不推荐完全依赖延时消息。延时消息更适合于与业务流相关的“延迟执行”场景。
14.RocketMQ的过滤消息
RocketMQ 提供了两种主要的过滤消息方式:
- Tag 过滤(Broker 端过滤):这是最常用也是推荐的方式。消息生产者在发送消息时为其设置一个或多个 Tag(标签)。消费者在订阅 Topic 时,可以指定只消费某个或某些 Tag 的消息。这种过滤是在 Broker 端完成的,即 Broker 只会将符合消费者订阅 Tag 的消息推送给消费者,大大减少了网络传输量。
- SQL 92 过滤(Broker 端过滤):这是一种更高级的过滤方式,允许消费者使用 SQL 92 标准的表达式来过滤消息。消息生产者在发送消息时可以设置用户属性(User Property)。消费者可以编写类似 SQL WHERE 子句的表达式,根据这些属性的值进行过滤。这种过滤也是在 Broker 端完成的。
Tag:
生产者在发送消息时,通过 setTags() 方法给消息设置标签。
1 | import org.apache.rocketmq.client.producer.DefaultMQProducer; |
消费者在订阅 Topic 时,可以在 subscribe() 方法中指定 Tag。
1 | import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
SQL 92 过滤
在 Broker 的配置文件 broker.conf 中,需要将 enablePropertyFilter 参数设置为 true:
支持92过滤开启
生产者在发送消息时,通过 putUserProperty() 方法给消息设置自定义属性。
1 | import org.apache.rocketmq.client.producer.DefaultMQProducer; |
消费者端订阅使用 SQL 92 表达式过滤消息
消费者在订阅 Topic 时,使用 MessageSelector.bySql() 方法传入 SQL 92 表达式。
支持的 SQL 92 语法:
- 数值比较:
>,<,>=,<=,BETWEEN,= - 字符比较:
=,<>,IN(支持NOT IN但需要 Broker 版本支持) - 逻辑运算:
AND,OR,NOT IS NULL或者IS NOT NULL: 检查属性是否存在- 字符串常量:
'abc','123' - 数值常量:
123,3.14159 - 布尔常量:
TRUE,FALSE
1 | import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
- 单场景(按类型区分):优先使用 Tag 过滤。它性能更高,配置简单,能满足绝大部分按消息类型过滤的需求。
- 复杂场景(按属性值过滤):当 Tag 无法满足你的过滤需求,需要根据消息的多个属性值进行复杂的逻辑判断时,可以考虑使用 SQL 92 过滤。但需要注意:
- Broker 端支持:确保你的 RocketMQ Broker 版本支持 SQL 92 过滤,并且已经开启了
enablePropertyFilter。 - 性能考量:SQL 92 过滤虽然灵活,但其解析和执行会比 Tag 过滤消耗更多的 Broker 资源。在对性能要求极高的场景下,应谨慎使用或进行压测。
- Broker 端支持:确保你的 RocketMQ Broker 版本支持 SQL 92 过滤,并且已经开启了
15.RocketMQ的事务消息,事务消息的机制了解吗?讲一讲回查机制?
RocketMQ 的事务消息实现了一个两阶段提交(Two-Phase Commit)的简化版本,但它巧妙地规避了传统 XA 事务的性能开销和复杂性,通过引入“半事务消息”和“消息回查”机制来实现最终一致性。
流程:
发送半事务消息(Half Message)
- 生产者向 RocketMQ Broker 发送一条消息,这条消息被标记为“半事务消息”。
- Broker 收到半事务消息后,将其持久化,但并不会立即将它投递给消费者。它会返回一个
ACK给生产者,表示消息已收到。此时,消费者是看不到这条消息的。
执行本地事务
- 生产者收到 Broker 的
ACK后,开始执行自己的本地事务(例如,更新数据库、调用其他内部服务等)。 - 这一步是事务消息的核心:你的业务逻辑会在这里完成,决定事务的最终状态。
提交或回滚半事务消息
- 根据本地事务的执行结果,生产者向 Broker 发送二次确认(Second Confirmation):
- 如果本地事务执行成功,生产者发送
Commit命令。Broker 收到Commit后,会将之前存储的半事务消息标记为“可投递”,并将其真正投递给消费者。 - 如果本地事务执行失败(或需要回滚),生产者发送
Rollback命令。Broker 收到Rollback后,会删除或丢弃之前存储的半事务消息,消费者永远不会收到这条消息。
- 如果本地事务执行成功,生产者发送
消息回查(Transaction Message Check)
- 这是 RocketMQ 事务消息的“杀手锏”,用于处理网络异常、生产者宕机等极端情况。
- 如果在步骤3中,生产者发送的二次确认(
Commit或Rollback)因为网络问题丢失,或者生产者在执行本地事务后宕机,导致 Broker 长时间没有收到二次确认,那么 Broker 会主动向生产者发起消息回查请求。 - Broker 会询问生产者:
嘿,这条半事务消息的本地事务状态到底是什么?是成功了还是失败了? - 生产者需要实现一个事务回查监听器 (
TransactionListener),当收到回查请求时,它会查询本地事务的最终状态(例如,查询数据库中相关订单的状态),并根据查询结果再次向 Broker 返回Commit或Rollback。 - Broker 收到回查结果后,再次执行步骤3的逻辑(标记为可投递或删除)。
消息回查流程:
- Broker 监控半事务消息:Broker 内部有一个定时任务,会不断扫描那些长时间(可配置,例如默认1分钟)处于“半事务”状态的消息。
- 发起回查:当发现有“超时”的半事务消息时,Broker 会向原始生产者组(Producer Group)中的任意一个存活的生产者实例发送回查请求。
- 生产者执行
checkLocalTransaction:生产者接收到回查请求后,会调用其实现的TransactionListener接口中的checkLocalTransaction()方法。- 在这个方法里,生产者需要根据消息的唯一标识(通常是
MsgId或者业务Key)去查询本地事务的真实状态。 - 例如,如果你的业务是“下单后发消息”,那么在这个回查方法里,你就需要根据消息中的订单ID去查询订单表,看看订单状态是否是“已支付”。
- 根据查询结果,
checkLocalTransaction()方法会返回三种状态:LocalTransactionState.COMMIT_MESSAGE:表示本地事务已经成功,Broker 可以将半事务消息投递给消费者。LocalTransactionState.ROLLBACK_MESSAGE:表示本地事务已经失败或需要回滚,Broker 会删除半事务消息。LocalTransactionState.UNKNOW:表示当前无法确定本地事务状态(例如,查询超时、数据库暂时不可用)。此时,RocketMQ 会过一段时间再次发起回查,直到获取明确的状态。
- 在这个方法里,生产者需要根据消息的唯一标识(通常是
- Broker 处理回查结果:Broker 根据生产者返回的状态,决定将半事务消息
Commit或Rollback。
回查机制的优点:
- 保证最终一致性:即使在极端情况下(网络闪断、生产者宕机),也能通过回查机制最终确定消息的去向,保证业务数据与消息状态的一致性。
- 避免资源浪费:如果本地事务失败,消息就不会被发送出去,避免了消费者收到无效消息,从而减少了不必要的消费处理。
- 低耦合:生产者和消费者之间不需要直接依赖本地事务状态,而是通过 Broker 进行协调。
16.Rocketmq的高可用
主要依赖于其 Broker 集群架构以及 Dledger 组件(在 RocketMQ 4.5.0 之后引入,并逐步推广)。RocketMQ 高可用的核心目标在于,即使部分 Broker 节点发生故障,消息仍然能够可靠地被持久化、生产和消费,保障业务的连续性。
架构:
- NameServer: NameServer 是一个轻量级的服务发现和路由中心,用于维护 Broker 的路由信息。NameServer 通常部署为一个集群,保证高可用性。即使部分 NameServer 宕机,客户端仍然可以从其他 NameServer 获取路由信息。
- Broker: Broker 是 RocketMQ 的消息存储和转发节点。为了实现高可用,Broker 通常部署为一个集群。Broker 集群中的每个 Broker 节点都有一个角色,即 Master 和 Slave。
- Master: Master Broker 负责接收客户端的消息写入请求,并将消息存储到本地磁盘。
- Slave: Slave Broker 负责从 Master Broker 复制消息数据,以备 Master 宕机时接管服务。
会出现的问题:
- 数据一致性问题: 在异步复制模式下,如果 Master 宕机,可能会丢失部分尚未同步到 Slave 的消息。
- 切换延迟: 主备切换需要一定的时间,在切换期间服务不可用。
- 脑裂问题: 在极端情况下,Master 和 Slave 之间可能会出现网络隔离,导致脑裂,数据不一致。
Dledger 方案:
它基于 Raft 一致性算法,提供更强的一致性和更高的可用性,替代了之前的普通主从复制模式。
- Raft 协议: Dledger 使用 Raft 协议在多个 Broker 节点之间选举出一个 Leader,只有 Leader 节点才能处理客户端的写入请求。 所有的数据变更都必须经过 Leader 的批准,并且复制到 Followers 节点上,确保数据一致性。
- 高可用架构: Dledger 使得 RocketMQ 可以构建一个高可用的 Broker 集群,其中 Leader 节点负责处理读写请求,Follower 节点负责备份数据。 如果 Leader 节点发生故障,Raft 协议会自动选举出一个新的 Leader,保证服务的连续性。
- 自动故障转移: 当 Leader 节点宕机时,Raft 协议会自动选举出一个新的 Leader,无需人工干预。 客户端会自动重连到新的 Leader 节点,继续进行消息的生产和消费。
- 数据一致性保证: Raft 协议保证了所有节点上的数据一致性。 即使在发生故障转移的情况下,也不会出现数据丢失或数据不一致的问题。
配置要点:
- NameServer 集群: 部署多个 NameServer 实例,组成 NameServer 集群,并确保客户端配置了所有 NameServer 的地址。
- Broker 集群: 部署多个 Broker 节点,并配置 Master 和 Slave 关系(在 Dledger 模式下配置 Dledger 集群)。
- 同步刷盘(SyncFlush): 启用同步刷盘机制,确保消息被可靠地写入磁盘。这会牺牲一定的性能,但可以提高数据的可靠性。
- 同步复制(SyncMaster): 对于传统 Master-Slave 架构,启用同步复制模式,确保消息被复制到 Slave 节点。 (Dledger模式下,数据同步由Dledger组件保证)
- 监控和告警: 实施全面的监控和告警机制,以便及时发现和处理故障。
17.RocketMQ的消息可靠性
生产者:
使用Rocketmq自带的事务消息,
事务消息原理:首先生产者会发送一个half 消息(对原始消息的封装),该消息对消费者不可见,MQ 通过 ACK 机制返回消息接受状态, 生产者执行本地事务并且返回给 MQ 一个状态(Commit、RollBack 等),如果是 Commit 的话 MQ 就会把消息给到下游, RollBack 的话就会丢弃该消息,状态如果为 UnKnow 的话会过一段时间回查本地事务状态,默认回查 15 次,一直是 UnKnow 状态的话就会丢弃此消息。
为什么先发一个 half 消息,作用就是先判断下 MQ 有没有问题,服务正不正常。
持久化:MQ 收到消息后写入硬盘如何保证不丢失?
数据存盘绕过缓存,改为同步刷盘,这一步需要修改 Broker 的配置文件,将 flushDiskType 改为 SYNC_FLUSH 同步刷盘策略,默认的是 ASYNC_FLUSH 异步刷盘,一旦同步刷盘返回成功,那么就一定保证消息已经持久化到磁盘中了。
消息写入硬盘后,硬盘坏了如何保证不丢失?
为了保证磁盘损坏导致丢失数据,RocketMQ 采用主从机构,集群部署,Leader 中的数据在多个 Follower 中都存有备份,防止单点故障导致数据丢失。
Master 节点挂了怎么办?Master 节点挂了之后 DLedger 登场
- 接管 MQ 的 commitLog
- 选举从节点
- 文件复制 uncommited 状态 多半从节点收到之后改为 commited
消费者消费 MQ 如何保证不丢失?
- 如果是网络问题导致的消费失败可以进行重试机制,默认每条消息重试 16 次
- 多线程异步消费失败,MQ 认为已经消费成功但是实际上对于业务逻辑来说消息是没有落地的,解决方案就是按照 mq 官方推荐的先执行本地事务再返回成功状态。
整个 MQ 节点挂了如何保证不丢失?
这种极端情况可以消息发送失败之后先存入本地,例如放到缓存中,另外启动一个线程扫描缓存的消息去重试发送。
18.RocketMQ为什么这么快,可以借鉴哪些地方
1.顺序写入磁盘
RocketMQ 的速度很大程度上得益于它采用了顺序写磁盘的策略。 磁盘的顺序写性能比随机写高几个数量级。 消息会被顺序地追加到 CommitLog 文件中,这使得写入速度非常快。
磁盘寻道时间是随机写的主要瓶颈,顺序写避免大量的磁头寻道操作。
2.PageCache 高效利用
RocketMQ 充分利用了操作系统的 PageCache。 Broker 会优先从 PageCache 中读取数据,减少了对磁盘的直接访问,提高了读取性能。
Broker 从 CommitLog 中读取消息并发送给 Consumer 时,如果 CommitLog 中的数据已经在 PageCache 中,则可以直接从内存中读取,无需进行物理磁盘 IO。 即使数据不在 PageCache 中,由于顺序读取的特性,也可以通过预读机制将数据加载到 PageCache 中。
3.零拷贝技术
RocketMQ通过零拷贝技术,例如sendfile, 减少了数据在内核空间和用户空间之间的复制,从而提升了消息的传输效率。消息可以直接从磁盘发送到网络接口, 减少了 CPU 的开销。
传统的 IO 操作需要多次数据拷贝。 例如, 通常需要经过 磁盘 -> 内核缓冲区 -> 用户缓冲区 -> Socket缓冲区 -> 网络 这样的路径。 零拷贝技术可以减少甚至避免这些拷贝。
避免在用户态和内核态的多次拷贝
4.避免随机读
RocketMQ 的消费模型设计避免了大量的随机读。 消息是按照 Offset 顺序消费的, 这样可以充分利用磁盘的顺序读性能。
5.轻量的消息结构
RocketMQ 的消息结构设计得非常轻量级, 只包含必要的元数据和消息体。 这减少了序列化和反序列化的开销, 提高了消息处理的速度。
轻量级的消息结构如何保证消息的可靠性? 通过 Broker 端的持久化机制和 Consumer 的 ACK 机制来保证。
6.高效的网络通信
RocketMQ 基于 Netty 框架构建, 采用了 Reactor 模式, 使用异步非阻塞 IO。 这使得 Broker 可以处理大量的并发连接, 提高了系统的吞吐量
7.尽可能无锁化操作
RocketMQ 在设计上尽量避免使用锁, 使用 CAS (Compare and Swap) 操作等无锁技术, 减少了线程上下文切换的开销, 提高了并发性能
19.RocketMQ 的存储机制
RocketMQ 的存储采用一种混合型的存储结构,既有类似日志结构的顺序写 CommitLog,又有用于快速索引的 ConsumeQueue。 这种设计使得 RocketMQ 既能保证写入的高吞吐量,又能兼顾消费的效率。
CommitLog (消息存储)
顺序写(Sequential Write) 是 CommitLog 最重要的特性。 消息按照到达 Broker 的顺序,依次追加到 CommitLog 文件末尾。 这使得写入速度非常快, 能够应对高并发的写入场景。
- CommitLog 由多个 CommitLog 文件组成, 每个文件大小固定 (默认1GB)。
- 当一个文件写满后,会自动创建新的文件进行写入。
- 文件名以偏移量命名,方便查找。
CommitLog 是 RocketMQ 存储消息的核心文件。 所有的消息都以追加写的方式写入 CommitLog, 保证了写入的高吞吐量。 CommitLog 文件是顺序写的, 这也是 RocketMQ 能够应对高并发写入的关键原因。
顺序写如何保证? Broker 接收到 Producer 发送的消息后,直接将消息追加到 CommitLog 文件的末尾, 没有随机 IO 操作。
高并发的处理策略:
- 批量写入: Broker 可以将多个消息批量写入 CommitLog, 减少 IO 次数。
- PageCache: 利用操作系统的 PageCache, 将数据缓存在内存中, 减少直接的磁盘 IO。
- 异步刷盘: 可以配置成异步刷盘, 不必每次写入都进行磁盘同步,进一步提升写入性能。 但是, 也需要注意数据可靠性的权衡。
ConsumeQueue (消息索引)
存储的是消息在 CommitLog 中的 offset (物理偏移量)、消息长度、Message Tag 的 hashcode。
相对于 CommitLog 来说, ConsumeQueue 更多的是随机读。 (虽然也会顺序追加新的索引,但消费时会根据指定的 queueId 和 offset 查找对应的索引项)
- ConsumeQueue 也是由多个文件组成,每个文件大小固定。
- 每个 ConsumeQueue 文件对应一个 Topic 下的某个 QueueId。
- 文件名以偏移量命名,方便查找。
ConsumeQueue 相当于是 CommitLog 的索引文件。 它存储了消息在 CommitLog 中的位置信息, 使得 Consumer 可以快速地定位到消息, 提高了消费效率。 没有ConsumeQueue的话, 消费消息就需要扫描整个 CommitLog。
- 加速消费: Consumer 可以根据 Topic 和 QueueId, 从 ConsumeQueue 中找到消息在 CommitLog 中的位置 (offset), 然后直接从 CommitLog 中读取消息。 避免了扫描整个 CommitLog 文件。
- 过滤消息: Consumer 可以根据 Message Tag 进行消息过滤。 ConsumeQueue 中存储了 Tag 的 hashcode, Consumer 可以先在 ConsumeQueue 中进行 Tag 过滤, 减少不必要的消息读取。
联系:
ConsumeQueue 是根据 CommitLog 异步生成的。 Broker 会启动一个后台线程, 定期扫描 CommitLog, 并将消息的索引信息提取出来, 写入 ConsumeQueue
对应关系: 一个 CommitLog 文件对应多个 ConsumeQueue 文件 (每个 Topic 的每个 QueueId 对应一个 ConsumeQueue 文件)。
- CommitLog 恢复: 扫描 CommitLog 文件, 找到最后一个有效的消息 Offset。
- ConsumeQueue 恢复: 根据 CommitLog 中最后一个有效的消息 Offset, 重新构建 ConsumeQueue。 如果 ConsumeQueue 已经存在, 则需要进行校验和修复。
20.RocketMQ中Broker的刷盘策略有哪些
提供了两种刷盘策略:
- 同步刷盘 (SYNC_FLUSH)
- 异步刷盘 (ASYNC_FLUSH)
同步刷盘:
同步刷盘指的是消息写入 CommitLog 后, 必须等待刷盘完成后,才返回 Producer 写入成功。 这种方式数据可靠性最高, 但是性能较低。 适用于对数据可靠性要求极高的场景。
通过 FileChannel.force() 方法强制将 PageCache 中的数据刷到磁盘。
异步刷盘:
异步刷盘指的是消息写入 CommitLog 后,立即返回 Producer 写入成功, 不需要等待刷盘完成。 这种方式性能较高,但是数据可靠性相对较低。 适用于对性能要求较高, 可以容忍少量消息丢失的场景。
- 定时刷盘: 定时将 PageCache 中的数据刷到磁盘。
- 积累一定消息后刷盘: 当 PageCache 中积累的消息达到一定数量时, 将数据刷到磁盘。
- OS 调度刷盘: 完全由操作系统来决定何时将 PageCache 中的数据刷到磁盘。
21.RocketMO中的Broker部署方式
1.单节点
2.多节点同步双写
多 Broker 同步双写部署方式,至少需要两个 Broker 节点。 消息同步写入到两个 Broker 节点, 只有两个 Broker 都写入成功,才返回 Producer 写入成功。 这种方式数据可靠性高,但是写入性能较低。
3.异步复制:
多 Broker 异步复制部署方式, 至少需要两个 Broker 节点。 消息先写入到 Master Broker, 然后异步复制到 Slave Broker。这种方式写入性能较高,但是数据可靠性相对较低。
4.Dledger 模式
“Dledger 模式是 RocketMQ 提供的基于 Raft 协议的 CommitLog 复制解决方案。 它能够提供更高的可用性和数据一致性。 在 Dledger 模式下,多个 Broker 组成一个 Raft 组, 自动选举 Leader, 实现故障转移。
22.RocketMQ怎么实现路由注册&路由发现
路由注册
在 RocketMQ 中,路由注册主要涉及以下几个核心组件及其交互:
- Producer(生产者)
- Broker(消息服务器)
- NameServer(命名服务)
- Topic(主题)
- Consumer(消费者)
通过 Broker 定期向 NameServer 注册自身信息与 Topic 信息, NameServer 收集这些信息, Producer/Consumer 从 NameServer 获取 Broker 列表, 并根据配置的负载均衡策略选择 Broker。
RocketMQ 的路由注册机制通过 Broker 主动注册、心跳保活,NameServer 维护路由表,Producer/Consumer 动态发现 Broker,实现了高度的灵活性和可扩展性。 这种设计使得 RocketMQ 能够很好地适应 Broker 的动态变化, 保证消息的可靠传输。
详细:
当 Broker 启动时,它会主动向所有的 NameServer 节点注册自己。 注册的信息包括:
- Broker 的 IP 地址和端口。
- Broker ID(区分 Master 和 Slave)。
Broker 所管理的 Topic 列表。 每个 Broker 会存储多个 Topic 的数据。
定期注册: Broker 会定时(默认 30 秒)向 NameServer 发送心跳包,更新自己的状态和 Topic 信息。 这确保了 NameServer 能够及时感知 Broker 的存活状态和 Topic 信息变化。
注册成功后: NameServer 会保存 Broker 的信息,并更新 Topic 路由表。
Producer 和 Consumer 启动时,也会连接到 NameServer,获取 Topic 的路由信息。
Producer 根据要发送消息的 Topic 从 NameServer 获取路由信息。
Consumer 根据要订阅的 Topic 从 NameServer 获取路由信息。
本地缓存: Producer 和 Consumer 会将获取到的路由信息缓存在本地, 避免频繁访问 NameServer。
定期更新: Producer 和 Consumer 会定期(默认 30 秒)从 NameServer 更新路由信息, 以便及时感知 Broker 的变更。
异常处理: 如果 Producer/Consumer 无法连接到 NameServer, 则会尝试连接其他 NameServer 节点。 如果从 NameServer 获取路由失败,则会重试。
NameServer 维护着整个 RocketMQ 集群的路由信息
包括:
- Key: Topic 名称
- Value: Broker 列表,包含了所有提供该 Topic 服务的 Broker 实例信息,包括IP地址、端口、BrokerId(Master/Slave)等。
功能:
Broker 变更感知: NameServer 通过心跳机制感知 Broker 的状态变化(例如 Broker 宕机)。 如果 Broker 长时间没有发送心跳包, NameServer 会将其从路由表中移除。
Topic 变更感知: 当 Topic 的配置发生变化时, Broker 会主动通知 NameServer, NameServer 会更新路由表。
数据同步: NameServer 集群之间不进行数据同步, 每个 NameServer 节点都保存着完整的路由信息。 Producer/Consumer 会从多个 NameServer 节点获取路由信息, 从而实现高可用。
参数:
namesrvAddr: Producer 和 Consumer 配置的 NameServer 地址列表,多个地址用分号分隔。brokerClusterName: Broker 所属的集群名称。brokerName: Broker 的名称。brokerId: Broker 的 ID, 0 表示 Master, 非 0 表示 Slave。topicName: Topic 的名称。
路由发现
RocketMQ 的路由发现是构建在其路由注册机制之上的,它允许 Producer 和 Consumer 动态地找到合适的 Broker 来发送和接收消息。 路由发现过程的核心是 NameServer。
Producer 和 Consumer 启动时, 会配置 NameServer 地址列表 (多个NameServer地址用分号分隔: namesrvAddr)。 它们会尝试连接列表中的 NameServer 节点。
Producer 需要发送消息到指定的 Topic, Consumer 需要订阅特定的 Topic, 因此它们会向 NameServer 查询该 Topic 的路由信息。
NameServer 查找其存储的路由表,找到包含该 Topic 的所有 Broker 信息。
defaultTopicQueueNums: 指定 Topic 默认的 Queue 数量。topicFilterFlag: Topic 过滤标志。order: 是否是顺序消息;顺序消息只允许单线程消费, 所以只有一个 MessageQueue, 对应的Broker 只有一个 .
NameServer 将 Broker 列表(包括 Master 和 Slave 的信息)返回给 Producer/Consumer。 这个Broker 列表包含了每个Broker的IP地址、端口号以及Broker ID。
本地缓存:
- 缓存路由信息: Producer 和 Consumer 收到 NameServer 返回的路由信息后, 会将这些信息缓存到本地内存中。
- 目的: 这样做可以避免每次发送/接收消息都向 NameServer 发起请求, 提高效率。
路由定期更新:
- 定期更新: 为了能够感知 Broker 的变化, Producer 和 Consumer 会定期 (默认30s) 向 NameServer 发起路由信息更新请求。
- Broker 变更感知: 如果 Broker 发生故障、新增或者下线, NameServer 会更新路由表。 Producer/Consumer 在下一次路由信息更新时,就可以感知到这些变化。
- 异常处理: 如果在一段时间内, Producer/Consumer 无法从 NameServer 获取到 Topic 的路由信息, 程序会进行重试。
Kafak
1.对Kafka有什么了解吗?
Kafka特点如下:
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
2.Kafka 为什么这么快?
- 顺序写入优化:Kafka将消息顺序写入磁盘,减少了磁盘的寻道时间。这种方式比随机写入更高效,因为磁盘读写头在顺序写入时只需移动一次。
- 批量处理技术:Kafka支持批量发送消息,这意味着生产者在发送消息时可以等待直到有足够的数据积累到一定量,然后再发送。这种方法减少了网络开销和磁盘I/O操作的次数,从而提高了吞吐量。
- 零拷贝技术:Kafka使用零拷贝技术,可以直接将数据从磁盘发送到网络套接字,避免了在用户空间和内核空间之间的多次数据拷贝。这大幅降低了CPU和内存的负载,提高了数据传输效率。
- 压缩技术:Kafka支持对消息进行压缩,这不仅减少了网络传输的数据量,还提高了整体的吞吐量。
3.kafka的模型介绍一下,kafka是推送还是拉取?
消费者通常有两种与 Broker 交互的模型:推送模型(Push Model) 和 拉取模型(Pull Model)。Kafka 选择了后者,
推送模型(Push Model)原理
在推送模型中,消息代理(Broker)主动将消息推送给消费者。当有新消息到达 Broker 时,Broker 会根据一定的策略(如轮询、最少连接等)将消息发送给订阅的消费者。
优点
- 实时性高:消息一旦到达 Broker 就能立即被消费者获取并处理,延迟较低。
- 开发简单:消费者只需等待接收消息即可,无需主动请求。
缺点
- 消费者负载控制困难:Broker 无法得知每个消费者的处理能力。如果推送速度过快,而消费者处理能力跟不上,容易导致消费者过载、崩溃,甚至数据丢失。这就像水龙头一直全开,而水桶可能接不过来。
- 流量控制复杂:需要复杂的流量控制机制来避免消费者过载,例如 Broker 维护每个消费者的处理速率,动态调整推送速度,这增加了 Broker 的复杂性。
- 不灵活:消费者被动接收消息,无法根据自身处理能力或特定需求(如批量消费)来调整消息获取节奏。
拉取模型(Pull Model)
在拉取模型中,消费者主动向消息代理(Broker)请求消息。消费者定期或按需向 Broker 发送拉取请求,Broker 接收到请求后,将可用的消息返回给消费者。
优点
- 消费者自我控制:消费者可以根据自身的处理能力、网络状况或业务需求来决定何时、以何种速率拉取多少消息。这就像水桶根据自身容量和需要,主动去水龙头接水。
- 避免过载:消费者不会因为 Broker 推送过快而导致过载,因为它只在准备好时才去拉取消息。
- 批量消费效率高:消费者可以一次性拉取一批消息进行批量处理,减少网络往返次数,提高吞吐量,这对于磁盘 I/O 友好的消息系统尤为重要。
缺点
- 实时性可能略低:如果消费者拉取间隔设置过长,可能会引入额外的消息延迟。
- 空轮询问题:如果 Broker 没有新消息,消费者仍然会发送拉取请求,这会导致“空轮询”,浪费网络资源和 CPU 周期。Kafka 对此有优化措施。
Kafka 选择拉取模型的原因(重点)
Kafka 选择拉取模型是基于其高吞吐量、持久化存储和分布式特性的考量。以下是主要原因:
适应消费者异构处理能力:Kafka 的设计目标之一是支持大量异构的消费者,它们可能拥有不同的处理能力和速度。拉取模型允许每个消费者根据自己的节奏消费,避免了“快生产者-慢消费者”导致的问题。消费者可以在消息量大时快速拉取,在消息量小时或处理繁忙时放缓拉取速度。
优化批量消息处理:Kafka 的设计理念是基于日志(Log)的,它将消息追加写入磁盘。批量地从磁盘读取消息远比单条读取效率高。拉取模型允许消费者一次性拉取一批(或一个批次)消息进行处理,从而最大限度地利用磁盘 I/O,提高整体吞吐量。
简化 Broker 设计:将流控和背压(backpressure)的复杂性从 Broker 转移到消费者端。Broker 只需关注消息的持久化和按需提供,无需跟踪每个消费者的消费状态和处理能力,这使得 Broker 的设计更加简单、健壮,更易于扩展。
更好的容错性和伸缩性:消费者故障或新增时,不会对 Broker 造成冲击。新的消费者加入或旧的消费者退出时,只需重新分配分区和调整拉取逻辑即可。
消费者主动控制偏移量 (Offset):这是拉取模型最重要的优势之一,也是 Kafka 独特且强大的特性。
消费者如何通过控制偏移量实现灵活的消息消费
Kafka 的每个分区(Partition)都是一个有序的、不可变的消息序列,每条消息都有一个唯一的、递增的偏移量(Offset)。消费者在拉取消息时,会记录自己消费到的当前偏移量。Kafka Broker 不负责跟踪消费者的消费状态,而是由消费者自己负责管理其消费的偏移量。
- 提交偏移量:消费者成功处理一批消息后,会向 Kafka 提交(commit)它已处理的最新消息的偏移量。这个偏移量通常存储在 Kafka 内部的一个特殊 Topic (
__consumer_offsets) 中。 - 从指定偏移量开始消费:当消费者启动或重新平衡(rebalance)时,它会从已提交的偏移量处开始消费。这种机制赋予了消费者极大的灵活性:
- 重置到旧偏移量(Time Travel):如果因为业务逻辑错误或需要重新处理历史数据,消费者可以手动将偏移量重置到更早的时间点或更小的偏移量。例如,通过
seek()方法将消费指针移到指定偏移量,甚至可以通过时间戳 (seek(TopicPartition, long timestamp)) 寻找到某个时间点的偏移量。这使得 Kafka 成为一个“时间机器”,可以重复消费数据。 - 跳到最新位置(Consume from Latest):如果消费者只想处理新生成的消息,或者跳过历史积压,它可以将偏移量直接设置为分区中的最新偏移量。这意味着它会从当前写入位置开始消费,忽略所有之前的历史消息。这对于快速启动消费者,只关注实时数据很有用。
- 重置到旧偏移量(Time Travel):如果因为业务逻辑错误或需要重新处理历史数据,消费者可以手动将偏移量重置到更早的时间点或更小的偏移量。例如,通过
这种基于偏移量自主控制的消费模式,使得 Kafka 的消费者非常灵活,能够适应各种复杂的业务场景,包括数据回溯、灾难恢复、实时处理与历史批处理的结合等。
- 提交偏移量:消费者成功处理一批消息后,会向 Kafka 提交(commit)它已处理的最新消息的偏移量。这个偏移量通常存储在 Kafka 内部的一个特殊 Topic (
消费者组的概念
Kafka 引入了消费者组(Consumer Group)的概念来实现高伸缩性和高可用性。
- 实现水平扩展:
- 一个 Topic 的一个分区在同一时刻只能被一个消费者组中的一个消费者实例消费。
- 当一个消费者组内有多个消费者实例时,Kafka 会将 Topic 的所有分区均匀地分配给组内的消费者。例如,如果一个 Topic 有 10 个分区,一个消费者组有 5 个消费者实例,那么每个消费者可能负责消费 2 个分区。
- 通过增加消费者组内的消费者实例数量,可以提高整个消费者组的并发处理能力,实现水平扩展。当消费者数量等于分区数量时,每个分区由一个消费者处理,达到最大并行度。如果消费者数量超过分区数量,多余的消费者将处于空闲状态。
- 实现故障转移(高可用性):
- 当消费者组中的某个消费者实例发生故障(如崩溃、下线)时,Kafka 会触发再平衡(Rebalance)机制。
- Kafka 会将该消费者原来负责消费的分区自动重新分配给组内其他活跃的消费者实例。
- 这样,即使有消费者实例故障,整个消费者组的消费任务也不会中断,保证了高可用性。新的消费者实例上线也会触发再平衡,将部分分区分配给它。
消费者如何通过拉取模式从 Broker 读取数据
消费者组中的每个消费者实例,都会对它被分配到的每个分区执行拉取操作:
- 初始化:消费者启动并加入消费者组。通过心跳机制与 Broker 保持连接,并参与分区分配(再平衡)。
- 获取偏移量:消费者从
__consumer_offsetsTopic 中获取其负责的每个分区的已提交偏移量,作为下一次拉取消息的起始位置。 - 发送拉取请求:消费者向其分配到的分区的 Leader Broker 发送
FetchRequest。请求中包含它想从哪个分区、从哪个偏移量开始、拉取多少字节的消息等信息。 - Broker 响应:Leader Broker 收到请求后,从其日志文件中读取指定偏移量之后的消息,并以
FetchResponse返回给消费者。 - 消费者处理:消费者收到消息后,进行业务逻辑处理。
- 提交偏移量:处理成功后,消费者将新的偏移量提交到
__consumer_offsetsTopic,更新自己的消费进度。 - 循环拉取:消费者会持续循环执行 3-6 步,不断地拉取并处理消息。
解决无数据时的循环问题(长轮询)
为了解决拉取模型可能出现的“空轮询”问题和提高效率,Kafka 的拉取请求通常采用长轮询(Long Polling)机制:
- 当消费者发送拉取请求时,如果 Broker 上没有立即可用的新消息,Broker 不会立即返回空结果。
- 相反,Broker 会持有(hold)住这个请求一段时间(由消费者请求中的
max.wait.ms参数控制,默认 500ms)。 - 在这段时间内,如果新的消息到达了,或者达到了等待时间,Broker 才会将这些新消息返回给消费者。
- 如果等待时间内没有新消息,Broker 才会返回一个空的结果。
- 这样,就避免了消费者频繁地发送空请求,减少了网络和 CPU 资源的浪费,同时又保证了相对较好的实时性。
4.Kafka 如何保证顺序读取消息?
Kafka 可以保证在同一个分区内消息是有序的,生产者写入到同一分区的消息会按照写入顺序追加到分区日志文件中,消费者从分区中读取消息时也会按照这个顺序。这是 Kafka 天然具备的特性。
要在 Kafka 中保证顺序读取消息,需要结合生产者、消费者的配置以及合适的业务处理逻辑来实现。以下具体说明如何实现顺序读取消息:
- 生产者端确保消息顺序:为了保证消息写入同一分区从而确保顺序性,生产者需要将消息发送到指定分区。可以通过自定义分区器来实现,通过为消息指定相同的Key,保证相同Key的消息发送到同一分区。
- 消费者端保证顺序消费:消费者在消费消息时,需要单线程消费同一分区的消息,这样才能保证按顺序处理消息。如果使用多线程消费同一分区,就无法保证消息处理的顺序性。
Kafka 本身不能保证跨分区的消息顺序性,如果需要全局的消息顺序性,通常有以下两种方法:
- 只使用一个分区:将所有消息都写入到同一个分区,消费者也只从这个分区消费消息。但这种方式会导致 Kafka 的并行处理能力下降,因为 Kafka 的性能优势在于多分区并行处理。
- 业务层面保证:在业务代码中对消息进行编号或添加时间戳等标识,消费者在消费消息后,根据这些标识对消息进行排序处理。但这种方式会增加业务代码的复杂度。
5.kafka 消息积压怎么办?
Kafka 消息积压是一个常见的问题,它可能会导致数据处理延迟,甚至影响业务的正常运行,下面是一些解决 Kafka 消息积压问题的常用方法:
- 增加消费者实例可以提高消息的消费速度,从而缓解积压问题。你需要确保消费者组中的消费者数量不超过分区数量,因为一个分区同一时间只能被一个消费者消费。
- 增加 Kafka 主题的分区数量可以提高消息的并行处理能力。在创建新分区后,你需要重新平衡消费者组,让更多的消费者可以同时消费消息。
6.Kafka为什么一个分区只能由消费者组的一个消费者消费?这样设计的意义是什么?
同一时刻,一条消息只能被组中的一个消费者实例消费
如果两个消费者负责同一个分区,那么就意味着两个消费者同时读取分区的消息,由于消费者自己可以控制读取消息的offset,就有可能C1才读到2,而C1读到1,C1还没处理完,C2已经读到3了,则会造成很多浪费,因为这就相当于多线程读取同一个消息,会造成消息处理的重复,且不能保证消息的顺序。
7.如果有一个消费主题topic,有一个消费组group,topic有10个分区,消费线程数和分区数的关系是怎么样的?
topic下的一个分区只能被同一个consumer group下的一个consumer线程来消费,但反之并不成立,即一个consumer线程可以消费多个分区的数据,比如Kafka提供的ConsoleConsumer,默认就只是一个线程来消费所有分区的数据。
所以,分区数决定了同组消费者个数的上限。
如果你的分区数是N,那么最好线程数也保持为N,这样通常能够达到最大的吞吐量。超过N的配置只是浪费系统资源,因为多出的线程不会被分配到任何分区。
8.消息中间件如何做到高可用?
消息中间件如何保证高可用呢?单机是没有高可用可言的,高可用都是对集群来说的,一起看下kafka的高可用吧。
Kafka 的基础集群架构,由多个broker组成,每个broker都是一个节点。当你创建一个topic时,它可以划分为多个partition,而每个partition放一部分数据,分别存在于不同的 broker 上。也就是说,一个 topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据。
有些伙伴可能有疑问,每个partition放一部分数据,如果对应的broker挂了,那这部分数据是不是就丢失了?那还谈什么高可用呢?
Kafka 0.8 之后,提供了HA机制,复制品副本机制来保证高可用,即每个 partition 的数据都会同步到其它机器上,形成多个副本。然后所有的副本会选举一个 leader 出来,让leader去跟生产和消费者打交道,其他副本都是follower。写数据时,leader 负责把数据同步给所有的follower,读消息时, 直接读 leader 上的数据即可。如何保证高可用的?就是假设某个 broker 宕机,这个broker上的partition 在其他机器上都有副本的。如果挂的是leader的broker呢?其他follower会重新选一个leader出来。
写数据的时候,生产者就写 leader,然后 leader 将数据落地写本地磁盘,接着其他 follower 自己主动从 leader 来 pull 数据。一旦所有 follower 同步好数据了,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功的消息给生产者。
消费的时候,只会从 leader 去读,但是只有当一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到。
9.Kafka 和 RocketMQ 消息确认机制有什么不同?
Kafka的消息确认机制有三种:0,1,-1:
- ACK=0:这是最不可靠的模式。生产者在发送消息后不会等待来自服务器的确认。这意味着消息可能会在发送之后丢失,而生产者将无法知道它是否成功到达服务器。
- ACK=1:这是默认模式,也是一种折衷方式。在这种模式下,生产者会在消息发送后等待来自分区领导者(leader)的确认,但不会等待所有副本(replicas)的确认。这意味着只要消息被写入分区领导者,生产者就会收到确认。如果分区领导者成功写入消息,但在同步到所有副本之前宕机,消息可能会丢失。
- ACK=-1:这是最可靠的模式。在这种模式下,生产者会在消息发送后等待所有副本的确认。只有在所有副本都成功写入消息后,生产者才会收到确认。这确保了消息的可靠性,但会导致更长的延迟。
RocketMQ 提供了三种消息发送方式:同步发送、异步发送和单向发送:
- 同步发送:是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
- 异步发送:是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式,但是需要实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。适用于链路耗时较长,对响应时间较为敏感的业务场景,例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
- 单向发送:发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
10.Kafka 和 RocketMQ 的 broker 架构有什么区别
- Kafka 的 broker 架构:Kafka 的 broker 架构采用了分布式的设计,每个 Kafka broker 是一个独立的服务实例,负责存储和处理一部分消息数据。Kafka 的 topic 被分区存储在不同的 broker 上,实现了水平扩展和高可用性。
- RocketMQ 的 broker 架构:RocketMQ 的 broker 架构也是分布式的,但是每个 RocketMQ broker 有主从之分,一个主节点和多个从节点组成一个 broker 集群。主节点负责消息的写入和消费者的拉取,从节点负责消息的复制和消费者的负载均衡,提高了消息的可靠性和可用性。
11.kafka是怎么解决消息幂等的
幂等性 (Idempotence) 指的是,对于同一个操作,无论执行多少次,其结果都是相同的,不会对系统状态造成额外的副作用。在分布式系统中,由于网络抖动、超时重试等原因,消息生产者可能会重复发送同一条消息。如果不对这些重复消息进行处理,就可能导致数据不一致(例如,重复扣款、重复插入数据)。
Kafka 在 0.11.0 版本 引入了生产者幂等性,以确保消息在生产者到 Broker 的传输过程中,即使生产者重试,消息也只会被写入 Kafka 一次且仅一次。
原理:
Kafka 实现生产者幂等性的核心机制是为每个生产者会话分配一个唯一的 Producer ID (PID),并为每条消息分配一个序列号 (Sequence Number)。
当生产者首次连接到 Kafka 集群并启用幂等性时,Broker 会为这个生产者会话分配一个唯一的 PID。
这个 PID 在生产者会话的生命周期内保持不变。
每个 PID 都会维护一个针对每个分区递增的序列号。
生产者发送的每条消息都会带上其 PID 和对应的 Sequence Number。
broker:
当 Broker 收到消息时,它会检查消息的 (PID, Partition, Sequence Number) 元组。
对于每个 (PID, Partition),Broker 会维护一个已接收到的最大序列号。
如果收到的消息的 Sequence Number 等于 Broker 记录的 max_sequence_number + 1,则表示这是一条新消息,Broker 会将其写入日志,并更新 最大序列号。
如果收到的消息的 Sequence Number 小于或等于 max_sequence_number,则表示这是一条重复消息(因为生产者重试发送了),Broker 会直接丢弃这条消息,但仍向生产者发送成功确认。
如果收到的消息的 Sequence Number 大于 max_sequence_number + 1,则表示消息乱序,这通常是不可恢复的错误,Broker 会抛出异常。
跟MVCC的read view的活跃事务id差不多
配置:
在 Kafka 生产者配置中,只需设置 enable.idempotence=true。
1 | Properties props = new Properties(); |
确保数据不重复:解决了生产者侧由于重试导致的重复消息问题。
简化生产者逻辑:开发者无需在应用层手动处理消息去重。
实现“精确一次语义”(Exactly-Once Semantics) 的基础:生产者幂等性是实现端到端事务性消息(包括跨多个 Topic/Partition 的事务)的关键组成部分。
单会话、单分区:幂等性保证只在一个生产者会话内,且针对单个分区有效。如果生产者重启(PID 会变),或者消息发送到不同的分区,则无法保证幂等性。
不处理消费者端的重复消费:生产者幂等性只解决了消息写入到 Kafka 的去重问题。消费者仍然可能因为重试消费等原因,从 Kafka 中读取到同一条消息多次。消费者端的去重(或保证“精确一次”处理)需要消费者自身结合业务逻辑实现,或者使用 Kafka Streams/Flink 等流处理框架的事务性功能。
12.kafka持久化的机制
Kafka 的持久性指的是它能够可靠地存储消息,即使在 Broker 宕机、网络故障等情况下也不会丢失数据。Kafka 通过以下几个核心机制来保证消息的持久性:
1.持久化到硬盘
追加写入 (Append-Only Log):Kafka 的所有数据都以日志(log)的形式存储在 Broker 的文件系统上。消息被追加写入到分区对应的日志文件中,是顺序写入,这使得磁盘 I/O 效率极高。
不可变性 (Immutability):一旦消息被写入分区,就是不可变的。它们不会被修改或删除,只能通过日志清理策略(基于时间或大小)来过期。
文件系统缓存 (Page Cache):Kafka 充分利用操作系统的文件系统缓存(Page Cache)。当消息写入磁盘时,它们首先进入操作系统的内存缓存,然后再异步地刷写到物理磁盘。这既提供了高性能写入,又在一定程度上保证了数据在内存中的持久性。
2.消息复制
分区副本 (Partition Replicas):Kafka 的每个 Topic 都被划分为多个分区 (Partitions),每个分区都可以配置一个复制因子 (Replication Factor)。例如,如果 replication.factor=3,则每个分区会有 3 个副本。
Leader-Follower 模型:每个分区都有一个Leader 副本和若干个Follower 副本。
- Leader:负责处理该分区所有的生产(写入)和消费(读取)请求。
- Follower:被动地从 Leader 复制消息日志,保持与 Leader 的数据同步。
In-Sync Replicas (ISR):同步副本集合。这是一个动态维护的集合,包含 Leader 副本和所有与 Leader 保持同步的 Follower 副本。判断同步的标准通常是 Follower 副本的日志与 Leader 副本的日志的差距在一个可配置的阈值之内。
故障转移 (Failover):如果 Leader 副本所在的 Broker 宕机,Kafka 控制器(Controller)会从 ISR 中选举一个新的 Leader 副本,从而确保该分区持续可用且数据不丢失。只要 ISR 中至少有一个副本存活,数据就不会丢失。
3.生产者确认机制:
生产者在发送消息时,可以通过 acks 参数来配置不同级别的确认机制,从而控制消息的持久性保证:
acks=0(Lowest Durability):- 生产者发送消息后,不等待 Broker 的任何确认就认为发送成功。
- 优点:吞吐量最高,延迟最低。
- 缺点:可靠性最差, Broker 宕机或消息未成功写入,都可能导致消息丢失。
acks=1(Default / Moderate Durability):- 生产者等待分区 Leader 副本确认消息已写入其本地日志(并进入文件系统缓存)。
- 优点:相对高的吞吐量和较低的延迟,同时提供了基本的可靠性保证。
- 缺点:如果 Leader 副本写入成功后,但在 Follower 副本同步之前 Leader 宕机,可能会导致消息丢失。
acks=all(or-1) (Highest Durability):- 生产者等待所有 ISR 中的副本都确认消息已写入其本地日志。
- 优点:最高级别的可靠性保证,只要至少
min.insync.replicas个副本存活,消息就不会丢失。 - 缺点:吞吐量最低,延迟最高。因为需要等待所有同步副本的确认,如果某个 Follower 同步速度慢,就会增加延迟。
- 推荐配置:为了最高的数据持久性,通常会配置
replication.factor >= 3,min.insync.replicas >= 2,并且生产者设置acks=all。
4.日志保留策略:
Kafka 不会像传统消息队列那样在消息被消费后立即删除。它会根据配置的保留策略来持久化消息:
- 基于时间:消息保留多长时间(
log.retention.ms,默认 7 天)。 - 基于大小:日志文件达到多大时开始清理(
log.retention.bytes)。 - 日志压缩 (Log Compaction):对于某些特殊的 Topic(如用于存储状态的 Topic),可以配置日志压缩。它会保留每个消息键的最新消息,清除旧的相同键的消息,从而实现按键的持久化。
13.kafka处理消息丢失
消费者:关闭offset的自动提交
就是说,你消费到了这个消息,然后消费者那边自动提交了 offset,让 Kafka 以为你已经消费好了这个消息,但其实你才刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。
这不是跟 RabbitMQ 差不多吗,大家都知道 Kafka 会自动提交 offset,那么只要关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢。但是此时确实还是可能会有重复消费,比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。
kafka丢数据:
就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。大家想想,要是此时其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不就少了一些数据?这就丢了一些数据啊。
配置参数来解决:
- 给 topic 设置
replication.factor参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。 - 在 Kafka 服务端设置
min.insync.replicas参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。 - 在 producer 端设置
acks=all:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。 - 在 producer 端设置
retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。
至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。
生产者:
acks=all ,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。
RabbitMQ
1.RabbitMQ的特性你知道哪些?
abbitMQ 以 可靠性、灵活性 和 易扩展性 为核心优势,适合需要稳定消息传递的复杂系统。其丰富的插件和协议支持使其在微服务、IoT、金融等领域广泛应用,比较核心的特性有如下:
- 持久化机制:RabbitMQ 支持消息、队列和交换器的持久化。当启用持久化时,消息会被写入磁盘,即使 RabbitMQ 服务器重启,消息也不会丢失。例如,在声明队列时可以设置
durable参数为true来实现队列的持久化:
1 | import pika |
- 消息确认机制:提供了生产者确认和消费者确认机制。生产者可以设置
confirm模式,当消息成功到达 RabbitMQ 服务器时,会收到确认消息;消费者在处理完消息后,可以向 RabbitMQ 发送确认信号,告知服务器该消息已被成功处理,服务器才会将消息从队列中删除。 - 镜像队列:支持创建镜像队列,将队列的内容复制到多个节点上,提高消息的可用性和可靠性。当一个节点出现故障时,其他节点仍然可以提供服务,确保消息不会丢失。
- 多种交换器类型:RabbitMQ 提供了多种类型的交换器,如直连交换器(Direct Exchange)、扇形交换器(Fanout Exchange)、主题交换器(Topic Exchange)和头部交换器(Headers Exchange)。不同类型的交换器根据不同的规则将消息路由到队列中。例如,扇形交换器会将接收到的消息广播到所有绑定的队列中;主题交换器则根据消息的路由键和绑定键的匹配规则进行路由。
2.RabbitMQ的底层架构是什么?
以下是 RabbitMQ 的一些核心架构组件和特性:
- 核心组件:生产者负责发送消息到 RabbitMQ、消费者负责从 RabbitMQ 接收并处理消息、RabbitMQ 本身负责存储和转发消息。
- 交换机:交换机接收来自生产者的消息,并根据 routing key 和绑定规则将消息路由到一个或多个队列。
- 持久化:RabbitMQ 支持消息的持久化,可以将消息保存在磁盘上,以确保在 RabbitMQ 重启后消息不丢失,队列也可以设置为持久化,以保证其结构在重启后不会丢失。
- 确认机制:为了确保消息可靠送达,RabbitMQ 使用确认机制,费者在处理完消息后发送确认给 RabbitMQ,未确认的消息会重新入队。
- 高可用性:RabbitMQ 提供了集群模式,可以将多个 RabbitMQ 实例组成一个集群,以提高可用性和负载均衡。通过镜像队列,可以在多个节点上复制同一队列的内容,以防止单点故障。
3.RabbitMQ交换器有哪些
Direct Exchange(直连交换器)
路由规则:Direct Exchange 会将消息路由到那些绑定键(Binding Key)与消息的路由键(Routing Key)完全匹配的队列。
特点:点对点或一对一的精确路由。
比如:日志级别分发,私人消息等等。
Fanout Exchange(扇形交换器)
路由规则:Fanout Exchange 会将接收到的所有消息广播到所有绑定到它的队列,无视消息的路由键和队列的绑定键。
特点:发布/订阅模型,多播。
比如:系统广播通知,
Topic Exchange(主题交换器)
路由规则:Topic Exchange 会根据消息的路由键和队列的绑定键的模式匹配(模糊匹配)将消息路由到队列。
- 绑定键使用
.分隔单词。 *(星号) 匹配一个单词。#(井号) 匹配零个或多个单词。
特点:最强大的路由方式,实现复杂的发布/订阅模式。
比如:复杂日志订阅,一个大型分布式系统的日志收集与分析。不同的服务(如认证服务 auth、支付服务 payment、数据库服务 db)产生不同级别的日志。消费者可以根据自己感兴趣的日志类型(如所有严重错误、某个服务的所有日志、所有警告)进行订阅。
商品库存事件通知,电商平台中,针对不同品类商品的库存变动、价格更新、新品上架等事件进行通知。例如,库存部门只关心低库存预警,运营部门关心所有商品更新,而某个特定部门可能只关心电子产品的所有相关事件。
Headers Exchange(头部交换器)
路由规则:Headers Exchange 不依赖于路由键,而是根据消息的头部属性(Headers)进行路由。队列与交换器绑定时,除了指定头部键值对,还需要指定一个 x-match 参数:
x-match=all:表示消息的所有头部属性必须与绑定时指定的头部属性完全匹配。x-match=any:表示消息的任意一个头部属性与绑定时指定的头部属性匹配即可。
特点:灵活性最高,但通常性能不如 Topic Exchange,且使用频率较低。
比如:多条件任务分发,任务调度系统,根据任务的优先级、类型、设备平台等多个维度进行复杂路由。例如,高优先级且关键的报警任务需要进入专门的处理队列,而来自移动设备的任务可以进入移动任务处理队列。
4.说一说RabbitMQ中的AMQP
AMQP 是一种开放的、通用的消息协议,它定义了客户端应用程序和消息中间件之间进行消息传递的方式。你可以把它想象成消息通信领域的“HTTP 协议”或“SQL 协议”。就像 HTTP 定义了浏览器和服务器如何通信,SQL 定义了应用程序和数据库如何通信一样,AMQP 定义了如何发送、存储和接收消息。
生产者 (Producer):发送消息的应用程序。
消费者 (Consumer):接收并处理消息的应用程序。
消息 (Message):在生产者和消费者之间传递的数据单元。消息包含:
- 有效载荷 (Payload):实际的数据,例如 JSON、XML 或二进制数据。
- 属性 (Properties):关于消息的元数据,例如内容类型、编码、优先级、过期时间等。
连接 (Connection):TCP/IP 连接,生产者/消费者通过它与 RabbitMQ 建立连接。
信道 (Channel):在连接内部建立的轻量级逻辑连接。在同一个连接中可以有多个信道,这样可以复用 TCP 连接,减少开销。大部分 AMQP 操作都是在信道上进行的。
交换器 (Exchange):消息的接收者。生产者将消息发送到交换器,而不是直接发送到队列。交换器根据路由规则将消息路由到一个或多个队列。这是 AMQP 灵活路由的核心。
- RabbitMQ 支持多种交换器类型:
Direct、Fanout、Topic、Headers。
绑定 (Binding):交换器和队列之间的规则,它告诉交换器如何根据消息的路由键 (Routing Key) 将消息路由到哪个队列。
队列 (Queue):存储消息的地方。消息在被消费者消费之前会暂时存储在队列中。
路由键 (Routing Key):生产者发送消息时携带的一个字符串,交换器根据它和绑定键进行匹配,决定消息的路由去向。
绑定键 (Binding Key):队列在绑定到交换器时设置的一个字符串,用于与消息的路由键进行匹配。
确认机制 (Acknowledgements):AMQP 支持消费者对消息进行确认(ACK),告知 RabbitMQ 消息已成功处理。如果消费者没有确认,RabbitMQ 会认为消息未被正确处理,可能会重新投递。这保证了消息的可靠投递。
流程:
生产者连接到 RabbitMQ 服务器,建立一个连接 (Connection)。
在连接上创建一个或多个信道 (Channel)。
声明一个交换器 (Exchange)(如果不存在)。
声明一个队列 (Queue)(如果不存在)。
通过一个绑定 (Binding) 将队列绑定到交换器,并指定绑定键 (Binding Key)。
生产者通过信道向交换器发布消息,消息包含路由键 (Routing Key) 和有效载荷。
交换器根据其类型和绑定规则,匹配消息的路由键和队列的绑定键,将消息路由到一个或多个队列。
消费者连接到 RabbitMQ 服务器,建立连接和信道。
消费者从队列中拉取或订阅消息。
消费者处理消息后,向 RabbitMQ 发送确认 (ACK)。
AMQP 的优点:
- 开放标准:不限于特定厂商,提供了互操作性。你可以用 Java 客户端向 RabbitMQ 发送消息,用 Python 客户端接收。
- 灵活性:通过交换器和绑定机制,实现了非常灵活的消息路由。
- 可靠性:支持消息持久化、消息确认、发布者确认等机制,确保消息不丢失。
- 跨平台/语言:由于是协议标准,有多种语言的客户端库支持。
5.RabbitMQ是怎么解决消息幂等问题的
实现 RabbitMQ 消息幂等性的核心思想是:为每条消息生成一个全局唯一标识符(Message ID),并在消费者端维护一个已处理消息 ID 的记录。
跟kafka感觉差不多??
唯一ID:
生产者是消息的源头,它有责任为每条消息生成一个全局唯一的 ID,并将其作为消息的元数据(通常是消息头)发送出去。
UUID (Universally Unique Identifier):最简单和常用的方法。生成一个随机的 128 位数字,冲突概率极低。
- 示例:
UUID.randomUUID().toString()
时间戳 + 机器/服务 ID + 计数器:这种方式可以保证 ID 的单调性(在一定程度上),方便排查问题,但实现略复杂。
- 示例:
System.currentTimeMillis() + "-" + serviceId + "-" + AtomicInteger.incrementAndGet()
业务唯一 ID:如果业务本身就存在一个唯一 ID(例如订单号、交易流水号),可以直接使用它作为消息 ID。这是最理想的情况,因为它天然与业务关联,易于追溯。
- 示例: 订单创建消息,直接使用
orderId作为消息 ID。
1 | import org.springframework.amqp.core.MessagePostProcessor; |
消费者处理
消费者是实现幂等性的核心环节。它需要维护一个已处理消息的 ID 存储,并在每次接收消息时进行判断。
从消息头中取出唯一 ID,并查询本地存储。
1 | // 伪代码:消费者处理逻辑 |
存储:
Redis:
- 优点:高性能、低延迟、支持过期时间(TTL)。非常适合作为缓存层,快速判断 ID 是否存在。
- 缺点:内存存储,如果 Redis 宕机或重启,未持久化的数据会丢失(但 Redis 支持 AOF/RDB 持久化)。容量受限于内存。
- 适用场景:绝大多数需要高性能幂等性的场景。
可以使用 Set (集合) 或 String (字符串) 类型。
- Set:
SADD processed_message_ids messageId,SISMEMBER processed_message_ids messageId。 - String:
SET processed_message_id_messageId1 EX timeout NX(key是messageId,value是任意占位符,EX timeout设置过期时间,NX保证原子性)。
过期时间 (TTL):非常重要!消息 ID 不应永久存储,因为磁盘空间有限。设置一个合理的过期时间(例如 7 天、30 天,根据消息的生命周期和重复发送的可能性来定),让 Redis 自动清除过期 ID。
容量:评估每天的消息量,确定存储这些 ID 需要的内存。如果 ID 数量巨大,可以考虑按天或按月创建不同的 Set/Key,或者使用 Redis Cluster 分片。
关系型数据库 (MySQL, PostgreSQL):
- 优点:数据持久化能力强,可靠性高,支持事务。
- 缺点:性能相对 Redis 差,存在 I/O 瓶颈,并发能力有限。
- 适用场景:对数据可靠性要求极高,且并发量不是特别巨大的场景,或者业务本身就强依赖数据库事务的场景。
查询:SELECT COUNT(*) FROM processed_messages WHERE message_id = ? AND consumer_group = ?;
插入:INSERT INTO processed_messages (message_id, consumer_group, process_time) VALUES (?, ?, NOW());
索引:必须在 message_id 或 (message_id, consumer_group) 上建立唯一索引,以保证 ID 的唯一性。
清理:需要定期清理过期数据(例如通过定时任务删除 process_time 过早的记录)。
分布式文件存储 (如 HDFS, S3):
- 优点:容量巨大,成本低。
- 缺点:查询延迟高,不适合实时判断。
- 适用场景:极少数离线批处理或审计场景,不适合在线消息处理的幂等性判断。
异常处理:
在“检查消息 ID 不存在 -> 处理业务 -> 记录消息 ID”这个流程中,如果业务处理失败,但消息 ID 已经存入存储,那么下次重试时就会被判断为重复消息而丢弃,导致消息丢失。因此,消息处理和消息 ID 存储必须是原子性的。
数据库事务:
- 如果业务处理和幂等性存储都在同一个数据库中,可以直接使用数据库事务。
- 将查询 ID、处理业务、插入 ID 放在同一个数据库事务中。
- 如果业务处理失败,事务回滚,消息 ID 也不会被记录。
Redis + 业务事务 (TCC/最终一致性):
如果幂等性存储在 Redis,而业务处理涉及数据库或其他服务,则不能简单地用一个本地事务。
方案一:先处理业务,后记录 ID (不推荐,可能出现业务成功但ID未记录)
try-catch捕获异常:如果业务处理失败,不记录 ID,不 ACK 消息,让 RabbitMQ 重试。- 问题:如果在业务处理成功后、记录 ID 之前服务崩溃,下次消息过来仍会被处理。
方案二:先记录 ID,再处理业务,异常时回滚 ID (复杂,但更可靠)
- 两阶段提交 (Two-Phase Commit) 或 TCC (Try-Confirm-Cancel) 思想的简化版。
- 在 Redis 中先用
SETNX尝试“预占”消息 ID,设置一个短的过期时间。 - 然后进行业务处理。
- 如果业务成功,则将 Redis 中的消息 ID 的过期时间设置为长期,并 ACK 消息。
- 如果业务失败,则删除 Redis 中的预占 ID,并 NACK 消息。
- 问题:如果在业务成功后、设置过期时间为长期之前崩溃,这个 ID 可能会在短时间内过期,导致重复消费。
推荐方案 (结合消息确认机制):
- 在消费消息后立即将消息 ID 存入 Redis,并设置一个相对短的过期时间(例如几分钟)。
- 异步执行实际的业务逻辑。
- 如果业务逻辑成功,再将 Redis 中的消息 ID 的过期时间延长至长期(或直接更新)。
- 最终成功后才 ACK 消息。
- 关键:如果业务处理失败或超时,消息 ID 在 Redis 中会过期,下次 RabbitMQ 重投消息时,该 ID 不存在,从而再次进入处理流程。
- 死信队列:如果多次重试仍失败,可以将消息路由到死信队列,人工介入。
更强的原子性:
更强的原子性(两阶段确认):
- 第一阶段:消费者收到消息后,在本地事务(如果业务和幂等存储在同一个数据库)或分布式事务(如 TCC)中,先进行幂等性判断并记录 ID,然后执行业务逻辑。
- 第二阶段:只有当整个事务提交成功后,才向 RabbitMQ 发送
basicAck。如果事务失败,则不发送basicAck,让 RabbitMQ 重新投递消息。 这种方式确保了消息处理和幂等性记录的最终一致性,因为只要消息 ID 未成功记录,或者业务未成功执行,RabbitMQ 就会重试。
6.RabbitMQ上的一个queue中存放 message是否有数量限制
是的有限制,是多种因素构成的
物理因素:内存和磁盘
内存
RabbitMQ 默认会将队列中的一部分消息保存在内存中,以提高消费性能。
当内存使用达到高水位阈值 (high water mark) 时(默认是 RabbitMQ 节点可用 RAM 的 40% 或 0.4 倍,可以通过 vm_memory_high_watermark.relative 或 vm_memory_high_watermark.absolute 配置),RabbitMQ 会触发内存警报,并阻塞生产者,阻止其继续发送消息,直到内存使用率下降。
内存中存储的主要是消息的元数据和最近发送/未消费的消息体。
硬盘
当队列中的消息数量过多,或者内存压力过大时,RabbitMQ 会将内存中的消息分页(page out)到磁盘上,以释放内存。这就是所谓的惰性队列 (Lazy Queues) 的工作原理,或对于经典队列在内存压力下进行的分页。
磁盘空间是另一个限制因素。如果磁盘空间不足,RabbitMQ 也会触发磁盘警报(默认剩余空间低于 5GB 或总容量的 80% 会触发警报),同样会阻塞生产者。
消息持久化(durable message)会直接写入磁盘,而非持久化消息也会在内存不足时被写到磁盘。
配置
多种参数来主动限制队列中消息的数量或总大小,可以防止队列溢出,防止单个队列无限增长,导致整个 Broker 资源耗尽
x-max-length (最大消息数量):限制队列中可以存储的最大消息数量。当达到此限制时,队列会根据其溢出策略丢弃最老的消息(默认行为)。
x-max-length-bytes (最大消息总大小):限制队列中可以存储的消息总字节数。当达到此限制时,同样会根据溢出策略丢弃最老的消息。
如果同时设置了 x-max-length 和 x-max-length-bytes,则两者都适用,哪个限制先达到就先强制执行。
x-overflow (溢出策略):当队列达到 x-max-length 或 x-max-length-bytes 限制时,如何处理新消息:
drop-head(默认):丢弃队列头部(最老)的消息。reject-publish:拒绝生产者发布的新消息。生产者会收到basic.nack或通道阻塞。reject-publish-dlx:拒绝生产者发布的新消息,并将这些被拒绝的消息路由到死信交换器 (DLX)。
message-ttl (消息 TTL / 过期时间):可以为队列中的所有消息设置默认的过期时间。超过此时间,消息将自动从队列中删除。这可以间接限制消息的数量,尤其对于时效性强的消息。
性能
消费延迟增加:大量消息堆积在队列中,消费者需要更长时间才能处理到最新的消息。
内存交换到磁盘 (Paging out) 增加 I/O 负担:当消息从内存分页到磁盘时,会增加磁盘 I/O,降低整体吞吐量。
集群同步开销:在镜像队列(Mirrored Queues)或仲裁队列(Quorum Queues)中,大量消息的同步会增加网络和 CPU 开销。
管理界面响应变慢:当队列中有数百万甚至上亿条消息时,管理界面查询队列状态会非常缓慢。
Broker 重启慢:如果 RabbitMQ Broker 异常重启,需要从磁盘加载和恢复大量的消息索引,导致启动时间变长。
7.Rabbitmq的高可用
RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。
普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每台机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。这个模式是没有高可用的
镜像集群模式(高可用性)
这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论是元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。
这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。
好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!第二,这么玩儿,不是分布式的,就没有扩展性可言了,如果某个 queue 负载很重,你加机器,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展你的 queue。你想,如果这个 queue 的数据量很大,大到这个机器上的容量无法容纳了,此时该怎么办呢?
解决办法:
1.选择合适的同步策略,RabbitMQ允许你配置消息同步的节点数量。你可以选择将消息同步到所有节点(ha-mode: all)以获得最高的可用性,或者同步到指定的节点,
1 | ha-mode: exactly`, `ha-sync-batch-size |
根据性能来选择。
2.限制单个队列的大小,避免在高可用队列中存储过多的消息。可以设置队列的最大长度或 TTL (time-to-live),以防止队列无限制增长。 可以考虑将消息归档到外部存储系统。
3.使用队列分片,对于单个队列数据量过大的情况,可以将队列拆分成多个分片,每个分片分布在不同的节点上。 RabbitMQ 本身并不原生支持队列分片,但你可以通过客户端的逻辑来实现。 例如,可以根据消息的某个属性(如用户ID)进行哈希,然后将消息发送到对应的分片队列。
实现队列分片:
- 生产者根据某种算法(通常是哈希)将消息路由到不同的队列。 例如,使用
messageKey.hashCode() % numOfShards来决定消息应该发往哪个分片。 - 消费者端合并: 消费者需要同时订阅多个分片队列,并将接收到的消息按照某种规则进行合并和处理。
- 可以结合批量发送来解决,但是消息的顺序是不能保证的
4.Federation/Shovel 插件: 这两个插件允许你将消息从一个 RabbitMQ 集群桥接到另一个集群。 这可以用来实现跨数据中心的消息复制和负载均衡。 Federation 适用于更松耦合的场景,而 Shovel 则适用于更紧密耦合的场景。
5.Quorum Queues: RabbitMQ 3.8 引入Quorum Queues,它基于Raft一致性算法,提供了更强的一致性和更高的可靠性,同时也具有比镜像队列更好的性能。 但是它并非完全替代镜像队列,选择哪种方案需要考量具体的应用场景和需求。
8.rabbit解决消息丢失问题
生产者:
RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务 channel.txSelect() ,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务 channel.txRollback() ,然后重试发送消息;如果收到了消息,那么可以提交事务 channel.txCommit() 。
但是因为rabbitmq是同步的,然后集群备份下来。性能比较低。
我们可以开启confirm模式:
在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
已经在 transaction 事务模式的 channel 是不能再设置成 confirm 模式的,即这两种模式是不能共存的。
普通confirm:每发送一条消息后,调用 waitForConfirms() 方法,等待服务器端 confirm,如果服务端返回 false 或者在一段时间内都没返回,客户端可以进行消息重发。
1 | channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); |
批量 confirm 模式:每发送一批消息后,调用 waitForConfirms() 方法,等待服务端 confirm。
1 | channel.confirmSelect(); |
异步 confirm 模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后客户端会回调这个方法。
1 | SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); |
rabbit自己丢了数据:
这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。
- 创建 queue 的时候将其设置为持久化。这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
- 第二个是发送消息的时候将消息的
deliveryMode设置为 2。就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
需要同时设置这两个,才能成功开启
消费者弄丢了数据:
主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。
这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack ,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
为了保证消息从队列中可靠地到达消费者,RabbitMQ 提供了消息确认机制。消费者在声明队列时,可以指定 noAck 参数,当 noAck=false,RabbitMQ 会等待消费者显式发回 ack 信号后,才从内存(和磁盘,如果是持久化消息)中移去消息。否则,一旦消息被消费者消费,RabbitMQ 会在队列中立即删除它。
9.RabbitMQ保证消息的顺序
拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点,这样也会造成吞吐量下降,可以在消费者内部采用多线程的方式取消费。
或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。
注意,这里消费者不直接消费消息,而是将消息根据关键值(比如:订单 id)进行哈希,哈希值相同的消息保存到相同的内存队列里。也就是说,需要保证顺序的消息存到了相同的内存队列,然后由一个唯一的 worker 去处理。






















