Java NIO 中 Selector 解析

一、Selector 簡介

1、Selector 和 Channel 關系

Selector 一般稱為選擇器,可以翻譯為 多路復用。它是 Java NIO 核心組件中的一個,用於檢查一個或者多個 NIO Channel (通道) 的狀態是否處於可讀、可寫。如此可以實現單線程管理多個 Channels , 也就是可以管理多個網絡鏈接。

使用 Selector 的好處在於:使用更少的線程就可以來處理通道瞭,相比使用多個線程,避免瞭線程上下文切換帶來的開銷。

2、可選擇通道(SelectableChannel)

(1)不是所有的 Channel 都是可以被 Selector 復用的。比方說, FileChannel 就不能被選擇器復用。判斷一個 Channel 能被 Selector 復用,有一個前提:判斷他是否繼承瞭一個抽象類 SelectableChannel。如果繼承瞭 SelectableChannel , 則可以被復用,否則不能。

(2)SelectableChannel 提供瞭實現通道選擇性所需要的公共方法。它是所有支持就緒檢查通道類的父類,所有 socket 通道,都繼承 SelectableChannel 類都是可選擇的,包括從管道(Pipe) 對象的中獲取得到的通道。而 FileChannel 類,沒有繼承 SelectableChannel , 因此是不是可選通道。

(3)一個通道可以被註冊到多個選擇器上,但對每個選擇器而言隻能被註冊一次。通道和選擇器之間的關系,使用註冊的方式完成。SelectableChannel 可以被註冊到 Selector 對象上,在註冊時候,需要指定通道的那些操作,是 Selector 感興趣的。

3、Channel 註冊到 Selector

(1)使用Channel.register(Selector sel, int pos) 方法,將一個通道註冊到一個選擇器時。第一個參數,指定通道要註冊的選擇器。第二個參數指定選擇器需要查詢的通道操作。

(2)可供選擇器查詢的通道操作,從類型類分,包括一下四種:

  • 可讀:SelectionKey.OP_READ
  • 可寫:SelectionKey.OP_WRITE
  • 連接:SelectionKey.OP_CONNECT
  • 接收:SelectionKey.OP_ACCEPT

如果 Selector 對通道的多操作類型感興趣,可以用“位或”操作符來實現:

比如int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

(3)選擇器查詢的不是通道的操作,而是通道的某個操作的一種就緒狀態。什麼操作的就緒狀態?一旦通道具備完成某個操作的條件,表示該通道的某個操作已經就緒,就可以被 Selector 查詢到,程序可以對通道進行對應的操作。比方說,某個 SocketChannel 通道可以連接到一個服務器,則處於“連接就緒”狀態(OP_CONNECT)。 再比方說,一個ServerSocketChannel 服務器通道準備好接收新進入的連接,則處於“接收就緒”(OP_ACCEPT)狀態。還比方說,一個數據可讀的通道,可以說是“讀就緒”(OP_READ)。一個等待寫數據的通道可以說是“寫就緒”(OP_WRITE)。

4、選擇鍵(SelectionKey)

(1)Channel 註冊之後,並且一旦通道處於某種就緒狀態,就可以被選擇器查詢到。這個工作使用選擇器 Selector 的 select() 方法完成。select 方法的作用,對感興趣的通道操作,進行就緒狀態的查詢。

(2)Selector 可以不斷的查詢 Channel 中發生的操作的就緒狀態。並且選擇甘心去的操作就緒狀態。一旦通道有操作的就緒狀態達成,並且是 Selecor 感興趣的操作,就會被 Selector 選中,放入選擇鍵集合中。

(3)一個選擇鍵,首先包含瞭註冊在 Selector 的通道操作的類型,比方說: SelectionKey.OP_READ . 也包含瞭特定的通道與特定的選擇器之間的註冊關系。

開發應用程序是,選擇鍵是編程的關鍵,NIO 編程,就是更具對應的選擇鍵,進行不同的業務邏輯處理。

(4)選擇鍵的概念,和事件的概念比較相似。一個選擇鍵類似監聽器模式裡面的一個事件。由於 Selector 不是事件觸發的模式,而是主動去查詢的模式,所以不叫事件 Event, 而是叫 SelectionKey 選擇鍵。

二、Selector 的使用方法

1、Selector 的創建

通過 Selector.open() 方法創建一個 Selector 對象。如下;

// 獲取 Selector 選擇器
Selector selector = Selector.open();

2、註冊 Channel 到 Selector

要實現 Selector 管理 Channel , 需要將 channel 註冊到相應的 Selector 上

// 1. 獲取 Selector 選擇器
Selector selector = Selector.open();

// 2. 獲取通道
ServerSocketChannel socketChannel = ServerSocketChannel.open();

// 3. 設置為非阻塞
socketChannel.configureBlocking(false);

// 4. 綁定連接
socketChannel.bind(new InetSocketAddress(9999));

// 5. 將通道註冊到選擇器
socketChannel.register(selector, SelectionKey.OP_ACCEPT);

上面通過調用通道的 register() 方法會將它註冊到一個選擇器上。

需要註意的是:

(1)與 Selector 一起使用, channel 必須處於非阻塞模式下,否則將拋出異常 IllegalBlockingModeException 。 這意味著,FileChannel 不能與 Selector 一起使用,因為 FileChannel 不能切換到非阻塞模式,而套接字相關的所有通道都可以。

(2)一個通道,並沒有一定要持有所有的四種操作。比如服務器通道 ServerSocketChannel 支持 Accept 接收操作,而 SocketChannel 客戶端通道則不支持。可以通過通道上的 vildOps() 方法,來獲取特定通道下所支持的操作集合。

3、輪訓查詢就緒操作

(1) 通過 Selector 的 select() 方法, 可以查詢出已經就緒的通道操作,有些就緒的狀態集合,包含在一個元素是 Selectionkey 對象的 Set 集合中

(2) 下面是 Selector 幾個重載的查詢 select() 方法:

  1. select() 阻塞到至少有一個通道在你註冊的事件上就緒。
  2. select(long timeout) select() 一樣,但最長阻塞事件為 timeout 毫秒。
  3. selectNow() 非阻塞,隻要有通道就立即返回。
  4. select() 方法返回的 int 之,表示有多少通道已經就緒,準確的說目前一次 select

方法來到這一次 select 方法之間的時間段上,有多少個通道編程瞭就緒狀態。

例如:首次調用 select() 方法,如果有一個通道編程瞭就緒狀態,返回瞭 1 , 若子啊次調用 select() 方法,如果另外一個通道就緒瞭,它會再次返回 1。 如果第一個就緒的 chnanel 麼有做任何操作,現在就有兩個就緒通道,但是每次 select() 方法調用之間,隻有一個通道就緒瞭。

一旦調用 select() 方法,並且返回值部位 0 時,在 Selector 中有一個 seletedKeys() 方法,用來范圍已選擇鍵集合,迭代集合的每個以元素,根據就緒操作的類型,完成對應的操作

// 查詢已經就緒的通道操作
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
while (iterator.hasNext()) {
    SelectionKey key = iterator.next();

    // 判斷 key 就緒狀態操作
    if (key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
}
iterator.remove();

4、停止選擇的方法

選擇器執行選擇的過程匯總,系統底層會依次詢問每個通道是否已經就緒,這個過程可能會造成調用線程進入阻塞狀態,那麼我們有一下三種方式可以喚醒在 select()方法中阻塞的線程。

wakeup() 方法:通過調用 Selector 對象的 wakeup() 方法讓處於阻塞狀態的 select() 方法立刻返回

該方法使得選擇器上的第一個哈沒有返回的選擇操作立即返回。如果當前沒有進行中的選擇操作,那麼下一次會對 select() 方法的一次調用立即返回。

close() 方法: 通過 close() 方法關閉 selector

該方法使得任何一個在選擇操作中阻塞的線程都被喚醒(類似 wakeup()) , 同時使的註冊到該 Selector 的所有 Channel 被註銷,所有的鍵都被取消,但是 Channel 本身不會關閉。

三、示例代碼

1、服務端代碼

@Test
public void server() throws IOException {
    //1. 獲取服務端通道
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

    //2. 切換非阻塞模式
    serverSocketChannel.configureBlocking(false);

    //3. 創建 buffer
    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
    writeBuffer.put("收到瞭。。。。".getBytes(StandardCharsets.UTF_8));

    //4. 綁定端口號
    serverSocketChannel.bind(new InetSocketAddress(20000));

    //5. 獲取 selector 選擇器
    Selector selector = Selector.open();

    //6. 通道註冊到選擇器,進行監聽
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    //7. 選擇器進行輪訓,進行後續操作
    while (selector.select() > 0) {
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator();
        // 循環
        while (selectionKeyIterator.hasNext()) {
            // 獲取就緒狀態
            SelectionKey k = selectionKeyIterator.next();

            // 操作判斷
            if (k.isAcceptable()) {
                // 獲取連接
                SocketChannel accept = serverSocketChannel.accept();

                // 切換非阻塞模式
                accept.configureBlocking(false);

                // 註冊
                accept.register(selector, SelectionKey.OP_READ);
            } else if (k.isReadable()) {
                SocketChannel socketChannel = (SocketChannel) k.channel();
                readBuffer.clear();
                socketChannel.read(readBuffer);

                readBuffer.flip();
                System.out.println("received:" + new String(readBuffer.array(), StandardCharsets.UTF_8));
                k.interestOps(SelectionKey.OP_WRITE);
            } else if (k.isWritable()) {
                writeBuffer.rewind();

                SocketChannel socketChannel = (SocketChannel) k.channel();
                socketChannel.write(writeBuffer);
                k.interestOps(SelectionKey.OP_READ);
            }
        }
    }
}

2、客戶端代碼

@Test
public void client() throws IOException {

    //1. 獲取通道,綁定主機和端口號
    SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(20000));

    //2. 切換到非阻塞模式
    socketChannel.configureBlocking(false);

    //3. 創建 buffer
    ByteBuffer buffer = ByteBuffer.allocate(1024);

    //4. 寫入 buffer 數據
    buffer.put(new Date().toString().getBytes(StandardCharsets.UTF_8));

    //5. 模式切換
    buffer.flip();

    //6. 寫入通道
    socketChannel.write(buffer);

    //7. 關閉
    buffer.clear();
    socketChannel.close();
}

3、NIO 編程步驟總結

  • 1、創建一個 ServerSocketChannel 通道
  • 2、設置為非阻塞模式
  • 3、創建一個 Selector 選擇器
  • 4、Channel 註冊到選擇器中,監聽連接事件
  • 5、調用 Selector 中的 select 方法(循環調用),監聽通道是否是就緒狀態
  • 6、調用 SelectKeys() 方法就能獲取 就緒 channel 集合
  • 7、遍歷就緒的 channel 集合,判斷就緒事件類型,實現具體的業務操作。
  • 8、根據業務流程,判斷是否需要再次註冊事件監聽事件,重復執行。

到此這篇關於Java NIO 中 Selector 解析的文章就介紹到這瞭,更多相關Java NIO 中的Selector內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: