RocketMQ 分布式消息队列详解
2026/1/14大约 14 分钟
RocketMQ 分布式消息队列详解
RocketMQ 是阿里巴巴开源的分布式消息中间件,2017 年成为 Apache 顶级项目。它具有高吞吐、低延迟、高可用的特点,广泛应用于订单、交易、流计算、消息推送、日志处理等场景。
一、消息队列概述
什么是消息队列?
消息队列(Message Queue,MQ)是一种应用程序间的通信方式。生产者将消息发送到队列,消费者从队列获取消息进行处理,实现了生产者和消费者的解耦。
为什么需要消息队列?
传统同步调用的问题:
- 响应时间长(各服务耗时累加)
- 服务耦合度高
- 任一服务故障导致整体失败
- 无法应对流量峰值
消息队列的核心作用
| 作用 | 说明 | 场景示例 |
|---|---|---|
| 异步解耦 | 服务间不直接调用,通过 MQ 通信 | 订单创建后异步发短信、邮件 |
| 流量削峰 | 高峰期消息堆积,平稳消费 | 秒杀活动、双11大促 |
| 数据分发 | 一份数据分发给多个消费者 | 数据同步到缓存、ES、数仓 |
| 最终一致性 | 通过消息保证分布式事务 | 跨服务的数据一致性 |
二、RocketMQ 核心概念
核心组件
| 组件 | 说明 | 类比 |
|---|---|---|
| NameServer | 注册中心,管理 Broker 路由信息 | 类似 ZooKeeper |
| Broker | 消息存储服务器,负责消息的存储和转发 | 邮局 |
| Producer | 消息生产者,发送消息到 Broker | 寄信人 |
| Consumer | 消息消费者,从 Broker 拉取消息消费 | 收信人 |
消息模型
| 概念 | 说明 |
|---|---|
| Topic | 消息主题,一类消息的集合,逻辑概念 |
| Tag | 消息标签,用于同一 Topic 下的消息过滤 |
| Message Queue | 消息队列,Topic 的物理分区,提高并发 |
| Consumer Group | 消费者组,同组消费者负载均衡消费 |
| Producer Group | 生产者组,用于事务消息的回查 |
三、架构设计
整体架构
各组件详解
NameServer
- 无状态设计:各节点独立,不互相通信
- 功能:Broker 注册、路由信息管理、心跳检测
- 高可用:部署多个节点,任一节点可用即可
Broker
- 消息存储:CommitLog(顺序写)+ ConsumeQueue(索引)
- 主从架构:Master 负责读写,Slave 负责备份
- 刷盘策略:同步刷盘(可靠)/ 异步刷盘(高性能)
消息发送流程
消息消费流程
四、环境搭建
Docker 快速部署(推荐)
# docker-compose.yml
version: '3.8'
services:
namesrv:
image: apache/rocketmq:5.1.0
container_name: rmqnamesrv
ports:
- 9876:9876
command: sh mqnamesrv
networks:
- rocketmq
broker:
image: apache/rocketmq:5.1.0
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
environment:
- NAMESRV_ADDR=namesrv:9876
command: sh mqbroker -n namesrv:9876
depends_on:
- namesrv
networks:
- rocketmq
dashboard:
image: apacherocketmq/rocketmq-dashboard:latest
container_name: rmqdashboard
ports:
- 8080:8080
environment:
- JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876
depends_on:
- namesrv
networks:
- rocketmq
networks:
rocketmq:
driver: bridge# 启动
docker-compose up -d
# 查看状态
docker-compose ps
# 访问控制台
# http://localhost:8080Linux 原生部署
# 1. 下载
wget https://archive.apache.org/dist/rocketmq/5.1.0/rocketmq-all-5.1.0-bin-release.zip
unzip rocketmq-all-5.1.0-bin-release.zip
cd rocketmq-all-5.1.0-bin-release
# 2. 启动 NameServer
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
# 3. 启动 Broker
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
# 4. 测试发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
# 5. 测试消费消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer五、Java 客户端开发
Maven 依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.1.0</version>
</dependency>生产者示例
public class ProducerExample {
public static void main(String[] args) throws Exception {
// 1. 创建生产者,指定生产者组
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 2. 设置 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
// 3. 设置发送超时时间
producer.setSendMsgTimeout(3000);
// 4. 设置发送失败重试次数
producer.setRetryTimesWhenSendFailed(3);
// 5. 启动生产者
producer.start();
// 6. 发送消息
for (int i = 0; i < 10; i++) {
Message msg = new Message(
"TopicTest", // Topic
"TagA", // Tag
("Hello RocketMQ " + i).getBytes() // Body
);
// 设置消息 Key,用于查询
msg.setKeys("OrderId_" + i);
SendResult result = producer.send(msg);
System.out.printf("发送结果: %s%n", result);
}
// 7. 关闭生产者
producer.shutdown();
}
}消费者示例
public class ConsumerExample {
public static void main(String[] args) throws Exception {
// 1. 创建消费者,指定消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 2. 设置 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
// 3. 设置消费起始位置
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 4. 订阅 Topic,* 表示订阅所有 Tag
consumer.subscribe("TopicTest", "*");
// 5. 设置消费线程数
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(20);
// 6. 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("收到消息: Topic=%s, Tag=%s, Body=%s%n",
msg.getTopic(), msg.getTags(), new String(msg.getBody()));
}
// 返回消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 7. 启动消费者
consumer.start();
System.out.println("消费者已启动...");
}
}六、消息类型详解
消息发送方式对比
| 发送方式 | 特点 | 适用场景 | 可靠性 |
|---|---|---|---|
| 同步发送 | 等待响应后返回 | 重要通知、交易消息 | 高 |
| 异步发送 | 发送后立即返回,回调处理结果 | 响应时间敏感场景 | 高 |
| 单向发送 | 只发送不等待响应 | 日志收集、监控数据 | 低 |
1. 同步消息
// 同步发送,等待 Broker 响应
SendResult result = producer.send(msg);
System.out.println("消息ID: " + result.getMsgId());
System.out.println("发送状态: " + result.getSendStatus());2. 异步消息
// 异步发送,通过回调处理结果
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
System.out.println("发送成功: " + result.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.println("发送失败: " + e.getMessage());
// 可以在这里进行重试或告警
}
});3. 单向消息
// 单向发送,不等待响应,适合日志收集
producer.sendOneway(msg);4. 延迟消息
消息发送后不会立即投递,而是等待指定时间后才投递给消费者。
延迟级别对照表:
| Level | 延迟时间 | Level | 延迟时间 |
|---|---|---|---|
| 1 | 1s | 10 | 6m |
| 2 | 5s | 11 | 7m |
| 3 | 10s | 12 | 8m |
| 4 | 30s | 13 | 9m |
| 5 | 1m | 14 | 10m |
| 6 | 2m | 15 | 20m |
| 7 | 3m | 16 | 30m |
| 8 | 4m | 17 | 1h |
| 9 | 5m | 18 | 2h |
Message msg = new Message("TopicTest", "订单超时取消".getBytes());
// 设置延迟级别 4,即 30 秒后投递
msg.setDelayTimeLevel(4);
producer.send(msg);应用场景:
- 订单超时未支付自动取消
- 定时任务触发
- 延迟重试
5. 顺序消息
保证同一业务的消息按顺序消费。
原理:同一业务 ID 的消息发送到同一个 Queue,消费者单线程顺序消费该 Queue。
// 发送顺序消息
String orderId = "ORDER_001";
String[] steps = {"创建订单", "扣减库存", "支付成功", "发货"};
for (String step : steps) {
Message msg = new Message("OrderTopic", step.getBytes());
// 通过 MessageQueueSelector 保证同一订单发到同一 Queue
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String orderId = (String) arg;
int index = Math.abs(orderId.hashCode()) % mqs.size();
return mqs.get(index);
}
}, orderId);
}
// 消费顺序消息(使用 MessageListenerOrderly)
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("顺序消费: " + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});6. 事务消息
解决分布式事务问题,保证本地事务与消息发送的最终一致性。
// 创建事务生产者
TransactionMQProducer producer = new TransactionMQProducer("tx_producer_group");
producer.setNamesrvAddr("localhost:9876");
// 设置事务监听器
producer.setTransactionListener(new TransactionListener() {
// 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地业务逻辑,如:扣减库存、更新订单状态
String orderId = msg.getKeys();
orderService.createOrder(orderId);
// 本地事务成功,提交消息
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 本地事务失败,回滚消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 事务回查(Broker 未收到确认时调用)
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String orderId = msg.getKeys();
// 查询本地事务状态
Order order = orderService.getOrder(orderId);
if (order != null && order.getStatus() == OrderStatus.CREATED) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (order == null) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
// 状态未知,继续等待回查
return LocalTransactionState.UNKNOW;
}
});
producer.start();
// 发送事务消息
Message msg = new Message("OrderTopic", "创建订单".getBytes());
msg.setKeys("ORDER_001");
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);七、消费模式
集群消费 vs 广播消费
| 模式 | 特点 | 适用场景 |
|---|---|---|
| 集群消费 | 同组消费者负载均衡,消息只消费一次 | 业务处理(默认) |
| 广播消费 | 每个消费者都收到全量消息 | 缓存刷新、配置更新 |
// 集群消费(默认)
consumer.setMessageModel(MessageModel.CLUSTERING);
// 广播消费
consumer.setMessageModel(MessageModel.BROADCASTING);Push vs Pull 模式
| 模式 | 原理 | 特点 |
|---|---|---|
| Push | 长轮询,Broker 有消息时推送 | 实时性高,使用简单 |
| Pull | 消费者主动拉取 | 可控性强,适合批量处理 |
// Push 模式(推荐)
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("group");
// Pull 模式
DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer("group");
pullConsumer.subscribe("TopicTest", "*");
pullConsumer.start();
while (true) {
List<MessageExt> msgs = pullConsumer.poll();
for (MessageExt msg : msgs) {
// 处理消息
}
}八、消息重试与死信队列
消费重试机制
重试时间间隔:10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
// 业务处理
processMessage(msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 返回 RECONSUME_LATER 触发重试
if (msg.getReconsumeTimes() < 3) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} else {
// 超过3次,记录日志,人工处理
log.error("消息消费失败: {}", msg.getMsgId());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});死信队列(DLQ)
超过最大重试次数的消息会进入死信队列,Topic 名称为 %DLQ%ConsumerGroup。
// 订阅死信队列,进行人工处理
DefaultMQPushConsumer dlqConsumer = new DefaultMQPushConsumer("dlq_consumer_group");
dlqConsumer.subscribe("%DLQ%consumer_group", "*");
dlqConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
// 记录到数据库,人工处理
log.warn("死信消息: {}", new String(msg.getBody()));
saveToDatabase(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});九、Spring Boot 集成
依赖配置
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>application.yml
rocketmq:
name-server: localhost:9876
producer:
group: my-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 3生产者
@Service
public class OrderMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 同步发送
public void sendOrderMessage(Order order) {
rocketMQTemplate.syncSend("order-topic", order);
}
// 异步发送
public void sendOrderMessageAsync(Order order) {
rocketMQTemplate.asyncSend("order-topic", order, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
log.info("发送成功: {}", result.getMsgId());
}
@Override
public void onException(Throwable e) {
log.error("发送失败", e);
}
});
}
// 发送延迟消息
public void sendDelayMessage(Order order, int delayLevel) {
rocketMQTemplate.syncSend("order-topic",
MessageBuilder.withPayload(order).build(),
3000, // 超时时间
delayLevel // 延迟级别
);
}
// 发送顺序消息
public void sendOrderlyMessage(Order order) {
rocketMQTemplate.syncSendOrderly("order-topic", order, order.getOrderId());
}
}消费者
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group",
consumeMode = ConsumeMode.CONCURRENTLY, // 并发消费
messageModel = MessageModel.CLUSTERING // 集群消费
)
public class OrderMessageConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("收到订单消息: {}", order);
// 业务处理
processOrder(order);
}
}
// 顺序消费
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-orderly-consumer-group",
consumeMode = ConsumeMode.ORDERLY // 顺序消费
)
public class OrderOrderlyConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("顺序消费订单: {}", order);
}
}十、高可用架构
主从同步模式
| 模式 | 说明 | 可靠性 | 性能 |
|---|---|---|---|
| 同步双写 | Master 和 Slave 都写成功才返回 | 高 | 低 |
| 异步复制 | Master 写成功即返回,异步同步到 Slave | 中 | 高 |
Broker 配置
# broker.conf
# Broker 名称
brokerName=broker-a
# 0-Master, >0-Slave
brokerId=0
# NameServer 地址
namesrvAddr=192.168.1.100:9876;192.168.1.101:9876
# 存储路径
storePathRootDir=/data/rocketmq/store
# 刷盘方式:ASYNC_FLUSH(异步)/ SYNC_FLUSH(同步)
flushDiskType=ASYNC_FLUSH
# 主从同步方式:ASYNC_MASTER(异步)/ SYNC_MASTER(同步)
brokerRole=SYNC_MASTER多 Master 多 Slave 集群
十一、消息存储原理
存储结构
- CommitLog:所有消息顺序写入,保证写入性能
- ConsumeQueue:消息消费队列,存储消息在 CommitLog 中的偏移量
- IndexFile:消息索引文件,支持按 Key 或时间查询
零拷贝技术
RocketMQ 使用 mmap 内存映射技术,减少数据拷贝次数,提升读写性能。
十二、最佳实践
生产者最佳实践
- 合理设置 Producer Group:同一类消息使用同一个 Group
- 设置发送超时和重试:
producer.setSendMsgTimeout(3000); producer.setRetryTimesWhenSendFailed(3); - 使用 Keys 便于问题排查:
msg.setKeys("OrderId_123"); - 批量发送提升吞吐:
List<Message> msgs = new ArrayList<>(); // 添加消息... producer.send(msgs);
消费者最佳实践
幂等处理:消息可能重复投递,业务需保证幂等
// 使用唯一键去重 if (redis.setnx("msg:" + msg.getMsgId(), "1")) { processMessage(msg); }合理设置消费线程数:
consumer.setConsumeThreadMin(20); consumer.setConsumeThreadMax(64);批量消费提升效率:
consumer.setConsumeMessageBatchMaxSize(10);消费失败合理处理:
if (msg.getReconsumeTimes() > 3) { // 记录日志,人工处理 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } return ConsumeConcurrentlyStatus.RECONSUME_LATER;
Topic 设计原则
| 原则 | 说明 |
|---|---|
| 按业务划分 | 订单、支付、物流使用不同 Topic |
| 使用 Tag 过滤 | 同一 Topic 下用 Tag 区分子类型 |
| 避免 Topic 过多 | 单集群 Topic 数量建议 < 1000 |
| Queue 数量规划 | 根据消费者数量和吞吐量设置 |
十三、常见问题
消息丢失怎么办?
消息重复怎么办?
- 业务幂等:数据库唯一键、Redis 去重
- 消息去重表:记录已消费的 MsgId
消息堆积怎么办?
- 增加消费者数量(不超过 Queue 数量)
- 增加消费线程数
- 批量消费
- 临时扩容 Queue 数量