RabbitMQ实践——搭建单人聊天服务
大纲
- 创建Core交换器
- 用户登录
- 发起聊天邀请
- 接受邀请
- 聊天
- 实验过程
- 总结
- 代码工程
经过之前的若干节的学习,我们基本掌握了Rabbitmq各个组件和功能。本文我们将使用之前的知识搭建一个简单的单人聊天服务。
基本结构如下。为了避免Server有太多连线导致杂乱,下图将Server画成两个模块,实则是一个服务。
该服务由两个核心交换器构成。
Core交换器是服务启动时创建的,它主要是为了向不同用户传递“系统通知型”消息。比如Jerry向Tom发起聊天邀请,则是通过上面黑色字体6-10的流程发给了Core交换器。然后Core交换器将该条消息告知Tom。
Fanout交换器是用来消息传递的。Jerry和Tom都向其发送消息,然后路由到两个队列。它们两各自订阅一个队列,就可以看到彼此的聊天内容了。
创建Core交换器
package com.rabbitmq.chat.service; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import jakarta.annotation.PostConstruct; import reactor.core.publisher.Flux; @Service public class Core { @Autowired private RabbitTemplate rabbitTemplate; private ConnectionFactory connectionFactory; final String exchangeName = "Core"; @PostConstruct public void init() { connectionFactory = rabbitTemplate.getConnectionFactory(); createExchange(exchangeName); } private void createExchange(String exchangeName) { rabbitTemplate.execute(channel -> { channel.exchangeDeclare(exchangeName, "direct", false, true, null); return null; }); }
用户登录
用户登录后,我们会创建一个“系统通知”队列。然后用户就会通过长连接形式,持续等待系统发出通知。
private final ReentrantLock lock = new ReentrantLock(); final private Map listeners = new java.util.HashMap(); public Flux Login(String username) { createExclusiveQueue(username); createBanding(exchangeName, username, username); return Flux.create(emitter -> { SimpleMessageListenerContainer container = getListener(username, (Message message) -> { String msg = new String(message.getBody()); System.out.println("Received message: " + msg); emitter.next(msg); }); container.start(); }); } private void createExchange(String exchangeName) { rabbitTemplate.execute(channel -> { channel.exchangeDeclare(exchangeName, "direct", false, true, null); return null; }); } private void createBanding(String exchangeName, String queueName, String routingKey) { rabbitTemplate.execute(channel -> { channel.queueBind(queueName, exchangeName, routingKey); return null; }); } private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) { lock.lock(); try { SimpleMessageListenerContainer listener = listeners.get(queueName); if (listener == null && messageListener != null) { listener = new SimpleMessageListenerContainer(); listener.setConnectionFactory(connectionFactory); listener.setQueueNames(queueName); listener.setMessageListener(messageListener); listeners.put(queueName, listener); } return listener; } finally { lock.unlock(); } }
Controller如下
package com.rabbitmq.chat.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import com.rabbitmq.chat.service.Core; import reactor.core.publisher.Flux; @RestController @RequestMapping("/user") public class UserController { @Autowired private Core core; @PostMapping(value = "/login", produces = "text/event-stream") public Flux login(@RequestParam String username) { return core.Login(username); } }
发起聊天邀请
发起聊天邀请时,系统会预先创建一个聊天室(ChatRoomInfo )。它包含上图中Fanout交换器、以及聊天双方需要订阅的消息队列。
这些创建完后,发起方就会等待对方发送的消息,也可以自己和自己聊天。因为消息队列已经创建好了,只是对方还没使用。
package com.rabbitmq.chat.service; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import jakarta.annotation.PostConstruct; import lombok.Data; import reactor.core.publisher.Flux; @Service public class ChatRoom { @Autowired private RabbitTemplate rabbitTemplate; private ConnectionFactory connectionFactory; @Data private class ChatRoomInfo { private String exchange; private Map usernameToQueuename; } private final Map chatRooms = new java.util.HashMap(); private final ReentrantLock lock = new ReentrantLock(); @PostConstruct public void init() { connectionFactory = rabbitTemplate.getConnectionFactory(); } public Flux invite(String fromUsername, String toUsername) { String chatRoomName = getChatRoomName(fromUsername, toUsername); ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName); if (chatRoomInfo == null) { createChatRoom(fromUsername, toUsername); } return talk(chatRoomName, fromUsername); } private void createChatRoom(String fromUsername, String toUsername) { String chatRoomName = getChatRoomName(fromUsername, toUsername); String exchangeName = chatRoomName; String fromQueueName = "queue-" + fromUsername + "-" + toUsername; String toQueueName = "queue-" + toUsername + "-" + fromUsername; rabbitTemplate.execute(action -> { action.exchangeDeclare(exchangeName, "fanout", false, true, null); action.queueDeclare(fromQueueName, false, true, false, null); action.queueDeclare(toQueueName, false, true, false, null); action.queueBind(fromQueueName, exchangeName, ""); action.queueBind(toQueueName, exchangeName, ""); return null; }); lock.lock(); try { ChatRoomInfo chatRoomInfo = new ChatRoomInfo(); chatRoomInfo.setExchange(exchangeName); chatRoomInfo.setUsernameToQueuename(Map.of(fromUsername, fromQueueName, toUsername, toQueueName)); chatRooms.put(chatRoomName, chatRoomInfo); } finally { lock.unlock(); } }
接受邀请
被邀请方通过Core交换器得知有人要和它聊天。
然后接受邀请的请求会寻找聊天室信息,然后订阅聊天记录队列。
public Flux accept(String fromUsername, String toUsername) { String chatRoomName = getChatRoomName(fromUsername, toUsername); return talk(chatRoomName, toUsername); } private Flux talk(String chatRoomName, String username) { ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName); if (chatRoomInfo == null) { throw new IllegalArgumentException("Chat room not found"); } String queueName = chatRoomInfo.getUsernameToQueuename().get(username); return Flux.create(emitter -> { SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer(); listener.setConnectionFactory(connectionFactory); listener.setQueueNames(queueName); listener.setMessageListener((Message message) -> { String msg = new String(message.getBody()); System.out.println(username + " received message: " + msg); emitter.next(msg); }); listener.start(); }); }
聊天
聊天的逻辑就是找到聊天室信息,然后向交换器发送消息。
public void chat(String fromUsername, String toUsername, String message) { String chatRoomName = getChatRoomName(fromUsername, toUsername); ChatRoomInfo chatRoomInfo = chatRooms.get(chatRoomName); if (chatRoomInfo == null) { chatRoomName = getChatRoomName(toUsername, fromUsername); chatRoomInfo = chatRooms.get(chatRoomName); } if (chatRoomInfo == null) { throw new IllegalArgumentException("Chat room not found"); } rabbitTemplate.convertAndSend(chatRoomInfo.getExchange(), "", fromUsername + ": " + message); } private String getChatRoomName(String fromUsername, String toUsername) { return fromUsername + "-" + toUsername + "-chat-room"; }
Controller侧代码
package com.rabbitmq.chat.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import com.rabbitmq.chat.service.ChatRoom; import com.rabbitmq.chat.service.Core; import reactor.core.publisher.Flux; @RestController @RequestMapping("/chat") public class ChatController { @Autowired private Core core; @Autowired private ChatRoom chatRoom; @PutMapping(value = "/invite", produces = "text/event-stream") public Flux invite(@RequestParam String fromUsername, @RequestParam String toUsername) { core.invite(fromUsername, toUsername); return chatRoom.invite(fromUsername, toUsername); } @PutMapping(value = "/accept", produces = "text/event-stream") public Flux accept(@RequestParam String fromUsername, @RequestParam String toUsername) { core.accept(fromUsername, toUsername); return chatRoom.accept(fromUsername, toUsername); } @PostMapping("/send") public void send(@RequestParam String fromUsername, @RequestParam String toUsername, @RequestParam String message) { chatRoom.chat(fromUsername, toUsername, message); } }
实验过程
在Postman中,我们先让tom登录,然后jerry登录。
在后台,我们看到创建两个队列
以及Core交换器的绑定关系也被更新
Jerry向Tom发起聊天邀请
可以看到Tom收到了邀请
同时新增了两个队列
以及一个交换器
Tom通过下面请求接受邀请
Jerry收到Tom接受了邀请的通知
后面它们就可以聊天了
它们的聊天窗口都收到了消息
总结
本文主要使用的知识点:
- direct交换器以及其绑定规则
- fanout交换器
- 自动删除的交换器
- 自动删除的队列
- 只有一个消费者的队列
- WebFlux响应式编程
代码工程
https://github.com/f304646673/RabbitMQDemo