摘要:采用通信模型的服務(wù)端通常由一個(gè)獨(dú)立的線程負(fù)責(zé)監(jiān)聽客戶端的連接它接收到客戶端連接請(qǐng)求之后為每個(gè)客戶端創(chuàng)建一個(gè)新的線程進(jìn)行鏈路處理處理完成之后通過(guò)輸出流返回應(yīng)答給客戶端線程銷毀這就是典型的一請(qǐng)求一應(yīng)答通信模型該模型最大的問(wèn)題就是缺乏彈性伸縮能力
BIO
采用 BIO 通信模型的服務(wù)端, 通常由一個(gè)獨(dú)立的 Acceptor 線程負(fù)責(zé)監(jiān)聽客戶端的連接, 它接收到客戶端連接請(qǐng)求之后為每個(gè)客戶端創(chuàng)建一個(gè)新的線程進(jìn)行鏈路處理, 處理完成之后, 通過(guò)輸出流返回應(yīng)答給客戶端, 線程銷毀. 這就是典型的一請(qǐng)求一應(yīng)答通信模型.
該模型最大的問(wèn)題就是缺乏彈性伸縮能力, 當(dāng)客戶端并發(fā)訪問(wèn)量增加后, 服務(wù)端的線程個(gè)數(shù)和客戶端并發(fā)訪問(wèn)數(shù)是 1:1 的關(guān)系.
當(dāng)線程數(shù)過(guò)多之后, 系統(tǒng)性能就會(huì)下降, 系統(tǒng)也會(huì)發(fā)生線程堆棧溢出、創(chuàng)建新線程失敗等問(wèn)題, 最終導(dǎo)致進(jìn)程宕機(jī)或者僵死, 不能對(duì)外提供服務(wù).
BIO 通信模型圖 偽異步 IO后端通過(guò)維護(hù)一個(gè)消息隊(duì)列和 N 個(gè)活躍線程, 來(lái)處理多個(gè)客戶端的請(qǐng)求接入, 當(dāng)有新的客戶端接入時(shí), 將客戶端的 Socket 封裝成一個(gè) Task (java.lang.Runnable 接口) 放入后端線的線程池進(jìn)行處理.
由于線程池可以設(shè)置消息隊(duì)列的大小和最大線程數(shù), 因此它的資源占用是可控的, 無(wú)論多少個(gè)客戶端并發(fā)訪問(wèn), 都不會(huì)導(dǎo)致資源耗盡和宕機(jī).
客戶端個(gè)數(shù) M, 線程池最大線程數(shù) N 的比例關(guān)系, 其中 M 可以遠(yuǎn)遠(yuǎn)大于 N.
注意: 當(dāng)對(duì) Socket 的輸入流進(jìn)行讀取操作的時(shí)候,它會(huì)一直阻塞轄區(qū), 直到發(fā)生如下三種事件:
有數(shù)據(jù)可讀.
可用數(shù)據(jù)已經(jīng)讀取完畢.
發(fā)生空指針或IO異常.
偽異步 IO 模型圖 弊端當(dāng)對(duì)方發(fā)送請(qǐng)求或應(yīng)答消息比較緩慢, 或者網(wǎng)絡(luò)傳輸比較慢時(shí), 讀取輸入流一方的通信線程將被長(zhǎng)時(shí)間阻塞, 如果對(duì)方要 60s 才能將數(shù)據(jù)發(fā)送完成, 讀取一方的 IO 線程也將會(huì)被同步阻塞 60s, 在此期間, 其它接入消息只能在消息隊(duì)列中排隊(duì).
假如所有的可用線程都被故障服務(wù)器阻塞, 那后續(xù)所有的 IO 消息都將在隊(duì)列中排隊(duì).
由于線程池采用阻塞隊(duì)列實(shí)現(xiàn), 當(dāng)隊(duì)列積滿之后, 后續(xù)入隊(duì)列的操作將被阻塞.
由于前端只有一個(gè) Accptor 線程接收客戶端接入, 它被阻塞在線程池的同步阻塞隊(duì)列之后, 新的客戶端請(qǐng)求消息將被拒絕, 客戶端會(huì)發(fā)生大量的連接超時(shí).
NIO與 Socket 類和 ServerSocket 類相對(duì)應(yīng), NIO 也提供了 SocketChannel 和 ServerSocketChannel 兩種不同的套接字通道實(shí)現(xiàn). 這兩種新增的通道都支持阻塞和非阻塞兩種模式.
一般來(lái)說(shuō), 低負(fù)載、低并發(fā)的應(yīng)用程序可以選擇同步阻塞IO以降低編程復(fù)雜度; 對(duì)于高負(fù)載、高并發(fā)的網(wǎng)絡(luò)模型應(yīng)用, 需要使用NIO的非阻塞模式進(jìn)行開發(fā).
緩沖區(qū) BufferBuffer 是一個(gè)對(duì)象, 它包含一些要寫入或要讀出的數(shù)據(jù). 在NIO庫(kù)中, 所有數(shù)據(jù)都是用緩沖區(qū)處理的. 在讀取數(shù)據(jù)時(shí), 它是直接讀取到緩沖區(qū)中的; 在寫入數(shù)據(jù)時(shí), 寫入到緩沖區(qū)中. 任何時(shí)候訪問(wèn)NIO中的數(shù)據(jù), 都是通過(guò)緩沖區(qū)進(jìn)行操作的.
緩沖區(qū)實(shí)質(zhì)上是一個(gè)數(shù)組. 通常它是一個(gè)字節(jié)數(shù)組, 也可以使用其他種類的數(shù)組. 但是一個(gè)緩沖區(qū)不僅僅是一個(gè)數(shù)組, 緩沖區(qū)提供了對(duì)數(shù)據(jù)的結(jié)構(gòu)化訪問(wèn)以及維護(hù)讀寫位置等信息.
常用緩沖區(qū)是 ByteBuffer, 一個(gè) ByteBuffer 提供了一種功能用于操作 byte 數(shù)組. 除了 ByteBuffer, 還有其他的一些緩沖區(qū).
ByteBuffer: 字節(jié)緩沖區(qū)
CharBuffer: 字符緩沖區(qū)
ShortBuffer: 短整形緩沖區(qū)
IntBuffer: 整形緩沖區(qū)
LongBuffer: 長(zhǎng)整形緩沖區(qū)
FloatBuffer: 浮點(diǎn)型緩沖區(qū)
DoubleBuffer: 雙精度浮點(diǎn)型緩沖區(qū)
每一個(gè) Buffer 類都是 Buffer 接口的一個(gè)子實(shí)例. 除了 ByteBuffer, 每個(gè) Buffer 類都有完全一樣的操作, 只是它們所處理的類型不一樣.
通道 ChannelChannel 是一個(gè)通道, 網(wǎng)絡(luò)數(shù)據(jù)通過(guò) Channel 讀取和寫入. 通道與流的不同之處在于通道是雙向的, 流只是在一個(gè)方向上移動(dòng)(一個(gè)流必須是 InputStream 或者 OutputStream), 而通道可以用于讀、寫或者二者同時(shí)進(jìn)行.
Java NIO中最重要的幾個(gè)Channel的實(shí)現(xiàn):FileChannel: 用于文件的數(shù)據(jù)讀寫
DatagramChannel: 用于UDP的數(shù)據(jù)讀寫
SocketChannel: 用于TCP的數(shù)據(jù)讀寫. 一般是客戶端實(shí)現(xiàn)
ServerSocketChannel: 允許我們監(jiān)聽TCP鏈接請(qǐng)求, 每個(gè)請(qǐng)求會(huì)創(chuàng)建會(huì)一個(gè)SocketChannel. 一般是服務(wù)器實(shí)現(xiàn)
多路復(fù)用器 Selector多路復(fù)用器提供選擇已經(jīng)就緒的任務(wù)的能力. 簡(jiǎn)單來(lái)講, Selector 會(huì)不斷的輪詢注冊(cè)在其上的 Channel, 如果某個(gè) Channel 上面發(fā)生讀或?qū)懯录? 這個(gè) Channel 就處于就緒狀態(tài), 會(huì)被 Selector 輪詢出來(lái), 然后通過(guò) SelectionKey 可以獲取就緒 Channel 的集合, 進(jìn)行后續(xù)的 IO 操作.
一個(gè)多路復(fù)用器 Selector 可以同時(shí)輪詢多個(gè) Channel, 由于 JDK 使用了 epoll() 代替?zhèn)鹘y(tǒng)的 select 實(shí)現(xiàn), 所以它并沒有最大連接句柄 1024/2048 的限制. 這也意味著只需要一個(gè)線程負(fù)責(zé) Selector 的輪詢, 就可以接入成千上萬(wàn)的客戶端.
NIO 服務(wù)端序列圖 NIO創(chuàng)建的 TimeServer 源碼分析public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; /** * 初始化多路復(fù)用器、綁定監(jiān)聽端口 * * @param port */ public MultiplexerTimeServer(int port) { try { // 創(chuàng)建多路復(fù)用器 selector = Selector.open(); // 打開 ServerSocketChannel 用來(lái)監(jiān)聽客戶端的連接, 它是所有客戶端連接的父管道. servChannel = ServerSocketChannel.open(); // 設(shè)置 ServerSocketChannel 為異步非阻塞模式 servChannel.configureBlocking(false); // 綁定地址和端口 servChannel.socket().bind(new InetSocketAddress(port), 1024); // 將 ServerSocketChannel 注冊(cè)到 Reactor 線程的多路復(fù)用器 Selector 上, 監(jiān)聽 ACCEPT 事件 servChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop() { this.stop = true; } /* * (non-Javadoc) * * @see java.lang.Runnable#run() */ @Override public void run() { while (!stop) { try { selector.select(1000); SetselectedKeys = selector.selectedKeys(); Iterator it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (Throwable t) { t.printStackTrace(); } } // 多路復(fù)用器關(guān)閉后,所有注冊(cè)在上面的Channel和Pipe等資源都會(huì)被自動(dòng)去注冊(cè)并關(guān)閉,所以不需要重復(fù)釋放資源 if (selector != null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { // 處理新接入的請(qǐng)求消息 if (key.isAcceptable()) { // Accept the new connection ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); // Add the new connection to the selector sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { // Read the data SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The time server receive order : " + body); String currentTime = "QUERY TIME ORDER" .equalsIgnoreCase(body) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc, currentTime); } else if (readBytes < 0) { // 對(duì)端鏈路關(guān)閉 key.cancel(); sc.close(); } else { ; // 讀到0字節(jié),忽略 } } } } private void doWrite(SocketChannel channel, String response) throws IOException { if (response != null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } } }
selector.select(1000); 休眠時(shí)間為1S, 無(wú)論是否有讀寫等事件發(fā)生, selector 每隔 1S 都被喚醒一次, selector 也提供了一個(gè)無(wú)參的 select 方法. 當(dāng)有處于就緒狀態(tài)的 Channel 時(shí), selector 將返回就緒狀態(tài)的 Channel 的 SelectionKey 集合, 我們通過(guò)對(duì)就緒狀態(tài)的 Channel 集合進(jìn)行迭代, 就可以進(jìn)行網(wǎng)絡(luò)的異步讀寫操作.
key.isAcceptable() 來(lái)判斷 當(dāng)前 SelectionKey 的通道是否已準(zhǔn)備好接受新的套接字連接(處理新接入的客戶端請(qǐng)求消息). 通過(guò) ServerSocketChannel 的 accept 接收客戶端的連接請(qǐng)求并創(chuàng)建 SocketChannel 實(shí)例, 完成上述操作后, 相當(dāng)于完成了TCP的三次握手, TCP物理鏈路正式建立. 注意,我們需要將新創(chuàng)建的 SocketChannel 設(shè)置為異步非阻塞, 同時(shí)也可以對(duì)其TCP參數(shù)進(jìn)行設(shè)置, 例如TCP接收和發(fā)送緩沖區(qū)的大小等.
根據(jù) SelectionKey 的操作位進(jìn)行判斷即可獲知網(wǎng)絡(luò)事件的類型, 如 isAcceptable() 表示為 OP_ACCEPT
key.isAcceptable() 來(lái)判斷 當(dāng)前 SelectionKey 的通道是否已準(zhǔn)備好進(jìn)行讀取(讀取客戶端的請(qǐng)求消息). 首先創(chuàng)建一個(gè) ByteBuffer, 由于我們事先無(wú)法得知客戶端發(fā)送的碼流大小, 作為例程, 我們開辟一個(gè)1M的緩沖區(qū). 然后調(diào)用 SocketChannel 的 read 方法讀取請(qǐng)求碼流, 注意, 由于我們已經(jīng)將 SocketChannel 設(shè)置為異步非阻塞模式, 因此它的 read 是非阻塞的. 使用返回值進(jìn)行判斷, 看讀取到的字節(jié)數(shù), 返回值有三種可能的結(jié)果:
返回值大于0: 讀到了字節(jié), 對(duì)字節(jié)進(jìn)行編解碼;
返回值等于0: 沒有讀取到字節(jié), 屬于正常場(chǎng)景, 忽略;
返回值為-1: 鏈路已經(jīng)關(guān)閉, 需要關(guān)閉SocketChannel, 釋放資源.
當(dāng)讀取到碼流以后, 我們進(jìn)行解碼, 首先對(duì) readBuffer 進(jìn)行 flip 操作, 它的作用是將緩沖區(qū)當(dāng)前的limit 設(shè)置為 position, position 設(shè)置為0, 用于后續(xù)對(duì)緩沖區(qū)的讀取操作.
然后根據(jù)緩沖區(qū)可讀的字節(jié)個(gè)數(shù)創(chuàng)建字節(jié)數(shù)組, 調(diào)用 ByteBuffer 的 get 操作將緩沖區(qū)可讀的字節(jié)數(shù)組拷貝到新創(chuàng)建的字節(jié)數(shù)組中, 最后調(diào)用字符串的構(gòu)造函數(shù)創(chuàng)建請(qǐng)求消息體并打印. 如果請(qǐng)求指令是 ”QUERY TIME ORDER” 則把服務(wù)器的當(dāng)前時(shí)間編碼后返回給客戶端, 下面我們看看如果異步發(fā)送應(yīng)答消息給客戶端.
doWrite 方法將消息異步發(fā)送給客戶端, 首先將字符串編碼成字節(jié)數(shù)組, 根據(jù)字節(jié)數(shù)組的長(zhǎng)度創(chuàng)建 ByteBuffer, 調(diào)用 ByteBuffer 的 put 操作將字節(jié)數(shù)組拷貝到緩沖區(qū)中, 然后對(duì)緩沖區(qū)進(jìn)行flip操作, 最后調(diào)用 SocketChannel 的 write 方法將緩沖區(qū)中的字節(jié)數(shù)組發(fā)送出去.
需要指出的是, 由于 SocketChannel 是異步非阻塞的, 它并不保證一次能夠把需要發(fā)送的字節(jié)數(shù)組發(fā)送完, 此時(shí)會(huì)出現(xiàn)“寫半包”問(wèn)題, 我們需要注冊(cè)寫操作, 不斷輪詢 Selector 將沒有發(fā)送完的 ByteBuffer 發(fā)送完畢, 可以通過(guò) ByteBuffer 的 hasRemain() 方法判斷消息是否發(fā)送完成.
AIO 編程NIO 2.0 引入了新的異步通道的概念, 并提供了異步文件通道和異步套接字通道的實(shí)現(xiàn). 異步通道提供以下兩種方式獲取操作結(jié)果.
通過(guò) java.util.concurrent.Future 類來(lái)表示異步操作的結(jié)果.
在執(zhí)行異步操作的時(shí)候傳入一個(gè) java.nio.channels
CompletionHandler 接口的實(shí)現(xiàn)類作為操作完成的回調(diào).
NIO 2.0 的異步套接字通道是真正的異步非阻塞 IO , 對(duì)應(yīng)與 UNIX 網(wǎng)絡(luò)編程中的事件驅(qū)動(dòng) IO. 它不需要通過(guò)多路復(fù)用器對(duì)注冊(cè)的通道進(jìn)行輪詢操作即可實(shí)現(xiàn)異步讀寫, 從而簡(jiǎn)化了 NIO 的編程模型.
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/72708.html
摘要:后改良為用線程池的方式代替新增線程,被稱為偽異步。最大的問(wèn)題是阻塞,同步。每次請(qǐng)求都由程序執(zhí)行并返回,這是同步的缺陷。這些都會(huì)被注冊(cè)在多路復(fù)用器上。多路復(fù)用器提供選擇已經(jīng)就緒狀態(tài)任務(wù)的能力。并沒有采用的多路復(fù)用器,而是使用異步通道的概念。 Netty是一個(gè)提供異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用框架,用以快速開發(fā)高性能、高可靠的網(wǎng)絡(luò)服務(wù)器和客戶端程序。Netty簡(jiǎn)化了網(wǎng)絡(luò)程序的開發(fā),是很多框架和公司...
摘要:后改良為用線程池的方式代替新增線程,被稱為偽異步。最大的問(wèn)題是阻塞,同步。每次請(qǐng)求都由程序執(zhí)行并返回,這是同步的缺陷。這些都會(huì)被注冊(cè)在多路復(fù)用器上。多路復(fù)用器提供選擇已經(jīng)就緒狀態(tài)任務(wù)的能力。并沒有采用的多路復(fù)用器,而是使用異步通道的概念。 Netty是一個(gè)提供異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用框架,用以快速開發(fā)高性能、高可靠的網(wǎng)絡(luò)服務(wù)器和客戶端程序。Netty簡(jiǎn)化了網(wǎng)絡(luò)程序的開發(fā),是很多框架和公司...
摘要:即可以理解為,方法都是異步的,完成后會(huì)主動(dòng)調(diào)用回調(diào)函數(shù)。主要在包下增加了下面四個(gè)異步通道其中的方法,會(huì)返回一個(gè)帶回調(diào)函數(shù)的對(duì)象,當(dāng)執(zhí)行完讀取寫入操作后,直接調(diào)用回調(diào)函數(shù)。 本文原創(chuàng)地址,我的博客:jsbintask.cn/2019/04/16/…(食用效果最佳),轉(zhuǎn)載請(qǐng)注明出處! 在理解什么是BIO,NIO,AIO之前,我們首先需要了解什么是同步,異步,阻塞,非阻塞。假如我們現(xiàn)在要去銀行取...
時(shí)間:2018年04月11日星期三 說(shuō)明:本文部分內(nèi)容均來(lái)自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com 教學(xué)源碼:https://github.com/zccodere/s... 學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:課程介紹 1-1 課程介紹 什么是Netty 高性能、事件驅(qū)動(dòng)、異步非阻塞的IO Java開源框架 基于NIO的客戶...
摘要:三同步非阻塞式以塊的方式處理數(shù)據(jù)面向緩存區(qū)的采用多路復(fù)用模式基于事件驅(qū)動(dòng)是實(shí)現(xiàn)了的一個(gè)流行框架,的。阿里云分布式文件系統(tǒng)里用的就是。四異步非阻塞式基于事件驅(qū)動(dòng),不需要多路復(fù)用器對(duì)注冊(cè)通道進(jìn)行輪詢,采用設(shè)計(jì)模式。 一、什么是IO IO 輸入、輸出 (read write accept)IO是面向流的 二、BIO BIO是同步阻塞式IO 服務(wù)端與客戶端進(jìn)行三次握手后一個(gè)鏈路建立一個(gè)線程面...
閱讀 2408·2021-11-23 09:51
閱讀 1219·2021-11-22 13:54
閱讀 3428·2021-09-24 10:31
閱讀 1095·2021-08-16 10:46
閱讀 3629·2019-08-30 15:54
閱讀 710·2019-08-30 15:54
閱讀 2892·2019-08-29 17:17
閱讀 3163·2019-08-29 15:08