SpringBoot2.0集成WebSocket實現後臺向前端推送信息

什麼是WebSocket?

這裡寫圖片描述

WebSocket協議是基於TCP的一種新的網絡協議。它實現瞭瀏覽器與服務器全雙工(full-duplex)通信——允許服務器主動發送信息給客戶端。

為什麼需要 WebSocket?

初次接觸 WebSocket 的人,都會問同樣的問題:我們已經有瞭 HTTP 協議,為什麼還需要另一個協議?它能帶來什麼好處?

答案很簡單,因為 HTTP 協議有一個缺陷:通信隻能由客戶端發起,HTTP 協議做不到服務器主動向客戶端推送信息。

這裡寫圖片描述

舉例來說,我們想要查詢當前的排隊情況,隻能是頁面輪詢向服務器發出請求,服務器返回查詢結果。輪詢的效率低,非常浪費資源(因為必須不停連接,或者 HTTP 連接始終打開)。因此WebSocket 就是這樣發明的。 前言

2020-10-20 教程補充:

  • 補充關於@Component@ServerEndpoint關於是否單例模式等的解答,感謝大傢熱心提問和研究。
  • Vue版本的websocket連接方法

2020-01-05 教程補充:

  • 整合瞭IM相關的優化
  • 優化開啟/關閉連接的處理
  • 上傳到開源項目spring-cloud-study-websocket,方便大傢下載代碼。

感謝大傢的支持和留言,14W訪問量是滿滿的動力!接下來還會有websocket+redis集群優化篇針對多ws服務器做簡單優化處理,敬請期待!

話不多說,馬上進入幹貨時刻。

maven依賴

SpringBoot2.0對WebSocket的支持簡直太棒瞭,直接就有包可以引入

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * 開啟WebSocket支持
 * @author zhengkai.blog.csdn.net
 */
@Configuration 
public class WebSocketConfig { 
	
 @Bean 
 public ServerEndpointExporter serverEndpointExporter() { 
 return new ServerEndpointExporter(); 
 } 
 
} 

WebSocketConfig

啟用WebSocket的支持也是很簡單,幾句代碼搞定

import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.socket.server.standard.ServerEndpointExporter;/** * 開啟WebSocket支持 * @author zhengkai.blog.csdn.net */@Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } } 

WebSocketServer

這就是重點瞭,核心都在這裡。

  • 因為WebSocket是類似客戶端服務端的形式(采用ws協議),那麼這裡的WebSocketServer其實就相當於一個ws協議的Controller
  • 直接@ServerEndpoint("/imserver/{userId}")@Component啟用即可,然後在裡面實現@OnOpen開啟連接,@onClose關閉連接,@onMessage接收消息等方法。
  • 新建一個ConcurrentHashMap webSocketMap 用於接收當前userId的WebSocket,方便IM之間對userId進行推送消息。單機版實現到這裡就可以。
  • 集群版(多個ws節點)還需要借助mysql或者redis等進行處理,改造對應的sendMessage方法即可。
package com.softdev.system.demo.config;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;


/**
 * @author zhengkai.blog.csdn.net
 */
@ServerEndpoint("/imserver/{userId}")
@Component
public class WebSocketServer {

 static Log log=LogFactory.get(WebSocketServer.class);
 /**靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的。*/
 private static int onlineCount = 0;
 /**concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。*/
 private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
 /**與某個客戶端的連接會話,需要通過它來給客戶端發送數據*/
 private Session session;
 /**接收userId*/
 private String userId="";

 /**
 * 連接建立成功調用的方法*/
 @OnOpen
 public void onOpen(Session session,@PathParam("userId") String userId) {
 this.session = session;
 this.userId=userId;
 if(webSocketMap.containsKey(userId)){
 webSocketMap.remove(userId);
 webSocketMap.put(userId,this);
 //加入set中
 }else{
 webSocketMap.put(userId,this);
 //加入set中
 addOnlineCount();
 //在線數加1
 }

 log.info("用戶連接:"+userId+",當前在線人數為:" + getOnlineCount());

 try {
 sendMessage("連接成功");
 } catch (IOException e) {
 log.error("用戶:"+userId+",網絡異常!!!!!!");
 }
 }

 /**
 * 連接關閉調用的方法
 */
 @OnClose
 public void onClose() {
 if(webSocketMap.containsKey(userId)){
 webSocketMap.remove(userId);
 //從set中刪除
 subOnlineCount();
 }
 log.info("用戶退出:"+userId+",當前在線人數為:" + getOnlineCount());
 }

 /**
 * 收到客戶端消息後調用的方法
 *
 * @param message 客戶端發送過來的消息*/
 @OnMessage
 public void onMessage(String message, Session session) {
 log.info("用戶消息:"+userId+",報文:"+message);
 //可以群發消息
 //消息保存到數據庫、redis
 if(StringUtils.isNotBlank(message)){
 try {
 //解析發送的報文
 JSONObject jsonObject = JSON.parseObject(message);
 //追加發送人(防止串改)
 jsonObject.put("fromUserId",this.userId);
 String toUserId=jsonObject.getString("toUserId");
 //傳送給對應toUserId用戶的websocket
 if(StringUtils.isNotBlank(toUserId)&&webSocketMap.containsKey(toUserId)){
 webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
 }else{
 log.error("請求的userId:"+toUserId+"不在該服務器上");
 //否則不在這個服務器上,發送到mysql或者redis
 }
 }catch (Exception e){
 e.printStackTrace();
 }
 }
 }

 /**
 *
 * @param session
 * @param error
 */
 @OnError
 public void onError(Session session, Throwable error) {
 log.error("用戶錯誤:"+this.userId+",原因:"+error.getMessage());
 error.printStackTrace();
 }
 /**
 * 實現服務器主動推送
 */
 public void sendMessage(String message) throws IOException {
 this.session.getBasicRemote().sendText(message);
 }


 /**
 * 發送自定義消息
 * */
 public static void sendInfo(String message,@PathParam("userId") String userId) throws IOException {
 log.info("發送消息到:"+userId+",報文:"+message);
 if(StringUtils.isNotBlank(userId)&&webSocketMap.containsKey(userId)){
 webSocketMap.get(userId).sendMessage(message);
 }else{
 log.error("用戶"+userId+",不在線!");
 }
 }

 public static synchronized int getOnlineCount() {
 return onlineCount;
 }

 public static synchronized void addOnlineCount() {
 WebSocketServer.onlineCount++;
 }

 public static synchronized void subOnlineCount() {
 WebSocketServer.onlineCount--;
 }
}

消息推送

至於推送新信息,可以再自己的Controller寫個方法調用WebSocketServer.sendInfo();即可

import com.softdev.system.demo.config.WebSocketServer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.ModelAndView;
import java.io.IOException;

/**
 * WebSocketController
 * @author zhengkai.blog.csdn.net
 */
@RestController
public class DemoController {

 @GetMapping("index")
 public ResponseEntity<String> index(){
 return ResponseEntity.ok("請求成功");
 }

 @GetMapping("page")
 public ModelAndView page(){
 return new ModelAndView("websocket");
 }

 @RequestMapping("/push/{toUserId}")
 public ResponseEntity<String> pushToWeb(String message, @PathVariable String toUserId) throws IOException {
 WebSocketServer.sendInfo(message,toUserId);
 return ResponseEntity.ok("MSG SEND SUCCESS");
 }
}

頁面發起

頁面用js代碼調用websocket,當然,太古老的瀏覽器是不行的,一般新的瀏覽器或者谷歌瀏覽器是沒問題的。還有一點,記得協議是ws的,如果使用瞭一些路徑類,可以replace(“http”,“ws”)來替換協議。

<!DOCTYPE html>
<html>
<head>
 <meta charset="utf-8">
 <title>websocket通訊</title>
</head>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>
<script>
 var socket;
 function openSocket() {
 if(typeof(WebSocket) == "undefined") {
 console.log("您的瀏覽器不支持WebSocket");
 }else{
 console.log("您的瀏覽器支持WebSocket");
 //實現化WebSocket對象,指定要連接的服務器地址與端口 建立連接
 //等同於socket = new WebSocket("ws://localhost:8888/xxxx/im/25");
 //var socketUrl="${request.contextPath}/im/"+$("#userId").val();
 var socketUrl="http://localhost:9999/demo/imserver/"+$("#userId").val();
 socketUrl=socketUrl.replace("https","ws").replace("http","ws");
 console.log(socketUrl);
 if(socket!=null){
 socket.close();
 socket=null;
 }
 socket = new WebSocket(socketUrl);
 //打開事件
 socket.onopen = function() {
 console.log("websocket已打開");
 //socket.send("這是來自客戶端的消息" + location.href + new Date());
 };
 //獲得消息事件
 socket.onmessage = function(msg) {
 console.log(msg.data);
 //發現消息進入 開始處理前端觸發邏輯
 };
 //關閉事件
 socket.onclose = function() {
 console.log("websocket已關閉");
 };
 //發生瞭錯誤事件
 socket.onerror = function() {
 console.log("websocket發生瞭錯誤");
 }
 }
 }
 function sendMessage() {
 if(typeof(WebSocket) == "undefined") {
 console.log("您的瀏覽器不支持WebSocket");
 }else {
 console.log("您的瀏覽器支持WebSocket");
 console.log('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
 socket.send('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
 }
 }
</script>
<body>
<p>【userId】:<div><input id="userId" name="userId" type="text" value="10"></div>
<p>【toUserId】:<div><input id="toUserId" name="toUserId" type="text" value="20"></div>
<p>【toUserId】:<div><input id="contentText" name="contentText" type="text" value="hello websocket"></div>
<p>【操作】:<div><a onclick="openSocket()">開啟socket</a></div>
<p>【操作】:<div><a onclick="sendMessage()">發送消息</a></div>
</body>

</html>

運行效果

  • v20200105,加入開源項目spring-cloud-study-websocket,更新運行效果,更方便理解。
  • v1.1的效果,剛剛修復瞭日志,並且支持指定監聽某個端口,代碼已經全部更新,現在是這樣的效果

打開兩個頁面,按F12調出控控制臺查看測試效果:

頁面 參數
http://localhost:9999/demo/page fromUserId=10,toUserId=20
http://localhost:9999/demo/page fromUserId=20,toUserId=10

分別開啟socket,再發送消息

在這裡插入圖片描述
在這裡插入圖片描述

2. 向前端推送數據:

http://localhost:9999/demo/push/10?message=123123

在這裡插入圖片描述

通過調用push api,可以向指定的userId推送信息,當然報文這裡亂寫,建議規定好格式。

後續

針對簡單IM的業務場景,進行瞭一些優化,可以看後續的文章SpringBoot2+WebSocket之聊天應用實戰(優化版本)(v20201005已整合)

主要變動是CopyOnWriteArraySet改為ConcurrentHashMap,保證多線程安全同時方便利用map.get(userId)進行推送到指定端口。

相比之前的Set,Set遍歷是費事且麻煩的事情,而Map的get是簡單便捷的,當WebSocket數量大的時候,這個小小的消耗就會聚少成多,影響體驗,所以需要優化。在IM的場景下,指定userId進行推送消息更加方便。

Websocker註入Bean問題

關於這個問題,可以看最新發表的這篇文章,在參考和研究瞭網上一些攻略後,項目已經通過該方法註入成功,大傢可以參考。
Springboot 使用 JSR 303 對 Controller 控制層校驗及 Service 服務層 AOP 校驗 使用消息資源文件對消息國際化

netty-websocket-spring-boot-starter

Springboot2構建基於Netty的高性能Websocket服務器(netty-websocket-spring-boot-starter)
隻需要換個starter即可實現高性能websocket,趕緊使用吧

Springboot2+Netty+Websocket

Springboot2+Netty實現Websocket,使用官方的netty-all的包,比原生的websocket更加穩定更加高性能,同等配置情況下可以handle更多的連接。

代碼樣式全部已經更正,也支持websocket連接url帶參數功能,另外也感謝大傢的閱讀和評論,一起進步,謝謝!~~

ServerEndpointExporter錯誤

org.springframework.beans.factory.BeanCreationException: Error creating bean with name ‘serverEndpointExporter‘ defined in class path resource [com/xxx/WebSocketConfig.class]: Invocation of init method failed; nested exception is java.lang.IllegalStateException: javax.websocket.server.ServerContainer not available

感謝@來瞭老弟兒 的反饋:

如果tomcat部署一直報這個錯,請移除 WebSocketConfig@Bean ServerEndpointExporter 的註入 。

ServerEndpointExporter 是由Spring官方提供的標準實現,用於掃描ServerEndpointConfig配置類和@ServerEndpoint註解實例。使用規則也很簡單:

如果使用默認的嵌入式容器 比如Tomcat 則必須手工在上下文提供ServerEndpointExporter。如果使用外部容器部署war包,則不需要提供提供ServerEndpointExporter,因為此時SpringBoot默認將掃描服務端的行為交給外部容器處理,所以線上部署的時候要把WebSocketConfig中這段註入bean的代碼註掉。 正式項目的前端WebSocket框架 GoEasy

感謝kkatrina的補充,正式的項目中,一般是用第三方websocket框架來做,穩定性、實時性有保證的多,也會包括一些心跳、重連機制。

GoEasy專註於服務器與瀏覽器,瀏覽器與瀏覽器之間消息推送,完美兼容世界上的絕大多數瀏覽器,包括IE6, IE7之類的非常古老的瀏覽器。支持Uniapp,各種小程序,react,vue等所有主流Web前端技術。
GoEasy采用 發佈/訂閱 的消息模式,幫助您非常輕松的實現一對一,一對多的通信。
https://www.goeasy.io/cn/doc/

@Component@ServerEndpoint關於是否單例模式,能否使用static Map等一些問題的解答

看到大傢都在熱心的討論關於是否單例模式這個問題,請大傢相信自己的直接,如果websocket是單例模式,還怎麼服務這麼多session呢。

  • websocket是原型模式@ServerEndpoint每次建立雙向通信的時候都會創建一個實例,區別於spring的單例模式。Spring的@Component默認是單例模式,請註意,默認 而已,是可以被改變的。
  • 這裡的@Component僅僅為瞭支持@Autowired依賴註入使用,如果不加則不能註入任何東西,為瞭方便。
  • 什麼是prototype 原型模式? 基本就是你需要從A的實例得到一份與A內容相同,但是又互不幹擾的實例B的話,就需要使用原型模式。關於在原型模式下使用static 的webSocketMap,請註意這是ConcurrentHashMap ,也就是線程安全/線程同步的,而且已經是靜態變量作為全局調用,這種情況下是ok的,或者大傢如果有顧慮或者更好的想法的化,可以進行改進。
  • 例如使用一個中間類來接收和存放session。為什麼每次都@OnOpen都要檢查webSocketMap.containsKey(userId) ,首先瞭為瞭代碼強壯性考慮,假設代碼以及機制沒有問題,那麼肯定這個邏輯是廢的對吧。
  • 但是實際使用的時候發現偶爾會出現重連失敗或者其他原因導致之前的session還存在,這裡就做瞭一個清除舊session,迎接新session的功能。

Vue版本的websocket連接

感謝**@GzrStudy**的貢獻,供大傢參考。

<script>
export default {
 data() {
 return {
 socket:null,
 userId:localStorage.getItem("ms_uuid"),
 toUserId:'2',
 content:'3'
 }
 },
 methods: {
 openSocket() {
 if (typeof WebSocket == "undefined") {
 console.log("您的瀏覽器不支持WebSocket");
 } else {
 console.log("您的瀏覽器支持WebSocket");
 //實現化WebSocket對象,指定要連接的服務器地址與端口 建立連接
 //等同於socket = new WebSocket("ws://localhost:8888/xxxx/im/25");
 //var socketUrl="${request.contextPath}/im/"+$("#userId").val();
 var socketUrl =
 "http://localhost:8081/imserver/" + this.userId;
 socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");
 console.log(socketUrl);
 if (this.socket != null) {
 this.socket.close();
 this.socket = null;
 }
 this.socket = new WebSocket(socketUrl);
 //打開事件
 this.socket = new WebSocket(socketUrl);
 //打開事件
 this.socket.onopen = function() {
 console.log("websocket已打開");
 //socket.send("這是來自客戶端的消息" + location.href + new Date());
 };
 //獲得消息事件
 this.socket.onmessage = function(msg) {
 console.log(msg.data);
 //發現消息進入 開始處理前端觸發邏輯
 };
 //關閉事件
 this.socket.onclose = function() {
 console.log("websocket已關閉");
 };
 //發生瞭錯誤事件
 this.socket.onerror = function() {
 console.log("websocket發生瞭錯誤");
 };
 }
 },
 sendMessage() {
 if (typeof WebSocket == "undefined") {
 console.log("您的瀏覽器不支持WebSocket");
 } else {
 console.log("您的瀏覽器支持WebSocket");
 console.log(
 '{"toUserId":"' +
 this.toUserId +
 '","contentText":"' +
 this.content +
 '"}'
 );
 this.socket.send(
 '{"toUserId":"' +
 this.toUserId +
 '","contentText":"' +
 this.content +
 '"}'
 );
 
 }
}

到此這篇關於SpringBoot2.0集成WebSocket實現後臺向前端推送信息的文章就介紹到這瞭,更多相關SpringBoot2.0集成WebSocket內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: