摘要:單線程模式流程服務器端的是一個線程對象,該線程會啟動事件循環,并使用選擇器來實現的多路復用。線程池分配一個線程給這個,即,將關注的事件以及對應的事件處理器注冊到線程中。多線程模式將接受客戶端的連接請求和與該客戶端的通信分在了兩個線程來完成。
Reactor模式
反應堆模式:“反應”器名字中”反應“的由來:
“反應”即“倒置”,“控制逆轉”,具體事件處理程序不調用反應器,而向反應器注冊一個事件處理器,表示自己對某些事件感興趣,有時間來了,具體事件處理程序通過事件處理器對某個指定的事件發生做出反應。
單線程Reactor模式流程:①服務器端的Reactor是一個線程對象,該線程會啟動事件循環,并使用Selector(選擇器)來實現IO的多路復用。channel注冊一個Acceptor事件處理器到Reactor中,Acceptor事件處理器所關注的事件是ACCEPT事件,這樣Reactor會監聽客戶端向服務器端發起的連接請求事件(ACCEPT事件)。
②客戶端向服務器端發起一個連接請求,Reactor監聽到了該ACCEPT事件的發生并將該ACCEPT事件派發給相應的Acceptor處理器來進行處理。Acceptor處理器通過accept()方法得到與這個客戶端對應的連接(SocketChannel),然后將該連接所關注的READ事件以及對應的READ事件處理器注冊到Reactor中,這樣一來Reactor就會監聽該連接的READ事件了。
③當Reactor監聽到有讀或者寫事件發生時,將相關的事件派發給對應的處理器進行處理。比如,讀處理器會通過SocketChannel的read()方法讀取數據,此時read()操作可以直接讀取到數據,而不會堵塞與等待可讀的數據到來。
④每當處理完所有就緒的感興趣的I/O事件后,Reactor線程會再次執行select()阻塞等待新的事件就緒并將其分派給對應處理器進行處理。
注意,Reactor的單線程模式的單線程主要是針對于I/O操作而言,也就是所有的I/O的accept()、read()、write()以及connect()操作都在一個線程上完成的。
基于單線程反應器模式手寫一個NIO通信先簡單介紹NIO中幾個重要對象:
Selector
Selector的英文含義是“選擇器”,也可以稱為為“輪詢代理器”、“事件訂閱器”、“channel容器管理機”都行。
事件訂閱和Channel管理: 應用程序將向Selector對象注冊需要它關注的Channel,以及具體的某一個Channel會對哪些IO事件感興趣。Selector中也會維護一個“已經注冊的Channel”的容器。
Channels
通道,被建立的一個應用程序和操作系統交互事件、傳遞內容的渠道(注意是連接到操作系統)。那么既然是和操作系統進行內容的傳遞,那么說明應用程序可以通過通道讀取數據,也可以通過通道向操作系統寫數據。
所有被Selector(選擇器)注冊的通道,只能是繼承了SelectableChannel類的子類。
ServerSocketChannel:應用服務器程序的監聽通道。只有通過這個通道,應用程序才能向操作系統注冊支持“多路復用IO”的端口監聽。同時支持UDP協議和TCP協議。
ScoketChannel:TCP Socket套接字的監聽通道,一個Socket套接字對應了一個客戶端IP:端口 到
服務器IP:端口的通信連接。
DatagramChannel:UDP 數據報文的監聽通道。
通道中的數據總是要先讀到一個Buffer,或者總是要從一個Buffer中寫入。
服務端處理器:
/** * 類說明:nio通信服務端處理器 */ public class NioServerHandle implements Runnable{ private Selector selector; private ServerSocketChannel serverChannel; private volatile boolean started; /** * 構造方法 * @param port 指定要監聽的端口號 */ public NioServerHandle(int port) { try { selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port)); serverChannel.register(selector,SelectionKey.OP_ACCEPT); started = true; System.out.println("服務器已啟動,端口號:"+port); } catch (IOException e) { e.printStackTrace(); } } public void stop(){ started = false; } @Override public void run() { //循環遍歷selector while(started){ try{ //阻塞,只有當至少一個注冊的事件發生的時候才會繼續. selector.select(); Setkeys = selector.selectedKeys(); Iterator it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try{ handleInput(key); }catch(Exception e){ if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } }catch(Throwable t){ t.printStackTrace(); } } //selector關閉后會自動釋放里面管理的資源 if(selector != null) try{ selector.close(); }catch (Exception e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //處理新接入的請求消息 if(key.isAcceptable()){ //獲得關心當前事件的channel ServerSocketChannel ssc = (ServerSocketChannel)key.channel(); //通過ServerSocketChannel的accept創建SocketChannel實例 //完成該操作意味著完成TCP三次握手,TCP物理鏈路正式建立 SocketChannel sc = ssc.accept(); System.out.println("======socket channel 建立連接" ); //設置為非阻塞的 sc.configureBlocking(false); //連接已經完成了,可以開始關心讀事件了 sc.register(selector,SelectionKey.OP_READ); } //讀消息 if(key.isReadable()){ System.out.println("======socket channel 數據準備完成," + "可以去讀==讀取======="); SocketChannel sc = (SocketChannel) key.channel(); //創建ByteBuffer,并開辟一個1M的緩沖區 ByteBuffer buffer = ByteBuffer.allocate(1024); //讀取請求碼流,返回讀取到的字節數 int readBytes = sc.read(buffer); //讀取到字節,對字節進行編解碼 if(readBytes>0){ //將緩沖區當前的limit設置為position=0, // 用于后續對緩沖區的讀取操作 buffer.flip(); //根據緩沖區可讀字節數創建字節數組 byte[] bytes = new byte[buffer.remaining()]; //將緩沖區可讀字節數組復制到新建的數組中 buffer.get(bytes); String message = new String(bytes,"UTF-8"); System.out.println("服務器收到消息:" + message); //處理數據 String result = response(message) ; //發送應答消息 doWrite(sc,result); } //鏈路已經關閉,釋放資源 else if(readBytes<0){ key.cancel(); sc.close(); } } } } //發送應答消息 private void doWrite(SocketChannel channel,String response) throws IOException { //將消息編碼為字節數組 byte[] bytes = response.getBytes(); //根據數組容量創建ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //將字節數組復制到緩沖區 writeBuffer.put(bytes); //flip操作 writeBuffer.flip(); //發送緩沖區的字節數組 channel.write(writeBuffer); } } public class NioServer { private static NioServerHandle nioServerHandle; public static void start(){ if(nioServerHandle !=null) nioServerHandle.stop(); nioServerHandle = new NioServerHandle(DEFAULT_PORT); new Thread(nioServerHandle,"Server").start(); } public static void main(String[] args){ start(); } }
客戶端處理器:
public class NioClientHandle implements Runnable{ private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean started; public NioClientHandle(String ip, int port) { this.host = ip; this.port = port; try { //創建選擇器 selector = Selector.open(); //打開通道 socketChannel = SocketChannel.open(); //如果為 true,則此通道將被置于阻塞模式; // 如果為 false,則此通道將被置于非阻塞模式 socketChannel.configureBlocking(false); started = true; } catch (IOException e) { e.printStackTrace(); } } public void stop(){ started = false; } @Override public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } //循環遍歷selector while(started){ try { //阻塞,只有當至少一個注冊的事件發生的時候才會繼續 selector.select(); //獲取當前有哪些事件可以使用 Setkeys = selector.selectedKeys(); //轉換為迭代器 Iterator it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try { handleInput(key); } catch (IOException e) { e.printStackTrace(); if(key!=null){ key.cancel(); if(key.channel()!=null){ key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); } } //selector關閉后會自動釋放里面管理的資源 if(selector!=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } //具體的事件處理方法 private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //獲得關心當前事件的channel SocketChannel sc = (SocketChannel)key.channel(); if(key.isConnectable()){//連接事件 if(sc.finishConnect()){} else{System.exit(1);} } //有數據可讀事件 if(key.isReadable()){ //創建ByteBuffer,并開辟一個1M的緩沖區 ByteBuffer buffer = ByteBuffer.allocate(1024); //讀取請求碼流,返回讀取到的字節數 int readBytes = sc.read(buffer); //讀取到字節,對字節進行編解碼 if(readBytes>0){ //將緩沖區當前的limit設置為position,position=0, // 用于后續對緩沖區的讀取操作 buffer.flip(); //根據緩沖區可讀字節數創建字節數組 byte[] bytes = new byte[buffer.remaining()]; //將緩沖區可讀字節數組復制到新建的數組中 buffer.get(bytes); String result = new String(bytes,"UTF-8"); System.out.println("accept message:"+result); }else if(readBytes<0){ key.cancel(); sc.close(); } } } } //發送消息 private void doWrite(SocketChannel channel,String request) throws IOException { //將消息編碼為字節數組 byte[] bytes = request.getBytes(); //根據數組容量創建ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //將字節數組復制到緩沖區 writeBuffer.put(bytes); //flip操作 writeBuffer.flip(); //發送緩沖區的字節數組 channel.write(writeBuffer); } private void doConnect() throws IOException { /*如果此通道處于非阻塞模式, 則調用此方法將啟動非阻塞連接操作。 如果立即建立連接,就像本地連接可能發生的那樣,則此方法返回true。 否則,此方法返回false, 稍后必須通過調用finishConnect方法完成連接操作。*/ if(socketChannel.connect(new InetSocketAddress(host,port))){} else{ //連接還未完成,所以注冊連接就緒事件,向selector表示關注這個事件 socketChannel.register(selector,SelectionKey.OP_CONNECT); } } //寫數據對外暴露的API public void sendMsg(String msg) throws Exception{ socketChannel.register(selector,SelectionKey.OP_READ); doWrite(socketChannel,msg); } } public class NioClient { private static NioClientHandle nioClientHandle; public static void start(){ if(nioClientHandle !=null) nioClientHandle.stop(); nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,DEFAULT_PORT); new Thread(nioClientHandle,"Client").start(); } //向服務器發送消息 public static boolean sendMsg(String msg) throws Exception{ nioClientHandle.sendMsg(msg); return true; } public static void main(String[] args) throws Exception { start(); System.out.println("請輸入請求信息:"); Scanner scanner = new Scanner(System.in); while(NioClient.sendMsg(scanner.next())); } }
服務端過程:
啟動服務端,完成一些初始化工作,ServerSocketChannel綁定端口并且注冊接受連接事件.
循環里selector.select()阻塞,只有當至少一個注冊的事件發生的時候才會繼續,循環里面處理發生的注冊事件
注冊事件發生時交給處理器,若為接受連接則accept取出socketChannel并完成連接,然后就是關注read讀取事件即注冊,有數據讀取了則處理器讀取請求數據并返回.
客戶端過程:
啟動客戶端,完成一些初始化工作.
根據服務端ip及端口發起連接.
往服務端發送數據,并注冊read讀取事件
循環里selector.select()阻塞,只有當至少一個注冊的事件發生的時候才會繼續,循環里面處理發生的注冊事件.
注冊事件發生時交給處理器,若為連接事件并且連接成功則跳過即不予處理等待讀取事件發送.
初始化工作如打開selector,channel,設置通道模式是否阻塞.
單線程Reactor,工作者線程池但在單線程Reactor模式中,不僅I/O操作在該Reactor線程上,連非I/O的業務操作也在該線程上進行處理了,這可能會大大延遲I/O請求的響應。所以我們應該將非I/O的業務邏輯操作從Reactor線程上卸載,以此來加速Reactor線程對I/O請求的響應.
添加了一個工作者線程池,并將非I/O操作從Reactor線程中移出轉交給工作者線程池來執行。這樣能夠提高Reactor線程的I/O響應,不至于因為一些耗時的業務邏輯而延遲對后面I/O請求的處理。
改進的版本中,所以的I/O操作依舊由一個Reactor來完成,包括I/O的accept()、read()、write()以及connect()操作。
對于一些小容量應用場景,可以使用單線程模型。但是對于高負載、大并發或大數據量的應用場景卻不合適,主要原因如下:
① 一個NIO線程同時處理成百上千的鏈路,性能上無法支撐,即便NIO線程的CPU負荷達到100%,也無法滿足海量消息的讀取和發送;
②當NIO線程負載過重之后,處理速度將變慢,這會導致大量客戶端連接超時,超時之后往往會進行重發,這更加重了NIO線程的負載,最終會導致大量消息積壓和處理超時,成為系統的性能瓶頸;
多Reactor線程模式Reactor線程池中的每一Reactor線程都會有自己的Selector、線程和分發的事件循環邏輯。
mainReactor可以只有一個,但subReactor一般會有多個。mainReactor線程主要負責接收客戶端的連接請求,然后將接收到的SocketChannel傳遞給subReactor,由subReactor來完成和客戶端的通信。
流程:
①注冊一個Acceptor事件處理器到mainReactor中,Acceptor事件處理器所關注的事件是ACCEPT事件,這樣mainReactor會監聽客戶端向服務器端發起的連接請求事件(ACCEPT事件)。啟動mainReactor的事件循環。
②客戶端向服務器端發起一個連接請求,mainReactor監聽到了該ACCEPT事件并將該ACCEPT事件派發給Acceptor處理器來進行處理。Acceptor處理器通過accept()方法得到與這個客戶端對應的連接(SocketChannel),然后將這個SocketChannel傳遞給subReactor線程池。
③subReactor線程池分配一個subReactor線程給這個SocketChannel,即,將SocketChannel關注的READ事件以及對應的READ事件處理器注冊到subReactor線程中。當然你也注冊WRITE事件以及WRITE事件處理器到subReactor線程中以完成I/O寫操作。Reactor線程池中的每一Reactor線程都會有自己的Selector、線程和分發的循環邏輯。
④當有I/O事件就緒時,相關的subReactor就將事件派發給響應的處理器處理。注意,這里subReactor線程只負責完成I/O的read()操作,在讀取到數據后將業務邏輯的處理放入到線程池中完成,若完成業務邏輯后需要返回數據給客戶端,則相關的I/O的write操作還是會被提交回subReactor線程來完成。
注意,所以的I/O操作(包括,I/O的accept()、read()、write()以及connect()操作)依舊還是在Reactor線程(mainReactor線程 或 subReactor線程)中完成的。Thread Pool(線程池)僅用來處理非I/O操作的邏輯。
多Reactor線程模式將“接受客戶端的連接請求”和“與該客戶端的通信”分在了兩個Reactor線程來完成。mainReactor完成接收客戶端連接請求的操作,它不負責與客戶端的通信,而是將建立好的連接轉交給subReactor線程來完成與客戶端的通信,這樣一來就不會因為read()數據量太大而導致后面的客戶端連接請求得不到即時處理的情況。并且多Reactor線程模式在海量的客戶端并發請求的情況下,還可以通過實現subReactor線程池來將海量的連接分發給多個subReactor線程,在多核的操作系統中這能大大提升應用的負載和吞吐量。
Netty服務端使用了“多Reactor線程模式”
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/77728.html
摘要:目錄源碼之下無秘密做最好的源碼分析教程源碼分析之番外篇的前生今世的前生今世之一簡介的前生今世之二小結的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端 目錄 源碼之下無秘密 ── 做最好的 Netty 源碼分析教程 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NI...
摘要:是什么是一個異步的,事件驅動的網絡編程框架。責任鏈模式通過將組裝起來,通過向里添加來監聽處理發生的事件。相比于的的不僅易用,而且還支持自動擴容。入站入站事件一般是由外部觸發的,如收到數據。 netty是什么? netty是一個異步的,事件驅動的網絡編程框架。 netty的技術基礎 netty是對Java NIO和Java線程池技術的封裝 netty解決了什么問題 使用Java IO進行...
摘要:如果什么事都沒得做,它也不會死循環,它會將線程休眠起來,直到下一個事件來了再繼續干活,這樣的一個線程稱之為線程。而請求處理邏輯既可以使用單獨的線程池進行處理,也可以跟放在讀寫線程一塊處理。 Netty到底是什么 從HTTP說起 有了Netty,你可以實現自己的HTTP服務器,FTP服務器,UDP服務器,RPC服務器,WebSocket服務器,Redis的Proxy服務器,MySQL的P...
摘要:后改良為用線程池的方式代替新增線程,被稱為偽異步。最大的問題是阻塞,同步。每次請求都由程序執行并返回,這是同步的缺陷。這些都會被注冊在多路復用器上。多路復用器提供選擇已經就緒狀態任務的能力。并沒有采用的多路復用器,而是使用異步通道的概念。 Netty是一個提供異步事件驅動的網絡應用框架,用以快速開發高性能、高可靠的網絡服務器和客戶端程序。Netty簡化了網絡程序的開發,是很多框架和公司...
摘要:后改良為用線程池的方式代替新增線程,被稱為偽異步。最大的問題是阻塞,同步。每次請求都由程序執行并返回,這是同步的缺陷。這些都會被注冊在多路復用器上。多路復用器提供選擇已經就緒狀態任務的能力。并沒有采用的多路復用器,而是使用異步通道的概念。 Netty是一個提供異步事件驅動的網絡應用框架,用以快速開發高性能、高可靠的網絡服務器和客戶端程序。Netty簡化了網絡程序的開發,是很多框架和公司...
閱讀 2220·2019-08-30 15:54
閱讀 1957·2019-08-30 13:49
閱讀 677·2019-08-29 18:44
閱讀 832·2019-08-29 18:39
閱讀 1114·2019-08-29 15:40
閱讀 1536·2019-08-29 12:56
閱讀 3148·2019-08-26 11:39
閱讀 3102·2019-08-26 11:37