SpringBoot+Redis实现分布式WebSocket
什么是分布式WebSocket?
是指在分布式系统架构中实现WebSocket的通信机制,它允许在不同的服务器节点之间共享和同步WebSocket会话状态,从而实现跨多个服务器的实时消息传递。
在分布式环境中实现WebSocket的挑战主要包括以下几点:
- 会话共享:在分布式系统中,用户的WebSocket连接可能与不同的服务器建立,这就要求系统能够在不同服务器间共享WebSocket会话信息,以便消息能够被正确地传递到所有相关的客户端。
- 负载均衡:使用负载均衡可以提高系统的可用性和伸缩性。但是,当WebSocket请求在服务器之间负载均衡时,需要确保客户端可以与正确的服务器建立连接,并且能够接收到所有的消息。
- 故障转移:在出现服务器故障时,系统需要能够将WebSocket会话无缝迁移到其他健康的服务器上,以保证服务的连续性。
- 一致性:确保所有用户在任何时候看到的都是一致的消息状态,这对于实时通信非常重要。
为了解决这些挑战,可以采取以下几种策略:
- 使用消息代理:通过引入一个中心化的消息代理(如RabbitMQ、Redis Pub/Sub等),可以让所有的服务器都连接到这个消息代理。当一个服务器需要发送消息时,它将消息发送到消息代理,然后由消息代理负责将消息分发到所有连接的客户端。这样可以确保消息的一致性和可靠性。
- 共享会话存储:使用一个共享的会话存储(如数据库或内存数据网格)来保存WebSocket会话的状态。这样,即使客户端最初连接到的服务器发生故障,其他服务器也可以接管会话并继续处理消息。
- 基于路由的负载均衡:使用智能负载均衡器(如Nginx、HAProxy等),它们可以根据特定的路由规则(如会话ID或用户ID)将WebSocket连接定向到特定的服务器。
- 服务发现:在微服务架构中,可以使用服务发现机制来动态地找到负责特定会话的服务器,并将消息路由到那里。
- WebSocket代理:使用专门的WebSocket代理服务器,它可以在多个后端服务器之间代理WebSocket连接,并确保消息的传递和会话的同步。
- 应用层协议:设计应用层协议来处理分布式WebSocket的复杂性,例如通过引入心跳机制来检测连接的健康状况,并通过预定的协议来同步会话状态。
总的来说,在实践中,可能需要结合多种策略来构建一个健壮的分布式WebSocket解决方案,以满足不同场景下的需求。此外,还需要考虑安全性、性能和可扩展性等因素,以确保系统的稳定性和可靠性。
温故而知新:单点WebSocket实现
SpringBoot2.0集成WebSocket,实现后台向前端推送信息_springboot集成websocket-CSDN博客https://zhengkai.blog.csdn.net/article/details/80275084
简单版本:在Java中使用Redis实现WebSocket
要在Java中使用Redis实现WebSocket,你需要使用一个支持WebSocket的Java Web框架,如Spring Boot,以及一个支持Redis的Java库,如Jedis。以下是一个简单的示例:
添加依赖项到你的pom.xml文件
org.springframework.boot spring-boot-starter-websocket org.springframework.boot spring-boot-starter-data-redis
创建一个WebSocket配置类
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; //by zhengkai.blog.csdn.net @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/websocket").withSockJS(); } @Override public void configureMessageBroker(org.springframework.messaging.simp.config.MessageBrokerRegistry registry) { registry.enableSimpleBroker("/topic"); registry.setApplicationDestinationPrefixes("/app"); } @Bean public JedisConnectionFactory jedisConnectionFactory() { return new JedisConnectionFactory(); } @Bean public RedisTemplate redisTemplate() { RedisTemplate template = new RedisTemplate(); template.setConnectionFactory(jedisConnectionFactory()); return template; } @Bean public MessageListenerAdapter messageListenerAdapter() { return new MessageListenerAdapter(new RedisMessageListener()); } @Bean public RedisMessageListenerContainer redisMessageListenerContainer() { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(jedisConnectionFactory()); container.addMessageListener(messageListenerAdapter(), topic()); return container; } @Bean public ChannelTopic topic() { return new ChannelTopic("websocket-topic"); } }
创建一个WebSocket消息监听器
import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; @Component public class RedisMessageListener implements MessageListener { @Override public void onMessage(Message message, byte[] pattern) { System.out.println("Received message: " + message); } }
发送消息到WebSocket客户端
在你的控制器中,你可以使用SimpMessagingTemplate来发送消息到WebSocket客户端:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class WebSocketController { @Autowired private SimpMessagingTemplate messagingTemplate; @GetMapping("/send") public String sendMessage() { messagingTemplate.convertAndSend("/topic/websocket-topic", "Hello from Redis!"); return "Message sent!"; } }
正式版本:用SpringBoot+Redis实现分布式WebSocket
- 将消息()统一推送到一个消息队列(Redis、Kafka等)的的topic,然后每个应用节点都订阅这个topic,在接收到WebSocket消息后取出这个消息的“消息接收者的用户ID/用户名”,然后再比对自身是否存在相应用户的连接,如果存在则推送消息,否则丢弃接收到的这个消息(这个消息接收者所在的应用节点会处理)
- 在用户建立WebSocket连接后,使用Redis缓存记录用户的WebSocket建立在哪个应用节点上,然后同样使用消息队列将消息推送到接收者所在的应用节点上面(实现上比方案一要复杂,但是网络流量会更低)
1. 定义一个WebSocket Channel枚举类
public enum WebSocketChannelEnum { //测试使用的简易点对点聊天 CHAT("CHAT", "测试使用的简易点对点聊天", "/topic/reply"); WebSocketChannelEnum(String code, String description, String subscribeUrl) { this.code = code; this.description = description; this.subscribeUrl = subscribeUrl; } /** * 唯一CODE */ private String code; /** * 描述 */ private String description; /** * WebSocket客户端订阅的URL */ private String subscribeUrl; public String getCode() { return code; } public String getDescription() { return description; } public String getSubscribeUrl() { return subscribeUrl; } /** * 通过CODE查找枚举类 */ public static WebSocketChannelEnum fromCode(String code){ if(StringUtils.isNoneBlank(code)){ for(WebSocketChannelEnum channelEnum : values()){ if(channelEnum.code.equals(code)){ return channelEnum; } } } return null; } }
2. 配置基于Redis的消息队列
需要注意的是,在大中型正式项目中并不推荐使用Redis实现的消息队列,因为经过测试它并不是特别可靠,所以应该考虑使用Kafka、rabbitMQ等专业的消息队列中间件
@Configuration @ConditionalOnClass({JedisCluster.class}) public class RedisConfig { @Value("${spring.redis.timeout}") private String timeOut; @Value("${spring.redis.cluster.nodes}") private String nodes; @Value("${spring.redis.cluster.max-redirects}") private int maxRedirects; @Value("${spring.redis.jedis.pool.max-active}") private int maxActive; @Value("${spring.redis.jedis.pool.max-wait}") private int maxWait; @Value("${spring.redis.jedis.pool.max-idle}") private int maxIdle; @Value("${spring.redis.jedis.pool.min-idle}") private int minIdle; @Value("${spring.redis.message.topic-name}") private String topicName; @Bean public JedisPoolConfig jedisPoolConfig(){ JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(maxActive); config.setMaxIdle(maxIdle); config.setMinIdle(minIdle); config.setMaxWaitMillis(maxWait); return config; } @Bean public RedisClusterConfiguration redisClusterConfiguration(){ RedisClusterConfiguration configuration = new RedisClusterConfiguration(Arrays.asList(nodes)); configuration.setMaxRedirects(maxRedirects); return configuration; } /** * JedisConnectionFactory */ @Bean public JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration configuration,JedisPoolConfig jedisPoolConfig){ return new JedisConnectionFactory(configuration,jedisPoolConfig); } /** * 使用Jackson序列化对象 */ @Bean public Jackson2JsonRedisSerializer jackson2JsonRedisSerializer(){ Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(objectMapper); return serializer; } /** * RedisTemplate */ @Bean public RedisTemplate redisTemplate(JedisConnectionFactory factory, Jackson2JsonRedisSerializer jackson2JsonRedisSerializer){ RedisTemplate redisTemplate = new RedisTemplate(); redisTemplate.setConnectionFactory(factory); //字符串方式序列化KEY StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); redisTemplate.setKeySerializer(stringRedisSerializer); redisTemplate.setHashKeySerializer(stringRedisSerializer); //JSON方式序列化VALUE redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } /** * 消息监听器 */ @Bean MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver, Jackson2JsonRedisSerializer jackson2JsonRedisSerializer){ //消息接收者以及对应的默认处理方法 MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageReceiver, "receiveMessage"); //消息的反序列化方式 messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer); return messageListenerAdapter; } /** * message listener container */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory , MessageListenerAdapter messageListenerAdapter){ RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //添加消息监听器 container.addMessageListener(messageListenerAdapter, new PatternTopic(topicName)); return container; } }
这里使用的配置:
spring: ... #redis redis: cluster: nodes: namenode22:6379,datanode23:6379,datanode24:6379 max-redirects: 6 timeout: 300000 jedis: pool: max-active: 8 max-wait: 100000 max-idle: 8 min-idle: 0 #自定义的监听的TOPIC路径 message: topic-name: topic-test
3. 定义一个Redis消息的处理者
@Component public class MessageReceiver { private final Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private SimpMessagingTemplate messagingTemplate; @Autowired private SimpUserRegistry userRegistry; /** * 处理WebSocket消息 */ public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg) { logger.info(MessageFormat.format("Received Message: {0}", redisWebsocketMsg)); //1. 取出用户名并判断是否连接到当前应用节点的WebSocket SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver()); if(simpUser != null && StringUtils.isNoneBlank(simpUser.getName())){ //2. 获取WebSocket客户端的订阅地址 WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode()); if(channelEnum != null){ //3. 给WebSocket客户端发送消息 messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent()); } } } }
4. 在Controller中发送WebSocket消息
@Controller @RequestMapping(("/wsTemplate")) public class RedisMessageController { private final Logger logger = LoggerFactory.getLogger(getClass()); @Value("${spring.redis.message.topic-name}") private String topicName; @Autowired private SimpMessagingTemplate messagingTemplate; @Autowired private SimpUserRegistry userRegistry; @Resource(name = "redisServiceImpl") private RedisService redisService; /** * 给指定用户发送WebSocket消息 */ @PostMapping("/sendToUser") @ResponseBody public String chat(HttpServletRequest request) { //消息接收者 String receiver = request.getParameter("receiver"); //消息内容 String msg = request.getParameter("msg"); HttpSession session = SpringContextUtils.getSession(); User loginUser = (User) session.getAttribute(Constants.SESSION_USER); HelloMessage resultData = new HelloMessage(MessageFormat.format("{0} say: {1}", loginUser.getUsername(), msg)); this.sendToUser(loginUser.getUsername(), receiver, WebSocketChannelEnum.CHAT.getSubscribeUrl(), JsonUtils.toJson(resultData)); return "ok"; } /** * 给指定用户发送消息,并处理接收者不在线的情况 * @param sender 消息发送者 * @param receiver 消息接收者 * @param destination 目的地 * @param payload 消息正文 */ private void sendToUser(String sender, String receiver, String destination, String payload){ SimpUser simpUser = userRegistry.getUser(receiver); //如果接收者存在,则发送消息 if(simpUser != null && StringUtils.isNoneBlank(simpUser.getName())){ messagingTemplate.convertAndSendToUser(receiver, destination, payload); } //如果接收者在线,则说明接收者连接了集群的其他节点,需要通知接收者连接的那个节点发送消息 else if(redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET, receiver)){ RedisWebsocketMsg redisWebsocketMsg = new RedisWebsocketMsg(receiver, WebSocketChannelEnum.CHAT.getCode(), payload); redisService.convertAndSend(topicName, redisWebsocketMsg); } //否则将消息存储到redis,等用户上线后主动拉取未读消息 else{ //存储消息的Redis列表名 String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + receiver + ":" + destination; logger.info(MessageFormat.format("消息接收者{0}还未建立WebSocket连接,{1}发送的消息【{2}】将被存储到Redis的【{3}】列表中", receiver, sender, payload, listKey)); //存储消息到Redis中 redisService.addToListRight(listKey, ExpireEnum.UNREAD_MSG, payload); } } /** * 拉取指定监听路径的未读的WebSocket消息 * @param destination 指定监听路径 * @return java.util.Map */ @PostMapping("/pullUnreadMessage") @ResponseBody public Map pullUnreadMessage(String destination){ Map result = new HashMap(); try { HttpSession session = SpringContextUtils.getSession(); //当前登录用户 User loginUser = (User) session.getAttribute(Constants.SESSION_USER); //存储消息的Redis列表名 String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + loginUser.getUsername() + ":" + destination; //从Redis中拉取所有未读消息 List messageList = redisService.rangeList(listKey, 0, -1); result.put("code", "200"); if(messageList !=null && messageList.size() > 0){ //删除Redis中的这个未读消息列表 redisService.delete(listKey); //将数据添加到返回集,供前台页面展示 result.put("result", messageList); } }catch (Exception e){ result.put("code", "500"); result.put("msg", e.getMessage()); } return result; } }
5. WebSocket相关配置
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{ @Autowired private AuthHandshakeInterceptor authHandshakeInterceptor; @Autowired private MyHandshakeHandler myHandshakeHandler; @Autowired private MyChannelInterceptor myChannelInterceptor; @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/chat-websocket") .addInterceptors(authHandshakeInterceptor) .setHandshakeHandler(myHandshakeHandler) .withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { //客户端需要把消息发送到/message/xxx地址 registry.setApplicationDestinationPrefixes("/message"); //服务端广播消息的路径前缀,客户端需要相应订阅/topic/yyy这个地址的消息 registry.enableSimpleBroker("/topic"); //给指定用户发送消息的路径前缀,默认值是/user/ registry.setUserDestinationPrefix("/user/"); } @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(myChannelInterceptor); } }
6. 示例页面
Chat With STOMP Message #connect-container { margin: 0 auto; width: 400px; } #connect-container div { padding: 5px; margin: 0 7px 10px 0; } .message input { padding: 5px; margin: 0 7px 10px 0; } .layui-btn { display: inline-block; } var stompClient = null; $(function () { var target = $("#target"); if (window.location.protocol === 'http:') { target.val('http://' + window.location.host + target.val()); } else { target.val('https://' + window.location.host + target.val()); } }); function setConnected(connected) { var connect = $("#connect"); var disconnect = $("#disconnect"); var echo = $("#echo"); if (connected) { connect.addClass("layui-btn-disabled"); disconnect.removeClass("layui-btn-disabled"); echo.removeClass("layui-btn-disabled"); } else { connect.removeClass("layui-btn-disabled"); disconnect.addClass("layui-btn-disabled"); echo.addClass("layui-btn-disabled"); } connect.attr("disabled", connected); disconnect.attr("disabled", !connected); echo.attr("disabled", !connected); } //连接 function connect() { var target = $("#target").val(); var ws = new SockJS(target); stompClient = Stomp.over(ws); stompClient.connect({}, function () { setConnected(true); log('Info: STOMP connection opened.'); //连接成功后,主动拉取未读消息 pullUnreadMessage("/topic/reply"); //订阅服务端的/topic/reply地址 stompClient.subscribe("/user/topic/reply", function (response) { log(JSON.parse(response.body).content); }) },function () { //断开处理 setConnected(false); log('Info: STOMP connection closed.'); }); } //断开连接 function disconnect() { if (stompClient != null) { stompClient.disconnect(); stompClient = null; } setConnected(false); log('Info: STOMP connection closed.'); } //向指定用户发送消息 function sendMessage() { if (stompClient != null) { var receiver = $("#receiver").val(); var msg = $("#message").val(); log('Sent: ' + JSON.stringify({'receiver': receiver, 'msg':msg})); $.ajax({ url: "/wsTemplate/sendToUser", type: "POST", dataType: "json", async: true, data: { "receiver": receiver, "msg": msg }, success: function (data) { } }); } else { layer.msg('STOMP connection not established, please connect.', { offset: 'auto' ,icon: 2 }); } } //从服务器拉取未读消息 function pullUnreadMessage(destination) { $.ajax({ url: "/wsTemplate/pullUnreadMessage", type: "POST", dataType: "json", async: true, data: { "destination": destination }, success: function (data) { if (data.result != null) { $.each(data.result, function (i, item) { log(JSON.parse(item).content); }) } else if (data.code !=null && data.code == "500") { layer.msg(data.msg, { offset: 'auto' ,icon: 2 }); } } }); } //日志输出 function log(message) { console.debug(message); }Seems your browser doesn't support Javascript! Websockets rely on Javascript being enabled. Please enable Javascript and reload this page!
Chat With STOMP Message Connect Disconnect Send Message
文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。