RabbitMQ
1.MQ引言
模块之间的耦合度多高,导致一个模块宕机后,全部功能都不能用了,并且同步通讯的成本过高,用户体验差。
1.1什么是MQ
1.2MQ有哪些
市面上比较火爆的几款MQ:
ActiveMQ,RocketMQ,Kafka,RabbitMQ。
语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多们语言,RabbitMQ支持多种语言。
效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。
消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。
学习成本:RabbitMQ非常简单。
RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal公司维护。
RabbitMQ严格的遵循AMQP协议,一种高级消息队列协议,帮助我们在进程之间传递异步消息。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等
1.3不同MQ的特点
1.4RabbitMQ介绍
为什么最受欢迎,应用最广泛?
1.使用AMQP协议 支持很多业务场景 比如 点对点 交换机路由 发布订阅模式 能适用很多业务场景
- 使用 erlang 语言 这个语言的特点 叫做面向并发编程 自身并发能力强 对socket 编程 支持友好
- 和spring 无缝整合
- 对数据一致性 数据丢失 错误处理 非常友好 可以不丢失任何数据 对错误数据恢复
![image-20200902195250260](https://gllspictures.oss-cn-beijing.aliyuncs.com/img/202304132247568.png)
rabbit 是部署最广泛的消息中间件
2.RabbitMQ安装
2.1下载
官网下载地址:https://www.rabbitmq.com/download.html
![image-20200902201442656](https://gllspictures.oss-cn-beijing.aliyuncs.com/img/202304132247358.png)
使用 docker-compose 安装 启动服务 进入服务内部 启动web 访问
cd /mydata/rabbitmq
vim docker-compose.yml
docker-compose up
version: "3.1"
services:
rabbitmq:
image: daocloud.io/library/rabbitmq:management
restart: always
container_name: rabbitmq
ports:
- 5672:5672 #rabbitmq 服务的端口号
- 15672:15672 # rabbitmq 图形化界面的端口号
volumes:
- ./data:/var/lib/rabbitmq
开启web界面访问:
启动容器 -进入容器 docker exec -it rabbitmq /bin/bash
开启:rabbitmq-plugins enable rabbitmq_management
打开浏览器:http://192.168.5.201:15672 用户名 guest 密码 guest
2.2RabbitMQ架构【重点
】
2.2.1 官方的简单架构图
- Publisher - 生产者:发布消息到RabbitMQ中的Exchange
- Consumer - 消费者:监听RabbitMQ中的Queue中的消息
- Exchange - 交换机:和生产者建立连接并接收生产者的消息
- Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互
- Routes - 路由:交换机以什么样的策略将消息发布到Queue
简单的架构图 |
---|
![]() |
2.2.2 RabbitMQ的完整架构图
完整架构图 |
---|
![]() |
查看图形化界面并创建一个Virtual Host
创建一个全新的用户和全新的Virtual Host,并且将test用户设置上可以操作/test的权限
![image-20200905171744241](https://gllspictures.oss-cn-beijing.aliyuncs.com/img/202304132248929.png)
3.RabbitMQ的使用【重点
】
3.1 RabbitMQ的通讯方式
通讯方式 |
---|
![]() |
![]() |
![]() |
![]() |
3.2java连接rabbitmq
3.2.1创建maven 项目
3.2.2导入依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
3.2.3创建连接工具类
public class RabbitMQUtil {
public static Connection getConnection(){
// 创建连接mq 的连接工厂对象
ConnectionFactory factory = new ConnectionFactory();
// 设置 rabbitmq 的主机
factory.setHost("192.168.5.201");
// 设置端口号
factory.setPort(5672);
//设置用户名
factory.setUsername("test");
// 设置密码
factory.setPassword("test");
// 设置连接哪个虚拟机
factory.setVirtualHost("/test");
// 创建Connection
Connection conn = null;
try {
conn = factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
// 返回
return conn;
}
public static void main(String[] args) {
System.out.println(getConnection());
}
}
3.3 Hello-World
一个生产者,一个默认的交换机,一个队列,一个消费者
创建生产者,创建一个channel,发布消息到默认exchange,指定路由规则。
创建消费者,创建一个channel,创建一个队列,并且去消费当前队列
P:生产者 也就是要发送消息的程序
C:消费者 消息的接受者 会一直等待消息的到来
queue:消息队列 图中红色部分 类似一个邮箱 可以缓存消息 生产者向其中投递消息 消费者从其中取消息
package com.glls._1helloworld;
import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
public class HelloWorldTest {
@Test
public void publish() throws Exception {
//1. 获取Connection
Connection connection = RabbitMQUtil.getConnection();
//2. 创建Channel 通道
Channel channel = connection.createChannel();
//3. 发布消息到exchange,同时指定路由的规则
String msg = "Hello-World!";
// 通道绑定对应的消息队列 可以省略 消费者指定queue 如果不省略 则 生产者消费者队列需要一致
/*
* 参数1 队列名称 如果队列不存在 自动创建
* 参数2 定义队列是否要持久化 true 持久化 (当mq 关闭时,将队列存入磁盘) false 不持久化 mq 重启 队列被删除
* 参数3 是否独占队列 当前队列 只允许当前连接可用 其他连接不能使用这个队列 true 独占队列 false 不独占
* 参数4 是否在消费完成后 自动删除队列 true自动删除 false 不自动删除
* 参数5 额外附加参数
* */
//channel.queueDeclare("HelloWorld",true,false,false,null);
// 发布消息
// 参数1:指定exchange 交换机 ,当前模式 是 生产者 ---队列----消费者 模式
// 参数2:指定路由的规则,使用具体的队列名称 队列名称。
// 参数3:指定传递的消息所携带的properties,使用null。 比如:MessageProperties.PERSISTENT_BASIC 表示持久化消息
// 参数4:指定发布的具体消息,byte[]类型
//第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,
//因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话
//我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
channel.basicPublish("","HelloWorld",null,msg.getBytes());
// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
System.out.println("生产者发布消息成功!");
//4. 释放资源
channel.close();
connection.close();
}
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明队列-HelloWorld
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("HelloWorld",true,false,false,null);
/**
* DefaultConsumer 是 Consumer 接口的实现类 接口中的定义的方法如下
这个不作为重点 了解即可
* handleCancel:除了调用basicCancel的其他原因导致消息被取消时调用。
* handleCancelOk:basicCancel调用导致的订阅取消时被调用。
* handleConsumeOk:任意basicComsume调用导致消费者被注册时调用。
* handleDelivery:消息接收时被调用。
* handleRecoverOk:basic.recover-ok被接收时调用
* handleShutdownSignal:当Channel与Conenction关闭的时候会调用。
*/
//4. 开启监听Queue
//简易版自定义 Consumer
//只需要重写DefaultConsumer 的handleDelivery方法即可取出消息,额外属性新增属性等操作
DefaultConsumer consume = new DefaultConsumer(channel){
// 重写DefaultConsumer 的handleDelivery 的方法 方法中获取消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body,"UTF-8"));
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK 开启自动确认机制 (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 消费回调接口
channel.basicConsume("HelloWorld",true,consume);
System.out.println("消费者开始监听队列!");
// System.in.read();
System.in.read(); // 不要让消费者线程 结束 否则看不到监听效果了 如果没有这行代码 下面不要关闭通道和连接
//5. 释放资源
channel.close(); // 不建议关闭 通道和连接
connection.close();
}
}
点对点的模型 应用场景 比如 在 注册的时候 发送短信验证 就可以以消息队列的形式 调用 短信服务接口
再比如 签到送积分的功能 可以用此模型 向积分服务 发送请求
3.3.1参数细节说明-durable
使用consume 这行代码来说明
# 参数1 队列名称
# 参数2 队列是否持久化 注意 是队列持久化 不是 队列中的数据持久化 当前设置为 false 启动消费者,在web页面会看到这个队列
channel.queueDeclare("HelloWorld",false,false,false,null);
启动消费者,web页面看到这个HelloWorld 队列
此时 停止消费者,这个记录依然存在, 停止docker中的rabbit ,再重启,这条记录就没有了,
# 参数2 是否持久化 设置为 true
channel.queueDeclare("HelloWorld",true,false,false,null);
此时 再停止 rabbit服务 重启rabbit 服务,这条记录依然存在,这就是 队列持久化,但是队列持久化 ,却不是队列中的数据持久化
要想让队列中的数据持久化,需要在生产者发布消息时 设置 队列中消息的属性
# 参数3 设置队列中消息的持久化
# 设置队列持久化 队列中消息持久化 ,rabbit 服务重启,队列 和 队列中的消息依然存在
channel.basicPublish("","HelloWorld",MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
3.3.2参数细节说明-exclusive
参数3 是否独占队列 当前队列 只允许当前连接可用 其他连接不能使用这个队列 true 独占队列 false 不独占 一般不独占
或者说 参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费) 一般不希望独占
#参数3 是否独占队列 一般不独占 设置为false ,也就是除了当前连接,其他消费者也可以 连接这个队列
channel.queueDeclare("HelloWorld",true,false,false,null);
#参数3 是否独占队列 如果设置为 true ,则当前队列 被当前连接独占 运行状态 如下
channel.queueDeclare("HelloWorld",true,true,false,null);
此时 如果停掉 consumer 这个队列 就会消失, 另外需要注意一点 ,生产者 此时 就不能声明队列了,因为 生产者再声明队列 就和消费者中的队列 出现排他性问题 报异常 ,只有把 生产者中 声明 队列 //channel.queueDeclare("HelloWorld",true,true,false,null); 这行代码 注释掉, 才能正常发送 消息。
3.3.3参数细节说明-autoDelete
如果这个队列没有消费者在消费,队列自动删除
参数4
# 消费者 这行代码 设置 第四个 参数 没有消费者的时候 是否自动删除 true 自动删除 false 不自动删除
channel.queueDeclare("HelloWorld",true,false,true,null);
此时 停止消费者线程 或者 关闭连接 ,这条记录会消失 ,即 删除了这个 队列
HelloWorld模型总结:生产者发送消息 到 消息队列 ,消费者监听消息队列 消息的变化,一旦有消息 ,消费者就去消费,这种模型比较简单,一个生产者 对应一个消费者,也是一些简单的业务用的比较多的场景,消息队列在中间 就类似一个邮箱 一个缓存,生产者把消息发送到消息队列,被消费者监听到 就去处理消息 依次来完成对应的业务操作。这种模型是最简单的模型,可能应对不了某些特殊的场景。比如 消费者在处理某些消息时 因为业务逻辑的复杂 或者 消费者处理过慢,会造成消息队列中的消息 不断造成消息的堆积,所以 我们希望 生产者生产的消息 可以给更多的消费者消费 ,这样就提高了 消费者处理消息 的 效率 ,当然 我们要保证 不同的消费者处理的消息是不同的 不然会造成业务的重复处理, 这样 就可以 处理消息快一些 不堵塞消息队列。基于这种需求 就是下面的Work模型
3.4Work
一个生产者,一个默认的交换机,一个队列,两个消费者
work queues 也称为task queues ,任务模型 。
HelloWorld 模型的不足 :当消息处理比较耗时时,可能生产消息的速度会远远大于消息的消费速度,长此以往 消息堆积的越来越多,无法及时处理 此时 就可以考虑work 模型,让多个消费者绑定到同一个队列,共同消费队列中的消息。队列中的消息 一旦被消费 就会消失 因此 任务是不会被重复执行的。
只需要在消费者端,添加Qos能力以及更改为手动ack即可让消费者,根据自己的能力去消费指定的消息,而不是默认情况下由RabbitMQ平均分配了,生产者不变,正常发布消息到默认的exchange,并指定routing
P:生产者 消息的发送者
C1:消费者 领取任务 并完成任务
C2: 消费者2 领取任务并完成任务
queue: 红色部分 队列
消费者指定Qoa和手动ack
package com.glls._2work;
import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
/**
* @author glls
* @create 2020-09-03 12:24
*
* Work 模型: 按劳分配 能者多劳
*
*
*
* 现在有两个 消费者 消费者1 处理消息 处理的慢 2秒一个 消费者2 处理消息处理的快 1秒 一个 ,但是 在自动确认模式下
*
* channel.basicConsume("Work",true,consumer); Auto:ack true ,生产者发送了100条消息 会一下子全被 接受 在web 页面看不到被消费的过程
* 而且 消费者轮流依次消费 没有出现谁消费的快 就多消费一点的情况
*
* 如果改成 手动确认模式 生产者发送的100 条消息 会逐渐被消费 在web 页面能看到被消费的过程 而且 哪个消费者消费的快 就会多消费
*
* 实际场景 我们希望 能者多劳 处理消息快的消费者 多处理一些
*
* 所以 不建议使用 消息的自动确认 应该改为 手动确认
*
*/
public class WorkTest {
@Test
public void publish() throws Exception {
//1. 获取Connection
Connection connection = RabbitMQUtil.getConnection();
//2. 创建Channel
Channel channel = connection.createChannel();
//3. 发布消息到exchange,同时指定路由的规则
String msg = "Hello-Work!";
// 参数1:指定exchange,使用""。
// 参数2:指定路由的规则,使用具体的队列名称。
// 参数3:指定传递的消息所携带的properties,使用null。
// 参数4:指定发布的具体消息,byte[]类型
channel.queueDeclare("Work",true,false,false,null);
for(int i=0;i<100;i++){
channel.basicPublish("","Work",null,(i+msg).getBytes());
}
// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
System.out.println("生产者发布消息成功!");
//4. 释放资源
channel.close();
connection.close();
}
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明队列-HelloWorld
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("Work",true,false,false,null);
//1 指定当前消费者,一次消费多少个消息 没有过来的消息 还在队列中保存 这样设置 不会造成消息丢失
// 因为 假如不指定 一次消费一条消息 就有可能有多条消息 到达消费者 此时消费者一旦宕机 到达消费者的消息也就丢了
// 所以 消息 从队列 到 消费者 一次来一条 免得 过来多条 消息在半路丢了
channel.basicQos(1); // 不要 一次性的把消息都给消费者 容易丢失 一次给一条 安全
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1号接收到消息:" + new String(body,"UTF-8"));
//2. 手动ack
// 参数1 long类型 标识队列中哪个具体的消息 参数2:boolean 类型 是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
// 处理完了消息 手动确认一下 队列再删除这个消息 这种机制保证消息永不丢失
// 队列给消费者 一条消息 消费者收到消息 处理完了之后 手动确认, 确认了之后 队列才把消息删除 保证消息永不丢失
// 而且 消费者 确认一个消息 队列发送一个消息 消费者确认的快 队列发送的快 能者多劳
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
//3. 指定手动ack
channel.basicConsume("Work",false,consumer);
System.out.println("消费者开始监听队列!");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
@Test
public void consume2() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明队列-HelloWorld
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("Work",true,false,false,null);
//1 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2号接收到消息:" + new String(body,"UTF-8"));
//2. 手动ack
// 参数1 long类型 标识队列中哪个具体的消息 参数2:boolean 类型 是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
//3. 指定手动ack
channel.basicConsume("Work",false,consumer);
System.out.println("消费者开始监听队列!");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
}
3.5 Publish/Subscribe fanout 扇出 也称为广播
一个生产者,一个交换机,两个队列,两个消费者
可以有多个消费者 每个消费者都有自己的 queue
每个队列都要绑定到Exchange 交换机
生产者发送消息 只能发送到交换机 交换机决定要发给哪个队列 生产者无法决定
交换机把消息发送给绑定过得所有队列
队列的消费者都能拿到消息 实现一条消息被多个消费者消费
声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定的方式就是直接绑定。
让生产者创建一个exchange并且指定类型,和一个或多个队列绑定到一起。
消费者还是正常的监听某一个队列即可。
使用场景:比如 注册成功 发送一个消息 短信服务 邮件服务 积分服务 这些服务 作为消费者 来消费接受这个消息 生产实践用的也较多
package com.glls._3pubsub;
import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
/**
* 发布订阅模型 广播模型 要使用交换机了
*
* fanout : 扇出 广播
*
*
*
*
* */
public class PubSubTest {
@Test
public void publish() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建通道对象
Channel channel = connection.createChannel();
//3. 发布消息到exchange,同时指定路由的规则
String msg = "Hello-PubSub!";
// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
//3. 创建交换机 - 绑定某一个队列
//参数1: exchange 交换机的名称
//参数2: 指定exchange 交换机的类型 FANOUT - pubsub 广播类型 , DIRECT - Routing , TOPIC - Topics
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
//绑定多个队列 绑定队列的代码 可以写在 生产者中 也可以写在 消费者中 建议 写在消费者 因为
// 写在生产者的话 就得提前指定 队列的名字 而广播这种模式 一般用临时队列 所以 在消费者中 指定队列 较好
// channel.queueBind("pubsub-queue1","pubsub-exchange","");
// channel.queueBind("pubsub-queue2","pubsub-exchange","");
/**
*
* 广播模式下 路由key 是没有用的 routingKey 没有意义 所以空着不写
* */
//第一次 发布消息 到 交换机
channel.basicPublish("pubsub-exchange","",null,msg.getBytes());
//第二次 发布消息 到 交换机
channel.basicPublish("pubsub-exchange","",null,msg.getBytes());
System.out.println("生产者发布消息成功!");
//4. 释放资源
channel.close();
connection.close();
}
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明队列-HelloWorld
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
// 广播这种形式 一般不需要创建 持久队列 一般 创建 临时队列
//channel.queueDeclare("pubsub-queue1",true,false,false,null);
// 声明交换机
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
// 创建临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queueName,"pubsub-exchange","");
//1 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1号接收到消息:" + new String(body,"UTF-8"));
//2. 手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
//3. 指定手动ack
channel.basicConsume(queueName,false,consumer);
System.out.println("消费者开始监听队列!");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
@Test
public void consume2() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明队列-HelloWorld
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
//channel.queueDeclare("pubsub-queue2",true,false,false,null);
// 声明 交换机
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
// 创建临时队列
String queueName = channel.queueDeclare().getQueue();
// 绑定 交换机 和 队列
channel.queueBind(queueName,"pubsub-exchange","");
//1 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2号接收到消息:" + new String(body,"UTF-8"));
//2. 手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
//3. 指定手动ack
channel.basicConsume(queueName,false,consumer);
System.out.println("消费者开始监听队列!");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
}
3.6 Routing
Routing之订阅模型-Direct
在Fanout模式中,一条消息 会被所有订阅的队列都消费。但是在某些场景下 我们希望不同的消息被不同的队列消费。这时 就要用到Direct 类型的Exchange
在Direct模型下:
1.队列和交换机的绑定 不能是任意绑定了 ,而是要指定一个RoutingKey (路由key)
2.消息的发送方在向Exchange 发送消息时 ,也必须指定消息的RoutingKey
3.Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key 进行判断,只有队列的RoutingKey 与消息的RoutingKey 完全一致 ,才会接收到消息
P:生产者 向Exchange 发送消息 发送消息时 会指定一个 RoutingKey
X:Exchange (交换机) 接受生产者的消息 然后把消息传递给 与RoutingKey 完全匹配的队列
C1 消费者 其所在队列指定了需要RoutingKey 为error 的消息
C2消费者 其所在队列 指定了需要RoutingKey 为 info error warning 的消息
生产者在创建DIRECT类型的exchange后,根据RoutingKey去绑定相应的队列,并且在发送消息时,指定消息的具体RoutingKey即可。
消费者没有变化
package com.glls._4routing;
import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
/**
* Exchange --- direct
*
*
* */
public class RoutingTest {
@Test
public void publish() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建连接通道对象
Channel channel = connection.createChannel();
// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
//3. 创建exchange - 绑定某一个队列
//参数1: 交换机的名称
//参数2: 路由模式 指定exchange的类型 FANOUT - pubsub , DIRECT - Routing , TOPIC - Topics
channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
//绑定多个队列 可以在这里 绑定队列 但是咱们是在消费者 创建临时队列 然后绑定
//channel.queueBind("routing-queue-error","routing-exchange","ERROR");
//channel.queueBind("routing-queue-info","routing-exchange","INFO");
//4. 发布消息到exchange,同时指定路由的规则
channel.basicPublish("routing-exchange","ERROR",null,"ERROR".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO1".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO2".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO3".getBytes());
System.out.println("生产者发布消息成功!");
//5. 释放资源
channel.close();
connection.close();
}
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明 交换机
channel.exchangeDeclare("routing-exchange",BuiltinExchangeType.DIRECT);
// 这种方式 是 创建持久队列
//channel.queueDeclare("routing-queue-error",true,false,false,null);
//这种方式 是创建临时 队列
String queueName = channel.queueDeclare().getQueue();
//基于 路由key 绑定 队列 和 交换机
channel.queueBind(queueName,"routing-exchange","ERROR");
//1 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1号接收到消息:" + new String(body,"UTF-8"));
//2. 手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
//3. 指定手动ack
channel.basicConsume(queueName,false,consumer);
System.out.println("消费者开始监听队列!");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
@Test
public void consume2() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
// 这种方式 是 创建持久队列
//channel.queueDeclare("routing-queue-info",true,false,false,null);
//3.声明交换机
channel.exchangeDeclare("routing-exchange",BuiltinExchangeType.DIRECT);
//这种方式 是创建临时 队列
String queueName = channel.queueDeclare().getQueue();
//基于 路由key 绑定 队列 和 交换机
channel.queueBind(queueName,"routing-exchange","ERROR"); //绑定路由Key 为ERROR 的
channel.queueBind(queueName,"routing-exchange","INFO"); // 绑定路由Key 为INFO 的
//1 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2号接收到消息:" + new String(body,"UTF-8"));
//2. 手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
//3. 指定手动ack
channel.basicConsume(queueName,false,consumer);
System.out.println("消费者开始监听队列!");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
}
3.7 Topic
Routing之订阅模型-Topic
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey 把消息路由到不同的队列,只不过Topic 类型的Exchange可以让队列在绑定RoutingKey 的时候使用通配符!这种模型RoutingKey 一般都是由一个或多个单词组成,多个单词之间以 “.” 分割, 例如 item.insert
shell#通配符 * (star) can substitute for exactly one word . 匹配不多不少恰好一个词 # (hash) can substitute for zero or more words . 匹配0个或多个词 # 如: audit.# 匹配 audit.irs.corporate 或者 audit.irs 等 audit.* 只能匹配 audit.irs
生产者创建Topic的exchange并且绑定到队列中,这次绑定可以通过*和#关键字,对指定RoutingKey内容,编写时注意格式 xxx.xxx.xxx去编写, * -> 一个xxx,而# -> 代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底是什么。
消费者只是监听队列,没变化。
package com.glls._5topic;
import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
public class TopicTest {
@Test
public void publish() throws Exception {
//1. 获取Connection
Connection connection = RabbitMQUtil.getConnection();
//2. 创建Channel
Channel channel = connection.createChannel();
// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
//3. 创建exchange - 绑定某一个队列
//参数1: exchange的名称
//参数2: 指定exchange的类型 FANOUT - pubsub , DIRECT - Routing , TOPIC - Topics
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
//绑定多个队列 在消费者中 指定 临时队列
//channel.queueBind("topic-queue-1","topic-exchange","*.red.*");
//channel.queueBind("topic-queue-2","topic-exchange","fast.#");
//channel.queueBind("topic-queue-2","topic-exchange","*.*.rabbit");
//3. 发布消息到exchange,同时指定路由的规则
channel.basicPublish("topic-exchange","fast.red.monkey",null,"红快猴子".getBytes());
channel.basicPublish("topic-exchange","slow.black.dog",null,"黑漫狗".getBytes());
channel.basicPublish("topic-exchange","fast.white.cat",null,"快白猫".getBytes());
System.out.println("生产者发布消息成功!");
//4. 释放资源
channel.close();
connection.close();
}
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3.声明交换机
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
//4.创建临时队列
String queueName = channel.queueDeclare().getQueue();
//5.绑定队列和交换机
channel.queueBind(queueName,"topic-exchange","*.red.*");
//6 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
//7. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者1号接收到消息:" + new String(body,"UTF-8"));
//2. 手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
//3. 指定手动ack
channel.basicConsume(queueName,false,consumer);
System.out.println("消费者开始监听队列!");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
@Test
public void consume2() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明交换机
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
//4.创建临时队列
String queueName = channel.queueDeclare().getQueue();
//5.绑定队列和交换机
channel.queueBind(queueName,"topic-exchange","*.*.rabbit");
channel.queueBind(queueName,"topic-exchange","fast.#");
//6 指定当前消费者,一次消费多少个消息
channel.basicQos(1);
//7. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2号接收到消息:" + new String(body,"UTF-8"));
//2. 手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
//3. 指定手动ack
channel.basicConsume(queueName,false,consumer);
System.out.println("消费者开始监听队列!");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
}
4、RabbitMQ整合SpringBoot【重点
】
4.1 SpringBoot整合RabbitMQ
4.1.1 创建SpringBoot工程
4.1.2 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.1.3 编写配置文件
spring:
rabbitmq:
host: 192.168.5.201
port: 5672
username: test
password: test
virtual-host: /test
4.1.4 Hello-World
//生产者 注入 rabbitTemplate
@SpringBootTest
class BootRabbitmqDemo1ApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void test1() throws IOException {
//Hello-World 模式
// 方式1 在配置类中 创建 queue
// rabbitTemplate.convertAndSend("simpleQueue", "Hello world222222!! "+ System.currentTimeMillis());
// 方式2 在注解上 创建 queue
rabbitTemplate.convertAndSend("helloQueue", "Hello world111111!! "+ System.currentTimeMillis());
System.out.println("消息已发送");
// 便于观察效果, 加入下面的代码
new Scanner(System.in).nextLine(); // 输入任何字符或回车结束程序
}
}
-----------------------------------------------------------------------------------------------------
//配置类
@Configuration
public class Hello_RabbitMQConfig {
// Queue的包: org.springframework.amqp.core.Queue
@Bean
public Queue simple_queue(){
/*
* 可用以下形式:
* new Queue("helloworld")
* 参数1: 队列名, 参数2: 持久, 参数3: 非排他, 参数4: 非自动删除
* new Queue("helloworld",false,false,false,null)
*/
return new Queue("simpleQueue");
}
}
-----------------------------------------------------------------------
//消费者
@Component
//@RabbitListener(queues = "simpleQueue") 在类上 使用 @RabbitListener 在 方法上 使用@RabbitHandler 这样只能监听一个队列
public class HelloConsumer {
// @RabbitHandler
// public void receive(String msg) {
// System.out.println("收到: "+msg);
// }
// 方式1 在配置类中 创建 queue 在这里引用即可
@RabbitListener(queues = "simpleQueue")
public void receive1(String msg) {
System.out.println("收到: "+msg);
}
//另外,@RabbitListener 注解中也可以直接定义队列: 如下
// @RabbitListener(queuesToDeclare = @Queue(name = "helloworld",durable = "false"))
// 方式2 在注解上 创建queue
//@RabbitListener(queuesToDeclare = @Queue(name = "helloQueue",durable = "false",autoDelete = "true")) 自定义属性
@RabbitListener(queuesToDeclare = @Queue(name = "helloQueue")) // 默认属性 durable = "true" 持久化 exclusive="false" 非独占 autoDelete = "false"不自动删除
public void receive2(String msg) {
System.out.println("收到: "+msg);
}
}
4.1.5Work
#生产者
@Test
public void testWork(){
// Work 模型
for(int i=0;i<20;i++){
rabbitTemplate.convertAndSend("work","work模型: "+i);
}
}
-----------------------------------------------------------------------------
#消费者
package com.glls.bootrabbitmqdemo1._2work;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkConsumer {
@RabbitListener(queuesToDeclare = @Queue(name = "work",durable = "false"))
public void getMessage(Object message){
System.out.println("接收到消息1:" + message);
}
@RabbitListener(queuesToDeclare = @Queue(name = "work",durable = "false"))
public void getMessage2(Object message){
System.out.println("接收到消息2:" + message);
}
}
4.1.6Pub/Sub
#生产者
@Test
public void testFanout(){
// 生产发布模型 广播模型
rabbitTemplate.convertAndSend("boot-pubsub-exchange","","广播模式");
}
-----------------------------------------------------------
#消费者
@Component
public class PubSubConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "boot-pubsub-exchange",type ="fanout") // 绑定的交换机
)
})
public void getMessage(Object message){
System.out.println("消费者1:"+message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "boot-pubsub-exchange",type ="fanout") // 绑定的交换机
)
})
public void getMessage2(Object message){
System.out.println("消费者2:"+message);
}
}
4.1.7route
#生产者
@Test
public void testRouting(){
// 路由模式
rabbitTemplate.convertAndSend("boot-route-exchange","info","发送的是info的key的路由信息");
}
-------------------------------------------------------
#消费者
@Component
public class RouteConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "boot-route-exchange",type = "direct"),
key = {"info","error"}
)
}
)
public void getMessage1(Object message){
System.out.println("消费者1:"+message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "boot-route-exchange",type = "direct"),
key = {"info"}
)
}
)
public void getMessage2(Object message){
System.out.println("消费者2:"+message);
}
}
4.1.8topic
#生产者
@Test
void testTopic2()
{
// Topic 模式 基于注解
//rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!");
rabbitTemplate.convertAndSend("boot-topic-exchange","black.dog.and.cat","黑色狗和猫!!");
}
#消费者
package com.glls.bootrabbitmqdemo1._5topic;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author glls
* @email 524840158@qq.com
* @company xxx
* @create 2020-09-04 15:31
*/
@Component
public class TopicConsumer {
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "boot-topic-exchange",type = "topic"),
key = {"*.red.*","black.*.#"}
)
}
)
public void getMessage2(Object message){
System.out.println("接收到消息2:" + message);
}
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "boot-topic-exchange",type = "topic"),
key = {"black.*.#"}
)
}
)
public void getMessage3(Object message){
System.out.println("接收到消息3:" + message);
}
}
4.2 手动ack
要在 消息消费完之后 才告诉 rabbitmq 这个消息消费了,而不是还没消费 就确认。 避免 消息 消费失败 了 但是消息已经被自动确认了 那么这个消息就相当于丢了 即丢消息
实现步骤:
1。 在yml 配置文件 指定 手动 配置
spring:
rabbitmq:
host: 192.168.5.201
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
acknowledge-mode: manual # 手动指定 ack
- 在 消费者的方法参数中 指定参数
@RabbitListener( bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(value = "boot-topic-exchange",type = "topic"), key = {"black.*.#"} ) } ) public void getMessage4(String msg, Channel channel, Message message) throws IOException { System.out.println("接收到消息3:" + msg); // 手动 ack channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }
5.RabbitMQ的其他操作
RabbitMQ的消息确认机制 保证了 消息的可靠性传递
RabbitMQ的消息确认有两种。
一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。 确认是否到达交换机 使用 confirm 机制 ,确认是否到达 queue 使用 return 机制
第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。 即消费者的 ack
消息确认的作用是什么?
为了防止消息丢失。消息丢失分为发送丢失和消费者处理丢失,相应的也有两种确认机制。
5.1消息的可靠性
RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。
RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。
5.1.1普通Confirm方式
package com.glls._6confirm;
import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
*
* 普通 confirm
* 普通Confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端Confirm。
* 实际上是一种串行Confirm了,每publish一条消息之后就等待服务端Confirm,如果服务端返回false或者超时时间内未返回,
* 客户端进行消息重传;
* */
public class HelloWorldTest {
private final static String QUEUE_NAME = "confirm";
@Test
public void publish() throws Exception {
Connection connection = RabbitMQUtil.getConnection();
System.out.println(connection);
Channel channel = connection.createChannel();
// 开启 confirm
channel.confirmSelect();
final long start = System.currentTimeMillis();
String msg = "This is a confirm message!";
channel.queueDeclare(QUEUE_NAME,true,false,true,null);
for (int i = 0; i < 500; i++) {
//第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,
//因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话
//我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
channel.basicPublish("",QUEUE_NAME,null,(" 普通Confirm模式, 第" + (i + 1) + "条消息").getBytes());
// 判断消息是否发送成功
if(channel.waitForConfirms()){
// System.out.println("消息发送成功");
}else{
System.out.println("消息发送失败");
// 可以在这里进行重发操作
}
}
System.out.println("执行waitForConfirms耗费时间: " + (System.currentTimeMillis() - start) + "ms");
//4. 关闭频道和连接
channel.close();
connection.close();
}
@Test
public void consume() throws Exception {
Connection connection = RabbitMQUtil.getConnection();
System.out.println(connection);
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,true,null);
DefaultConsumer consume = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSSS");
System.out.println("接收到消息:" + new String(body,"UTF-8"));
System.out.println("consume Done at:"+ time.format(new Date()));
}
};
channel.basicConsume(QUEUE_NAME,true,consume);
System.out.println("消费者开始监听队列!");
System.in.read();
channel.close();
connection.close();
}
}
5.1.2 批量Confirm方式。
package com.glls._6confirm;
import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
/**
*
* 批量Confirm方式
* 批量Confirm模式:批量Confirm模式,每发送一批消息之后,调用waitForConfirms()方法,等待服务端Confirm,
* 这种批量确认的模式极大的提高了Confirm效率,但是如果一旦出现Confirm返回false或者超时的情况,
* 客户端需要将这一批次的消息全部重发,这会带来明显的重复消息,如果这种情况频繁发生的话,效率也会不升反降;
* */
public class HelloWorldTest2 {
private final static String QUEUE_NAME = "confirm many";
@Test
public void publish() throws Exception {
Connection connection = RabbitMQUtil.getConnection();
System.out.println(connection);
Channel channel = connection.createChannel();
// 开启 confirm
channel.confirmSelect();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//这里主要更改代码为发送批量消息后 再进行等待服务器确认 把确认消息 放在 for 循环 外面
for (int i = 0; i < 500; i++) {
String msg = "批量确认!" + i;
channel.basicPublish("",QUEUE_NAME,null,(" 批量Confirm模式, 第" + (i + 1) + "条消息").getBytes());
}
//3.3 确定批量操作是否成功
//if(channel.waitForConfirms()){
// System.out.println("发送成功");
//}else{
//失败 进行消息重发
//}
// 或者 采用下面的方式 该方法会等到最后一条消息得到确认或者得到nack才会结束,也就是说在waitForConfirmsOrDie处会造成当前程序的阻塞。
final long start = System.currentTimeMillis();
channel.waitForConfirmsOrDie(); // 当你发送的全部消息,有一个失败的时候,就直接全部失败 抛出异常IOException
System.out.println("执行waitForConfirmsOrDie耗费时间: " + (System.currentTimeMillis() - start) + "ms");
//4. 释放资源
channel.close();
connection.close();
}
@Test
public void consume() throws Exception {
Connection connection = RabbitMQUtil.getConnection();
System.out.println(connection);
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
DefaultConsumer consume = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body,"UTF-8"));
}
};
channel.basicConsume(QUEUE_NAME,true,consume);
System.out.println("消费者开始监听队列!");
System.in.read();
channel.close();
connection.close();
}
}
5.1.3 异步Confirm方式。
package com.glls._6confirm;
import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
/**
*
* 异步Confirm方式。
* 异步Confirm模式:提供一个回调方法,服务端Confirm了一条或者多条消息后Client端会回调这个方法。
*
*
* */
public class HelloWorldTest3s {
private final static String QUEUE_NAME = "unsynch confirm ";
@Test
public void publish() throws Exception {
Connection connection = RabbitMQUtil.getConnection();
System.out.println(connection);
Channel channel = connection.createChannel();
// 开启 confirm
channel.confirmSelect();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
long start = System.currentTimeMillis();
//3.3 开启异步回调
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// deliveryTag:是该消息的index
System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量" + multiple);
// multiple的值为true,true确认所有将比第一个参数指定的 delivery-tag 小的消息都得到确认。
}
//handleNack 3s 10s xxx.. 重试
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量" + multiple);
}
});
System.out.println("执行异步确认耗费时间: "+(System.currentTimeMillis()-start)+"ms");
//批量发送消息
for (int i = 0; i < 500; i++) {
channel.basicPublish("",QUEUE_NAME,null,(" 异步Confirm模式, 第" + (i + 1) + "条消息").getBytes());
}
/**
* 可以看到,虽然我们还是发送了500条消息,同样我们并没有收到500个ack消息 ,只收到较少的ack消息,
* 并且这些个ack消息的multiple域都为true,你多次运行程序会发现每次发送回来的ack消息中的deliveryTag域的值并不是一样的,
* 说明broker端批量回传给发送者的ack消息并不是以固定的批量大小回传的;
*
* */
System.in.read(); // 卡住程序 不结束 就可以看到 异步监听的效果了
//4. 释放资源
channel.close();
connection.close();
}
@Test
public void consume() throws Exception {
Connection connection = RabbitMQUtil.getConnection();
System.out.println(connection);
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
DefaultConsumer consume = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("接收到消息:" + msg);
}
};
channel.basicConsume(QUEUE_NAME,true,consume);
System.out.println("消费者开始监听队列!");
System.in.read();
channel.close();
connection.close();
}
}
5.1.4 Return机制
Confirm只能感知消息是否到达exchange,无法保证消息可以被exchange分发到指定queue。
而且exchange是不能持久化消息的,queue是可以持久化消息。
采用Return机制来监听消息是否从exchange送到了指定的queue中
开启Return机制,并在发送消息时,指定mandatory为true
package com.glls._7return;
import com.glls.utils.RabbitMQUtil;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
/**
*
* return 机制
* confirm 机制 只能确认 消息 发送到了 exchange 不能保证到 消息队列了 ,所以 开启 return 机制
* 监听 是否消息 到了 queue 中
*
* */
public class HelloWorldTest4 {
private final static String QUEUE_NAME = "unsynch confirm and return";
@Test
public void publish() throws Exception {
Connection connection = RabbitMQUtil.getConnection();
System.out.println(connection);
Channel channel = connection.createChannel();
// 开启 confirm
channel.confirmSelect();
channel.queueDeclare(QUEUE_NAME,true,false,true,null);
//3.3 开启异步回调
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量" + multiple);
// multiple的值为true,true确认所有将比第一个参数指定的 delivery-tag 小的消息都得到确认。
}
//handleNack 3s 10s xxx.. 重试
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量" + multiple);
}
});
// 1. 开启return机制 2. 发送消息时 指定 mandatory 参数
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 当消息没有送达到queue时,才会执行。
System.out.println(new String(body,"UTF-8") + "没有送达到Queue中!!");
}
});
//批量发送消息
for (int i = 0; i < 500; i++) {
// 2. 发送消息时 指定 mandatory 参数
channel.basicPublish("",QUEUE_NAME,true,null,(" 异步Confirm和return, 第" + (i + 1) + "条消息").getBytes());
}
System.in.read(); // 卡住程序 不结束 就可以看到 异步监听的效果了
//4. 释放资源
channel.close();
connection.close();
}
@Test
public void consume() throws Exception {
Connection connection = RabbitMQUtil.getConnection();
System.out.println(connection);
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,true,null);
DefaultConsumer consume = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("接收到消息:" + msg);
}
};
channel.basicConsume(QUEUE_NAME,true,consume);
System.out.println("消费者开始监听队列!");
System.in.read();
channel.close();
connection.close();
}
}
5.2 springboot 实现rabbitmq消息的可靠性
5.2.1配置文件
编写配置文件 开启Confirm 和 Return 机制
spring:
rabbitmq:
host: 192.168.5.201
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
acknowledge-mode: manual # 手动指定 ack
publisher-confirm-type: simple # 确认
publisher-returns: true # 消息可靠性
指定RabbitTemplate 对象 开启 Confirm 和 Return 并编写回调方法
生产者 消费者 没有什么变化
package com.glls.bootrabbitmqdemo1.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @author glls
* @email 524840158@qq.com
* @company xxx
* @create 2020-10-03 15:01
*/
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
//
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct // init-method 构建当前对象时 会执行这个方法
public void initMethod(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("消息唯一标识"+correlationData);
System.out.println("确认结果"+ack);
System.out.println("失败原因"+cause);
if(ack){
System.out.println("消息已经送到了exchange");
}else{
System.out.println("消息没有送到exchange");
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
// 如果执行了这个方法 说明消息没有送到 queue 中
System.out.println("消息没有送到 queue ");
}
}
5.3避免消息的重复消费
重复消费消息,会对非幂等行操作造成问题
重复消费消息的原因是,消费者没有给RabbitMQ一个ack
为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,
id-0(正在执行业务)
id-1(执行业务成功)
如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。
生产者,发送消息时,指定messageId
private final static String QUEUE_NAME = "refuse repeat queue ";
@Test
public void publish() throws Exception {
//1. 获取Connection
Connection connection = RabbitMQUtil.getConnection();
//2. 创建Channel
Channel channel = connection.createChannel();
//3. 发布消息到exchange,同时指定路由的规则
String msg = "防止重复提交的消息";
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
for(int i=0;i<100;i++){
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(1) //指定消息是否需要持久化 1 - 需要持久化 2 - 不需要持久化
.messageId(UUID.randomUUID().toString())
.build();
channel.basicPublish("",QUEUE_NAME,properties,(i+msg).getBytes());
}
// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
System.out.println("生产者发布消息成功!");
//4. 释放资源
channel.close();
connection.close();
}
消费者,在消费消息时,根据具体业务逻辑去操作redis
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQUtil.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
channel.basicQos(1); // 不要 一次性的把消息都给消费者 容易丢失 一次给一条 安全
//4. 开启监听Queue
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
Jedis jedis = new Jedis("127.0.0.1",6379);
String messageId = properties.getMessageId();
//1. setnx到Redis中,默认指定value-0
String result = jedis.set(messageId, "0", "NX", "EX", 10);
if(result != null && result.equalsIgnoreCase("OK")) {
System.out.println("1接收到消息:" + new String(body, "UTF-8"));
//2. 消费成功,set messageId 1
jedis.setex(messageId,10,"1");
//2. 手动ack
// 参数1 long类型 取出来当前消息在队列中的的索引 参数2:boolean 类型 是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
}else {
//3. 如果1中的setnx失败,获取key对应的value,如果是0,return,如果是1
String s = jedis.get(messageId);
if("1".equalsIgnoreCase(s)){
// 消息被别人消费完了
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
}
};
//3. 指定手动ack
channel.basicConsume(QUEUE_NAME,false,consumer);
System.out.println("消费者1开始监听队列!");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
5.4避免消息的重复消费-springboot 实现
5.4.1 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
5.4.2 编写配置文件
spring:
redis:
host: 192.168.199.109
port: 6379
5.4.3 修改生产者
@Test
void testRepeat(){
CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("boot-topic-exchange-repeat","black.dog.and.cat","黑色狗和猫!!",messageId);
//System.in.read();
}
5.4.4 修改消费者
package com.glls.bootrabbitmqdemo1._8repeat;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@Component
public class TopicRepeatConsumer {
@Autowired
private StringRedisTemplate redisTemplate;
@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "boot-topic-exchange-repeat",type = "topic"),
key = {"black.*.#"}
)
}
)
public void getMessage4(String msg, Channel channel, Message message) throws IOException {
//0. 获取MessageId
String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
//1. 设置key到Redis
if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {
//2. 消费消息
System.out.println("接收到消息:" + msg);
//3. 设置key的value为1
redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
//4. 手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
//5. 获取Redis中的value即可 如果是1,手动ack
if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
}
6.MQ的应用场景
6.1异步处理
6.2应用解耦
6.3流量削峰
7.高级特性
7.1TTL
Time to live 过期时间 设置消息的过期时间 有两种方式
- 指定一条消息的过期时间
- 给队列设置消息过期时间,队列中所有的消息都有同样的过期时间
应用场景:比如 下单未支付 则订单自动删除的实现
7.1.1 设置消息的过期时间
发送一条设置了过期时间的消息
细节: 过期时间 指的是 消息在 队列中的存活时间,所以 此时 为了看到效果 不用设置消费者监听队列 一直消费消息,如果 一直监听队列 消费消息的话 就看不到消息过期之后 消息从队列中消失的的效果了
所以 创建 交换机 创建 队列 别创建消费者
package com.ww.rabbitmq._7ttl;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @ClassName : TTLConfig
* @Author : glls
* @Date: 2021/6/5 20:54
* @Description :
*/
@Configuration
public class TTLConfig {
/**
* ttl_exchange 交换机名称
*/
public static final String TTL_DIRECT_EXCHANGE = "ttldirectExchange";
/**
* ttl_direct队列名称
*/
public static final String TTL_DIRECT_QUEUE="ttldirectQueue";
/**
* ttl_direct路由Key
*/
public static final String TTL_DIRECT_ROUTINGKEY="ttl_directRoutingKey";
/**
* 定义一个TTL direct交换机
* @return
*/
@Bean
public DirectExchange ttlDirectExchange(){
return new DirectExchange(TTL_DIRECT_EXCHANGE);
}
/**
* 定义一个TTL direct队列
* @return
*/
@Bean
public Queue ttlDirectQueue(){
//Map<String,Object> map=new HashMap<>();
//map.put("x-message-ttl",5000);
return new Queue(TTL_DIRECT_QUEUE,true,false,false);
}
/**
* TTL定义一个队列和交换机的绑定
* @return
*/
@Bean
public Binding ttlDirectBinding(){
return BindingBuilder.bind(ttlDirectQueue()).to(ttlDirectExchange()).with(TTL_DIRECT_ROUTINGKEY);
}
}
生产者发送消息
@Test
public void sendTTLMessage() {
String message = "测试设置了过期时间的消息";
MessageProperties messageProperties=new MessageProperties(); // 消息属性对象
messageProperties.setExpiration("10000"); // 设置过期时间 10秒
Message msg=new Message(message.getBytes(),messageProperties); //封装消息对象
rabbitTemplate.send(TTL_DIRECT_EXCHANGE,TTL_DIRECT_ROUTINGKEY,msg);
}
测试 发现 消息在队列 10 秒后 消失
7.1.2 设置队列的过期时间
直接 在 队列上 设置消息的过期时间 这样 队列中的消息过期时间也都跟 队列设置的过期时间相同, 如果 消息也设置了过期时间 谁小谁优先级高
@Bean
public Queue ttlDirectQueue(){
// 在队列上 设置 此队列中 消息的过期时间
Map<String,Object> map=new HashMap<>();
// 队列中 所有的消息的过期时间 为 20秒
//map.put("x-message-ttl",20000);
// 队列中 所有的消息的过期时间 为 5秒
map.put("x-message-ttl",5000);
//return new Queue(TTL_DIRECT_QUEUE,true,false,false);
return new Queue(TTL_DIRECT_QUEUE,true,false,false,map);
}
说明: 如果同时指定了Message的TTL 和 Queue 的 TTL ,则优先较小的那一个
所以 最佳实践 是 在 队列上设置过期时间
注意点:
TTL的延时队列存在一个问题,就是同一个队列里的消息延时时间最好一致,比如说队列里的延时时间都是1小时,千万不能队列里的消息延时时间乱七八糟多久的都有,这样的话先入队的消息如果延时时间过长会堵着后入队延时时间小的消息,导致后面的消息到时也无法变成死信转发出去,很坑!!!
举个栗子:延时队列里先后进入A,B,C三条消息,存活时间是3h,2h,1h,结果到了1小时C不会死,到了2hB不会死,到了3小时A死了,同时B,C也死了,意味着3h后A,B,C才能消费,很坑!!!
7.2 死信队列
队列中的消息可能会变成死信消息(dead-lettered),进而当以下几个事件任意一个发生时,消息将会被重新发送到一个交换机:,
1,消息被消费者使用basic.reject或basic.nack方法并且requeue参数值设置为false的方式进行消息确认(negatively acknowledged)
2,消息由于消息有效期(per-message TTL)过期
3,消息由于队列超过其长度限制而被丢弃
注意,队列的有效期并不会导致其中的消息过期
x-dead-letter-exchange 指定死信交换机
x-dead-letter-routing-key 指定死信路由key
7.3 延迟队列
什么是延迟队列?
延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。
其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。
简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
延迟队列使用场景:
那么什么时候需要用延时队列呢?考虑一下以下场景:
订单在十分钟之内未支付则自动取消。
新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
账单在一周内未支付,则自动结算。
用户注册成功后,如果三天内没有登陆则进行短信提醒。
用户发起退款,如果三天内没有得到处理则通知相关运营人员。
预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。
延迟队列的实现方式:
1,利用TTL+死信队列
生产者生产一条延时消息,根据需要延时时间的不同,利用不同的routingkey将消息路由到不同的延时队列,每个队列都设置了不同的TTL属性,并绑定在同一个死信交换机中,消息过期后,根据routingkey的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理即可。
这种方式的弊端,无法做到通用性,每搞一个新的延迟任务,都要去实现一个实现的TTL+死信队列,比较麻烦;
2,利用RabbitMQ插件实现
安装一个插件即可:https:/https://www.rabbitmq.com/community-plugins.html ,下载rabbitmq_delayed_message_exchange插件,然后解压放置到RabbitMQ的插件目录。
把下载的插件 放到 容器内的 /plugins 目录内
rabbitmq-plugins enable rabbitmq_delayed_message_exchange 安装插件
重启 rabbitmq 容器
package com.ww.rabbitmq._9delay;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
*/
@Configuration
public class DelayConfig {
/**
* delayedDirect交换机名称
*/
public static final String DELAYED_DIRECT_EXCHANGE="delayedDirectExchange";
/**
* delayed direct队列名称
*/
public static final String DELAYED_DIRECT_QUEUE="delayedDirectQueue";
/**
* delayed_direct路由Key
*/
public static final String DELAYED_DIRECT_ROUTINGKEY="delayed_directRoutingKey";
/**
* 配置一个DELAYED Direct类型的交换机
自定义类型 交换机
* @return
*/
@Bean
public CustomExchange delayedCustomExChange(){
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_DIRECT_EXCHANGE,"x-delayed-message", true, false, args);
}
/**
* 定义一个DELAYED direct队列
* @return
*/
@Bean
public Queue delayedDirectQueue(){
return new Queue(DELAYED_DIRECT_QUEUE);
}
/**
* 配置一个delayed交换机和队列的绑定
* @return
*/
@Bean
public Binding delayedDirectBinding(){
return BindingBuilder.bind(delayedDirectQueue()).to(delayedCustomExChange()).with(DELAYED_DIRECT_ROUTINGKEY).noargs();
}
}
package com.ww.rabbitmq._9delay;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
/**
*/
@Component
public class DelayConsumer {
/**
* delayedDirect交换机名称
*/
public static final String DELAYED_DIRECT_EXCHANGE="delayedDirectExchange";
/**
* delayed direct队列名称
*/
public static final String DELAYED_DIRECT_QUEUE="delayedDirectQueue";
/**
* delayed_direct路由Key
*/
public static final String DELAYED_DIRECT_ROUTINGKEY="delayed_directRoutingKey";
// 方式2 利用 插件 接受延迟消息
@RabbitListener(queues = {DELAYED_DIRECT_QUEUE})
public void receiveMessage2(Channel channel, Message message) throws IOException {
System.out.println("接收延迟消息:"+new String(message.getBody())+":"+new Date().toLocaleString());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
/**
* delayedDirect交换机名称
*/
public static final String DELAYED_DIRECT_EXCHANGE="delayedDirectExchange";
/**
* delayed_direct路由Key
*/
public static final String DELAYED_DIRECT_ROUTINGKEY="delayed_directRoutingKey";
// 利用插件 发送延迟消息
@Test
public void sendDelayMessage(){
String message="测试延时消息插件" + new Date().toLocaleString();
Integer delayTime = 10000; // 毫秒
rabbitTemplate.convertAndSend(DELAYED_DIRECT_EXCHANGE,DELAYED_DIRECT_ROUTINGKEY,message,a->{
a.getMessageProperties().setDelay(delayTime);
return a;
});
}
8.RabbitMQ的集群
单个mq 也存在单点故障问题,所以 也有集群架构保证高可用性。
由多个MQ 服务节点共同对消息传递提供服务,如果其中一个节点出现宕机,不影响我们整个系统的使用。
7.1普通集群(副本集群)
7.2镜像集群
9.练习
创建消息服务 发送阿里云手机短信验证码
打开阿里云平台 登陆控制台
开启子用户
新建用户组
把上一步创建的用户添加到这个用户组
给这用户组 添加服务的权限
得到accessKey 和 accessKeySecret
添加模板
添加签名
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>4.5.16</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
public static void main(String[] args) {
DefaultProfile profile = DefaultProfile.getProfile("cn-qingdao", "LTAI5t9VxKTe2ZRooXUJ5Nqp", "Khq1jiz2djmZO0KIHMbKyp3PjHRAEn");
IAcsClient client = new DefaultAcsClient(profile);
CommonRequest request = new CommonRequest();
request.setSysMethod(MethodType.POST);
request.setSysDomain("dysmsapi.aliyuncs.com");
request.setSysVersion("2017-05-25");
request.setSysAction("SendSms");
request.putQueryParameter("PhoneNumbers", "13663718227");
request.putQueryParameter("SignName", "ABC商城");
request.putQueryParameter("TemplateCode", "SMS_200702765");
HashMap<String, String> map = new HashMap<>();
map.put("code","123456");
request.putQueryParameter("TemplateParam", JSON.toJSONString(map));
try {
CommonResponse response = client.getCommonResponse(request);
System.out.println(response.getData());
} catch (ClientException e) {
e.printStackTrace();
}
}
10.个人回顾复习
官网:
Messaging that just works — RabbitMQ