RabbitMQ知识学习

基本介绍

消息队列,跨进程的通讯机制,上下游逻辑解耦+物理解耦的消息通信服务

消息队列能做什么

  • 流量削峰
  • 应用解耦
  • 异步处理

消息队列的分类

  • ActiveMQ:
    • 优点:单机吞吐量万级,时效性ms级,可用性高,基于从主架构实现高可用,消息可靠性较低概率丢失数据
    • 缺点:官方社区不怎么维护,高吞吐量场景较少使用
  • Kafka:为大数据而生的消息中间件
    • 优点:性能卓越,单机写入TPS约在百万条/秒,吞吐量高,分布式,少数机器宕机不会丢失数据
    • 缺点:单机超过64个队列/分区,负载会发生明显飙高现象;消息失败不支持重试;社区更新较慢
  • RocketMQ:
    • 优点:单机吞吐量十万级,分布式架构,消息可以做到0丢失,扩展性好,支持10亿级别的消息堆积
    • 缺点:支持的客户端语言不多,社区活跃度一般
  • RabbitMQ:
    • 优点:由于erlang语言的高并发特性,性能较好,吞吐量万级,支持多种语言,社区活跃度高
    • 缺点:学习成本较高

如何选择

  • kafka适合产生大量数据的互联网服务的数据收集业务,若有日志采集功能,肯定首选kafka
  • RocketMQ天生为金融互联网领域而生,在阿里双十一已经经历过多次考验
  • RabbitMQ适合当数据量没有那么大,或者是中小型公司

RabbitMQ核心概念

  • 生产者
  • 交换机
  • 队列
  • 消费者

基本工作模式

各模式的具体说明可以查看官网文档

工作原理

Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker

Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客 户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发 消息到 queue 中去。常用的类型有:direct (point-to-point),topic (publish-subscribe) and fanout (multicast)

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保 存到 exchange 中的查询表中,用于 message 的分发依据

Hello World

此为简单模式

基本实现示例

添加依赖

1
2
3
4
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>

生产者示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Producer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接参数
connectionFactory.setHost("192.168.0.123");
connectionFactory.setUsername("rabbitmq");
connectionFactory.setPassword("123456");
// 创建连接和信道
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();) {
/**
* 声明队列
* 参数说明:
* 队列名称
* 队列里面的消息是否持久化,默认内存
* 队列是否只允许多个消费者消费,即是否消息共享
* 是否自动删除,最后一个消费者断开连接后删除
* 其他参数
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello rabbitmq";
/**
* 发布消息
* 参数说明:
* 交换机名称
* 路由key
* 其他参数
* 消息体
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
}

消费者示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Consumer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接参数
connectionFactory.setHost("192.168.0.123");
connectionFactory.setUsername("rabbitmq");
connectionFactory.setPassword("123456");
// 创建连接和信道
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();) {
/**
* 消费消息
* 参数说明:
* 消费的队列名称
* 消费成功后是否自动应答
* 消息传递时的回调
* 取消消费者时的回调
*/
channel.basicConsume(QUEUE_NAME, true,
(consumerTag, message) -> {
System.out.println(new String(message.getBody()));
},
consumerTag -> {
System.out.println(consumerTag);
});
} catch (Exception e) {
e.printStackTrace();
}
}
}

整合Spring Boot示例

引入依赖

1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

修改配置文件

1
2
3
4
5
6
7
8
9
spring:
application:
name: rabbitmq-service
rabbitmq:
host: 192.168.0.123
port: 5672
username: rabbitmq
password: 123456

增加配置类

1
2
3
4
5
6
7
8
@Configuration
public class RabbitConfig {
public static final String DEFAULT_QUEUE_NAME = "default";
@Bean
public Queue defaultQueue() {
return new Queue(DEFAULT_QUEUE_NAME);
}
}

生产者

1
2
3
4
5
6
7
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("helloWorld")
public String helloWorld() {
rabbitTemplate.convertAndSend(RabbitConfig.DEFAULT_QUEUE_NAME, "hello rabbitmq");
return "ok";
}

消费者

