快速上手RabbitMQ
- 安装RabbitMQ
-
- 首先将镜像包上传到虚拟机,使用命令加载镜像
Python
docker load -i mq.tar
-
- 运行MQ容器
Python
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
- MQ的基本结构
-
-
- publisher:生产者
- consumer:消费者
- exchange:交换机,负责消息路由
- queue:队列,存储消息
- virtualHost:虚拟主机,隔离不同租户的exchange,queue,消息的隔离
-
Python
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
Python
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
- SpringAMQP
-
- 功能
-
-
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
-
-
- 简化模型 === producer->queue->consumer
-
-
- BasicQueue
-
-
-
-
- 首先在父工程中引入依赖
-
-
Python
org.springframework.boot
spring-boot-starter-amqp
-
-
-
- 配置MQ地址,在publisher服务的application.yml中添加配置
-
-
Python
spring:
rabbitmq:
host: 192.168.137.138 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: itcast # 用户名
password: 123321 # 密码
-
-
-
- 编写队列
-
-
Python
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg){
log.info("接受到的消息:{}",msg);
}
-
-
-
- 发送消息
-
-
Python
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue(){
String queueName = "simple.queue";
String message = "hello,world";
rabbitTemplate.convertAndSend(queueName,message);
}
}
-
-
- WorkQueue === 让多个消费者绑定到一个队列,共同消费队列中的消息
-
Python
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testWorkQueue() throws Exception{
String queueName = "simple.queue";
String message = "hello,world";
for (int i = 1; i exchange(只负责路由,不负责存储)->queue->consumer
Fanout === 广播给所有的queue
结构图 消息发送流程
可以有多个队列每个队列都要绑定到Exchange(交换机)生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定交换机把消息发送给绑定过的所有队列订阅队列的消费者都能拿到消息
在消费者模块中创建一个类,声明队列和交换机@Configuration
public class FanoutConfig {
/*
* 创建一个交换机
* */
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout.exchange");
}
/*
* 创建队列1
* */
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/*
* 创建队列2
* */
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/*
* 将队列1绑定到交换机
* */
@Bean
public Binding queue1Binding(){
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
/*
* 将队列2绑定到交换机
* */
@Bean
public Binding queue2Binding(){
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
}
发送消息@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testFanoutExchange() {
// 队列名称
String exchangeName = "fanout.exchange";
// 消息
String message = "hello world!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
}
消息接受@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "fanout.queue1")
public void fanoutQueue1(String msg){
log.info("收到了来自fanout.queue1的消息,{}",msg);
}
@RabbitListener(queues = "fanout.queue2")
public void fanoutQueue2(String msg){
log.info("收到了来自fanout.queue2的消息,{}",msg);
}
}
Direct === 路由给exchange绑定的queue
结构图 消息发送流程
queue与exchange绑定的时候需要设置bindingkey可以设置多个bindingkey,key可以重复produce发送的时候需要设置routingkeyexchange判断消息的routingkey与queue中的bindingkey是否完全一致,一致才会接受到消息
基于注解声明队列和交换机@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "direct.exchange"),
key = {"red","blue"}
))
public void directQueue1(String msg){
log.info("收到了来自direct.queue1的消息,{}",msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "direct.exchange"),
key = {"gary","blue"}
))
public void directQueue2(String msg){
log.info("收到了来自direct.queue2的消息,{}",msg);
}
}
消息发送@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testDirectExchange(){
String exchange = "direct.exchange";
String routingKey = "gary";
String message = "hello direct";
rabbitTemplate.convertAndSend(exchange,routingKey,message);
}
}
Direct交换机与Fanout交换机有什么区别?
Fanout交换机将消息路由给每一个与之绑定的队列Direct交换机根据RoutingKey判断路由给哪个队列
Topic
结构图 匹配支持通配符
*:1个单词#:1个或者多个单词
基于注解声明队列和交换机@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "topic.exchange"),
key = "china.#"
))
public void topicQueue1(String msg){
log.info("收到了来自topic.queue1的消息,{}",msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "topic.exchange"),
key = "#.news"
))
public void topicQueue2(String msg){
log.info("收到了来自topic.queue2的消息,{}",msg);
}
}
消息发送@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testTopicExchange(){
String exchange = "topic.exchange";
String routingKey = "china.123";
String message = "so cool";
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}
消息转换器
默认发送String,byte[],Serializable可以自定义序列化
在publisher和consumer两个服务中都引入依赖: com.fasterxml.jackson.dataformat
jackson-dataformat-xml
2.9.10
注入MessageConverter的实现类@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
消息发送@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testObjectQueue(){
String queue = "object.queue";
User message = new User("蒋浩楠",80);
rabbitTemplate.convertAndSend(queue,message);
}
}
接收消息@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "object.queue")
public void objectQueue(UserDTO dto){
log.info("收到了来自topic.queue2的消息,{}",dto.toString());
}
}
RabbitMQ集群
普通集群
结构图 特征
会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回队列所在节点宕机,队列中的消息就会丢失
镜像集群
结构图 特征
交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点一个队列的主节点可能是另一个队列的镜像节点所有操作都是主节点完成,然后同步给镜像节点主宕机后,镜像节点会替代成新的主
仲裁队列
特征
与镜像队列一样,都是主从模式,支持主从数据同步使用非常简单,没有复杂的配置主从同步基于Raft协议,强一致
java代码中创建仲裁队列
创建队列@Bean
public Queue quorumQueue() {
return QueueBuilder
.durable("quorum.queue") // 持久化
.quorum() // 仲裁队列
.build();
}
SpringAMQP连接MQ集群spring:
rabbitmq:
addresses: 192.168.150.105:8071, 192.168.150.105:8072, 192.168.150.105:8073 #address来代替host、port方式
username: itcast
password: 123321
virtual-host: /
部署集群
计划部署3节点的mq集群 获取cookie,每个集群节点必须具有相同的 cookie。实例之间也需要它来相互通信docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie
UTQKOGHXAJPQFJREBLEL #cookie
docker rm -f mq #停止并删除当前的mq容器,我们重新搭建集群
准备集群配置#在/tmp目录新建一个配置文件 rabbitmq.conf
cd /tmp
# 创建文件
touch rabbitmq.conf
#配置文件内容如下
loopback_users.guest = false
listeners.tcp.default = 5672
default_user = itcast
default_pass = 123321
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3
再创建一个文件,记录cookiecd /tmp
# 创建cookie文件
touch .erlang.cookie
# 写入cookie
echo "UTQKOGHXAJPQFJREBLEL" > .erlang.cookie
# 修改cookie文件的权限
# 修改cookie文件的权限
# 修改cookie文件的权限
chmod 600 .erlang.cookie
准备三个目录,mq1、mq2、mq3,然后拷贝rabbitmq.conf、cookie文件到mq1、mq2、mq3:cd /tmp
# 创建目录
mkdir mq1 mq2 mq3
# 进入/tmp
cd /tmp
# 拷贝
cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3
# 或者
echo mq1 mq2 mq3 | xargs -t -n 1 cp rabbitmq.conf
echo mq1 mq2 mq3 | xargs -t -n 1 cp .erlang.cookie
启动集群#创建一个网络
docker network create mq-net
#运行命令
docker run -d --net mq-net \
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/mq1/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
--name mq1 \
--hostname mq1 \
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:3-management
docker run -d --net mq-net \
-v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/mq2/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
--name mq2 \
--hostname mq2 \
-p 8072:5672 \
-p 8082:15672 \
rabbitmq:3-management
docker run -d --net mq-net \
-v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/mq3/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
--name mq3 \
--hostname mq3 \
-p 8073:5672 \
-p 8083:15672 \
rabbitmq:3-management
添加镜像模式docker exec -it mq1 rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
添加仲裁队列
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!