Springboot之整合Socket連接案例
Socket連接與硬件通信
一、如何讓socket隨著springboot項目一起啟動
SpringBoot中CommandLineRunner的作用:平常開發中有可能需要實現在項目啟動後執行的功能,SpringBoot提供的一種簡單的實現方案就是添加一個model並實現CommandLineRunner接口,實現功能的代碼放在實現的run方法中
具體實現
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @author 易水●墨龍吟 * @Description * @create 2019-04-14 23:40 */ @Component public class TestRunner implements CommandLineRunner { @Autowired private SocketProperties properties; @Override public void run(String... args) throws Exception { ServerSocket server = null; Socket socket = null; server = new ServerSocket(properties.getPort()); System.out.println("設備服務器已經開啟, 監聽端口:" + properties.getPort()); ThreadPoolExecutor pool = new ThreadPoolExecutor( properties.getPoolCore(), properties.getPoolMax(), properties.getPoolKeep(), TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(properties.getPoolQueueInit()), new ThreadPoolExecutor.DiscardOldestPolicy() ); while (true) { socket = server.accept(); pool.execute(new ServerConfig(socket)); } } }
此處使用瞭自定義的線程池,提高對於socket的客戶端處理能力。
二、自定義配置並使用
此處將socket的端口和線程池的一些配置放到 application.yml中使用,方便使用和修改
# Socket配置 socket: # 監聽端口 2323 port: 2323 # 線程池 - 保持線程數 20 pool-keep: 20 # 線程池 - 核心線程數 10 pool-core: 10 # 線程池 - 最大線程數 20 pool-max: 30 # 線程隊列容量 10 pool-queue-init: 10
import lombok.Getter; import lombok.Setter; import lombok.ToString; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.stereotype.Component; /** * @author 易水●墨龍吟 * @Description * @create 2019-04-18 22:35 */ @Setter @Getter @ToString @Component @Configuration @PropertySource("classpath:application.yml") @ConfigurationProperties(prefix = "socket") public class SocketProperties { private Integer port; private Integer poolKeep; private Integer poolCore; private Integer poolMax; private Integer poolQueueInit; }
三、Socket對於客戶端發來的信息的處理和重發機制
當客戶端端連接之後發送信息,如果超時未發送,將會關閉,發送數據有異常將會返回給客戶端一個error,讓客戶端在發送一次數據。
import com.farm.config.socket.resolve.MessageChain; import com.farm.service.EnvironmentService; import com.farm.service.impl.EnvironmentServiceImpl; import java.io.*; import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; import java.util.Map; /** * @author 易水●墨龍吟 * @Description * @create 2019-04-14 23:21 */ public class ServerConfig extends Thread { private Socket socket; public ServerConfig(Socket socket) { this.socket = socket; } // 獲取spring容器管理的類,可以獲取到sevrice的類 private EnvironmentService service = SpringUtil.getBean(EnvironmentServiceImpl.class); private String handle(InputStream inputStream) throws IOException, DataFormException { byte[] bytes = new byte[1024]; int len = inputStream.read(bytes); if (len != -1) { StringBuffer request = new StringBuffer(); request.append(new String(bytes, 0, len, "UTF-8")); System.out.println("接受的數據: " + request); System.out.println("from client ... " + request + "當前線程" + Thread.currentThread().getName()); Map<String, String> map = MessageChain.out(request.toString()); System.out.println("處理的數據" + map); Integer res = service.addEnvironment(map); if (res == 1) { return "ok"; } else { throw new DataFormException("數據處理異常"); } } else { throw new DataFormException("數據處理異常"); } } @Override public void run() { BufferedWriter writer = null; try { // 設置連接超時9秒 socket.setSoTimeout(9000); System.out.println("客戶 - " + socket.getRemoteSocketAddress() + " -> 機連接成功"); InputStream inputStream = socket.getInputStream(); writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); String result = null; try { result = handle(inputStream); writer.write(result); writer.newLine(); writer.flush(); } catch (IOException | DataFormException | IllegalArgumentException e) { writer.write("error"); writer.newLine(); writer.flush(); System.out.println("發生異常"); try { System.out.println("再次接受!"); result = handle(inputStream); writer.write(result); writer.newLine(); writer.flush(); } catch (DataFormException | SocketTimeoutException ex) { System.out.println("再次接受, 發生異常,連接關閉"); } } } catch (SocketException socketException) { socketException.printStackTrace(); try { writer.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } catch (IOException e) { e.printStackTrace(); } finally { try { writer.close(); } catch (IOException e) { e.printStackTrace(); } } } }
在此處有一個坑,如果客戶端是用C/C++編寫的,必須使用如下方法:
byte[] bytes = new byte[1024]; int len = inputStream.read(bytes);
如果使用readLine或者 DataInputStream dataInputStream =new DataInputStream(socket.getInputStream())這樣會出現使用TCP連接助手,客戶端發送數據收不到。
四、如何在普通類中使用Spring註入類
這裡需要使用一個工具類。
import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; /** * @author 易水●墨龍吟 * @Description * @create 2019-04-15 0:01 */ @Component public class SpringUtil implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if (SpringUtil.applicationContext == null) { SpringUtil.applicationContext = applicationContext; } } /** * 獲取applicationContext * @return */ public static ApplicationContext getApplicationContext() { return applicationContext; } /** * 通過name獲取 Bean. * @param name * @return */ public static Object getBean(String name){ return getApplicationContext().getBean(name); } /** * 通過class獲取Bean. * @param clazz * @param <T> * @return */ public static <T> T getBean(Class<T> clazz){ return getApplicationContext().getBean(clazz); } /** * 通過name,以及Clazz返回指定的Bean * @param name * @param clazz * @param <T> * @return */ public static <T> T getBean(String name,Class<T> clazz){ return getApplicationContext().getBean(name, clazz); } }
補充:springboot下websocket前臺後端數據長連接
首先導入依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.springframework.security</groupId> <artifactId>spring-security-messaging</artifactId> </dependency>
spring-security-messaging 是後面繼承 AbstractSecurityWebSocketMessageBrokerConfigurer需要用到的依賴
WebSocketConfig
@Configuration @EnableWebSocketMessageBroker //此註解表示使用STOMP協議來傳輸基於消息代理的消息,此時可以在@Controller類中使用@MessageMapping public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void registerStompEndpoints(StompEndpointRegistry registry) { /** * 註冊 Stomp的端點 * addEndpoint:添加STOMP協議的端點。這個HTTP URL是供WebSocket或SockJS客戶端訪問的地址 * withSockJS:指定端點使用SockJS協議 */ registry.addEndpoint("/websocket/tracker") //物流消息通道, .setAllowedOrigins("*") //允許跨域,裡面路徑可以設定 .withSockJS() //指定協議 .setInterceptors(httpSessionHandshakeInterceptor()) ; //設置攔截器() } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { /** * 配置消息代理 * 啟動簡單Broker,消息的發送的地址符合配置的前綴來的消息才發送到這個broker */ registry.enableSimpleBroker("/topic","/user"); } //攔截器 @Bean public HandshakeInterceptor httpSessionHandshakeInterceptor() { return new HandshakeInterceptor() { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { //可以在這裡先判斷登錄是否合法 return true; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { //握手成功後, } }; } }
WebsocketSecurityConfiguration
@Configuration public class WebsocketSecurityConfiguration extends AbstractSecurityWebSocketMessageBrokerConfigurer { @Override protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) { messages .nullDestMatcher().authenticated() .simpDestMatchers("/topic/**").authenticated() .simpDestMatchers("/user/**").authenticated() .simpTypeMatchers(SimpMessageType.MESSAGE, SimpMessageType.SUBSCRIBE).denyAll() // catch all .anyMessage().denyAll(); } /** * Disables CSRF for Websockets. */ @Override protected boolean sameOriginDisabled() { return true; } }
WebSocketResource
package com.gleam.shopmall.web.rest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.SimpMessageMappingInfo; import org.springframework.messaging.simp.SimpMessageSendingOperations; import org.springframework.stereotype.Controller; import org.springframework.web.socket.messaging.SessionDisconnectEvent; @Controller public class WebSocketResource { private static final Logger log = LoggerFactory.getLogger(WebSocketResource.class); @Autowired SimpMessageSendingOperations messagingTemplate; //此方法適用於網頁聊天室,從前端接收數據,返回訂閱者(前端) @MessageMapping("/welcome") //指定要接收消息的地址,類似@RequestMapping @SendTo("/topic/getResponse") //默認消息將被發送到與傳入消息相同的目的地,但是目的地前面附加前綴(默認情況下為“/topic”} public String say(String message) throws Exception { return message; } //發送指定用戶(直接從後端發送數據到前端) public void sendToUser(String login,String channel, String info) { log.debug("[ToUser]WEBSOCKET發送消息, username={}, info={}", login, info); this.messagingTemplate.convertAndSendToUser(login, channel, info); log.debug("[ToUser]WEBSOCKET發送消息:完成"); } //發送所有訂閱的(直接從後端發送數據到前端) public void send(String channel, String info) { log.debug("[ToAll]WEBSOCKET發送消息, info={}", info); // this.messagingTemplate.convertAndSend(channel, info); this.messagingTemplate.convertAndSend("/topic/getResponse", "接收到瞭嗎?"); log.debug("[ToAll]WEBSOCKET發送消息:完成"); } }
前端html
<!DOCTYPE html> <html xmlns:th="http://www.thymeleaf.org"> <head> <meta charset="UTF-8" /> <script src="http://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script> <script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.js"></script> <script src="http://code.jquery.com/jquery-1.7.2.min.js"></script> <script src="http://pv.sohu.com/cityjson?ie=utf-8"></script> <title>Spring Boot+WebSocket+廣播式</title> <script type="text/javascript"> var stompClient = null; function setConnected(connected) { document.getElementById('connect').disabled = connected; document.getElementById('disconnect').disabled = !connected; document.getElementById('conversationDiv').style.visibility = connected ? 'visible' : 'hidden'; $('#response').html(); } function connect() { // websocket的連接地址,此值等於WebSocketConfig中registry.addEndpoint("/websocket/tracker").withSockJS()配置的地址, //這裡如果是微服務或者遠端,需要全路徑 var socket = new SockJS('/websocket/tracker'); //1 stompClient = Stomp.over(socket);//2 stompClient.connect({}, function(frame) {//3 setConnected(true); console.log('開始進行連接Connected: ' + frame); // 客戶端訂閱消息的目的地址:此值等於WebSocketResource中@SendTo("/topic/getResponse")註解的裡配置的值 stompClient.subscribe('/topic/getResponse', function(respnose){ //4 showResponse(respnose.body); }); }); } function disconnect() { if (stompClient != null) { stompClient.disconnect(); } setConnected(false); console.log("Disconnected"); } function sendName() { var name = $('#name').val(); stompClient.send("/welcome", {}, returnCitySN['cip'] +":"+name);// JSON.stringify(name) } function showResponse(message) { var response = $("#response"); response.html(message+"<br>" + response.html()); } </script> </head> <body onload="disconnect()"> <noscript><h2 style="color: red">貌似你的瀏覽器不支持websocket</h2></noscript> <div> <div> <button id="connect" onclick="connect();" style="color: red">連接</button> <button id="disconnect" disabled="disabled" onclick="disconnect();">斷開連接</button> </div> <div id="conversationDiv"> <label>輸入內容</label><input type="text" id="name" /> <button id="sendName" onclick="sendName();">發送</button> <p id="response"></p> </div> </div> </body> </html>```
以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。如有錯誤或未考慮完全的地方,望不吝賜教。
推薦閱讀:
- springboot整合websocket最基礎入門使用教程詳解
- java 實現web項目啟動加載properties屬性文件
- springboot簡單接入websocket的操作方法
- 解決Java中socket使用getInputStream()阻塞問題
- springboot實現在工具類(util)中調用註入service層方法