Java 利用DeferredResult實現http輪詢實時返回數據接口

今天這篇文章呢,不難,其實是解答我一直以來心裡的一個疑問。是這樣的,之前看五八技術委員會主席沈劍老師的公眾號架構師之路的一篇文章:http 如何像 tcp 一樣實時的收消息,裡面其中的一個方案是用 http 短連接輪詢的方式實現“偽長連接”。但是對於輪詢,我們的第一反應肯定是有延時,但是標題不是說的是實時嗎?當然我們可以把輪詢的時長縮短一些,先不說這樣大部分時間的輪詢調用,可能都沒消息返回,造成服務器資源浪費,輪詢時間再短也是有延時啊,所以難道是偽實時?反正一般消息延時個三五秒,甚至十秒八秒一分鐘,大傢也不會在意,隻會認為對方返回慢,對不起,這是我們程序員的鍋,但是 http 真的不能實現實時嗎?沈劍老師提出瞭一種方法:首選 webim 和 webserver 之間建立一條 http 連接,專門用作消息通道,這條連接叫 http 消息連接。然後會有如下處理:

1. 沒有消息到達的時候,這個 http 消息連接將被夯住,不返回,由於 http 是短連接,這個 http 消息連接最多被夯住 90 秒,就會被斷開(這是瀏覽器或者 webserver 的行為);

2. 在 1 的情況下,如果 http 消息連接被斷開,立馬再發起一個 http 消息連接;

此時在在 1 和 2 的配合下,瀏覽器與 webserver 之間將永遠有一條消息連接在,然後還有一種情況

3. 每次收到消息時,這個消息連接就能及時將消息帶回瀏覽器頁面,並且在返回後,會立馬再發起一個 http 消息連接

這樣就能做到使用 http 端連接輪詢的方式實現瞭實時收消息。不過需要說明的是,其實還有一種情況:消息到達時,上一個 http 消息連接正在返回,也就是第二種情況的時候突然來瞭一個消息,此時沒有 http 消息連接可用。雖然理論上 http 消息連接的返回是瞬時的,沒有消息連接可用出現的概率極小,但是根據墨菲定律我們知道,這種情況肯定會出現,所以這種情況下我們可以將消息暫存入消息池中,下一個消息連接到達後,無需等待,直接去消息池中取消息,將將消息帶回,然後立刻返回生成新的消息連接即可。

不過以上都不是今天這篇文章的重點,和今天這篇文章的標題也沒有任何關系。重點是當時看瞭沈劍老師的這篇文章後我一直有一個疑問:第一步的時候如何夯住?總不能 sleep 吧,這多不優雅啊,由於一直以為沒有遇到過類似的需求,所以這麼幾年來我也沒深究這個問題,但是心裡確實一直記著,直到前一段時間,聽馬士兵教育的公開課,當時再講類似的問題的時候提到瞭夯住 http 的連接(具體是哪個問題,還真不記得瞭),雖然當時上課的老師沒提怎麼實現,但是評論區我問瞭一下,如何夯住不返回?然後有一個同學回復說,用 DeferredResult,然後下課後搜瞭一下資料,果然可以,如下是實現的筆記,所以這才是重點,希望對有這個疑問的同學也有一點幫助。

1. 消息返回實體類,大傢可以根據實際情況,自己定義即可:

package cn.bridgeli.deferredresulttest.entity;
 
import lombok.Data;
import lombok.Getter;
 
/**
 * @author bridgeli
 */
@Data
public class DeferredResultResponse {
    private Integer code;
    private String msg;
 
    public enum Msg {
        TIMEOUT("超時"),
        FAILED("失敗"),
        SUCCESS("成功");
 
        @Getter
        private String desc;
 
        Msg(String desc) {
            this.desc = desc;
        }
    }
}

2. controller 接口:

package cn.bridgeli.deferredresulttest.controller;
 
import cn.bridgeli.deferredresulttest.entity.DeferredResultResponse;
import cn.bridgeli.deferredresulttest.service.DeferredResultService;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
 
import javax.annotation.Resource;
 
 
/**
 * @author bridgeli
 */
@RestController
@RequestMapping(value = "/deferred-result")
public class DeferredResultController {
 
    @Resource
    private DeferredResultService deferredResultService;
 
    /**
     * 為瞭方便測試,簡單模擬一個
     * 多個請求用同一個requestId會出問題
     */
    private final String requestId = "test";
 
 
    @GetMapping(value = "/get")
    public DeferredResult<DeferredResultResponse> get(@RequestParam(value = "timeout", required = false, defaultValue = "10000") Long timeout) {
        DeferredResult<DeferredResultResponse> deferredResult = new DeferredResult<>(timeout);
 
        deferredResultService.process(requestId, deferredResult);
 
        return deferredResult;
    }
 
    /**
     * 設置DeferredResult對象的result屬性,模擬異步操作
     *
     * @param desired
     * @return
     */
    @GetMapping(value = "/result")
    public String settingResult(@RequestParam(value = "desired", required = false, defaultValue = "成功") String desired) {
        DeferredResultResponse deferredResultResponse = new DeferredResultResponse();
        if (DeferredResultResponse.Msg.SUCCESS.getDesc().equals(desired)) {
            deferredResultResponse.setCode(HttpStatus.OK.value());
            deferredResultResponse.setMsg(desired);
        } else {
            deferredResultResponse.setCode(HttpStatus.INTERNAL_SERVER_ERROR.value());
            deferredResultResponse.setMsg(DeferredResultResponse.Msg.FAILED.getDesc());
        }
        deferredResultService.settingResult(requestId, deferredResultResponse);
 
        return "Done";
    }
}

其中:/get 接口模擬沈劍老師說的消息連接,/result 接口模擬有一條新消息來瞭,然後 /get 接口會立即返回。主要註意的是 requestId,在實際項目中不能使用同一個,否則會出現問題,這個測一下就知道瞭,也很容易想到原因。

3. service 實現:

package cn.bridgeli.deferredresulttest.service;
 
import cn.bridgeli.deferredresulttest.entity.DeferredResultResponse;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.web.context.request.async.DeferredResult;
 
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
 
/**
 * @author bridgeli
 */
@Service
public class DeferredResultService {
 
    private Map<String, Consumer<DeferredResultResponse>> taskMap;
 
    public DeferredResultService() {
        taskMap = new ConcurrentHashMap<>();
    }
 
    /**
     * 將請求id與setResult映射
     *
     * @param requestId
     * @param deferredResult
     */
    public void process(String requestId, DeferredResult<DeferredResultResponse> deferredResult) {
        // 請求超時的回調函數
        deferredResult.onTimeout(() -> {
            taskMap.remove(requestId);
            DeferredResultResponse deferredResultResponse = new DeferredResultResponse();
            deferredResultResponse.setCode(HttpStatus.REQUEST_TIMEOUT.value());
            deferredResultResponse.setMsg(DeferredResultResponse.Msg.TIMEOUT.getDesc());
            deferredResult.setResult(deferredResultResponse);
        });
 
        Optional.ofNullable(taskMap)
                .filter(t -> !t.containsKey(requestId))
                .orElseThrow(() -> new IllegalArgumentException(String.format("requestId=%s is existing", requestId)));
 
        taskMap.putIfAbsent(requestId, deferredResult::setResult);
    }
 
    /**
     * 這裡相當於異步的操作方法
     * 設置DeferredResult對象的setResult方法
     *
     * @param requestId
     * @param deferredResultResponse
     */
    public void settingResult(String requestId, DeferredResultResponse deferredResultResponse) {
        if (taskMap.containsKey(requestId)) {
            Consumer<DeferredResultResponse> deferredResultResponseConsumer = taskMap.get(requestId);
            // 這裡相當於DeferredResult對象的setResult方法
            deferredResultResponseConsumer.accept(deferredResultResponse);
            taskMap.remove(requestId);
        }
    }
 
}

文章最後,我想在說明另外一個問題,我們利用 DeferredResult 實現瞭 http 輪詢返回,其實換個思路想問題,我們是不是也實現瞭 http 接口延時返回?所以如果你有延時返回的需求,同樣可以借助 DeferredResult 實現。

以上就是Java 利用 DeferredResult 實現 http 輪詢實時返回數據接口的詳細內容,更多關於Java 實現 http 輪詢實時返回數據接口的資料請關註WalkonNet其它相關文章!

推薦閱讀:

    None Found