springboot基於Redis發佈訂閱集群下WebSocket的解決方案

一、背景

單機節點下,WebSocket連接成功後,可以直接發送消息。而多節點下,連接時通過nginx會代理到不同節點。

假設一開始用戶連接瞭node1的socket服務。觸發消息發送的條件的時候也通過nginx進行代理,假如代理轉到瞭node2節點上,那麼node2節點的socket服務就發送不瞭消息,因為一開始用戶註冊的是node1節點。這就導致瞭消息發送失敗。

為瞭解決這一方案,消息發送時,就需要一個中間件來記錄,這樣,三個節點都可以獲取消息,然後在根據條件進行消息推送。

二、解決方案(springboot 基於 Redis發佈訂閱)

1、依賴

<!-- redis -->    
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- websocket --> 
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2、創建業務處理類 Demo.class,該類可以實現MessageListener接口後重寫onMessage方法,也可以不實現,自己寫方法。

import com.alibaba.fastjson.JSON;
import com.dy.service.impl.OrdersServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
 
import java.util.HashMap;
 
/**
 * @program: 
 * @description: redis消息訂閱-業務處理
 * @author: zhang yi
 * @create: 2021-01-25 16:46
 */
@Component
public class Demo implements MessageListener {
  Logger logger = LoggerFactory.getLogger(this.getClass());
 
  @Override
  public void onMessage(Message message, byte[] pattern) {
    logger.info("消息訂閱成功---------");
    logger.info("內容:"+message.getBody());
    logger.info("交換機:"+message.getChannel());
  }
}

3、創建PubSubConfig配置類

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
 
/**
 * @program: 
 * @description: redis發佈訂閱配置
 * @author: zhang yi
 * @create: 2021-01-25 16:49
 */
@Configuration
@EnableCaching
public class PubSubConfig {
  Logger logger = LoggerFactory.getLogger(this.getClass());
 
  //如果是多個交換機,則參數為(RedisConnectionFactory connectionFactory,
  //              MessageListenerAdapter listenerAdapter,
  //              MessageListenerAdapter listenerAdapter2)
  @Bean
  RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                      MessageListenerAdapter listenerAdapter) {
 
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    // 可以添加多個 messageListener,配置不同的交換機
    container.addMessageListener(listenerAdapter, new PatternTopic("channel:demo"));
    //container.addMessageListener(listenerAdapter2, new PatternTopic("channel:demo2"));
    return container;
  }
 
  /**
   * 消息監聽器適配器,綁定消息處理器,利用反射技術調用消息處理器的業務方法
   * @param demo 第一步的業務處理類
   * @return
   */
  @Bean
  MessageListenerAdapter listenerAdapter(Demo demo) {
    logger.info("----------------消息監聽器加載成功----------------");
    // onMessage 就是方法名,基於反射調用
    return new MessageListenerAdapter(demo, "onMessage");
  }
 
  /**
   * 多個交換機就多寫一個
   * @param subCheckOrder
   * @return
   */
  //@Bean
  //MessageListenerAdapter listenerAdapter2(SubCheckOrder subCheckOrder) {
  //  logger.info("----------------消息監聽器加載成功----------------");
  //  return new MessageListenerAdapter(subCheckOrder, "onMessage");
  //}
 
  @Bean
  StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
    return new StringRedisTemplate(connectionFactory);
  }
}

4、消息發佈

@Autowired
private RedisTemplate<String, Object> redisTemplate;
 
redisTemplate.convertAndSend("channel:demo", "我是內容");

三、具體用法

  • socket連接成功。
  • socket消息推送時,把信息發佈到redis中。socket服務訂閱redis的消息,訂閱成功後進行推送。集群下的socket都能訂閱到消息,但是隻有之前連接成功的節點能推送成功,其餘的無法推送。

推薦閱讀: