springboot基於Redis發佈訂閱集群下WebSocket的解決方案
一、背景
單機節點下,WebSocket連接成功後,可以直接發送消息。而多節點下,連接時通過nginx會代理到不同節點。
假設一開始用戶連接瞭node1的socket服務。觸發消息發送的條件的時候也通過nginx進行代理,假如代理轉到瞭node2節點上,那麼node2節點的socket服務就發送不瞭消息,因為一開始用戶註冊的是node1節點。這就導致瞭消息發送失敗。
為瞭解決這一方案,消息發送時,就需要一個中間件來記錄,這樣,三個節點都可以獲取消息,然後在根據條件進行消息推送。
二、解決方案(springboot 基於 Redis發佈訂閱)
1、依賴
<!-- redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- websocket --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
2、創建業務處理類 Demo.class,該類可以實現MessageListener接口後重寫onMessage方法,也可以不實現,自己寫方法。
import com.alibaba.fastjson.JSON; import com.dy.service.impl.OrdersServiceImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; import java.util.HashMap; /** * @program: * @description: redis消息訂閱-業務處理 * @author: zhang yi * @create: 2021-01-25 16:46 */ @Component public class Demo implements MessageListener { Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public void onMessage(Message message, byte[] pattern) { logger.info("消息訂閱成功---------"); logger.info("內容:"+message.getBody()); logger.info("交換機:"+message.getChannel()); } }
3、創建PubSubConfig配置類
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; /** * @program: * @description: redis發佈訂閱配置 * @author: zhang yi * @create: 2021-01-25 16:49 */ @Configuration @EnableCaching public class PubSubConfig { Logger logger = LoggerFactory.getLogger(this.getClass()); //如果是多個交換機,則參數為(RedisConnectionFactory connectionFactory, // MessageListenerAdapter listenerAdapter, // MessageListenerAdapter listenerAdapter2) @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 可以添加多個 messageListener,配置不同的交換機 container.addMessageListener(listenerAdapter, new PatternTopic("channel:demo")); //container.addMessageListener(listenerAdapter2, new PatternTopic("channel:demo2")); return container; } /** * 消息監聽器適配器,綁定消息處理器,利用反射技術調用消息處理器的業務方法 * @param demo 第一步的業務處理類 * @return */ @Bean MessageListenerAdapter listenerAdapter(Demo demo) { logger.info("----------------消息監聽器加載成功----------------"); // onMessage 就是方法名,基於反射調用 return new MessageListenerAdapter(demo, "onMessage"); } /** * 多個交換機就多寫一個 * @param subCheckOrder * @return */ //@Bean //MessageListenerAdapter listenerAdapter2(SubCheckOrder subCheckOrder) { // logger.info("----------------消息監聽器加載成功----------------"); // return new MessageListenerAdapter(subCheckOrder, "onMessage"); //} @Bean StringRedisTemplate template(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } }
4、消息發佈
@Autowired private RedisTemplate<String, Object> redisTemplate; redisTemplate.convertAndSend("channel:demo", "我是內容");
三、具體用法
- socket連接成功。
- socket消息推送時,把信息發佈到redis中。socket服務訂閱redis的消息,訂閱成功後進行推送。集群下的socket都能訂閱到消息,但是隻有之前連接成功的節點能推送成功,其餘的無法推送。
推薦閱讀:
- Redis實現訂單自動過期功能的示例代碼
- 利用Redis實現訂單30分鐘自動取消
- spring整合redis消息監聽通知使用的實現示例
- SpringBoot如何監聽redis Key變化事件案例詳解
- Redis深入瞭解內存淘汰與事務操作