Spring Boot 使用 SSE 方式向前端推送數據詳解

前言

SSE簡單的來說就是服務器主動向前端推送數據的一種技術,它是單向的,也就是說前端是不能向服務器發送數據的。SSE適用於消息推送,監控等隻需要服務器推送數據的場景中,下面是使用Spring Boot 來實現一個簡單的模擬向前端推動進度數據,前端頁面接受後展示進度條。

服務端

在Spring Boot中使用時需要註意,最好使用Spring Web 提供的SseEmitter這個類來進行操作,我在剛開始時使用網上說的將Content-Type設置為text-stream這種方式發現每次前端每次都會重新創建接。最後參考該文實現瞭最終想要的效果:

SSE工具類

SSEServer.java

package vip.huhailong.catchat.sse;

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

/**
 * @author Huhailong
 */
@Slf4j
public class SSEServer {

    /**
     * 當前連接數
     */
    private static AtomicInteger count = new AtomicInteger(0);

    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    public static SseEmitter connect(String userId){
        //設置超時時間,0表示不過期,默認是30秒,超過時間未完成會拋出異常
        SseEmitter sseEmitter = new SseEmitter(0L);
        //註冊回調
        sseEmitter.onCompletion(completionCallBack(userId));
        sseEmitter.onError(errorCallBack(userId));
        sseEmitter.onTimeout(timeOutCallBack(userId));
        sseEmitterMap.put(userId,sseEmitter);
        //數量+1
        count.getAndIncrement();
        log.info("create new sse connect ,current user:{}",userId);
        return sseEmitter;
    }
    /**
     * 給指定用戶發消息
     */
    public static void sendMessage(String userId, String message){
        if(sseEmitterMap.containsKey(userId)){
            try{
                sseEmitterMap.get(userId).send(message);
            }catch (IOException e){
                log.error("user id:{}, send message error:{}",userId,e.getMessage());
                e.printStackTrace();
            }
        }
    }

    /**
     * 想多人發送消息,組播
     */
    public static void groupSendMessage(String groupId, String message){
        if(sseEmitterMap!=null&&!sseEmitterMap.isEmpty()){
            sseEmitterMap.forEach((k,v) -> {
                try{
                    if(k.startsWith(groupId)){
                        v.send(message, MediaType.APPLICATION_JSON);
                    }
                }catch (IOException e){
                    log.error("user id:{}, send message error:{}",groupId,message);
                    removeUser(k);
                }
            });
        }
    }
    public static void batchSendMessage(String message) {
        sseEmitterMap.forEach((k,v)->{
            try{
                v.send(message,MediaType.APPLICATION_JSON);
            }catch (IOException e){
                log.error("user id:{}, send message error:{}",k,e.getMessage());
                removeUser(k);
            }
        });
    }
    /**
     * 群發消息
     */
    public static void batchSendMessage(String message, Set<String> userIds){
        userIds.forEach(userId->sendMessage(userId,message));
    }
    public static void removeUser(String userId){
        sseEmitterMap.remove(userId);
        //數量-1
        count.getAndDecrement();
        log.info("remove user id:{}",userId);
    }
    public static List<String> getIds(){
        return new ArrayList<>(sseEmitterMap.keySet());
    }
    public static int getUserCount(){
        return count.intValue();
    }
    private static Runnable completionCallBack(String userId) {
        return () -> {
            log.info("結束連接,{}",userId);
            removeUser(userId);
        };
    }
    private static Runnable timeOutCallBack(String userId){
        return ()->{
            log.info("連接超時,{}",userId);
            removeUser(userId);
        };
    }
    private static Consumer<Throwable> errorCallBack(String userId){
        return throwable -> {
            log.error("連接異常,{}",userId);
            removeUser(userId);
        };
    }
}

上面這個類可以把它當作一個SSE的工具類,下面我們使用一下它

在Controller層創建 SSEController.java

package vip.huhailong.catchat.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import vip.huhailong.catchat.sse.SSEServer;

/**
 * @author Huhailong
 */
@Slf4j
@RestController
@CrossOrigin
@RequestMapping("/sse")
public class SSEController {

    @GetMapping("/connect/{userId}")
    public SseEmitter connect(@PathVariable String userId){
        return SSEServer.connect(userId);
    }

    @GetMapping("/process")
    public void sendMessage() throws InterruptedException {
        for(int i=0; i<=100; i++){
            if(i>50&&i<70){
                Thread.sleep(500L);
            }else{
                Thread.sleep(100L);
            }
            SSEServer.batchSendMessage(String.valueOf(i));
        }
    }
}

上面的connect是用來連接sse的,它返回一個SseEmitter實例,這時候連接就已經創建瞭,然後下面的process接口是用來推送數據的,我這裡是準備讓前端實現一個進度條的效果,所以推送的是數字,為瞭效果明顯,我在推送到50到70的時候速度放慢,其餘都是100ms

前端代碼

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Home</title>
    <script>
        let data = new EventSource("/cat-chat/sse/connect/huhailong")
        data.onmessage = function(event){
            console.log("test=>",event)
            document.getElementById("result").innerText = event.data+'%';
            document.getElementById("my-progress").value = event.data;
        }
    </script>
</head>
<body>
    <div id="result"></div>
    <progress style="width: 300px" id="my-progress" value="0" max="100"></progress>
</body>
</html>

最終效果:

到此這篇關於Spring Boot 使用 SSE 方式向前端推送數據詳解的文章就介紹到這瞭,更多相關Spring Boot SSE內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: