關於springboot響應式編程整合webFlux的問題
在servlet3.0標準之前,是每一個請求對應一個線程。如果此時一個線程出現瞭高延遲,就會產生阻塞問題,從而導致整個服務出現嚴重的性能情況,因為一旦要調用第三方接口,就有可能出現這樣的操作瞭。早期的處理方式隻能是手工控制線程。
在servlet3.0標準之後,為瞭解決此類問題,所以提供瞭異步響應的支持。在異步響應處理結構中,可以將耗時操作的部分交由一個專屬的異步線程進行響應處理,同時請求的線程資源將被釋放,並將該線程返回到線程池中,以供其他用戶使用,這樣的操作機制將極大的提升程序的並發性能。
對於以上給出的響應式編程支持,僅僅是一些原生的支持模式,而現在既然基於springboot程序開發,那麼就需要考慮一些更簡單的整合。
而在spring中實現響應式編程,那麼則需要使用到spring webFlux,該組件是一個重新構建的且基於Reactive Streams標準實現的異步非阻塞Web開發框架,以Reactor開發框架為基礎,可以更加容易實現高並發訪問下的請求處理模型。在springboot2.x版本中提供瞭webFlux依賴模塊,該模塊有兩種模型實現:一種是基於功能性端點的方式,另一種是基於SpringMVC註解方式。
Maven引入
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
整合處理器:
package com.example.oldguy.myWebFlux.handler; import com.example.oldguy.myVo.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; @Component @Slf4j public class MessageHandler { public Mono<Message> echoHandler(Message message){ log.info("【{}】業務層接收處理數據:{}",Thread.currentThread().getName()); message.setTitle("【】"+Thread.currentThread().getName()+"】"+message.getTitle()); message.setContent("【】"+Thread.currentThread().getName()+"】"+message.getContent()); return Mono.create(item->item.success(message)); //實現數據響應 } }
整合控制器:
package com.example.oldguy.myController; import com.example.oldguy.myVo.Message; import com.example.oldguy.myWebFlux.handler.MessageHandler; import com.example.oldguy.mytask.MyThreadTask; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.web.bind.WebDataBinder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.InitBinder; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import org.springframework.web.context.request.async.DeferredResult; import javax.servlet.http.HttpServletRequest; import java.beans.PropertyEditorSupport; import java.time.Instant; import java.time.LocalDate; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Date; import java.util.concurrent.TimeUnit; /** * 異步線程的處理機制 */ @RestController @RequestMapping("/message/*") @Slf4j @Api(tags = "異步處理") public class AsyncController { @Autowired private ThreadPoolTaskExecutor threadPoolTaskExecutor; private MyThreadTask task; private MessageHandler messageHandler; /** * 日期轉換 * @param * @return */ private static final DateTimeFormatter LOCAL_DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd"); @InitBinder public void initBinder(WebDataBinder binder){ binder.registerCustomEditor(Date.class,new PropertyEditorSupport(){ @Override public void setAsText(String text) throws IllegalArgumentException { LocalDate localDate = LocalDate.parse(text,LOCAL_DATE_FORMAT); Instant instant = localDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant(); super.setValue(Date.from(instant)); } }); } @GetMapping("runnable") @ApiOperation("異常處理Runnable") public Object message(String message) { log.info("外部線程:{}", Thread.currentThread().getName()); HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest(); DeferredResult<String> result = new DeferredResult<>(6000L); //設置異步響應 this.threadPoolTaskExecutor.execute(new Runnable() { //線程核心任務 @SneakyThrows public void run() { log.info("內部線程:{}",Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(7); result.setResult("[echo]"+message); //執行最終的響應 result.onCompletion(new Runnable() { //完成處理線程 log.info("完成線程:{}",Thread.currentThread().getName()); //日志輸出 result.onTimeout(new Runnable() { log.info("超時線程:{}",Thread.currentThread().getName()); result.setResult("【請求超時】"+request.getRequestURI()); //超時路徑 return result; @GetMapping("task") @ApiOperation("task異步任務開啟") public Object messageTask(String message){ log.info("外部線程{}",Thread.currentThread().getName()); this.task.startTaskHander(); return "【echo】"+message; @GetMapping("webflux") @ApiOperation("整合webflux") public Object echo(Message message){ log.info("接收用戶信息,用戶方發送的參數為message={}",message); return this.messageHandler.echoHandler(message); }
頁面響應:
控制臺響應:
2021-11-30 15:04:06.946 INFO 22884 — [nio-1999-exec-1] c.e.oldguy.myController.AsyncController : 接收用戶信息,用戶方發送的參數為message=Message(title=pansd, pubdate=Tue Nov 30 00:00:00 CST 2021, content=come on baby)
2021-11-30 15:04:06.947 INFO 22884 — [nio-1999-exec-1] c.e.o.myWebFlux.handler.MessageHandler : 【http-nio-1999-exec-1】業務層接收處理數據:Message(title=pansd, pubdate=Tue Nov 30 00:00:00 CST 2021, content=come on baby)
webFlux響應map和List
//webFlux響應集合 public Flux<Message> list(Message message){ List<Message> messageList = new ArrayList<>(); for(int i=0;i<10;i++){ Message m = new Message(); m.setTitle(i+"--"+message.getTitle()); m.setContent(i+"--"+message.getContent()); m.setPubdate(message.getPubdate()); messageList.add(m); } return Flux.fromIterable(messageList); } public Flux<Map.Entry<String,Message>> map(Message message){ Map<String,Message> map = new HashMap<>(); for(int i=0;i<10;i++){ Message m = new Message(); m.setTitle(i+"--"+message.getTitle()); m.setContent(i+"--"+message.getContent()); m.setPubdate(message.getPubdate()); map.put("pansd-"+i,m); } // Set<Map.Entry<String, Message>> entries = map.entrySet(); return Flux.fromIterable(map.entrySet()); }
@GetMapping("webfluxList") @ApiOperation("整合webfluxList") public Object echoList(Message message){ log.info("接收用戶信息,用戶方發送的參數為message={}",message); return this.messageHandler.list(message); } @GetMapping("webfluxMap") @ApiOperation("整合webfluxMap") public Object echoMap(Message message){ log.info("接收用戶信息,用戶方發送的參數為message={}",message); return this.messageHandler.map(message); }
到此這篇關於springboot響應式編程整合webFlux的文章就介紹到這瞭,更多相關springboot響應式編程內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- SpringCloud之Hystrix的詳細使用
- Java Spring註解之@Async的基本用法和示例
- 詳解springboot使用異步註解@Async獲取執行結果的坑
- springBoot @Scheduled實現多個任務同時開始執行
- SpringBoot深入分析webmvc和webflux的區別