1
2
3
4
5
6
7
8
9
@Component
@RabbitListener(queues = RabbitConfig.DEFAULT_QUEUE_NAME)
@Slf4j
public class Consumer {
@RabbitHandler
public void consume(String message) {
log.info(message);
}
}

Work Queues

工作队列模式,又叫任务队列模式,为了解决多个消费者有序执行密集型的资源任务

公平分发

生产者

1
2
3
4
5
6
7
8
9
10
11
12
@GetMapping("work")
public String work() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(RabbitConfig.WORK_QUEUE_NAME, RandomUtil.randomString(8));
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return "ok";
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
@Component
@Slf4j
public class Worker {
@RabbitListener(queues = RabbitConfig.WORK_QUEUE_NAME)
public void worker1(String msg, Channel channel, Message message) {
log.info("消费者01-接收到消息:" + msg);
}
@RabbitListener(queues = RabbitConfig.WORK_QUEUE_NAME)
public void worker2(String msg, Channel channel, Message message) {
log.info("消费者02-接收到消息:" + msg);
}
}

结果:

从结果上看每个消费者消费消息的数据是相等的

消息应答

消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡

所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用

手动应答
  • Channel.basicAck(long deliveryTag, boolean multiple):肯定确认应答
    • multiple:表示批量应答,可以减少网络拥堵
  • Channel.basicReject(long deliveryTag, boolean requeue):否定确认应答
    • deliveryTag:拒绝对应的消息
    • requeue:是否重新入队,false时丢弃或进入死信队列
  • Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):表示己拒绝处理该消息,可以将其丢弃
    • multiple:是否多消息
    • requeue:拒绝该消费者先前接收未ACK的所有消息
  • Channel.basicRecover(boolean requeue):恢复消息到队列
    • requeue:表示消息入队后是否投递给其他消费者,true表示投递给其他消费者,false表示重新投递给自己

手动应答示例

配置文件中修改应答模式为手动应答

1
2
3
4
5
6
7
8
9
10
11
spring:
application:
name: rabbitmq-service
rabbitmq:
host: 192.168.0.123
port: 5672
username: rabbitmq
password: 123456
listener:
simple:
acknowledge-mode: manual # 应答模式为手动应答

消费者在上面代码的基础上修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RabbitListener(queues = RabbitConfig.WORK_QUEUE_NAME)
public void worker1(String msg, Channel channel, Message message) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("消费者01-接收到消息:" + msg);
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}

持久化

队列持久化就是将durable属性设置为true

消息持久化在Spring RabbitMQ中默认就是持久化的

调用convertAndSend发送消息后,在RabbitTemplate中会用convertMessageIfNecessary方法将object转换为MessageProperties对象,而MessageProperties类中消息发送模式默认就是持久化的

1
public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;

不公平分发

让处理速度快的消费者多干活

通过修改消费者的信道的basicQos为1,即为不公平分发

预取值

消费者信道上肯定不止只有一个消息,因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题

这个时候就可以通过使用 channel.basicQos() 方法设置预取值,该值定义通道上允许的未确认消息的最大数量,一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认

Publisher Confirms

发布确认模式

设置队列持久化和消息持久化后还不能保证消息完全不丢失,必须加上发布确认才能保证生产者发布的消息绝对不丢失

spring中需要修改配置文件开启

1
2
3
4
5
6
7
8
9
10
11
12
spring:
application:
name: rabbitmq-service
rabbitmq:
host: 192.168.0.123
port: 5672
username: rabbitmq
password: 123456
listener:
simple:
acknowledge-mode: manual # 应答模式为手动应答
publisher-confirm-type: correlated # 设置发布确认类型
  • SIMPLE:同步确认,单次、批量确认通过invoke调用
  • CORRELATED:发布消息成功到交换器后会触发回调方法
  • NONE:禁用发布确认模式,是默认值

发布确认的策略

单个确认发布

一种同步确认发布的方式,也就是发一个消息以后只有他被确认发布,后续的才能继续发布

最大的缺点是:发布速度特别慢

1
2
3
4
5
6
7
8
9
10
TimeInterval timer = DateUtil.timer();
// 单个确认发布
messageInfos.forEach(messageInfo -> {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.invoke(operations -> {
rabbitTemplate.convertAndSend(RabbitConfig.PUBLISHER_CONFIRM_QUEUE_NAME, messageInfo, correlationData);
return rabbitTemplate.waitForConfirms(5000);
});
});
log.info("单个确认发布耗时 {} ms", timer.intervalRestart()); // 单个确认发布耗时 4446 ms
批量确认发布

与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息

仍然是同步的,也一样阻塞消息的发布

1
2
3
4
5
6
7
8
9
10
11
// 批量确认发布
for (int i = 0; i < messageInfos.size(); i++) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(RabbitConfig.PUBLISHER_CONFIRM_QUEUE_NAME, messageInfos.get(0), correlationData);
if ((i % messageInfos.size() == 0 || i == messageInfos.size() - 1) && i != 0) {
rabbitTemplate.invoke(operations -> {
return rabbitTemplate.waitForConfirms(5000);
});
}
}
log.info("批量确认发布 {} ms", timer.intervalRestart()); // 批量确认发布 1038 ms
异步确认发布

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功

1
2
3
4
5
6
7
8
9
10
11
12
// 设置异步确认回调函数
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.warn("id={} 消息未确认,cause={}", correlationData.getId(), cause);
// todo 投递异常处理
}
});
messageInfos.forEach(messageInfo -> {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(RabbitConfig.PUBLISHER_CONFIRM_QUEUE_NAME, messageInfo, correlationData);
});
log.info("异步确认发布 {} ms", timer.intervalRestart()); // 异步确认发布 710 ms

交换机

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列

相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

总共有以下类型:

  • 直接(direct):处理路由键;需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配
  • 主题(topic):将路由键和某模式进行匹配;此时队列需要绑定要一个模式上
  • 标题(headers):不处理路由键;而是根据发送的消息内容中的headers属性进行匹配
  • 扇出(fanout):不处理路由键;只需要简单的将队列绑定到交换机上

Fanout交换机示例

Publish/Subscribe 发布、订阅模式使用扇出交换机实现

配置类中注册交换机和对应的队列,并进行绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static class FanoutExchangeConfig {
@Bean(name = "fanoutExchange")
public Exchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean(name = "fanoutQueue1")
public Queue fanoutQueue1() {
return new Queue(FANOUT_QUEUE_1);
}
@Bean(name = "fanoutQueue2")
public Queue fanoutQueue2() {
return new Queue(FANOUT_QUEUE_2);
}
@Bean
public Binding fanoutQueue1Binding(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,
@Qualifier("fanoutQueue1") Queue fanoutQueue1) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
public Binding fanoutQueue2Binding(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,
@Qualifier("fanoutQueue2") Queue fanoutQueue2) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}

通过交换机发送

1
2
3
4
5
6
7
@GetMapping("fanout")
public String fanout() {
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE, null, RandomUtil.randomString(8));
}
return "ok";
}

两个消费者监听两个队列能获取到一样的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@RabbitListener(queues = RabbitConfig.FANOUT_QUEUE_1)
public void worker4(MessageInfo msg, Channel channel, Message message) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("消费者04-接收到消息:{}", msg);
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
@RabbitListener(queues = RabbitConfig.FANOUT_QUEUE_2)
public void worker5(MessageInfo msg, Channel channel, Message message) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("消费者05-接收到消息:{}", msg);
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}

Direct交换机示例

Routing 路由模式使用直接交换机实现

注册交换机和队列,并进行绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static class DirectExchangeConfig {
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}
@Bean(name = "directQueue1")
public Queue directQueue1() {
return new Queue(DIRECT_QUEUE_1);
}
@Bean(name = "directQueue2")
public Queue directQueue2() {
return new Queue(DIRECT_QUEUE_2);
}
@Bean
public Binding directQueue1Binding(DirectExchange directExchange,
@Qualifier("directQueue1") Queue directQueue1) {
return BindingBuilder.bind(directQueue1).to(directExchange).with(DIRECT_QUEUE_1);
}
@Bean
public Binding directQueue2Binding(DirectExchange directExchange,
@Qualifier("directQueue2") Queue directQueue2) {
return BindingBuilder.bind(directQueue2).to(directExchange).with(DIRECT_QUEUE_2);
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
@GetMapping("direct")
public String direct() {
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, RabbitConfig.DIRECT_QUEUE_2, RandomUtil.randomString(8));
} else {
rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, RabbitConfig.DIRECT_QUEUE_1, RandomUtil.randomString(8));
}
}
return "ok";
}

消费者代码没什么区别就不放了

Topic交换机示例

topic的路由键不能随便写,必须满足一定的规则,它必须是一个单词列表,多个单词用.隔开,单词列表不能超过255个字节,如"app.log.info"、“*.log.*”、“#.log.*”

其中*可以代替任意一个单词

#可以代替零个或多个单词

Topics 主题模式使用主题交换机实现

增加交换机和队列注册,并进行绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public static class TopicExchangeConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Queue topicQueue1() {
return new AnonymousQueue();
}
@Bean
public Queue topicQueue2() {
return new AnonymousQueue();
}
@Bean
public Queue topicQueue3() {
return new AnonymousQueue();
}
@Bean
public Binding topicBinding1(TopicExchange topicExchange,
Queue topicQueue1) {
return BindingBuilder.bind(topicQueue1)
.to(topicExchange)
.with("*.orange.*");
}
@Bean
public Binding topicBinding2(TopicExchange topicExchange,
Queue topicQueue2) {
return BindingBuilder.bind(topicQueue2)
.to(topicExchange)
.with("*.*.rabbit");
}
@Bean
public Binding topicBinding3(TopicExchange topicExchange,
Queue topicQueue3) {
return BindingBuilder.bind(topicQueue3)
.to(topicExchange)
.with("lazy.#");
}
}

生产者

1
2
3
4
5
6
7
8
9
@GetMapping("topic")
public String topic() {
final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox",
"lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};
for (String key : keys) {
rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, key, RandomUtil.randomString(8));
}
return "ok";
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@RabbitListener(queues = "#{topicQueue1.name}")
public void worker8(MessageInfo msg, Channel channel, Message message) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("消费者08-接收到消息:{}", msg);
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
@RabbitListener(queues = "#{topicQueue2.name}")
public void worker9(MessageInfo msg, Channel channel, Message message) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("消费者09-接收到消息:{}", msg);
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
@RabbitListener(queues = "#{topicQueue3.name}")
public void worker10(MessageInfo msg, Channel channel, Message message) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("消费者10-接收到消息:{}", msg);
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}

死信队列

由于某些原因队列中的一些消息无法被消费,这些消息被称为死信,存放死信的队列被称为死信队列

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中

死信的来源

  • 消息TTL过期
  • 队列达到最大长度
  • 消息被拒绝:basic.reject、basic.nack方法中requeue=false

基础架构

示例

注册交换机和队列,并为队列声明死信交换机及绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public static class DeadQueueConfig {
@Bean
public DirectExchange normalExchange() {
return new DirectExchange(NORMAL_EXCHANGE);
}
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(DEAD_EXCHANGE);
}
@Bean
public Queue normalQueue() {
Map<String, Object> args = new HashMap<>(4);
// 声明死信交换机
args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 声明死信路由键
args.put("x-dead-letter-routing-key", DEAD_QUEUE);
// 声明队列长度
args.put("x-max-length", 5);
// 声明队列TTL
args.put("x-message-ttl", TimeUnit.SECONDS.toMillis(10));
return QueueBuilder.durable(NORMAL_QUEUE).withArguments(args).build();
}
@Bean
public Queue deadQueue() {
return new Queue(DEAD_QUEUE);
}
@Bean
public Binding normalBinding(DirectExchange normalExchange,
Queue normalQueue) {
return BindingBuilder.bind(normalQueue)
.to(normalExchange).with(NORMAL_QUEUE);
}
@Bean
public Binding deadBinding(DirectExchange deadExchange,
Queue deadQueue) {
return BindingBuilder.bind(deadQueue)
.to(deadExchange).with(DEAD_QUEUE);
}
}

其余队列声明参数:

参数名 作用
x-message-ttl 发送到队列的消息在丢弃之前可以存活多长时间(毫秒)
x-max-length 队列最大长度
x-expires 队列在被自动删除(毫秒)之前可以使用多长时间
x-max-length-bytes 队列在开始从头部删除之前可以包含的就绪消息的总体大小
x-dead-letter-exchange 设置队列溢出行为。这决定了在达到队列的最大长度时消息会发生什么。有效值为drop-head或reject-publish。交换的可选名称,如果消息被拒绝或过期,将重新发布这些名称
x-dead-letter-routing-key 可选的替换路由密钥,用于在消息以字母为单位时使用。如果未设置,将使用消息的原始路由密钥
x-max-priority 队列支持的最大优先级数;如果未设置,队列将不支持消息优先级
x-queue-mode 将队列设置为延迟模式,在磁盘上保留尽可能多的消息以减少内存使用;如果未设置,队列将保留内存缓存以尽快传递消息
x-queue-master-locator 将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则
x-overflow 队列达到最大长度时,可选模式包括: drop-head, reject-publish 和 reject-publish-dlx.

生产者

1
2
3
4
5
6
7
@GetMapping("deadQueue")
public String deadQueue() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(RabbitConfig.NORMAL_EXCHANGE, RabbitConfig.NORMAL_QUEUE, RandomUtil.randomString(8));
}
return "ok";
}

延迟队列

延迟队列内部是有序的,最重要的特性就是延时,延时队列达到的目的是在指定时间之后进行消费操作

其实就是死信队列中消息TTL过期那种情况

使用场景

  • 订单在十分钟之内未支付则自动取消
  • 新创建的店铺,若在十天内都没有上传过商品,则自动发送消息提醒
  • 用户注册成功后,若三天内没有登录则进行短信提醒
  • 用户发起退款,若三天内没有处理则通知相关运营人员
  • 预定会议后,在预定的时间点前十分钟提醒各参会人员

示例

注册交换机和队列,并绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static class DelayQueueConfig {
@Bean
public DirectExchange delayNormalExchange() {
return new DirectExchange(DELAY_NORMAL_EXCHANGE);
}
@Bean
public DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE);
}
@Bean
public Queue delayNormalQueue() {
Map<String, Object> args = new HashMap<>(4);
// 声明死信交换机
args.put("x-dead-letter-exchange", DELAY_EXCHANGE);
// 声明死信路由键
args.put("x-dead-letter-routing-key", DELAY_QUEUE);
// 声明队列TTL 延迟40s
args.put("x-message-ttl", TimeUnit.SECONDS.toMillis(40));
return QueueBuilder.durable(DELAY_NORMAL_QUEUE).withArguments(args).build();
}
@Bean
public Queue delayQueue() {
return new Queue(DELAY_QUEUE);
}
@Bean
public Binding delayNormalBinding(DirectExchange delayNormalExchange,
Queue delayNormalQueue) {
return BindingBuilder.bind(delayNormalQueue)
.to(delayNormalExchange).with(DELAY_NORMAL_QUEUE);
}
@Bean
public Binding delayBinding(DirectExchange delayExchange,
Queue delayQueue) {
return BindingBuilder.bind(delayQueue)
.to(delayExchange).with(DELAY_QUEUE);
}
}

生产者

1
2
3
4
5
6
7
@GetMapping("delayQueue")
public String delayQueue() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(RabbitConfig.DELAY_NORMAL_EXCHANGE, RabbitConfig.DELAY_NORMAL_QUEUE, RandomUtil.randomString(8));
}
return "ok";
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RabbitListener(queues = RabbitConfig.DELAY_QUEUE)
public void worker11(MessageInfo msg, Channel channel, Message message) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("消费者11-接收到消息:{}", msg);
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}

延迟队列优化

上面代码实现的延迟队列的延迟时间是固定的,当需要另一个延迟时间时就又需要再声明一个队列,很麻烦

可以优化为队列不声明延迟时间,延迟时间由生产者确定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Bean
public Queue delayDynamicNormalQueue() {
Map<String, Object> args = new HashMap<>(4);
// 声明死信交换机
args.put("x-dead-letter-exchange", DELAY_EXCHANGE);
// 声明死信路由键
args.put("x-dead-letter-routing-key", DELAY_QUEUE);
return QueueBuilder.durable(DELAY_DYNAMIC_NORMAL_QUEUE).withArguments(args).build();
}

@Bean
public Binding delayDynamicNormalBinding(DirectExchange delayNormalExchange,
Queue delayDynamicNormalQueue) {
return BindingBuilder.bind(delayDynamicNormalQueue)
.to(delayNormalExchange).with(DELAY_DYNAMIC_NORMAL_QUEUE);
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
@GetMapping("delayDynamicQueue")
public String delayDynamicQueue() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(RabbitConfig.DELAY_NORMAL_EXCHANGE, RabbitConfig.DELAY_DYNAMIC_NORMAL_QUEUE,
RandomUtil.randomString(8), message -> {
// 设置延迟时间
message.getMessageProperties().setExpiration("10000");
return message;
});
}
return "ok";
}

存在问题

若生产者发送的消息设置不同的延迟时间,依旧是按顺序消费

如第一个消息的延时时间很长,第二个消息的延时时间很短,第二个消息并不会优先得到执行

因为RabbitMQ执行检查第一个消息是否过期

使用插件实现延迟队列

基于上面的问题,可以通过安装插件来实现,延迟队列插件是rabbitmq_delayed_message_exchange

rabbitmq/rabbitmq-delayed-message-exchange:下载插件的ez包

使用以下命令启用插件

1
2
3
4
5
6
# 查看插件目录
rabbitmq-plugins directories -s
# 将下载的插件复制到插件目录下
cp rabbitmq_delayed_message_exchange-3.8.0.ez /opt/rabbitmq/plugins
# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

插件安装后多一个x-delayed-message交换机类型

示例代码

注册交换机和队列,并绑定

1
2
3
4
5
6
7
8
9
10
11
@Bean
public CustomExchange xDelayedMessageExchange() {
Map<String, Object> args = new HashMap<>();
// 设置延迟类型
args.put("x-delayed-type", "direct");
return new CustomExchange(X_DELAYED_MESSAGE_EXCHANGE, "x-delayed-message", true, false, args);
}
@Bean
public Queue xDelayedMessageQueue() {
return new Queue(X_DELAYED_MESSAGE_QUEUE);
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
@GetMapping("xDelayedMessageExchange")
public String xDelayedMessageExchange() {
for (int i = 0; i < 10; i++) {
Integer expiration = i % 2 == 0 ? 10000 : 5000;
rabbitTemplate.convertAndSend(RabbitConfig.X_DELAYED_MESSAGE_EXCHANGE, RabbitConfig.X_DELAYED_MESSAGE_QUEUE,
expiration.toString(), message -> {
// 设置延迟时间
message.getMessageProperties().setDelay(expiration);
return message;
});
}
return "ok";
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RabbitListener(queues = RabbitConfig.X_DELAYED_MESSAGE_QUEUE)
public void worker12(MessageInfo msg, Channel channel, Message message) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("消费者12-接收到消息:{}", msg);
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}

发布确认高级

生成者发消息给Broker时,若RabbitMQ服务不可用(交换机不可用)或者队列不可用,需要将发送的消息缓存下来,避免消息丢失

发布确认

首先在配置文件中开启发布确认

1
2
3
4
5
6
7
8
9
10
11
12
spring:
application:
name: rabbitmq-service
rabbitmq:
host: 192.168.0.123
port: 5672
username: rabbitmq
password: 123456
listener:
simple:
acknowledge-mode: manual # 应答模式为手动应答
publisher-confirm-type: correlated

发布确认 确认的是交换机能否收到

实现确认回调,并注入到rabbitTemplate中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
@Component
public class MyCallback implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
/**
* 交换机确认回调方法
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String dataId = correlationData != null ? correlationData.getId() : null;
if (!ack) {
log.warn("{} 消息确认失败, cause={}", dataId, cause);
}
}
}

注册交换机和队列,并绑定

生产者

1
2
3
4
5
6
7
@GetMapping("publisherConfirmPlus")
public String publisherConfirmPlus() {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(RabbitConfig.PUBLISHER_CONFIRM_PLUS_EXCHANGE + "_1",
RabbitConfig.PUBLISHER_CONFIRM_PLUS_QUEUE, RandomUtil.randomString(8), correlationData);
return "ok";
}

当交换机ack为false时表示交换机异常,如使用错误的交换机名会报下面的异常

1
o.s.a.r.c.CachingConnectionFactory       : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'publisher_confirm_plus_exchange_1' in vhost '/', class-id=60, method-id=40)

回退消息

退回消息确认的是队列能否收到

修改配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
application:
name: rabbitmq-service
rabbitmq:
host: 192.168.0.123
port: 5672
username: rabbitmq
password: 123456
listener:
simple:
acknowledge-mode: manual # 应答模式为手动应答
publisher-confirm-type: correlated
publisher-returns: true

MyCallback类再实现RabbitTemplate.ReturnsCallback接口,并注入到rabbitTemplate中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Slf4j
@Component
public class MyCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallbac
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
/**
* 交换机确认回调方法
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String dataId = correlationData != null ? correlationData.getId() : null;
if (!ack) {
log.warn("{} 消息确认失败, cause={}", dataId, cause);
}
}
/**
* 队列确认回调方法
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.warn("消息 {} 被交换机 {} 退回,退回原因:{},路由键:{}", new String(returned.getMessage().getBody()),
returned.getExchange(), returned.getReplyText(), returned.getRoutingKey());
}
}

生产者

1
2
3
4
5
6
7
@GetMapping("publisherConfirmPlus")
public String publisherConfirmPlus() {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(RabbitConfig.PUBLISHER_CONFIRM_PLUS_EXCHANGE,
RabbitConfig.PUBLISHER_CONFIRM_PLUS_QUEUE + "_1", RandomUtil.randomString(8), correlationData);
return "ok";
}

上面的例子中路由键不存在,消息发送后会有异常输入,如下:

1
c.z.j.rabbitmq.boot.callback.MyCallback  : 消息 fjutwwqj 被交换机 publisher_confirm_plus_exchange 退回,退回原因:NO_ROUTE,路由键:publisher_confirm_plus_queue_1

备份交换机

当交换机异常时使用备份交换机来发送消息,同时还可以用备份交换机来记录本次异常日志

注册交换机并设置备份关系

1
2
3
4
5
6
7
8
9
10
11
@Bean
public DirectExchange publisherConfirmPlusExchange() {
return ExchangeBuilder.directExchange(PUBLISHER_CONFIRM_PLUS_EXCHANGE)
// 设置备份交换机
.withArgument("alternate-exchange", PUBLISHER_CONFIRM_PLUS_BACKUP_EXCHANGE)
.build();
}
@Bean
public FanoutExchange publisherConfirmPlusBackupExchange() {
return new FanoutExchange(PUBLISHER_CONFIRM_PLUS_BACKUP_EXCHANGE);
}

其他就是注册队列,并与交换机绑定

其他知识

幂等性

幂等性:用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用

消费端幂等性保障,主流有两种方式:

  • 唯一ID+指纹码机制,利用数据库去重
  • 利用redis的原子性去实现(nx类命令)

优先级队列

消费时先将队列按照优先级排列,即先消费优先级高的消息

优先级队列中优先级范围为0~255,越大优先级越高

优先级队列使用注意事项
  • 创建的队列需要通过x-max-priority设置最大优先级,不要设置太大,太大影响性能
  • 发送的消息需要设置优先级(不设置的情况下,优先级是最低的),不能大于队列设置的最大优先级

惰性队列

惰性队列:惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储

默认情况下,生成发送到队列的消息是保存在内存中的,这样可以更快的发送消费者

使用方式

创建队列时设置x-queue-mode参数为lazy即可将队列设置为惰性队列

普通队列与惰性队列内存消耗对比


RabbitMQ知识学习
https://blog.kedr.cc/posts/1746580309/
作者
zhuweitung
发布于
2023年1月13日
许可协议