摘要:今天是小明女朋友的生日,小明想給她一個驚喜,于是想到了訂一個蛋糕給她,所以小明打電話給蛋糕店預定,店員回復他說好的,我們知道了,制作好了會通知你的。于是小明就開開心心的打游戲去了。值檢查,整個設計中均沒有對對象做的檢查,容易引起。
Netty中的異步調用
如果大家觀察仔細,會發現我們之前所寫的代碼都是串行執行的,這是什么意思?就是我們看到代碼是什么順序,最后程序就是按什么順序執行的。
但是Netty作為一個高性能網絡框架,他的調用很多都是異步的,這樣,就可以不等上一步做完,繼續行進下一步,達到多任務并行的作用。
實現概述Netty是怎么實現他的異步調用呢,大致總結了下由以下幾個核心部分
組成:
異步執行(executor)
異步結果(future and promise)
Listener
同步接口
首先,既然是異步調用,肯定要有異步執行,同學們這里肯定想到的是使用線程,沒錯,他的底層確實也是線程,只不過netty自身封裝成了executor,增強了線程的調度。
其次,是要能獲取到這次執行的結果,有的同學可能會說使用callable,沒錯這確實是一種解決方案,但是netty并沒有使用這種,而是使用了一種更為巧妙的設計(也就是通過promise對象來傳遞執行的結果)來完成這種操作,下面我們會詳細說明。
最后就是promise對象提供的各種接口,比如Listener:可以監聽執行的完成。或者是同步接口:保證異步執行的方法順序也是同步的。這篇文章中,我們主要就講這兩,三個,其他的各位童鞋可以自己去看源碼。
Executor實現Netty中每個Channel都有一個eventloop對象,實現還蠻復雜的,在這里不是重點,所以我們先實現一個,具有異步調用功能的exector。
自定義executor很簡單,只要實現Executor接口就行
public class MyNettyExecutor implements Executor { private ThreadFactory factory; public MyNettyExecutor(ThreadFactory factory) { this.factory = factory; } public void execute(Runnable command) { factory.newThread(command).start(); } }
然后在需要使用的時候,實例化這個類,這里為了增強使用,我們在類內部提供一個靜態初始化方法,并提供最簡單factory實現。
public static Executor newExecutor(){ return new MyNettyExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r); } }); }Promise詳解 對furture/promise的理解
我對future的認識最開始源于Java的FutureTask框架,簡單來說,FutureTask是Future接口的一種實現,Future則是異步執行的結果。
而promise,從接口注釋上來看,是一種可修改的Future
/** * Special {@link Future} which is writable. */
那么現在來看,一個異步結果的程序主要有下面幾步
生成promise對象
具體調用的地方傳入promise參數
異步調用完成后,設置promise為完成
返回future對象
其中,第三步是發生在異步調用里的,所以我們看到的順序其實就是1->2>4,讓我們來畫一張圖。
這其實可以用一個現實中的例子來講述。
今天是小明女朋友的生日,小明想給她一個驚喜,于是想到了訂一個蛋糕給她,所以小明打電話給蛋糕店預定,店員回復他說:好的,我們知道了,制作好了會通知你的。于是小明就開開心心的打游戲去了。
在上面的例子中,預定蛋糕就是一個異步過程,我只要通知需要做這件事的人(execute),并拿到回復(Future),然后就可以做其他事情了。然后過一段時間打電話詢問蛋糕做好沒(isDone),如果沒做好,那就請他做好的時候通知我(listener)
所以現在我們有了異步執行,還需要什么呢?
Future和Promise的定義接口
Promise實現
然后,我們理一下需要哪些接口
isDone 判斷任務是否完成
addListener
trySuccess 設置任務完成并通知所有listener
sync 同步方法,等待任務完成
定義首先定義接口
/*listener接口,提供complete方法**/ public interface MyFutureListenerisDone> extends EventListener { void operationComplete(F future); } /*Future接口**/ public interface MyFuture { boolean isDone(); MyFuture sync() throws InterruptedException ; MyFuture addListener(MyFutureListener extends MyFuture super V>> listener); } /*promise接口**/ public interface MyPromise extends MyFuture { boolean trySuccess(); @Override MyPromise addListener(MyFutureListener extends MyFuture super V>> listener); }
我們假設只有完成和未完成兩個狀態,Promise內維護著這個狀態值(初始為null),那么判斷是否完成只需要判斷這個值不為空就行了。
private volatile Object result = null; @Override public boolean isDone() { return result != null; }trySucess
那么最簡單的success實現就是給這個對象賦值
@Override public boolean trySuccess() { result = new Object(); return true; }
當然,這里很不嚴謹,我們后面再說。
Listener接口實現上面我們定義了listener接口,這里要實現addListener方法
private List>> listeners; @Override public MyPromise addListener(MyFutureListener extends MyFuture super Void>> listener) { synchronized (this) { if(listeners == null){ listeners = new ArrayList >>(); listeners.add(listener); }else { listeners.add(listener); } } if (isDone()){ for (MyFutureListener f: listeners ) { f.operationComplete(this); } } return this; }
然后完善下success方法,成功的時候調用每一個listener的complete方法。
@Override public boolean trySuccess() { result = new Object(); for (MyFutureListener f: listeners ) { f.operationComplete(this); } return true; }同步接口實現
同步也很簡單,就是先判斷任務是否完成,沒有完成就wait一下。注意,wait之前我們要保持同步,引入synchronized原語。
@Override public MyFuturesync() throws InterruptedException { if (isDone()){ return this; } synchronized (this){ while (!isDone()) { waiters++; try { wait(); }finally { waiters--; } } } return this; }
同理,需要有地方去喚醒它,我們繼續完善success方法,最終我們的trySuccess方法如下
private synchronized void checkNotify(){ if (waiters > 0){ notifyAll(); } } @Override public boolean trySuccess() { result = new Object(); checkNotify(); for (MyFutureListener f: listeners ) { f.operationComplete(this); } return true; }Demo
輪子造好了,是時候寫個demo測試一下
public class MyExecutorDemo { public static void main(String[] args) { MyFuture future = asyncHello().addListener((MyFutureListener警告>) future1 -> System.out.println("監聽到完成")); if (future.isDone()){ System.out.println("異步執行完成"); }else{ try { future.sync(); } catch (InterruptedException e) { e.printStackTrace(); } } } static MyFuture asyncHello(){ Executor executor = MyNettyExecutor.newExecutor(); final DefaultPromise promise = new DefaultPromise(); executor.execute(() -> { System.out.println("Hello Async"); try { //模擬一些操作 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } promise.trySuccess(); }); return promise; } }
不可用于生產,這個Future/promise的設計僅僅為了說明異步執行和結果,距離netty中的異步框架還缺少很多。
NULL值檢查,整個設計中均沒有對對象做NULL的檢查,容易引起NullPointException。
異常處理缺失,對可能失敗的地方做異常處理(這也是是否能用于生產的合格檢驗)
非完全異步,listener的通知沒有使用異步
待補充(以我現在的水平,暫時想不到)
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/68619.html
摘要:我想這很好的解釋了中,僅僅一個都這么復雜,在單線程或者說串行的程序中,編程往往是很簡單的,說白了就是調用,調用,調用然后返回。 Netty源碼分析(三) 前提概要 這次停更很久了,原因是中途迷茫了一段時間,不過最近調整過來了。不過有點要說下,前幾天和業內某個大佬聊天,收獲很多,所以這篇博文和之前也會不太一樣,我們會先從如果是我自己去實現這個功能需要怎么做開始,然后去看netty源碼,與...
摘要:下面無恥的貼點源碼。啟動類我們也學,把啟動類抽象成兩層,方便以后寫客戶端。別著急,我們慢慢來,下一篇我們會了解以及他的成員,然后,完善我們的程序,增加其接收數據的能力。文章的源碼我會同步更新到我的上,歡迎大家,哈哈。 廢話兩句 這次更新拖了很長時間,第一是自己生病了,第二是因為最開始這篇想寫的很大,然后構思了很久,發現不太合適把很多東西寫在一起,所以做了點拆分,準備國慶前完成這篇博客。...
摘要:一些想法這個系列想開很久了,自己使用也有一段時間了,利用也編寫了一個簡單的框架,并運用到工作中了,感覺還不錯,趁著這段時間工作不是很忙,來分析一波源碼,提升下技術硬實力。 一些想法 這個系列想開很久了,自己使用netty也有一段時間了,利用netty也編寫了一個簡單的框架,并運用到工作中了,感覺還不錯,趁著這段時間工作不是很忙,來分析一波源碼,提升下技術硬實力。 結構 這里先看下net...
摘要:在上一篇源碼實戰系列三全剖析中,我們詳細分析了的初始化過程,并得出了如下結論在中,每一個都有一個對象,并且其內部本質上就是一個雙向鏈表本篇我們將深入源碼內部,對其一探究竟,給大家一個全方位解析。 在上一篇《Netty4.x 源碼實戰系列(三):NioServerSocketChannel全剖析》中,我們詳細分析了NioServerSocketChannel的初始化過程,并得出了如下結論...
摘要:對于,目前大家只知道是個線程組,其內部到底如何實現的,它的作用到底是什么,大家也都不太清楚,由于篇幅原因,這里不作詳細介紹,后面會有文章作專門詳解。 在上一篇《ServerBootstrap 與 Bootstrap 初探》中,我們已經初步的了解了ServerBootstrap是netty進行服務端開發的引導類。 且在上一篇的服務端示例中,我們也看到了,在使用netty進行網絡編程時,我...
閱讀 2566·2021-11-23 09:51
閱讀 3363·2021-11-22 15:22
閱讀 1876·2021-11-18 13:22
閱讀 2266·2021-09-24 09:48
閱讀 1314·2019-08-29 13:58
閱讀 1307·2019-08-26 13:39
閱讀 2450·2019-08-26 10:48
閱讀 3037·2019-08-26 10:21