摘要:的出現(xiàn)解決了這尷尬的問(wèn)題,非阻塞模式下,通過(guò),我們的線程只為已就緒的通道工作,不用盲目的重試了。注意要將注冊(cè)到,首先需要將設(shè)置為非阻塞模式,否則會(huì)拋異常。
背景知識(shí) 同步、異步、阻塞、非阻塞
首先,這幾個(gè)概念非常容易搞混淆,但NIO中又有涉及,所以總結(jié)一下。
同步:API調(diào)用返回時(shí)調(diào)用者就知道操作的結(jié)果如何了(實(shí)際讀取/寫入了多少字節(jié))。
異步:相對(duì)于同步,API調(diào)用返回時(shí)調(diào)用者不知道操作的結(jié)果,后面才會(huì)回調(diào)通知結(jié)果。
阻塞:當(dāng)無(wú)數(shù)據(jù)可讀,或者不能寫入所有數(shù)據(jù)時(shí),掛起當(dāng)前線程等待。
非阻塞:讀取時(shí),可以讀多少數(shù)據(jù)就讀多少然后返回,寫入時(shí),可以寫入多少數(shù)據(jù)就寫入多少然后返回。
對(duì)于I/O操作,根據(jù)Oracle官網(wǎng)的文檔,同步異步的劃分標(biāo)準(zhǔn)是“調(diào)用者是否需要等待I/O操作完成”,這個(gè)“等待I/O操作完成”的意思不是指一定要讀取到數(shù)據(jù)或者說(shuō)寫入所有數(shù)據(jù),而是指真正進(jìn)行I/O操作時(shí),比如數(shù)據(jù)在TCP/IP協(xié)議棧緩沖區(qū)和JVM緩沖區(qū)之間傳輸?shù)倪@段時(shí)間,調(diào)用者是否要等待。
所以,我們常用的 read() 和 write() 方法都是同步I/O,同步I/O又分為阻塞和非阻塞兩種模式,如果是非阻塞模式,檢測(cè)到無(wú)數(shù)據(jù)可讀時(shí),直接就返回了,并沒(méi)有真正執(zhí)行I/O操作。
總結(jié)就是,Java中實(shí)際上只有 同步阻塞I/O、同步非阻塞I/O 與 異步I/O 三種機(jī)制,我們下文所說(shuō)的是前兩種,JDK 1.7才開始引入異步 I/O,那稱之為NIO.2。
傳統(tǒng)IO我們知道,一個(gè)新技術(shù)的出現(xiàn)總是伴隨著改進(jìn)和提升,Java NIO的出現(xiàn)亦如此。
傳統(tǒng) I/O 是阻塞式I/O,主要問(wèn)題是系統(tǒng)資源的浪費(fèi)。比如我們?yōu)榱俗x取一個(gè)TCP連接的數(shù)據(jù),調(diào)用 InputStream 的 read() 方法,這會(huì)使當(dāng)前線程被掛起,直到有數(shù)據(jù)到達(dá)才被喚醒,那該線程在數(shù)據(jù)到達(dá)這段時(shí)間內(nèi),占用著內(nèi)存資源(存儲(chǔ)線程棧)卻無(wú)所作為,也就是俗話說(shuō)的占著茅坑不拉屎,為了讀取其他連接的數(shù)據(jù),我們不得不啟動(dòng)另外的線程。在并發(fā)連接數(shù)量不多的時(shí)候,這可能沒(méi)什么問(wèn)題,然而當(dāng)連接數(shù)量達(dá)到一定規(guī)模,內(nèi)存資源會(huì)被大量線程消耗殆盡。另一方面,線程切換需要更改處理器的狀態(tài),比如程序計(jì)數(shù)器、寄存器的值,因此非常頻繁的在大量線程之間切換,同樣是一種資源浪費(fèi)。
隨著技術(shù)的發(fā)展,現(xiàn)代操作系統(tǒng)提供了新的I/O機(jī)制,可以避免這種資源浪費(fèi)。基于此,誕生了Java NIO,NIO的代表性特征就是非阻塞I/O。緊接著我們發(fā)現(xiàn),簡(jiǎn)單的使用非阻塞I/O并不能解決問(wèn)題,因?yàn)樵诜亲枞J较?,read()方法在沒(méi)有讀取到數(shù)據(jù)時(shí)就會(huì)立即返回,不知道數(shù)據(jù)何時(shí)到達(dá)的我們,只能不停的調(diào)用read()方法進(jìn)行重試,這顯然太浪費(fèi)CPU資源了,從下文可以知道,Selector組件正是為解決此問(wèn)題而生。
Java NIO 核心組件 1.Channel概念
Java NIO中的所有I/O操作都基于Channel對(duì)象,就像流操作都要基于Stream對(duì)象一樣,因此很有必要先了解Channel是什么。以下內(nèi)容摘自JDK 1.8的文檔
A channel represents an open connection to an entity such as a hardware device, a file, a network socket, or a program component that is capable of performing one or more distinct I/O operations, for example reading or writing.
從上述內(nèi)容可知,一個(gè)Channel(通道)代表和某一實(shí)體的連接,這個(gè)實(shí)體可以是文件、網(wǎng)絡(luò)套接字等。也就是說(shuō),通道是Java NIO提供的一座橋梁,用于我們的程序和操作系統(tǒng)底層I/O服務(wù)進(jìn)行交互。
通道是一種很基本很抽象的描述,和不同的I/O服務(wù)交互,執(zhí)行不同的I/O操作,實(shí)現(xiàn)不一樣,因此具體的有FileChannel、SocketChannel等。加群895244712,免費(fèi)獲取Java架構(gòu)師進(jìn)階學(xué)習(xí)資料
通道使用起來(lái)跟Stream比較像,可以讀取數(shù)據(jù)到Buffer中,也可以把Buffer中的數(shù)據(jù)寫入通道。
當(dāng)然,也有區(qū)別,主要體現(xiàn)在如下兩點(diǎn):
一個(gè)通道,既可以讀又可以寫,而一個(gè)Stream是單向的(所以分 InputStream 和 OutputStream)
通道有非阻塞I/O模式
實(shí)現(xiàn)
Java NIO中最常用的通道實(shí)現(xiàn)是如下幾個(gè),可以看出跟傳統(tǒng)的 I/O 操作類是一一對(duì)應(yīng)的。
FileChannel:讀寫文件
DatagramChannel: UDP協(xié)議網(wǎng)絡(luò)通信
SocketChannel:TCP協(xié)議網(wǎng)絡(luò)通信
ServerSocketChannel:監(jiān)聽TCP連接
2.BufferNIO中所使用的緩沖區(qū)不是一個(gè)簡(jiǎn)單的byte數(shù)組,而是封裝過(guò)的Buffer類,通過(guò)它提供的API,我們可以靈活的操縱數(shù)據(jù),下面細(xì)細(xì)道來(lái)。
與Java基本類型相對(duì)應(yīng),NIO提供了多種 Buffer 類型,如ByteBuffer、CharBuffer、IntBuffer等,區(qū)別就是讀寫緩沖區(qū)時(shí)的單位長(zhǎng)度不一樣(以對(duì)應(yīng)類型的變量為單位進(jìn)行讀寫)。
Buffer中有3個(gè)很重要的變量,它們是理解Buffer工作機(jī)制的關(guān)鍵,分別是
capacity (總?cè)萘浚?/p>
position (指針當(dāng)前位置)
limit (讀/寫邊界位置)
Buffer的工作方式跟C語(yǔ)言里的字符數(shù)組非常的像,類比一下,capacity就是數(shù)組的總長(zhǎng)度,position就是我們讀/寫字符的下標(biāo)變量,limit就是結(jié)束符的位置。Buffer初始時(shí)3個(gè)變量的情況如下圖
在對(duì)Buffer進(jìn)行讀/寫的過(guò)程中,position會(huì)往后移動(dòng),而 limit 就是 position 移動(dòng)的邊界。由此不難想象,在對(duì)Buffer進(jìn)行寫入操作時(shí),limit應(yīng)當(dāng)設(shè)置為capacity的大小,而對(duì)Buffer進(jìn)行讀取操作時(shí),limit應(yīng)當(dāng)設(shè)置為數(shù)據(jù)的實(shí)際結(jié)束位置。(注意:將Buffer數(shù)據(jù)?寫入?通道是Buffer?讀取?操作,從通道?讀取?數(shù)據(jù)到Buffer是Buffer?寫入?操作)
在對(duì)Buffer進(jìn)行讀/寫操作前,我們可以調(diào)用Buffer類提供的一些輔助方法來(lái)正確設(shè)置 position 和 limit 的值,主要有如下幾個(gè)
flip(): 設(shè)置 limit 為 position 的值,然后 position 置為0。對(duì)Buffer進(jìn)行讀取操作前調(diào)用。
rewind(): 僅僅將 position 置0。一般是在重新讀取Buffer數(shù)據(jù)前調(diào)用,比如要讀取同一個(gè)Buffer的數(shù)據(jù)寫入多個(gè)通道時(shí)會(huì)用到。
clear(): 回到初始狀態(tài),即 limit 等于 capacity,position 置0。重新對(duì)Buffer進(jìn)行寫入操作前調(diào)用。
compact(): 將未讀取完的數(shù)據(jù)(position 與 limit 之間的數(shù)據(jù))移動(dòng)到緩沖區(qū)開頭,并將 position 設(shè)置為這段數(shù)據(jù)末尾的下一個(gè)位置。其實(shí)就等價(jià)于重新向緩沖區(qū)中寫入了這么一段數(shù)據(jù)。
然后,看一個(gè)實(shí)例,使用 FileChannel 讀寫文本文件,通過(guò)這個(gè)例子驗(yàn)證通道可讀可寫的特性以及Buffer的基本用法(注意 FileChannel 不能設(shè)置為非阻塞模式)。
FileChannel channel = new RandomAccessFile("test.txt", "rw").getChannel(); channel.position(channel.size()); // 移動(dòng)文件指針到末尾(追加寫入) ByteBuffer byteBuffer = ByteBuffer.allocate(20); // 數(shù)據(jù)寫入Buffer byteBuffer.put("你好,世界! ".getBytes(StandardCharsets.UTF_8)); // Buffer -> Channel byteBuffer.flip(); while (byteBuffer.hasRemaining()) { channel.write(byteBuffer); } channel.position(0); // 移動(dòng)文件指針到開頭(從頭讀?。? CharBuffer charBuffer = CharBuffer.allocate(10); CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder(); // 讀出所有數(shù)據(jù) byteBuffer.clear(); while (channel.read(byteBuffer) != -1 || byteBuffer.position() > 0) { byteBuffer.flip(); // 使用UTF-8解碼器解碼 charBuffer.clear(); decoder.decode(byteBuffer, charBuffer, false); System.out.print(charBuffer.flip().toString()); byteBuffer.compact(); // 數(shù)據(jù)可能有剩余 } 加群895244712,免費(fèi)獲取Java架構(gòu)師進(jìn)階學(xué)習(xí)資料 channel.close();
這個(gè)例子中使用了兩個(gè)Buffer,其中 byteBuffer 作為通道讀寫的數(shù)據(jù)緩沖區(qū),charBuffer 用于存儲(chǔ)解碼后的字符。clear() 和 flip() 的用法正如上文所述,需要注意的是最后那個(gè) compact() 方法,即使 charBuffer 的大小完全足以容納 byteBuffer 解碼后的數(shù)據(jù),這個(gè) compact() 也必不可少,這是因?yàn)槌S弥形淖址腢TF-8編碼占3個(gè)字節(jié),因此有很大概率出現(xiàn)在中間截?cái)嗟那闆r,請(qǐng)看下圖:
當(dāng) Decoder 讀取到緩沖區(qū)末尾的 0xe4 時(shí),無(wú)法將其映射到一個(gè) Unicode,decode()方法第三個(gè)參數(shù) false 的作用就是讓 Decoder 把無(wú)法映射的字節(jié)及其后面的數(shù)據(jù)都視作附加數(shù)據(jù),因此 decode() 方法會(huì)在此處停止,并且 position 會(huì)回退到 0xe4 的位置。如此一來(lái), 緩沖區(qū)中就遺留了“中”字編碼的第一個(gè)字節(jié),必須將其 compact 到前面,以正確的和后序數(shù)據(jù)拼接起來(lái)。
BTW,例子中的 CharsetDecoder 也是 Java NIO 的一個(gè)新特性,所以大家應(yīng)該發(fā)現(xiàn)了一點(diǎn)哈,NIO的操作是面向緩沖區(qū)的(傳統(tǒng)I/O是面向流的)。
至此,我們了解了 Channel 與 Buffer 的基本用法。接下來(lái)要說(shuō)的是讓一個(gè)線程管理多個(gè)Channel的重要組件。
3.SelectorSelector 是什么
Selector(選擇器)是一個(gè)特殊的組件,用于采集各個(gè)通道的狀態(tài)(或者說(shuō)事件)。我們先將通道注冊(cè)到選擇器,并設(shè)置好關(guān)心的事件,然后就可以通過(guò)調(diào)用select()方法,靜靜地等待事件發(fā)生。
通道有如下4個(gè)事件可供我們監(jiān)聽:
Accept:有可以接受的連接
Connect:連接成功
Read:有數(shù)據(jù)可讀
Write:可以寫入數(shù)據(jù)了
為什么要用Selector
前文說(shuō)了,如果用阻塞I/O,需要多線程(浪費(fèi)內(nèi)存),如果用非阻塞I/O,需要不斷重試(耗費(fèi)CPU)。Selector的出現(xiàn)解決了這尷尬的問(wèn)題,非阻塞模式下,通過(guò)Selector,我們的線程只為已就緒的通道工作,不用盲目的重試了。比如,當(dāng)所有通道都沒(méi)有數(shù)據(jù)到達(dá)時(shí),也就沒(méi)有Read事件發(fā)生,我們的線程會(huì)在select()方法處被掛起,從而讓出了CPU資源。
使用方法
如下所示,創(chuàng)建一個(gè)Selector,并注冊(cè)一個(gè)Channel。
注意:要將 Channel 注冊(cè)到 Selector,首先需要將 Channel 設(shè)置為非阻塞模式,否則會(huì)拋異常。
Selector selector = Selector.open(); channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
register()方法的第二個(gè)參數(shù)名叫“interest set”,也就是你所關(guān)心的事件集合。如果你關(guān)心多個(gè)事件,用一個(gè)“按位或運(yùn)算符”分隔,比如
SelectionKey.OP_READ | SelectionKey.OP_WRITE
這種寫法一點(diǎn)都不陌生,支持位運(yùn)算的編程語(yǔ)言里都這么玩,用一個(gè)整型變量可以標(biāo)識(shí)多種狀態(tài),它是怎么做到的呢,其實(shí)很簡(jiǎn)單,舉個(gè)例子,首先預(yù)定義一些常量,它們的值(二進(jìn)制)如下
可以發(fā)現(xiàn),它們值為1的位都是錯(cuò)開的,因此對(duì)它們進(jìn)行按位或運(yùn)算之后得出的值就沒(méi)有二義性,可以反推出是由哪些變量運(yùn)算而來(lái)。怎么判斷呢,沒(méi)錯(cuò),就是“按位與”運(yùn)算。比如,現(xiàn)在有一個(gè)狀態(tài)集合變量值為 0011,我們只需要判斷 "0011 & OP_READ" 的值是 1 還是 0 就能確定集合是否包含 OP_READ 狀態(tài)。
然后,注意 register() 方法返回了一個(gè)SelectionKey的對(duì)象,這個(gè)對(duì)象包含了本次注冊(cè)的信息,我們也可以通過(guò)它修改注冊(cè)信息。從下面完整的例子中可以看到,select()之后,我們也是通過(guò)獲取一個(gè) SelectionKey 的集合來(lái)獲取到那些狀態(tài)就緒了的通道。
一個(gè)完整實(shí)例概念和理論的東西闡述完了(其實(shí)寫到這里,我發(fā)現(xiàn)沒(méi)寫出多少東西,好尷尬(⊙?⊙)),看一個(gè)完整的例子吧。
這個(gè)例子使用Java NIO實(shí)現(xiàn)了一個(gè)單線程的服務(wù)端,功能很簡(jiǎn)單,監(jiān)聽客戶端連接,當(dāng)連接建立后,讀取客戶端的消息,并向客戶端響應(yīng)一條消息。
需要注意的是,我用字符 "0"(一個(gè)值為0的字節(jié)) 來(lái)標(biāo)識(shí)消息結(jié)束。
單線程Serverpublic class NioServer { public static void main(String[] args) throws IOException { // 創(chuàng)建一個(gè)selector Selector selector = Selector.open(); // 初始化TCP連接監(jiān)聽通道 ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.bind(new InetSocketAddress(9999)); listenChannel.configureBlocking(false); // 注冊(cè)到selector(監(jiān)聽其ACCEPT事件) listenChannel.register(selector, SelectionKey.OP_ACCEPT); // 創(chuàng)建一個(gè)緩沖區(qū) ByteBuffer buffer = ByteBuffer.allocate(100); while (true) { selector.select(); //阻塞,直到有監(jiān)聽的事件發(fā)生 IteratorkeyIter = selector.selectedKeys().iterator(); // 通過(guò)迭代器依次訪問(wèn)select出來(lái)的Channel事件 while (keyIter.hasNext()) { SelectionKey key = keyIter.next(); if (key.isAcceptable()) { // 有連接可以接受 SocketChannel channel = ((ServerSocketChannel) key.channel()).accept(); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); System.out.println("與【" + channel.getRemoteAddress() + "】建立了連接!"); } else if (key.isReadable()) { // 有數(shù)據(jù)可以讀取 buffer.clear(); // 讀取到流末尾說(shuō)明TCP連接已斷開, // 因此需要關(guān)閉通道或者取消監(jiān)聽READ事件 // 否則會(huì)無(wú)限循環(huán) if (((SocketChannel) key.channel()).read(buffer) == -1) { key.channel().close(); continue; } // 按字節(jié)遍歷數(shù)據(jù) buffer.flip(); while (buffer.hasRemaining()) { byte b = buffer.get(); if (b == 0) { // 客戶端消息末尾的