基於Java實現Socket編程入門
認識Socket
socket,又稱套接字,是在不同的進程間進行網絡通訊的一種協議、約定或者說是規范。
對於socket編程,它更多的時候像是基於TCP/UDP等協議做的一層封裝或者說抽象,是一套系統所提供的用於進行網絡通信相關編程的接口。
建立socket的基本流程
我們以linux操作系統提供的基本api為例,瞭解建立一個socket通信的基本流程:
可以看到本質上,socket是對tcp連接(當然也有可能是udp等其他連接)協議,在編程層面上的簡化和抽象。
1.最基本的Socket示范
1.1 單向通信
首先,我們從隻發送和接收一次消息的socket基礎代碼開始:
服務端:
package com.marklux.socket.base; import java.io.IOException; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; /** * The very basic socket server that only listen one single message. */ public class BaseSocketServer { private ServerSocket server; private Socket socket; private int port; private InputStream inputStream; private static final int MAX_BUFFER_SIZE = 1024; public int getPort() { return port; } public void setPort(int port) { this.port = port; } public BaseSocketServer(int port) { this.port = port; } public void runServerSingle() throws IOException { this.server = new ServerSocket(this.port); System.out.println("base socket server started."); // the code will block here till the request come. this.socket = server.accept(); this.inputStream = this.socket.getInputStream(); byte[] readBytes = new byte[MAX_BUFFER_SIZE]; int msgLen; StringBuilder stringBuilder = new StringBuilder(); while ((msgLen = inputStream.read(readBytes)) != -1) { stringBuilder.append(new String(readBytes,0,msgLen,"UTF-8")); } System.out.println("get message from client: " + stringBuilder); inputStream.close(); socket.close(); server.close(); } public static void main(String[] args) { BaseSocketServer bs = new BaseSocketServer(9799); try { bs.runServerSingle(); }catch (IOException e) { e.printStackTrace(); } } }
客戶端:
package com.marklux.socket.base; import java.io.IOException; import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.net.Socket; /** * The very basic socket client that only send one single message. */ public class BaseSocketClient { private String serverHost; private int serverPort; private Socket socket; private OutputStream outputStream; public BaseSocketClient(String host, int port) { this.serverHost = host; this.serverPort = port; } public void connetServer() throws IOException { this.socket = new Socket(this.serverHost, this.serverPort); this.outputStream = socket.getOutputStream(); // why the output stream? } public void sendSingle(String message) throws IOException { try { this.outputStream.write(message.getBytes("UTF-8")); } catch (UnsupportedEncodingException e) { System.out.println(e.getMessage()); } this.outputStream.close(); this.socket.close(); } public static void main(String[] args) { BaseSocketClient bc = new BaseSocketClient("127.0.0.1",9799); try { bc.connetServer(); bc.sendSingle("Hi from mark."); }catch (IOException e) { e.printStackTrace(); } } }
先運行服務端,再運行客戶端,就可以看到效果。
- 註意這裡的IO操作實現,我們使用瞭一個大小為
MAX_BUFFER_SIZE
的byte數組作為緩沖區,然後從輸入流中取出字節放置到緩沖區,再從緩沖區中取出字節構建到字符串中去,這在輸入流文件很大時非常有用,事實上,後面要講到的NIO也是基於這種思路實現的。
1.2 雙向通信
上面的例子隻實現瞭一次單向的通信,這顯然有點浪費通道。socket連接支持全雙工的雙向通信(底層是tcp),下面的例子中,服務端在收到客戶端的消息後,將返回給客戶端一個回執。
並且我們使用瞭一些java.io包裝好的方法,來簡化整個通信的流程(因為消息長度不大,不再使用緩沖區)。
服務端:
public void runServer() throws IOException { this.serverSocket = new ServerSocket(port); this.socket = serverSocket.accept(); this.inputStream = socket.getInputStream(); String message = new String(inputStream.readAllBytes(), "UTF-8"); System.out.println("received message: " + message); this.socket.shutdownInput(); // 告訴客戶端接收已經完畢,之後隻能發送 // write the receipt. this.outputStream = this.socket.getOutputStream(); String receipt = "We received your message: " + message; outputStream.write(receipt.getBytes("UTF-8")); this.outputStream.close(); this.socket.close(); }
客戶端:
public void sendMessage(String message) throws IOException { this.socket = new Socket(host,port); this.outputStream = socket.getOutputStream(); this.outputStream.write(message.getBytes("UTF-8")); this.socket.shutdownOutput(); // 告訴服務器,所有的發送動作已經結束,之後隻能接收 this.inputStream = socket.getInputStream(); String receipt = new String(inputStream.readAllBytes(), "UTF-8"); System.out.println("got receipt: " + receipt); this.inputStream.close(); this.socket.close(); }
-
註意這裡我們在服務端接受到消息以及客戶端發送消息後,分別調用瞭
shutdownInput()
和shutdownOutput()
而不是直接close對應的stream,這是因為在關閉任何一個stream,都會直接導致socket的關閉,也就無法進行後面回執的發送瞭。 -
但是註意,調用
shutdownInput()
和shutdownOutput()
之後,對應的流也會被關閉,不能再次向socket發送/寫入瞭。
2. 發送更多的消息:結束的界定
剛才的兩個例子中,每次打開流,都隻能進行一次寫入/讀取操作,結束後對應流被關閉,就無法再次寫入/讀取瞭。
在這種情況下,如果要發送兩次消息,就不得不建立兩個socket,既耗資源又麻煩。其實我們完全可以不關閉對應的流,隻要分次寫入消息就可以瞭。
但是這樣的話,我們就必須面對另一個問題:如何判斷一次消息發送的結束呢?
2.1 使用特殊符號
最簡單的辦法是使用一些特殊的符號來標記一次發送完成,服務端隻要讀到對應的符號就可以完成一次讀取,然後進行相關的處理操作。
下面的例子中我們使用換行符\n
來標記一次發送的結束,服務端每接收到一個消息,就打印一次,並且使用瞭Scanner來簡化操作:
服務端:
public void runServer() throws IOException { this.server = new ServerSocket(this.port); System.out.println("base socket server started."); this.socket = server.accept(); // the code will block here till the request come. this.inputStream = this.socket.getInputStream(); Scanner sc = new Scanner(this.inputStream); while (sc.hasNextLine()) { System.out.println("get info from client: " + sc.nextLine()); } // 循環接收並輸出消息內容 this.inputStream.close(); socket.close(); }
客戶端:
public void connetServer() throws IOException { this.socket = new Socket(this.serverHost, this.serverPort); this.outputStream = socket.getOutputStream(); } public void send(String message) throws IOException { String sendMsg = message + "\n"; // we mark \n as a end of line. try { this.outputStream.write(sendMsg.getBytes("UTF-8")); } catch (UnsupportedEncodingException e) { System.out.println(e.getMessage()); } // this.outputStream.close(); // this.socket.shutdownOutput(); } public static void main(String[] args) { CycleSocketClient cc = new CycleSocketClient("127.0.0.1", 9799); try { cc.connetServer(); Scanner sc = new Scanner(System.in); while (sc.hasNext()) { String line = sc.nextLine(); cc.send(line); } }catch (IOException e) { e.printStackTrace(); } }
運行後效果是,客戶端每輸入一行文字按下回車後,服務端就會打印出對應的消息讀取記錄。
2.2 根據長度界定
回到原點,我們之所以不好定位消息什麼時候結束,是因為我們不能夠確定每次消息的長度。
那麼其實可以先將消息的長度發送出去,當服務端知道消息的長度後,就能夠完成一次消息的接收瞭。
總的來說,發送一次消息變成瞭兩個步驟
- 發送消息的長度
- 發送消息
最後的問題就是,“發送消息的長度”這一步驟所發送的字節量必須是固定的,否則我們仍然會陷入僵局。
一般來說,我們可以使用固定的字節數來保存消息的長度,比如規定前2個字節就是消息的長度,不過這樣我們能夠傳送的消息最大長度也就被固定死瞭,以2個字節為例,我們發送的消息最大長度不超過2^16個字節即64K。
如果你瞭解一些字符的編碼,就會知道,其實我們可以使用變長的空間來儲存消息的長度,比如:
第一個字節首位為0:即0XXXXXXX,表示長度就一個字節,最大128,表示128B
第一個字節首位為110,那麼附帶後面一個字節表示長度:即110XXXXX 10XXXXXX,最大2048,表示2K
第一個字節首位為1110,那麼附帶後面二個字節表示長度:即110XXXXX 10XXXXXX 10XXXXXX,最大131072,表示128K
依次類推
當然這樣實現起來會麻煩一些,因此下面的例子裡我們仍然使用固定的兩個字節來記錄消息的長度。
服務端:
public void runServer() throws IOException { this.serverSocket = new ServerSocket(this.port); this.socket = serverSocket.accept(); this.inputStream = socket.getInputStream(); byte[] bytes; while (true) { // 先讀第一個字節 int first = inputStream.read(); if (first == -1) { // 如果是-1,說明輸入流已經被關閉瞭,也就不需要繼續監聽瞭 this.socket.close(); break; } // 讀取第二個字節 int second = inputStream.read(); int length = (first << 8) + second; // 用位運算將兩個字節拼起來成為真正的長度 bytes = new byte[length]; // 構建指定長度的字節大小來儲存消息即可 inputStream.read(bytes); System.out.println("receive message: " + new String(bytes,"UTF-8")); } }
客戶端:
public void connetServer() throws IOException { this.socket = new Socket(host,port); this.outputStream = socket.getOutputStream(); } public void sendMessage(String message) throws IOException { // 首先要把message轉換成bytes以便處理 byte[] bytes = message.getBytes("UTF-8"); // 接下來傳輸兩個字節的長度,依然使用移位實現 int length = bytes.length; this.outputStream.write(length >> 8); // write默認一次隻傳輸一個字節 this.outputStream.write(length); // 傳輸完長度後,再正式傳送消息 this.outputStream.write(bytes); } public static void main(String[] args) { LengthSocketClient lc = new LengthSocketClient("127.0.0.1",9799); try { lc.connetServer(); Scanner sc = new Scanner(System.in); while (sc.hasNextLine()) { lc.sendMessage(sc.nextLine()); } } catch (IOException e) { e.printStackTrace(); } }
3. 處理更多的連接:多線程
3.1 同時實現消息的發送與接收
在考慮服務端處理多連接之前,我們先考慮使用多線程改造一下原有的一對一對話實例。
在原有的例子中,消息的接收方並不能主動地向對方發送消息,換句話說我們並沒有實現真正的互相對話,這主要是因為消息的發送和接收這兩個動作並不能同時進行,因此我們需要使用兩個線程,其中一個用於監聽鍵盤輸入並將其寫入socket,另一個則負責監聽socket並將接受到的消息顯示。
出於簡單考慮,我們直接讓主線程負責鍵盤監聽和消息發送,同時另外開啟一個線程用於拉取消息並顯示。
消息拉取線程 ListenThread.java
public class ListenThread implements Runnable { private Socket socket; private InputStream inputStream; public ListenThread(Socket socket) { this.socket = socket; } @Override public void run() throws RuntimeException{ try { this.inputStream = socket.getInputStream(); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e.getMessage()); } while (true) { try { int first = this.inputStream.read(); if (first == -1) { // 輸入流已經被關閉,無需繼續讀取 throw new RuntimeException("disconnected."); } int second = this.inputStream.read(); int msgLength = (first<<8) + second; byte[] readBuffer = new byte[msgLength]; this.inputStream.read(readBuffer); System.out.println("message from [" + socket.getInetAddress() + "]: " + new String(readBuffer,"UTF-8")); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e.getMessage()); } } } }
主線程,啟動時由用戶選擇是作為server還是client:
public class ChatSocket { private String host; private int port; private Socket socket; private ServerSocket serverSocket; private OutputStream outputStream; // 以服務端形式啟動,創建會話 public void runAsServer(int port) throws IOException { this.serverSocket = new ServerSocket(port); System.out.println("[log] server started at port " + port); // 等待客戶端的加入 this.socket = serverSocket.accept(); System.out.println("[log] successful connected with " + socket.getInetAddress()); // 啟動監聽線程 Thread listenThread = new Thread(new ListenThread(this.socket)); listenThread.start(); waitAndSend(); } // 以客戶端形式啟動,加入會話 public void runAsClient(String host, int port) throws IOException { this.socket = new Socket(host, port); System.out.println("[log] successful connected to server " + socket.getInetAddress()); Thread listenThread = new Thread(new ListenThread(this.socket)); listenThread.start(); waitAndSend(); } public void waitAndSend() throws IOException { this.outputStream = this.socket.getOutputStream(); Scanner sc = new Scanner(System.in); while (sc.hasNextLine()) { this.sendMessage(sc.nextLine()); } } public void sendMessage(String message) throws IOException { byte[] msgBytes = message.getBytes("UTF-8"); int length = msgBytes.length; outputStream.write(length>>8); outputStream.write(length); outputStream.write(msgBytes); } public static void main(String[] args) { Scanner scanner = new Scanner(System.in); ChatSocket chatSocket = new ChatSocket(); System.out.println("select connect type: 1 for server and 2 for client"); int type = Integer.parseInt(scanner.nextLine().toString()); if (type == 1) { System.out.print("input server port: "); int port = scanner.nextInt(); try { chatSocket.runAsServer(port); } catch (IOException e) { e.printStackTrace(); } }else if (type == 2) { System.out.print("input server host: "); String host = scanner.nextLine(); System.out.print("input server port: "); int port = scanner.nextInt(); try { chatSocket.runAsClient(host, port); } catch (IOException e) { e.printStackTrace(); } } } }
3.2 使用線程池優化服務端並發能力
作為服務端,如果一次隻跟一個客戶端建立socket連接,未免顯得太過浪費資源,因此我們完全可以讓服務端和多個客戶端建立多個socket。
那麼既然要處理多個連接,就不得不面對並發問題瞭(當然,你也可以寫循環輪流處理)。我們可以使用多線程來處理並發,不過線程的創建和銷毀都會消耗大量的資源和時間,所以最好一步到位,用一個線程池來實現。
下面給出一個示范性質的服務端代碼:
public class SocketServer { public static void main(String args[]) throws Exception { // 監聽指定的端口 int port = 55533; ServerSocket server = new ServerSocket(port); // server將一直等待連接的到來 System.out.println("server將一直等待連接的到來"); //如果使用多線程,那就需要線程池,防止並發過高時創建過多線程耗盡資源 ExecutorService threadPool = Executors.newFixedThreadPool(100); while (true) { Socket socket = server.accept(); Runnable runnable=()->{ try { // 建立好連接後,從socket中獲取輸入流,並建立緩沖區進行讀取 InputStream inputStream = socket.getInputStream(); byte[] bytes = new byte[1024]; int len; StringBuilder sb = new StringBuilder(); while ((len = inputStream.read(bytes)) != -1) { // 註意指定編碼格式,發送方和接收方一定要統一,建議使用UTF-8 sb.append(new String(bytes, 0, len, "UTF-8")); } System.out.println("get message from client: " + sb); inputStream.close(); socket.close(); } catch (Exception e) { e.printStackTrace(); } }; threadPool.submit(runnable); } } }
4. 連接保活
我想你不難發現一個問題,那就是當socket連接成功建立後,如果中途發生異常導致其中一方斷開連接,此時另一方是無法發現的,隻有在再次嘗試發送/接收消息才會因為拋出異常而退出。
簡單的說,就是我們維持的socket連接,是一個長連接,但我們沒有保證它的時效性,上一秒它可能還是可以用的,但是下一秒就不一定瞭。
4.1 使用心跳包
保證連接隨時可用的最常見方法就是定時發送心跳包,來檢測連接是否正常。這對於實時性要求很高的服務而言,還是非常重要的(比如消息推送)。
大體的方案如下:
- 雙方約定好心跳包的格式,要能夠區別於普通的消息。
- 客戶端每隔一定時間,就向服務端發送一個心跳包
- 服務端每接收到心跳包時,將其拋棄
- 如果客戶端的某個心跳包發送失敗,就可以判斷連接已經斷開
- 如果對實時性要求很高,服務端也可以定時檢查客戶端發送心跳包的頻率,如果超過一定時間沒有發送可以認為連接已經斷開
4.2 斷開時重連
使用心跳包必然會增加帶寬和性能的負擔,對於普通的應用我們其實並沒有必要使用這種方案,如果消息發送時拋出瞭連接異常,直接嘗試重新連接就好瞭。
跟上面的方案對比,其實這個拋出異常的消息就充當瞭心跳包的角色。
總的來說,連接是否要保活,如何保活,需要根據具體的業務場景靈活地思考和定制。
到此這篇關於基於Java實現Socket編程入門的文章就介紹到這瞭,更多相關Java Socket編程入門內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!