java Nio使用NioSocket客戶端與服務端交互實現方式

NioSocket 客戶端與服務端交互實現

java Nio是jdk1.4新增的io方式—–nio(new IO),這種方式在目前來說算不算new,更合適的解釋應該是non-block IO。

non-block是相對於傳統的io方式來講的。傳統的Io方式是阻塞的,我們拿網絡io來舉例,傳統的io模型如下:

這裡寫圖片描述

服務端主線程負責不斷地server.accept(),如果沒有客戶端請求主線程就會阻塞,當客戶端請求時,主線程會通過線程池創建一個新的線程執行。

簡單解釋就是一個線程負責一個客戶端的socket,當客戶端因網絡等原因傳遞速度慢的時候,服務端對應的客戶端的線程就會等待,很浪費資源。

同時線程過少的話會影響服務的吞吐量,而線程過多的話由於上下文切換等原因會導致效率十分低下,傳統的io方式並不適合如今的網絡流量。

Nio的模型如下:

這裡寫圖片描述

nio相比傳統的io模型,最大的特點是優化瞭線程的使用。

nio通過selector可以使用一個線程去管理多個socket句柄,說是管理也不太合適,nio是采用的事件驅動模型,selector負責的是監控各個連接句柄的狀態,不是去輪詢每個句柄,而是在數據就緒後,將消息通知給selector,而具體的socket句柄管理則是采用多路復用的模型,交由操作系統來完成。

selector充當的是一個消息的監聽者,負責監聽channel在其註冊的事件,這樣就可以通過一個線程完成瞭大量連接的管理,當註冊的事件發生後,再調用相應線程進行處理。

這樣就不需要為每個連接都使用一個線程去維持長連接,減少瞭長連接的開銷,同時減少瞭上下文的切換提高瞭系統的吞吐量。

java Nio的組成

java Nio主要由三個核心部分組成:

- Buffer 
- Channel 
- Selector

所有的io的Nio都是從一個channel開始的,Channel有點類似於流,但是和流不同的是,channel是可以雙向讀寫的。Channel有幾種類型,主要包含文件io操作和網絡io:

- FileChannel (文件io) 
- DatagramChannel (udp數據報) 
- SocketChannel (tcp客戶端) 
- ServerSocketChannel (tcp服務端)

Buffer是一個中間緩存區,數據可以從channel讀取到buffer,也可以從buffer寫到channel中,在java中,傳統方式與io的交互,需要將數據從堆內存讀取到直接內存中,然後交由c語言來調用系統服務完成io的交互。

而使用Buffer可以直接在直接內存中開辟內存區域,減少瞭io復制的操作,從而提高瞭io操作的效率。

#基本數據類型的buffer 
- ByteBuffer 
- CharBuffer 
- DoubleBuffer 
- FloatBuffer 
- IntBuffer 
- LongBuffer 
- ShortBuffer
#文件內存映射buffer 
- MappedByteBuffer
#直接內存區buffer 
- DirectBuffer

Selector允許單個線程處理多個channel,可以將多個channel教給selector管理,並註冊相應的事件,而selector則采用事件驅動的方式,當註冊的事件就緒後,調用相應的相應的線程處理該時間,不用使用線程去維持長連接,減少瞭線程的開銷。

Selector通過靜態工廠的open方法建立,然後通過channel的register註冊到Channel上。

註冊後通過select方法等待請求,select請求有long類型參數,代表等待時間,如果等待時間內接受到操作請求,則返回可以操作請求的數量,否則超時往下走。

傳入參數為零或者無參方法,則會采用阻塞模式知道有相應請求。

收到請求後調用selectedKeys返回SelectionKey的集合。

SelectionKey保存瞭處理當前請求的Channel和Selector,並且提供瞭不同的操作類型。

SelectionKey的操作有四種:

- SelectionKey.OP_CONNECT 
- SelectionKey.OP_ACCEPT 
- SelectionKey.OP_READ 
- SelectionKey.OP_WRITE

下面為一個客戶端與服務端實用NioSocket交互的簡單例子:

//對selectionKey事件的處理
/**
 * description:
 *
 * @author wkGui
 */
interface ServerHandlerBs {
    void handleAccept(SelectionKey selectionKey) throws IOException;
    String handleRead(SelectionKey selectionKey) throws IOException;
}
/**
 * description:
 *
 * @author wkGui
 */
public class ServerHandlerImpl implements ServerHandlerBs {
    private int bufferSize = 1024;
    private String localCharset = "UTF-8";
    public ServerHandlerImpl() {
    }
    public ServerHandlerImpl(int bufferSize) {
        this(bufferSize, null);
    }
    public ServerHandlerImpl(String localCharset) {
        this(-1, localCharset);
    }
    public ServerHandlerImpl(int bufferSize, String localCharset) {
        this.bufferSize = bufferSize > 0 ? bufferSize : this.bufferSize;
        this.localCharset = localCharset == null ? this.localCharset : localCharset;
    }
    @Override
    public void handleAccept(SelectionKey selectionKey) throws IOException {
        //獲取channel
        SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
        //非阻塞
        socketChannel.configureBlocking(false);
        //註冊selector
        socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize));
        System.out.println("建立請求......");
    }
    @Override
    public String handleRead(SelectionKey selectionKey) throws IOException {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
        String receivedStr = "";
        if (socketChannel.read(buffer) == -1) {
            //沒讀到內容關閉
            socketChannel.shutdownOutput();
            socketChannel.shutdownInput();
            socketChannel.close();
            System.out.println("連接斷開......");
        } else {
            //將channel改為讀取狀態
            buffer.flip();
            //按照編碼讀取數據
            receivedStr = Charset.forName(localCharset).newDecoder().decode(buffer).toString();
            buffer.clear();
            //返回數據給客戶端
            buffer = buffer.put(("received string : " + receivedStr).getBytes(localCharset));
            //讀取模式
            buffer.flip();
            socketChannel.write(buffer);
            //註冊selector 繼續讀取數據
            socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize));
        }
        return receivedStr;
    }
}
//服務端server類
/**
 * description:
 *
 * @author wkGui
 */
public class NioSocketServer {
    private volatile byte flag = 1;
    public void setFlag(byte flag) {
        this.flag = flag;
    }
    public void start() {
        //創建serverSocketChannel,監聽8888端口
        try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
            serverSocketChannel.socket().bind(new InetSocketAddress(8888));
            //設置為非阻塞模式
            serverSocketChannel.configureBlocking(false);
            //為serverChannel註冊selector
            Selector selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("服務端開始工作:");
            //創建消息處理器
            ServerHandlerBs handler = new ServerHandlerImpl(1024);
            while (flag == 1) {
                selector.select();
                System.out.println("開始處理請求 : ");
                //獲取selectionKeys並處理
                Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();
                    try {
                        //連接請求
                        if (key.isAcceptable()) {
                            handler.handleAccept(key);
                        }
                        //讀請求
                        if (key.isReadable()) {
                            System.out.println(handler.handleRead(key));
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    //處理完後移除當前使用的key
                    keyIterator.remove();
                }
                System.out.println("完成請求處理。");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
//server端啟動類
/**
 * description:
 *
 * @author wkGui
 */
public class ServerMain {
    public static void main(String[] args) {
        NioSocketServer server = new NioSocketServer();
        new Thread(() -> {
            try {
                Thread.sleep(10*60*1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                server.setFlag((byte) 0);
            }
        }).start();
        server.start();
    }
}
//客戶端client類
/**
 * description:
 *
 * @author wkGui
 */
public class NioSocketClient {
    public void start() {
        try (SocketChannel socketChannel = SocketChannel.open()) {
            //連接服務端socket
            SocketAddress socketAddress = new InetSocketAddress("localhost", 8888);
            socketChannel.connect(socketAddress);
            int sendCount = 0;
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            //這裡最好使用selector處理   這裡隻是為瞭寫的簡單
            while (sendCount < 10) {
                buffer.clear();
                //向服務端發送消息
                buffer.put(("current time : " + System.currentTimeMillis()).getBytes());
                //讀取模式
                buffer.flip();
                socketChannel.write(buffer);
                buffer.clear();
                //從服務端讀取消息
                int readLenth = socketChannel.read(buffer);
                //讀取模式
                buffer.flip();
                byte[] bytes = new byte[readLenth];
                buffer.get(bytes);
                System.out.println(new String(bytes, "UTF-8"));
                buffer.clear();
                sendCount++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
//client啟動類
/**
 * description:
 *
 * @author wkGui
 */
public class ClientMain {
    public static void main(String[] args) {
        new NioSocketClient().start();
    }
}

Java NIO 實現 WebSocket 協議

WebSocket協議

WebSocket是一種在單個TCP連接上進行全雙工通信的協議。 WebSocket使得客戶端和服務器之間的數據交換變得更加簡單,允許服務端主動向客戶端推送數據。在WebSocket API中,瀏覽器和服務器隻需要完成一次握手,兩者之間就直接可以創建持久性的連接,並進行雙向數據傳輸。

WebSocket協議相比於Http協議來說,最大的特點就是可以實現服務端主動向客戶端發送消息。在WebSocket出現之前,如果客戶端想實時獲取服務端的消息,就需要使用AJAX輪詢,查詢是否有消息,這樣就很消耗服務器資源和帶寬。但是用WebSocket就可以實現服務端主動向客戶端發送數據,並且隻需要占用一個TCP連接,節省瞭資源和帶寬。

WebSocket連接建立過程

為瞭建立一個WebSocket連接,客戶端瀏覽器首先要向服務器發起一個HTTP請求,這個請求和通常的HTTP請求不同,包含瞭一些附加的頭信息,其中附加頭信息“Upgrade: WebSocket” 表明這是一個申請協議升級的HTTP請求。服務器端解析這些附加的信息頭,然後生成應答消息返回給客戶端,客戶端和服務端的WebSocket連接就建立瞭。之後就可以使用WebSocket協議的格式來雙向發送消息。

建立連接時發送的HTTP請求頭:

返回的HTTP響應頭:

在響應頭中的 Sec-WebSocket-Accept 時通過Sec-WebSocket-Key構造出來的。首先在Sec-WebSocket-Key後接上一個258EAFA5-E914-47DA-95CA-C5AB0DC85B11,然後再進行SHA1摘要得到160位數據在,在使用BASE64進行編碼,最後得到的就是Sec-WebSocket-Accept。

WebSocket數據發送過程

WebSocket數據發送的幀格式如下所示:

FIN – 1bit

在數據發送的過程中,可能會分片發送,FIN表示是否為最後一個分片。如果發生瞭分片,則1表示時最後一個分片;不能再分片的情況下,這個標志總是為1。

RSV1 RSV2 RSV3 – 1bit each

用於擴展,不使用擴展時需要為全0;非零時通信雙方必須協商好擴展。這裡我們用不上。

OPCODE – 4bits

用於表示所傳送數據的類型,也就是payload中的數據。

數值 含義
0x0 附加數據幀
0x1 文本數據幀
0x2 二進制數據幀
0x3-0x7 保留
0x8 關閉連接幀
0x9 ping幀
0xA pong幀
0xB-0xF 保留

MASK – 1bit

用於表示payload是否被進行瞭掩碼運算,1表示使用掩碼,0表示不使用掩碼。從客戶端發送向服務端的數據幀必須使用掩碼。

Payload length 7 bits,7+16 bits or 7+64 bits

用於表示payload的長度,有以下三種情況:

Payload length 表示的大小 payload的長度
0 – 125 Payload length 大小
126 之後的2個字節表示的無符號整數
127 之後的8個字節表示的無符號整數

Masking-key – 0 or 4 bytes

32 bit長的掩碼,如果MASK為1,則幀中就存在這一個字段,在解析payload時,需要進行使用32長掩碼進行異或操作,之後才能得到正確結果。

Java NIO 實現

利用Java NIO 來實現一個聊天室。部分代碼如下。

NIO的常規代碼:

selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
    SelectionKey key = it.next();
    it.remove();
    if (key.isAcceptable()) {
        handleAccept(key);
    }
    if (key.isReadable()) {
        handleRead(key);
    }
}

接受連接:

public void handleAccept(SelectionKey key) {
    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
    SocketChannel sc;
    try {
        sc = ssc.accept();
        sc.configureBlocking(false);
        sc.register(selector, SelectionKey.OP_READ);
        System.out.println(String.format("[server] -- client %s connected.", sc.getRemoteAddress().toString()));
    } catch (IOException e) {
        System.out.println(String.format("[server] -- error occur when accept: %s.", e.getMessage()));
        key.cancel();
    }
}

讀取通道中的數據:

public void handleRead(SelectionKey key) {
    SocketChannel sc = (SocketChannel) key.channel();
    Client client = (Client) key.attachment();
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    // 如果是第一次連接進來,就需要創建一個客戶端對象,存儲起來
    if (client == null) {
        client = new Client(sc);
        clients.add(client);
        key.attach(client);
        byteBuffer.clear();
        // 如果連接還沒有建立,就是要HTTP建立連接
        try {
            sc.read(byteBuffer);
            byteBuffer.flip();
            String response = WebSocketHandler.getResponse(new String(byteBuffer.array()));
            byteBuffer.clear();
            byteBuffer.put(response.getBytes());
            byteBuffer.flip();
            while (byteBuffer.hasRemaining()) {
                sc.write(byteBuffer);
            }
        } catch (IOException e) {
            System.out.println(String.format("[server] -- error occur when read: %s.", e.getMessage()));
        }
        String message = "[系統消息] " + client.toString() + " 加入瞭群聊";
        broadcast(message.getBytes(), client);
    }
    byteBuffer.clear();
    int read = 0;
    try {
        read = sc.read(byteBuffer);
        if (read > 0) {
            byteBuffer.flip();
            int opcode = byteBuffer.get() & 0x0f;
            // 8表示客戶端關閉瞭連接
            if (opcode == 8) {
                System.out.println(String.format("[server] -- client %s connection close.", sc.getRemoteAddress()));
                clients.remove(client);
                String message = "[系統消息] " + client.toString() + " 退出瞭群聊";
                broadcast(message.getBytes(), client);
                sc.close();
                key.cancel();
                return;
            }
   // 隻考慮瞭最簡單的payload長度情況。
            int len = byteBuffer.get();
            len &= 0x7f;
            byte[] mask = new byte[4];
            byteBuffer.get(mask);
            byte[] payload = new byte[len];
            byteBuffer.get(payload);
            for (int i = 0; i < payload.length; i++) {
                payload[i] ^= mask[i % 4];
            }
            System.out.println(String
                    .format("[server] -- client: [%s], send: [%s].", client.toString(), new String(payload)));
            String message = String.format("[%s]: %s", client.toString(), new String(payload));
            broadcast(message.getBytes(), client);
        } else if (read == -1) {
            System.out.println(String.format("[server] -- client %s connection close.", sc.getRemoteAddress()));
            clients.remove(client);
            String message = "[系統消息] " + client.toString() + " 退出瞭群聊";
            broadcast(message.getBytes(), client);
            sc.close();
            key.cancel();
        }
    } catch (IOException e) {
        System.out.println(String.format("[server] -- error occur when read: %s.", e.getMessage()));
    }
}

使用HTTP建立WebSocket連接。

public class WebSocketHandler {
    private static String APPEND_STRING = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
    static class Header {
        private Map<String, String> properties = new HashMap<>();
        public String get(String key) {
            return properties.get(key);
        }
    }
    private WebSocketHandler() {}
    private static Header phrase(String request) {
        Header header = new Header();
        String[] pros = request.split("\r\n");
        for (String pro : pros) {
            if (pro.contains(":")) {
                int index = pro.indexOf(":");
                String key = pro.substring(0, index).trim();
                String value = pro.substring(index + 1).trim();
                header.properties.put(key, value);
            }
        }
        return header;
    }
    public static String getResponse(String request) {
        Header header = phrase(request);
        String acceptKey = header.get("Sec-WebSocket-Key") + APPEND_STRING;
        MessageDigest sha1;
        try {
            sha1 = MessageDigest.getInstance("sha1");
            sha1.update(acceptKey.getBytes());
            acceptKey = new String(Base64.getEncoder().encode(sha1.digest()));
        } catch (NoSuchAlgorithmException e) {
            System.out.println("fail to encode " + e.getMessage());
            return null;
        }
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("HTTP/1.1 101 Switching Protocols\r\n").append("Upgrade: websocket\r\n")
                     .append("Connection: Upgrade\r\n").append("Sec-WebSocket-Accept: " + acceptKey + "\r\n")
                     .append("\r\n");
        return stringBuilder.toString();
    }
}

客戶端對象

/**
 * @author XinHui Chen
 * @date 2020/2/8 19:20
 */
public class Client {
    private SocketChannel socketChannel = null;
    private String id = null;
    public SocketChannel getSocketChannel() {
        return socketChannel;
    }
    public String getId() {
        return id;
    }
    Client(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
        this.id = UUID.randomUUID().toString();
    }
    @Override
    public String toString() {
        try {
            return id + " " + socketChannel.getRemoteAddress().toString();
        } catch (IOException e) {
            System.out.println(e.getMessage());
            return null;
        }
    }
}

結果

使用網頁和控制臺與服務端建立WebSocket連接,發送數據。兩個都能成功顯示。

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: