RabbitMQ 面试
# 环境安装
在 CentOS 7 上安装 RabbitMQ 服务器 (opens new window) 备份地址 (opens new window)
# 代码实践
Hello World (opens new window)
# 面试题
- 削峰填谷
- 死信
- 消息积压问题
# 1、什么是 RabbitMQ
RabbitMQ 是一个开源的消息队列中间件,它实现了高效的消息传递模型。消息队列是一种在应用程序之间进行异步通信的方式,可以将发送者和接收者解耦,提高系统的可靠性和可扩展性。
RabbitMQ 基于 AMQP(Advanced Message Queuing Protocol)协议,即高级消息队列协议,是一种网络协议,用于在分布式系统中传输消息。它的目的是为分布式应用程序提供统一的、标准的消息传输机制,使得不同的应用程序可以通过 AMQP 进行通信,实现系统间的解耦和灵活性。
AMQP 具有以下特点:
- 支持异步通信:发送者和接收者不需要同时在线,可以在任意时间发送和接收消息。
- 支持消息持久化:可以将消息存储在持久化存储器中,以保证即使在发送者或接收者故障的情况下,消息也能够被保留和传递。
- 支持消息的路由和过滤:可以根据消息的属性和内容进行路由和过滤,以实现更加灵活的消息传递。
- 支持多种编程语言和平台:AMQP 可以通过多种编程语言和平台进行实现和调用,包括 Java、Python、Ruby、.NET 等。并提供了丰富的特性,如消息确认、持久化、发布-订阅模式、消息路由等。
在 RabbitMQ 中,消息生产者将消息发送到队列中,然后消息消费者从队列中接收消息进行处理。RabbitMQ 会负责消息的存储、路由和传递,确保消息的可靠投递。使用 RabbitMQ 可以实现系统间的解耦、流量控制(削峰)、异步处理、消息通知等功能,广泛应用于分布式系统、微服务架构、任务队列等场景中。
1.1、消息队列的优点:
(1)解耦:将系统按照不同的业务功能拆分出来,消息生产者只管把消息发布到 MQ 中而不用管谁来取,消息消费者只管从 MQ 中取消息而不管是谁发布的。消息生产者和消费者都不知道对方的存在;
(2)异步:主流程只需要完成业务的核心功能;对于业务非核心功能,将消息放入到消息队列之中进行异步处理,减少请求的等待,提高系统的总体性能;
(3)削峰/限流:将所有请求都写到消息队列中,消费服务器按照自身能够处理的请求数从队列中拿到请求,防止请求并发过高将系统搞崩溃;
1.2、消息队列的缺点:
(1)系统的可用性降低:系统引用的外部依赖越多,越容易挂掉,如果 MQ 服务器挂掉,那么可能会导致整套系统崩溃。这时就要考虑如何保证消息队列的高可用了
(2)系统复杂度提高:加入消息队列之后,需要保证消息没有重复消费、如何处理消息丢失的情况、如何保证消息传递的有序性等问题;
(3)数据一致性问题:A 系统处理完了直接返回成功了,使用者都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,就会导致数据不一致了
1.3、Kafka、ActiveMQ、RabbitMQ、RocketMQ 消息队列的选型:
每种 MQ 没有绝对的好坏,主要依据使用场景,扬长避短,利用其优势,规避其劣势。
(1)中小型软件公司,技术实力较为一般,建议选 RabbitMQ:一方面,erlang 语言天生具备高并发的特性,而且管理界面用起来十分方便。代码是开源的,而且社区十分活跃,可以解决开发过程中遇到的 bug,这点对于中小型公司来说十分重要。
- 不考虑 rocketmq 的原因是,rocketmq 是阿里出品,如果阿里放弃维护 rocketmq,中小型公司一般抽不出人来进行 rocketmq 的定制化开发,因此不推荐。
- 不考虑 kafka 的原因是:中小型软件公司不如互联网公司,数据量没那么大,选消息中间件应首选功能比较完备的,所以 kafka 排除
(2)大型软件公司:根据具体使用场景在 rocketMq 和 kafka 之间二选一。
一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。针对 rocketMQ,大型软件公司有能力对 rocketMQ 进行定制化开发。至于 kafka,如果是大数据领域的实时计算、日志采集功能,肯定是首选 kafka 了。
# 2、RabbitMQ 的构造
RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念:
- 生产者 Publisher:生产消息,就是投递消息的一方。消息一般包含两个部分:消息体(payload)和标签(Label)
- 消费者 Consumer:消费消息,也就是接收消息的一方。消费者连接到 RabbitMQ 服务器,并订阅到队列上。消费消息时只消费消息体,丢弃标签。
- Broker 服务节点:表示消息队列服务器实体。一般情况下一个 Broker 可以看做一个 RabbitMQ 服务器。
- Queue:消息队列,用来存放消息。一个消息可投入一个或多个队列,多个消费者可以订阅同一队列,这时队列中的消息会被平摊(轮询)给多个消费者进行处理。
- Exchange:交换器,接受生产者发送的消息,根据路由键将消息路由到绑定的队列上。
- Routing Key: 路由关键字,用于指定这个消息的路由规则,需要与交换器类型(Exchange)和绑定键(Binding Key)联合使用才能最终生效。
- Binding:绑定,通过绑定将交换器和队列关联起来,一般会指定一个 BindingKey,通过 BindingKey,交换器就知道将消息路由给哪个队列了。
- Connection:网络连接,比如一个 TCP 连接,用于连接到具体 broker
- Channel: 信道,AMQP 命令都是在信道中进行的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接,一个 TCP 连接可以用多个信道。客户端可以建立多个 channel,每个 channel 表示一个会话任务。由 Exchange、Queue、RoutingKey 三个才能决定一个从 Exchange 到 Queue 的唯一的线路。
- Message:消息,由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
- Virtual host(VHost):虚拟主机,用于逻辑隔离,表示一批独立的交换器、消息队列和相关对象。一个 Virtual host 可以有若干个 Exchange 和 Queue,同一个 Virtual host 不能有同名的 Exchange 或 Queue。最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段
# 3、Exchange 交换器的类型
Exchange 分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers
(1)direct:消息中的路由键(RoutingKey)如果和 Bingding 中的 bindingKey 完全匹配,交换器就将消息发到对应的队列中。是基于完全匹配、单播的模式。
(2)fanout:把所有发送到 fanout 交换器的消息路由到所有绑定该交换器的队列中,fanout 类型转发消息是最快的。
(3)topic:通过模式匹配的方式对消息进行路由,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。
匹配规则:
- ① RoutingKey 和 BindingKey 为一个 点号 '.' 分隔的字符串。 比如: java.xiaoka.show
- ② BindingKey 可使用 *和 # 用于做模糊匹配:*匹配一个单词,#匹配多个或者 0 个单词
例如:图中 Topic 由三个单词<celerity>.<colour>.<species>
组成,分别代表特征、颜色和物种,单词之间用.间隔。这样 Q1 将接收颜色为 orange 的所有消息,Q2 将接收物种为 rabbit 的消息和特征为 lazy 的消息
(4)headers:不依赖于路由键进行匹配,是根据发送消息内容中的 headers 属性进行匹配,除此之外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了
只有绑定中定义的 Headers 跟消息中的 Header 匹配,才会路由到相应的队列。匹配规则有两种:
ALL:要求两个 Headers 中所有 key 和 value 匹配; ANY:要求两个 Headers 任何一个 key 和 value 匹配。
# 4、生产者发送消息的过程
- Producer 先连接到 Broker,建立连接 Connection,开启一个信道 channel
- Producer 声明一个交换器并设置好相关属性
- Producer 声明一个队列并设置好相关属性
- Producer 通过绑定键将交换器和队列绑定起来
- Producer 发送消息到 Broker,其中包含路由键、交换器等信息
- 交换器根据接收到的路由键查找匹配的队列
- 如果找到,将消息存入对应的队列,如果没有找到,会根据生产者的配置丢弃或者退回给生产者
- 关闭信道
# 5、消费者接收消息过程
- Producer 先连接到 Broker,建立连接 Connection,开启一个信道 channel
- 向 Broker 请求消费相应队列中消息,可能会设置响应的回调函数
- 等待 Broker 回应并投递相应队列中的消息,接收消息
- 消费者确认收到的消息,ack
- RabbitMQ 从队列中删除已经确定的消息
- 关闭信道
# 6、如何保证消息不被重复消费?✨
在消息队列系统中,保证消息不被重复消费(即实现 幂等性)是一个核心问题。
以下是完整的解决方案,结合了设计原则、技术实现和 Python 示例:
# 重复消费的根本原因
消息确认失败:消费者处理成功但 ACK 未送达 Broker(网络故障、进程崩溃)
重试机制:生产者或 Broker 自动重发消息
分区再均衡:Kafka 等 MQ 在消费者组重启时可能重复分配消息
# 解决方案
# 1. 业务层幂等设计(最根本方案)
原理:设计业务逻辑使多次操作结果与一次操作相同
实现方式:
# 订单支付场景示例:基于唯一订单ID去重
def process_payment(order_id, amount):
# 检查订单是否已处理
if redis.get(f"order:{order_id}:processed") == "1":
print(f"订单 {order_id} 已处理,跳过")
return
# 核心业务逻辑(如扣款)
pay_service.charge(order_id, amount)
# 标记为已处理(设置过期时间防数据膨胀)
redis.setex(f"order:{order_id}:processed", 3600*24, "1") # 保留24小时
2
3
4
5
6
7
8
9
10
11
12
关键点:
使用唯一业务 ID(订单 ID、用户 ID+操作类型等)
存储介质选择:
Redis:高性能,适合高频场景(需设置 TTL)
数据库唯一索引:强一致性保证(如 MySQL 唯一键)
布隆过滤器:内存高效,允许极低概率误判(适合海量数据)
# 2. 消息去重表
原理:在消费前校验消息唯一性
基于数据库的的唯一主键进行约束。消费完消息之后,到数据库中做一个 insert 操作,如果出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
数据库表设计:
CREATE TABLE message_dedupe (
msg_id VARCHAR(64) PRIMARY KEY, -- 消息唯一ID
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
2
3
4
Python 消费逻辑:
def consume_message(msg):
with database.transaction(): # 开启事务
# 尝试插入去重记录
try:
db.execute(
"INSERT INTO message_dedupe (msg_id) VALUES (%s)",
(msg.id,)
)
except DuplicateKeyError:
# 已存在记录则跳过
return
# 执行业务逻辑
handle_business(msg)
2
3
4
5
6
7
8
9
10
11
12
13
14
# 3. Broker 端去重(依赖 MQ 特性)
RabbitMQ:
message_deduplication
插件(基于消息 ID)Kafka:
enable.idempotence=true
(生产者幂等)+ 事务 APIRocketMQ:消息 Key+数据库事务
Python+Kafka 示例:
from kafka import KafkaProducer
# 启用幂等生产者
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
enable_idempotence=True, # 关键参数
transaction_id="my-transaction-id"
)
# 事务性发送
producer.begin_transaction()
producer.send('orders', key=order_id, value=json.dumps(data))
producer.commit_transaction()
2
3
4
5
6
7
8
9
10
11
12
13
# 4. 分布式锁控制
场景:对非幂等操作的临时保护
Python+Redis 分布式锁:
from redis import Redis
from redis.lock import Lock
def safe_consume(msg):
lock_key = f"lock:{msg.id}"
lock = Lock(redis, lock_key, timeout=30)
if lock.acquire(blocking_timeout=5):
try:
if not check_processed(msg.id):
process_message(msg)
mark_as_processed(msg.id)
finally:
lock.release()
2
3
4
5
6
7
8
9
10
11
12
13
# 技术选型建议
场景 | 推荐方案 | 注意事项 |
---|---|---|
高频交易 | Redis 幂等键 | 需监控内存,设置合理 TTL |
金融支付 | 数据库唯一索引 | 保证 ACID,注意死锁 |
大数据流 | Kafka 事务 API | 配合 isolation.level=read_committed |
历史敏感操作 | 版本号/状态机 | 如:update table set status=paid where status=unpaid |
# 要点总结
幂等性:业务设计优先于技术手段
分层防御:
生产者:启用 MQ 内置幂等(如 Kafka 的
enable.idempotence
)Broker:利用消息去重插件(RabbitMQ)
消费者:本地去重表+业务校验
数据一致性:在事务中同时完成业务操作和去重标记
极端情况:承认无法 100%避免,需设计对账补偿机制
📌 黄金法则:
“通过业务唯一标识实现幂等,比依赖 MQ 特性更可靠” —— 这是面试中最值得强调的设计思想。
# 7、如何保证消息不丢失,进行可靠性传输?✨
对于消息的可靠性传输,每种 MQ 都要从三个角度来分析:
生产者端:使用确认机制 + 重试策略 + 持久化属性
Broker 端:持久化交换机/队列 + 镜像队列 + 磁盘警报
消费者端:手动确认 + 死信队列 + 合理重试策略
生产者使用确认机制确保消息到达,Broker 通过持久化和镜像保证消息存储,消费者借助手动确认和死信队列完成可靠处理。三者缺一不可。
7.1、生产者丢数据:
RabbitMQ 提供事务机制(transaction)和确认机制(confirm)两种模式来确保生产者不丢消息。注意:事务机制和 Confirm 机制是互斥的,两者不能共存,会导致 RabbitMQ 报错
- 事务机制:
发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())
该方式的缺点是生产者发送消息会同步阻塞等待发送结果是成功还是失败,导致生产者发送消息的吞吐量下降。
channel.tx_select() # 开启事务
try:
channel.basic_publish(...)
channel.tx_commit() # 提交事务
except pika.exceptions.AMQPError:
channel.tx_rollback() # 回滚
# 重试逻辑
2
3
4
5
6
7
- 生产者确认机制:
生产环境常用的是 confirm 模式。生产者将信道 channel 设置成 confirm 模式,一旦 channel 进入 confirm 模式,所有在该信道上发布的消息都将会被指派一个唯一的 ID,一旦消息被投递到所有匹配的队列之后,rabbitMQ 就会发送一个确认给生产者(包含消息的唯一 ID),这样生产者就知道消息已经正确到达目的队列了。如果 rabbitMQ 没能处理该消息,也会发送一个 Nack 消息给你,这时就可以进行重试操作。
Confirm 模式最大的好处在于它是异步的,一旦发布消息,生产者就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者便可以通过回调方法来处理该确认消息。
确认模式
RabbitMQ 的确认模式有三种:普通确认模式(Simple Acknowledgement)、批量确认模式(Multiple Acknowledgements)、异步确认模式(Async Acknowledgements)。
- 普通确认模式(Simple Acknowledgement):也被称为单条确认模式。消费者接收到消息并处理成功后,向 RabbitMQ 发送单条确认消息。RabbitMQ 会将已确认的消息从队列中删除。这种模式的优点是简单直接,但如果处理过程中发生异常,消息可能会丢失。
- 批量确认模式(Multiple Acknowledgements):也被称为批量确认模式。消费者接收到一批消息后,将这批消息以批量的形式发送确认消息给 RabbitMQ。RabbitMQ 只有在收到整个批量消息的确认后,才会将这批消息从队列中删除。相比于普通确认模式,批量确认模式减少了确认消息的数量,提高了性能。
- 异步确认模式(Async Acknowledgements):在消费者处理消息的同时,异步地发送确认消息给 RabbitMQ。这种模式下,消息的确认不会阻塞消费者的处理。消费者只需通过回调函数或事件来处理消息的确认。这种模式通常用于消费者需要异步处理消息且不希望因消息确认而阻塞的情况。
以下是使用 Python 的示例代码来演示 RabbitMQ 的确认模式:
- 普通确认模式(Simple Acknowledgement):
import pika
def callback(ch, method, properties, body):
print("Received message: %r" % body)
# 手动发送确认消息
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='my_queue')
channel.basic_consume(queue='my_queue', on_message_callback=callback)
print('Waiting for messages...')
channel.start_consuming()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
- 批量确认模式(Multiple Acknowledgements):
import pika
def callback(ch, method, properties, body):
print("Received message: %r" % body)
def on_batch_acknowledgement(frame):
print('Batch acknowledged: %r' % frame)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='my_queue')
channel.confirm_delivery()
channel.add_on_batch_acknowledgement_callback(on_batch_acknowledgement)
channel.basic_consume(queue='my_queue', on_message_callback=callback)
print('Waiting for messages...')
channel.start_consuming()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
- 异步确认模式(Async Acknowledgements):
# 开启确认模式
channel.confirm_delivery()
# 异步确认回调
def handle_confirmation(frame):
if isinstance(frame, pika.frame.Method):
if frame.method.NAME == 'Basic.Ack':
print("Message confirmed")
elif frame.method.NAME == 'Basic.Nack':
print("Message lost, retrying...")
# 重试逻辑
channel.add_on_return_callback(handle_confirmation)
# 发送消息(设置 mandatory=True 确保路由失败通知)
channel.basic_publish(
exchange='exchange',
routing_key='routing_key',
body=message,
properties=pika.BasicProperties(delivery_mode=2), # 持久化消息
mandatory=True
)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
以上示例代码中,首先创建了一个 RabbitMQ 连接,并声明了一个队列。然后使用不同的确认模式来消费队列中的消息,并通过不同的方式发送确认消息。
7.2、消息队列丢数据:
持久化、集群、普通模式、镜像模式
处理消息队列丢数据的情况,一般是开启持久化磁盘。持久化配置可以和生产者的 confirm 机制配合使用,在消息持久化磁盘后,再给生产者发送一个 Ack 信号。这样的话,如果消息持久化磁盘之前,即使 RabbitMQ 挂掉了,生产者也会因为收不到 Ack 信号而再次重发消息。
持久化设置如下(必须同时设置以下 2 个配置):
- (1)创建 queue 的时候,将 queue 的持久化标志 durable 在设置为 true,代表是一个持久的队列,这样就可以保证 rabbitmq 持久化 queue 的元数据,但是不会持久化 queue 里的数据;
- (2)发送消息的时候将 deliveryMode 设置为 2,将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
这样设置以后,RabbitMQ 就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,可能服务已经死掉,这种情况可以通过引入镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉)
持久化三要素
# 1. 持久化交换机
channel.exchange_declare(
exchange='persistent_exchange',
exchange_type='direct',
durable=True # 关键参数
)
# 2. 持久化队列
channel.queue_declare(
queue='persistent_queue',
durable=True # 关键参数
)
# 3. 持久化消息
properties = pika.BasicProperties(
delivery_mode=2, # 关键参数 (1=非持久, 2=持久)
content_type='text/plain',
timestamp=int(time.time())
)
channel.basic_publish(
exchange='persistent_exchange',
routing_key='routing_key',
body=message,
properties=properties
)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
镜像队列(Mirrored Queues)
rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}'
1集群部署(Cluster)
磁盘警报配置(防止磁盘写满)
disk_free_limit.absolute = 2GB
7.3、消费者丢数据:
消费者丢数据一般是因为采用了自动确认消息模式。该模式下,虽然消息还在处理中,但是消费中者会自动发送一个确认,通知 RabbitMQ 已经收到消息了,这时 RabbitMQ 就会立即将消息删除。这种情况下,如果消费者出现异常而未能处理消息,那就会丢失该消息。
解决方案就是采用手动确认消息,设置 autoAck = False,等到消息被真正消费之后,再手动发送一个确认信号,即使中途消息没处理完,但是服务器宕机了,那 RabbitMQ 就收不到发的 ack,然后 RabbitMQ 就会将这条消息重新分配给其他的消费者去处理。
手动确认机制
def callback(ch, method, properties, body):
try:
# 业务处理逻辑
process_message(body)
# 处理成功后手动确认
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"处理失败: {e}")
# 拒绝消息并重新入队
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=True # 重新入队
)
# 关闭自动确认
channel.basic_consume(
queue='persistent_queue',
on_message_callback=callback,
auto_ack=False # 关键参数
)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
但是 RabbitMQ 并没有使用超时机制,RabbitMQ 仅通过与消费者的连接来确认是否需要重新发送消息,也就是说,只要连接不中断,RabbitMQ 会给消费者足够长的时间来处理消息。另外,采用手动确认消息的方式,我们也需要考虑一下几种特殊情况:
- 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被消费,然后重新分发给下一个订阅的消费者,所以存在消息重复消费的隐患
- 如果消费者接收到消息却没有确认消息,连接也未断开,则 RabbitMQ 认为该消费者繁忙,将不会给该消费者分发更多的消息
需要注意的点:
1、消息可靠性增强了,性能就下降了,因为写磁盘比写 RAM 慢的多,两者的吞吐量可能有 10 倍的差距。所以,是否要对消息进行持久化,需要综合考虑业务场景、性能需要,以及可能遇到的问题。若想达到单 RabbitMQ 服务器 10W 条/秒以上的消息吞吐量,则要么使用其他的方式来确保消息的可靠传输,要么使用非常快速的存储系统以支持全持久化,例如使用 SSD。或者仅对关键消息作持久化处理,且应该保证关键消息的量不会导致性能瓶颈。
2、当设置 autoAck = False 时,如果忘记手动 ack,那么将会导致大量任务都处于 Unacked 状态,造成队列堆积,直至消费者断开才会重新回到队列。解决方法是及时 ack,确保异常时 ack 或者拒绝消息。
3、启用消息拒绝或者发送 nack 后导致死循环的问题:如果在消息处理异常时,直接拒绝消息,消息会重新进入队列。这时候如果消息再次被处理时又被拒绝 。这样就会形成死循环。
注意:以上所有的保障措施都无法防止在 broker 收到消息和确认消息之间的窗口期内发生的消息丢失。为了解决这个问题,可能需要在应用层实现某种形式的重试机制。
消费者重试策略
def callback(ch, method, properties, body):
try:
process_message(body)
ch.basic_ack(method.delivery_tag)
except TemporaryError:
# 临时错误,稍后重试
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
except PermanentError:
# 永久错误,转入死信队列
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
2
3
4
5
6
7
8
9
10
# 8、如何保证消息的有序性?✨
总结:
在 RabbitMQ 中保证消息有序性的核心是:
将需要有序处理的消息通过业务 ID 路由到同一队列
确保每个队列只有一个消费者线程处理
在消费端实现消息序号验证和状态跟踪
这种分组有序模式在保证顺序性的同时,仍能通过多队列实现水平扩展。
针对保证消息有序性的问题,解决方法就是保证生产者入队的顺序是有序的,出队后的顺序消费则交给消费者去保证。
- 单队列单消费者模式(简单但低效) 拆分 queue,使得一个 queue 只对应一个消费者。由于 MQ 一般都能保证内部队列是先进先出的,所以把需要保持先后顺序的一组消息使用某种算法都分配到同一个消息队列中。然后只用一个消费者单线程去消费该队列,这样就能保证消费者是按照顺序进行消费的了。但是消费者的吞吐量会出现瓶颈。如果多个消费者同时消费一个队列,还是可能会出现顺序错乱的情况,这就相当于是多线程消费了
# 生产者:确保所有消息发送到同一队列
channel.basic_publish(
exchange='',
routing_key='ordered_queue', # 固定队列
body=message,
properties=pika.BasicProperties(delivery_mode=2)
)
# 消费者:单线程处理(禁用并发)
channel.basic_qos(prefetch_count=1) # 每次只取一条
channel.basic_consume(
queue='ordered_queue',
on_message_callback=process_message,
auto_ack=False
)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- 消息分组(Sharding)方案(推荐)
将需要有序处理的消息分组,每组分配到独立队列
# 生产者:确保所有消息发送到同一队列
channel.basic_publish(
exchange='',
routing_key='ordered_queue', # 固定队列
body=message,
properties=pika.BasicProperties(delivery_mode=2)
)
# 消费者:单线程处理(禁用并发)
channel.basic_qos(prefetch_count=1) # 每次只取一条
channel.basic_consume(
queue='ordered_queue',
on_message_callback=process_message,
auto_ack=False
)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 9、如何处理消息堆积情况?✨
场景题:几千万条数据在 MQ 里积压了七八个小时。
监控预警:利用 RabbitMQ 管理 API 或者 Prometheus+Grafana 搭建监控系统,为队列长度、消费者性能等关键指标设置阈值报警。
水平扩展:当出现消息堆积时,迅速增加消费者实例数量。可以借助容器化技术(如 Kubernetes)实现自动扩缩容。
消息优化:
- 对大消息进行拆分,避免单条消息处理时间过长。
- 对消息进行压缩,减少传输和存储的开销。
持久化策略:根据业务的重要程度,合理设置队列和消息的持久化策略,在性能和可靠性之间找到平衡。
死信队列:配置死信队列来处理失败的消息,防止这些消息阻塞正常队列。
负载均衡:通过 HAProxy 或 Nginx 对 RabbitMQ 集群进行负载均衡,确保流量均匀分布。
9.1、出现该问题的原因:
消息堆积往往是生产者的生产速度与消费者的消费速度不匹配导致的。有可能就是消费者消费能力弱,渐渐地消息就积压了,也有可能是因为消息消费失败反复复重试造成的,也有可能是消费端出了问题,导致不消费了或者消费极其慢。比如,消费端每次消费之后要写 mysql,结果 mysql 挂了,消费端 hang 住了不动了,或者消费者本地依赖的一个东西挂了,导致消费者挂了。
所以如果是 bug 则处理 bug;如果是因为本身消费能力较弱,则优化消费逻辑,比如优化前是一条一条消息消费处理的,那么就可以批量处理进行优化。
9.2、临时扩容,快速处理积压的消息:
(1)先修复 consumer 的问题,确保其恢复消费速度,然后将现有的 consumer 都停掉;
(2)临时创建原先 N 倍数量的 queue ,然后写一个临时分发数据的消费者程序,将该程序部署上去消费队列中积压的数据,消费之后不做任何耗时处理,直接均匀轮询写入临时建立好的 N 倍数量的 queue 中;
(3)接着,临时征用 N 倍的机器来部署 consumer,每个 consumer 消费一个临时 queue 的数据
(4)等快速消费完积压数据之后,恢复原先部署架构 ,重新用原先的 consumer 机器消费消息。
这种做法相当于临时将 queue 资源和 consumer 资源扩大 N 倍,以正常 N 倍速度消费。
import pika
import threading
# 消费者回调函数
def consumer_callback(ch, method, properties, body):
try:
# 处理消息的业务逻辑
print(f"处理消息: {body}")
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"处理消息出错: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
# 创建多个消费者线程
def create_consumers(queue_name, num_consumers=5):
consumers = []
for i in range(num_consumers):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=queue_name, on_message_callback=consumer_callback)
thread = threading.Thread(target=channel.start_consuming)
thread.daemon = True
thread.start()
consumers.append((connection, channel, thread))
print(f"消费者 {i+1} 已启动")
return consumers
# 启动5个消费者
consumers = create_consumers('task_queue', 5)
# 保持主线程运行
try:
while True:
pass
except KeyboardInterrupt:
print("程序关闭")
for conn, _, _ in consumers:
conn.close()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
9.3、恢复队列中丢失的数据:
如果使用的是 rabbitMQ,并且设置了过期时间,消息在 queue 里积压超过一定的时间会被 rabbitmq 清理掉,导致数据丢失。这种情况下,实际上队列中没有什么消息挤压,而是丢了大量的消息。所以就不能说增加 consumer 消费积压的数据了,这种情况可以采取 “批量重导” 的方案来进行解决。在流量低峰期,写一个程序,手动去查询丢失的那部分数据,然后将消息重新发送到 mq 里面,把丢失的数据重新补回来。
9.4、MQ 长时间未处理导致 MQ 写满的情况如何处理:
如果消息积压在 MQ 里,并且长时间都没处理掉,导致 MQ 都快写满了,这种情况肯定是临时扩容方案执行太慢,这种时候只好采用 “丢弃+批量重导” 的方式来解决了。首先,临时写个程序,连接到 mq 里面消费数据,消费一个丢弃一个,快速消费掉积压的消息,降低 MQ 的压力,然后在流量低峰期时去手动查询重导丢失的这部分数据。
import pika
import redis
# 连接Redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
# 消费者回调函数 - 溢出处理
def overflow_callback(ch, method, properties, body):
try:
# 尝试处理消息
process_message(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"消息处理失败,存入Redis: {e}")
# 将消息存入Redis队列
redis_client.rpush('overflow_messages', body)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 处理堆积在Redis中的消息
def process_overflow_messages():
while True:
message = redis_client.lpop('overflow_messages')
if not message:
print("Redis中没有溢出消息了")
break
try:
process_message(message)
except Exception as e:
print(f"重新处理失败: {e}")
# 处理失败,重新放回Redis
redis_client.rpush('overflow_messages', message)
# 启动溢出消息处理器
process_overflow_messages()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 10、如何保证消息队列的高可用?✨
RabbitMQ 是基于主从(非分布式)做高可用性的,RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式
10.1、单机模式: 一般没人生产用单机模式
10.2、普通集群模式:
普通集群模式用于提高系统的吞吐量,通过添加节点来线性扩展消息队列的吞吐量。也就是在多台机器上启动多个 RabbitMQ 实例,而队列 queue 的消息只会存放在其中一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。消费的时候,如果连接到了另外的实例,那么该实例就会从数据实际所在的实例上的 queue 拉取消息过来,就是说让集群中多个节点来服务某个 queue 的读写操作
但普通集群模式的缺点在于:无高可用性,queue 所在的节点宕机了,其他实例就无法从那个实例拉取数据;RabbitMQ 内部也会产生大量的数据传输。
10.3、镜像队列集群模式:
镜像队列集群是 RabbitMQ 真正的高可用模式,集群中一般会包含一个主节点 master 和若干个从节点 slave,如果 master 由于某种原因失效,那么按照 slave 加入的时间排序,"资历最老"的 slave 会被提升为新的 master。
镜像队列下,所有的消息只会向 master 发送,再由 master 将命令的执行结果广播给 slave,所以 master 与 slave 节点的状态是相同的。比如,每次写消息到 queue 时,master 会自动将消息同步到各个 slave 实例的 queue;如果消费者与 slave 建立连接并进行订阅消费,其实质上也是从 master 上获取消息,只不过看似是从 slave 上消费而已,比如消费者与 slave 建立了 TCP 连接并执行 Basic.Get 的操作,那么也是由 slave 将 Basic.Get 请求发往 master,再由 master 准备好数据返回给 slave,最后由 slave 投递给消费者。
从上面可以看出,队列的元数据和消息会存在于多个实例上,也就是说每个 RabbitMQ 节点都有这个 queue 的完整镜像,任何一个机器宕机了,其它机器节点还包含了这个 queue 的完整数据,其他消费者都可以到其它节点上去消费数据。
(1)缺点:
① 性能开销大,消息需要同步到所有机器上,导致网络带宽压力和消耗很重
② 非分布式,没有扩展性,如果 queue 的数据量大到这个机器上的容量无法容纳了,此时该方案就会出现问题了
(2)如何开启镜像集群模式呢?
在 RabbitMQ 的管理控制台 Admin 页面下,新增一个镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。
# 11、死信 DL、私信交换器 DLX 与 死信队列 DLQ
在 RabbitMQ 中,死信消息(Dead Letter)必须被路由到专门的死信队列(Dead Letter Queue, DLQ)才能被消费。死信本身并不是一个队列,而是指那些无法被正常处理的消息。
DLX,全称为 Dead Letter Exchange,死信交换器,死信邮箱。也是一个正常的 Exchange,和一般的 Exchange 没有任何区别。能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列出现死信(dead message,就是没有任何消费者消费)的时候,RabbitMQ 就会自动将这条消息重新发布到 Exchange 上去,进而被路由到另一个队列。可以监听这个队列中的消息作相应的处理。
# 死信产生的常见原因
消息被拒绝(basic.reject 或 basic.nack)且 requeue=false(不重回队列)
- 消费者处理消息失败时,可能选择拒绝消息并不重新入队。
消息过期(TTL)
- 通过队列属性或消息属性设置过期时间,过期未消费的消息会成为死信。
队列达到最大长度
- 当队列已满,新消息会被拒绝,无法再添加,之后再收到的消息成为死信。
# 死信处理方案
import pika
# 连接 RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明死信交换器和队列
# 声明死信交换器 (DLX)
channel.exchange_declare(exchange='dlx_exchange', exchange_type='direct')
# 声明死信队列 (DLQ)
channel.queue_declare(queue='dlx_queue')
channel.queue_bind(queue='dlx_queue', exchange='dlx_exchange', routing_key='dlx_key')
# 声明普通队列并配置死信交换器
args = {
'x-dead-letter-exchange': 'dlx_exchange', # 指定死信交换器
'x-dead-letter-routing-key': 'dlx_key', # 指定死信路由键
'x-message-ttl': 60000, # 消息过期时间(毫秒)
'x-max-length': 1000 # 队列最大长度
}
channel.queue_declare(queue='normal_queue', arguments=args)
# 生产者发送消息
channel.basic_publish(
exchange='',
routing_key='normal_queue',
body='Hello, World!',
properties=pika.BasicProperties(delivery_mode=2) # 持久化消息
)
# 消费者拒绝消息(模拟异常情况)
def callback(ch, method, properties, body):
print(f"Received: {body}")
# 拒绝消息并不重新入队,消息将进入死信队列
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
channel.basic_consume(queue='normal_queue', on_message_callback=callback, auto_ack=False)
channel.start_consuming()
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 12、其他
- 交换器无法根据自身类型和路由键找到符合条件队列时,有哪些处理方式:设置 mandatory = true,代表返回消息给生产者;设置 mandatory = false,代表直接丢弃
- 消费者得到消息队列中的数据的方式:push 和 pull
- 消息基于什么传输:由于 TCP 连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。所以 RabbitMQ 使用信道 channel 的方式来传输数据,信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接上的信道数量没有限制,每个信道共享同一个 TCP 连接,但是可以并行处理多个信道上的消息,这样可以提高消息处理的效率。
- 延迟队列:存储对应的延迟消息,当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
在 RabbitMQ 中并不存在延迟队列,但我们可以通过设置消息的过期时间和死信队列来实现延迟队列,消费者监听死信交换器绑定的队列,而不要监听消息发送的队列。
延迟队列插件
设置x-delayed-message
,具体见
python - How could I send a delayed message in rabbitmq using the rabbitmq-delayed-message-exchange plugin? - Stack Overflow (opens new window) 和 Delayed Message Exchange - LavinMQ (opens new window)
(6)优先级队列:优先级高的队列会先被消费,可以通过 x-max-priority 参数来实现。但是当消费速度大于生产速度且 Broker 没有堆积的情况下,优先级显得没有意义。
(7)RabbitMQ 要求集群中至少有一个磁盘节点,其他节点可以是内存节点,当节点加入或离开集群时,必须要将该变更通知到至少一个磁盘节点。如果只有一个磁盘节点,刚好又是该节点崩溃了,那么集群可以继续路由消息,但不能创建队列、创建交换器、创建绑定、添加用户、更改权限、添加或删除集群节点。也就是说集群中的唯一磁盘节点崩溃的话,集群仍然可以运行,但直到该节点恢复前,无法更改任何东西。