摘要:為解決這問題,我們發現元兇處在一線程一請求上,如果一個線程能同時處理多個請求,那么在高并發下性能上會大大改善。這樣一個線程可以同時發起多個調用,并且不需要同步等待數據就緒。表示當前就緒的事件類型。
JAVA NIO 一步步構建I/O多路復用的請求模型
摘要:本文屬于原創,歡迎轉載,轉載請保留出處:https://github.com/jasonGeng88/blog
文章一:JAVA 中原生的 socket 通信機制
當前環境jdk == 1.8
代碼地址git 地址:https://github.com/jasonGeng88/java-network-programming
知識點nio 下 I/O 阻塞與非阻塞實現
SocketChannel 介紹
I/O 多路復用的原理
事件選擇器與 SocketChannel 的關系
事件監聽類型
字節緩沖 ByteBuffer 數據結構
場景接著上一篇中的站點訪問問題,如果我們需要并發訪問10個不同的網站,我們該如何處理?
在上一篇中,我們使用了java.net.socket類來實現了這樣的需求,以一線程處理一連接的方式,并配以線程池的控制,貌似得到了當前的最優解。可是這里也存在一個問題,連接處理是同步的,也就是并發數量增大后,大量請求會在隊列中等待,或直接異常拋出。
為解決這問題,我們發現元兇處在“一線程一請求”上,如果一個線程能同時處理多個請求,那么在高并發下性能上會大大改善。這里就借住 JAVA 中的 nio 技術來實現這一模型。
nio 的阻塞實現關于什么是 nio,從字面上理解為 New IO,就是為了彌補原本 I/O 上的不足,而在 JDK 1.4 中引入的一種新的 I/O 實現方式。簡單理解,就是它提供了 I/O 的阻塞與非阻塞的兩種實現方式(當然,默認實現方式是阻塞的。)。
下面,我們先來看下 nio 以阻塞方式是如何處理的。
建立連接有了上一篇 socket 的經驗,我們的第一步一定也是建立 socket 連接。只不過,這里不是采用 new socket() 的方式,而是引入了一個新的概念 SocketChannel。它可以看作是 socket 的一個完善類,除了提供 Socket 的相關功能外,還提供了許多其他特性,如后面要講到的向選擇器注冊的功能。
類圖如下:
建立連接代碼實現:
// 初始化 socket,建立 socket 與 channel 的綁定關系 SocketChannel socketChannel = SocketChannel.open(); // 初始化遠程連接地址 SocketAddress remote = new InetSocketAddress(this.host, port); // I/O 處理設置阻塞,這也是默認的方式,可不設置 socketChannel.configureBlocking(true); // 建立連接 socketChannel.connect(remote);獲取 socket 連接
因為是同樣是 I/O 阻塞的實現,所以后面的關于 socket 輸入輸出流的處理,和上一篇的基本相同。唯一差別是,這里需要通過 channel 來獲取 socket 連接。
獲取 socket 連接
Socket socket = socketChannel.socket();
處理輸入輸出流
PrintWriter pw = getWriter(socketChannel.socket()); BufferedReader br = getReader(socketChannel.socket());完整示例
package com.jason.network.mode.nio; import com.jason.network.constant.HttpConstant; import com.jason.network.util.HttpUtil; import java.io.*; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; import java.nio.channels.SocketChannel; public class NioBlockingHttpClient { private SocketChannel socketChannel; private String host; public static void main(String[] args) throws IOException { for (String host: HttpConstant.HOSTS) { NioBlockingHttpClient client = new NioBlockingHttpClient(host, HttpConstant.PORT); client.request(); } } public NioBlockingHttpClient(String host, int port) throws IOException { this.host = host; socketChannel = SocketChannel.open(); socketChannel.socket().setSoTimeout(5000); SocketAddress remote = new InetSocketAddress(this.host, port); this.socketChannel.connect(remote); } public void request() throws IOException { PrintWriter pw = getWriter(socketChannel.socket()); BufferedReader br = getReader(socketChannel.socket()); pw.write(HttpUtil.compositeRequest(host)); pw.flush(); String msg; while ((msg = br.readLine()) != null){ System.out.println(msg); } } private PrintWriter getWriter(Socket socket) throws IOException { OutputStream out = socket.getOutputStream(); return new PrintWriter(out); } private BufferedReader getReader(Socket socket) throws IOException { InputStream in = socket.getInputStream(); return new BufferedReader(new InputStreamReader(in)); } }nio 的非阻塞實現 原理分析
nio 的阻塞實現,基本與使用原生的 socket 類似,沒有什么特別大的差別。
下面我們來看看它真正強大的地方。到目前為止,我們將的都是阻塞 I/O。何為阻塞 I/O,看下圖:
我們主要觀察圖中的前三種 I/O 模型,關于異步 I/O,一般需要依靠操作系統的支持,這里不討論。
從圖中可以發現,阻塞過程主要發生在兩個階段上:
第一階段:等待數據就緒;
第二階段:將已就緒的數據從內核緩沖區拷貝到用戶空間;
這里產生了一個從內核到用戶空間的拷貝,主要是為了系統的性能優化考慮。假設,從網卡讀到的數據直接返回給用戶空間,那勢必會造成頻繁的系統中斷,因為從網卡讀到的數據不一定是完整的,可能斷斷續續的過來。通過內核緩沖區作為緩沖,等待緩沖區有足夠的數據,或者讀取完結后,進行一次的系統中斷,將數據返回給用戶,這樣就能避免頻繁的中斷產生。
了解了 I/O 阻塞的兩個階段,下面我們進入正題。看看一個線程是如何實現同時處理多個 I/O 調用的。從上圖中的非阻塞 I/O 可以看出,僅僅只有第二階段需要阻塞,第一階段的數據等待過程,我們是不需要關心的。不過該模型是頻繁地去檢查是否就緒,造成了 CPU 無效的處理,反而效果不好。如果有一種類似的好萊塢原則— “不要給我們打電話,我們會打給你” 。這樣一個線程可以同時發起多個 I/O 調用,并且不需要同步等待數據就緒。在數據就緒完成的時候,會以事件的機制,來通知我們。這樣不就實現了單線程同時處理多個 IO 調用的問題了嗎?即所說的“I/O 多路復用模型”。
廢話講了一大堆,下面就來實際操刀一下。
創建選擇器由上面分析可以,我們得有一個選擇器,它能監聽所有的 I/O 操作,并且以事件的方式通知我們哪些 I/O 已經就緒了。
代碼如下:
import java.nio.channels.Selector; ... private static Selector selector; static { try { selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } }創建非阻塞 I/O
下面,我們來創建一個非阻塞的 SocketChannel,代碼與阻塞實現類型,唯一不同是socketChannel.configureBlocking(false)。
注意:只有在socketChannel.configureBlocking(false)之后的代碼,才是非阻塞的,如果socketChannel.connect()在設置非阻塞模式之前,那么連接操作依舊是阻塞調用的。
SocketChannel socketChannel = SocketChannel.open(); SocketAddress remote = new InetSocketAddress(host, port); // 設置非阻塞模式 socketChannel.configureBlocking(false); socketChannel.connect(remote);建立選擇器與 socket 的關聯
選擇器與 socket 都創建好了,下一步就是將兩者進行關聯,好讓選擇器和監聽到 Socket 的變化。這里采用了以 SocketChannel 主動注冊到選擇器的方式進行關聯綁定,這也就解釋了,為什么不直接new Socket(),而是以SocketChannel的方式來創建 socket。
代碼如下:
socketChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE);
上面代碼,我們將 socketChannel 注冊到了選擇器中,并且對它的連接、可讀、可寫事件進行了監聽。
具體的事件監聽類型如下:
操作類型 | 值 | 描述 | 所屬對象 |
---|---|---|---|
OP_READ | 1 << 0 | 讀操作 | SocketChannel |
OP_WRITE | 1 << 2 | 寫操作 | SocketChannel |
OP_CONNECT | 1 << 3 | 連接socket操作 | SocketChannel |
OP_ACCEPT | 1 << 4 | 接受socket操作 | ServerSocketChannel |
現在,選擇器已經與我們關心的 socket 進行了關聯。下面就是感知事件的變化,然后調用相應的處理機制。
這里與 Linux 下的 selector 有點不同,nio 下的 selecotr 不會去遍歷所有關聯的 socket。我們在注冊時設置了我們關心的事件類型,每次從選擇器中獲取的,只會是那些符合事件類型,并且完成就緒操作的 socket,減少了大量無效的遍歷操作。
public void select() throws IOException { // 獲取就緒的 socket 個數 while (selector.select() > 0){ // 獲取符合的 socket 在選擇器中對應的事件句柄 key Set keys = selector.selectedKeys(); // 遍歷所有的key Iterator it = keys.iterator(); while (it.hasNext()){ // 獲取對應的 key,并從已選擇的集合中移除 SelectionKey key = (SelectionKey)it.next(); it.remove(); if (key.isConnectable()){ // 進行連接操作 connect(key); } else if (key.isWritable()){ // 進行寫操作 write(key); } else if (key.isReadable()){ // 進行讀操作 receive(key); } } } }
注意:這里的selector.select()是同步阻塞的,等待有事件發生后,才會被喚醒。這就防止了 CPU 空轉的產生。當然,我們也可以給它設置超時時間,selector.select(long timeout)來結束阻塞過程。
處理連接就緒事件下面,我們分別來看下,一個 socket 是如何來處理連接、寫入數據和讀取數據的(這些操作都是阻塞的過程,只是我們將等待就緒的過程變成了非阻塞的了)。
處理連接代碼:
// SelectionKey 代表 SocketChannel 在選擇器中注冊的事件句柄 private void connect(SelectionKey key) throws IOException { // 獲取事件句柄對應的 SocketChannel SocketChannel channel = (SocketChannel) key.channel(); // 真正的完成 socket 連接 channel.finishConnect(); // 打印連接信息 InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress(); String host = remote.getHostName(); int port = remote.getPort(); System.out.println(String.format("訪問地址: %s:%s 連接成功!", host, port)); }處理寫入就緒事件
// 字符集處理類 private Charset charset = Charset.forName("utf8"); private void write(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress(); String host = remote.getHostName(); // 獲取 HTTP 請求,同上一篇 String request = HttpUtil.compositeRequest(host); // 向 SocketChannel 寫入事件 channel.write(charset.encode(request)); // 修改 SocketChannel 所關心的事件 key.interestOps(SelectionKey.OP_READ); }
這里有兩個地方需要注意:
第一個是使用 channel.write(charset.encode(request)); 進行數據寫入。有人會說,為什么不能像上面同步阻塞那樣,通過PrintWriter包裝類進行操作。因為PrintWriter的 write() 方法是阻塞的,也就是說要等數據真正從 socket 發送出去后才返回。
這與我們這里所講的阻塞是不一致的,這里的操作雖然也是阻塞的,但它發生的過程是在數據從用戶空間到內核緩沖區拷貝過程。至于系統將緩沖區的數據通過 socket 發送出去,這不在阻塞范圍內。也解釋了為什么要用 Charset 對寫入內容進行編碼了,因為緩沖區接收的格式是ByteBuffer。
第二,選擇器用來監聽事件變化的兩個參數是 interestOps 與 readyOps。
interestOps:表示 SocketChannel 所關心的事件類型,也就是告訴選擇器,當有這幾種事件發生時,才來通知我。這里通過key.interestOps(SelectionKey.OP_READ);告訴選擇器,之后我只關心“讀就緒”事件,其他的不用通知我了。
readyOps:表示 SocketChannel 當前就緒的事件類型。以key.isReadable()為例,判斷依據就是:return (readyOps() & OP_READ) != 0;
處理讀取就緒事件private void receive(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer); buffer.flip(); String receiveData = charset.decode(buffer).toString(); // 當再沒有數據可讀時,取消在選擇器中的關聯,并關閉 socket 連接 if ("".equals(receiveData)) { key.cancel(); channel.close(); return; } System.out.println(receiveData); }
這里的處理基本與寫入一致,唯一要注意的是,這里我們需要自行處理去緩沖區讀取數據的操作。首先會分配一個固定大小的緩沖區,然后從內核緩沖區中,拷貝數據至我們剛分配固定緩沖區上。這里存在兩種情況:
我們分配的緩沖區過大,那多余的部分以0補充(初始化時,其實會自動補0)。
我們分配的緩沖去過小,因為選擇器會不停的遍歷。只要 SocketChannel 處理讀就緒狀態,那下一次會繼續讀取。當然,分配過小,會增加遍歷次數。
最后,將一下 ByteBuffer 的結構,它主要有 position, limit,capacity 以及 mark 屬性。以 buffer.flip(); 為例,講下各屬性的作用(mark 主要是用來標記之前 position 的位置,是在當前 postion 無法滿足的情況下使用的,這里不作討論)。
從圖中看出,
容量(capacity):表示緩沖區可以保存的數據容量;
極限(limit):表示緩沖區的當前終點,即寫入、讀取都不可超過該重點;
位置(position):表示緩沖區下一個讀寫單元的位置;
完整代碼package com.jason.network.mode.nio; import com.jason.network.constant.HttpConstant; import com.jason.network.util.HttpUtil; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Set; public class NioNonBlockingHttpClient { private static Selector selector; private Charset charset = Charset.forName("utf8"); static { try { selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException { NioNonBlockingHttpClient client = new NioNonBlockingHttpClient(); for (String host: HttpConstant.HOSTS) { client.request(host, HttpConstant.PORT); } client.select(); } public void request(String host, int port) throws IOException { SocketChannel socketChannel = SocketChannel.open(); socketChannel.socket().setSoTimeout(5000); SocketAddress remote = new InetSocketAddress(host, port); socketChannel.configureBlocking(false); socketChannel.connect(remote); socketChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE); } public void select() throws IOException { while (selector.select(500) > 0){ Set keys = selector.selectedKeys(); Iterator it = keys.iterator(); while (it.hasNext()){ SelectionKey key = (SelectionKey)it.next(); it.remove(); if (key.isConnectable()){ connect(key); } else if (key.isWritable()){ write(key); } else if (key.isReadable()){ receive(key); } } } } private void connect(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); channel.finishConnect(); InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress(); String host = remote.getHostName(); int port = remote.getPort(); System.out.println(String.format("訪問地址: %s:%s 連接成功!", host, port)); } private void write(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress(); String host = remote.getHostName(); String request = HttpUtil.compositeRequest(host); System.out.println(request); channel.write(charset.encode(request)); key.interestOps(SelectionKey.OP_READ); } private void receive(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer); buffer.flip(); String receiveData = charset.decode(buffer).toString(); if ("".equals(receiveData)) { key.cancel(); channel.close(); return; } System.out.println(receiveData); } }示例效果 總結
本文從 nio 的阻塞方式講起,介紹了阻塞 I/O 與非阻塞 I/O 的區別,以及在 nio 下是如何一步步構建一個 IO 多路復用的模型的客戶端。文中需要理解的內容比較多,如果有理解錯誤的地方,歡迎指正~
后續Netty 下的異步請求實現
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/70210.html
摘要:阻塞請求結果返回之前,當前線程被掛起。也就是說在異步中,不會對用戶線程產生任何阻塞。當前線程在拿到此次請求結果的過程中,可以做其它事情。事實上,可以只用一個線程處理所有的通道。 準備知識 同步、異步、阻塞、非阻塞 同步和異步說的是服務端消息的通知機制,阻塞和非阻塞說的是客戶端線程的狀態。已客戶端一次網絡請求為例做簡單說明: 同步同步是指一次請求沒有得到結果之前就不返回。 異步請求不會...
摘要:阻塞當進行讀寫時,線程是阻塞的狀態。當任何一個收到數據后,中斷程序將喚起進程。接收數據當收到數據后,中斷程序會給的就緒列表添加引用。當接收到數據,中斷程序一方面修改,另一方面喚醒等待隊列中的進程,進程再次進入運行狀態如下圖。 本篇文章目的在于基本概念和原理的解釋,不會貼過多的使用代碼。 什么是NIO Java NIO (New IO)是 Java 的另一個 IO API (來自 jav...
摘要:三同步非阻塞式以塊的方式處理數據面向緩存區的采用多路復用模式基于事件驅動是實現了的一個流行框架,的。阿里云分布式文件系統里用的就是。四異步非阻塞式基于事件驅動,不需要多路復用器對注冊通道進行輪詢,采用設計模式。 一、什么是IO IO 輸入、輸出 (read write accept)IO是面向流的 二、BIO BIO是同步阻塞式IO 服務端與客戶端進行三次握手后一個鏈路建立一個線程面...
摘要:三同步非阻塞式以塊的方式處理數據面向緩存區的采用多路復用模式基于事件驅動是實現了的一個流行框架,的。阿里云分布式文件系統里用的就是。四異步非阻塞式基于事件驅動,不需要多路復用器對注冊通道進行輪詢,采用設計模式。 一、什么是IO IO 輸入、輸出 (read write accept)IO是面向流的 二、BIO BIO是同步阻塞式IO 服務端與客戶端進行三次握手后一個鏈路建立一個線程面...
閱讀 2325·2021-11-23 10:09
閱讀 2894·2021-10-12 10:11
閱讀 2601·2021-09-29 09:35
閱讀 1343·2019-08-30 15:53
閱讀 2269·2019-08-30 11:15
閱讀 2915·2019-08-29 13:01
閱讀 2298·2019-08-28 18:15
閱讀 3369·2019-08-26 12:13