摘要:接下來(lái),就看怎么用協(xié)程來(lái)實(shí)現(xiàn)異步了。直接拿的原始寫代碼會(huì)死人的。引入?yún)f(xié)程就是為了把上下連續(xù)的業(yè)務(wù)邏輯放在一個(gè)協(xié)程里,把與業(yè)務(wù)關(guān)系不大的的處理部分放到框架的里。第三部分是放棄掉執(zhí)行權(quán)。這樣一個(gè)只能接收打印一行的異步應(yīng)用就寫好了。
前面已經(jīng)準(zhǔn)備好了greenlet對(duì)應(yīng)的Java版本了,一個(gè)刪減后的kilim(http://segmentfault.com/blog/taowen/1190000000697487)。接下來(lái),就看怎么用協(xié)程來(lái)實(shí)現(xiàn)異步io了。首先,拿一段最最簡(jiǎn)單的tcp socket accept的代碼:
Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(9090)); serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("listening..."); selector.select(); scheduler.accept(serverSocketChannel); System.out.println("hello");
這里使用的是java 6的NIO1的selector模型。直接拿NIO的原始api寫代碼會(huì)死人的。引入?yún)f(xié)程就是為了把上下連續(xù)的業(yè)務(wù)邏輯放在一個(gè)協(xié)程里,把與業(yè)務(wù)關(guān)系不大的selector的處理部分放到框架的ioloop里。也就是把一段交織的代碼,分成兩個(gè)關(guān)注點(diǎn)不同的組成部分。
改造之后的代碼在這里: https://github.com/taowen/daili/tree/1e319f929678213a8d8f63ee5e8b8cf016637317
這是改造之后的效果:
Scheduler scheduler = new Scheduler(); DailiTask task = new DailiTask(scheduler) { @Override public void execute() throws Pausable, Exception { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(9090)); serverSocketChannel.configureBlocking(false); System.out.println("listening..."); scheduler.accept(serverSocketChannel); System.out.println("hello"); } }; scheduler.callSoon(task); scheduler.loop();
其中最關(guān)鍵的一行是 scheduler.accept(serverSocketChannel); 這個(gè)調(diào)用是阻塞的。但是只阻塞調(diào)用它的Task協(xié)程。如果有多個(gè)Task并行的話,別的Task可以在這個(gè)時(shí)候被運(yùn)行。那么scheduler.accept是如何做到把NIO的selector api轉(zhuǎn)換成這樣的形式的呢?
public SocketChannel accept(ServerSocketChannel serverSocketChannel) throws IOException, Pausable { SocketChannel socketChannel = serverSocketChannel.accept(); if (null != socketChannel) { return socketChannel; } SelectionKey selectionKey = serverSocketChannel.keyFor(selector); if (null == selectionKey) { selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new WaitingSelectorIO()); } else { selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_ACCEPT); } WaitingSelectorIO waitingSelectorIO = (WaitingSelectorIO) selectionKey.attachment(); waitingSelectorIO.acceptBlockedAt = System.currentTimeMillis(); waitingSelectorIO.acceptTask = (Task) Task.getCurrentTask(); selectionKey.attach(waitingSelectorIO); Task.pause(waitingSelectorIO); return serverSocketChannel.accept(); }
這個(gè)函數(shù)分成四部分:第一部分是嘗試去accept,如果有戲就不用NIO了。第二部分是注冊(cè)selection key,說(shuō)明我希望知道什么時(shí)候可以accept了,并把task作為附件加上去。第三部分是Task.pause放棄掉執(zhí)行權(quán)。第四部分是task被回調(diào)了,說(shuō)明等待的accept已經(jīng)ok了,可以去調(diào)用了。
但是Task.pause了之后,是誰(shuí)在把這個(gè)暫停的task重新拉起來(lái)執(zhí)行的呢?這個(gè)就是scheduler的loop干的活了
public void loop() throws IOException { while (true) { executeReadyTasks(); selector.select(); SetselectionKeys = selector.selectedKeys(); for (SelectionKey selectionKey : selectionKeys) { WaitingSelectorIO waitingSelectorIO = (WaitingSelectorIO) selectionKey.attachment(); if (selectionKey.isAcceptable()) { Task taskToCall = waitingSelectorIO.acceptTask; waitingSelectorIO.acceptBlockedAt = 0; waitingSelectorIO.acceptTask = null; callSoon(taskToCall); } } } }
在循環(huán)中調(diào)用selector.select獲得網(wǎng)絡(luò)事件的通知。如果selection key就緒了,就把附件里的task取出來(lái)回調(diào)。具體的回調(diào)發(fā)生在executeReadyTasks內(nèi)部,其實(shí)就是調(diào)用一下resume而已。
private void executeReadyTasks() { Task task; while((task = readyTasks.poll()) != null) { executeTask(task); } } private void executeTask(Task task) { try { task.resume(); } catch (Exception e) { LOGGER.error("failed to execute task: " + task, e); } }
這樣一個(gè)只能接收telnet 127.0.0.1 9090打印一行hello的異步io應(yīng)用就寫好了。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/64155.html
摘要:基本上所有的網(wǎng)絡(luò)應(yīng)用都會(huì)示范一個(gè)的寫法。除了這些操作的主體是而不是,操作的是,而不是。以為例其過(guò)程是這樣的這段代碼就是創(chuàng)建一個(gè),并注冊(cè)一個(gè),并把附著到上。關(guān)鍵之一顯然是利用了協(xié)程的和,把回調(diào)轉(zhuǎn)換成順序的邏輯執(zhí)行。 基本上所有的網(wǎng)絡(luò)應(yīng)用都會(huì)示范一個(gè)tcp的echo寫法。前面我們已經(jīng)看到了如何使用協(xié)程和異步io來(lái)做tcp服務(wù)器的第一步,accept。下面是一個(gè)完整的echo server的...
摘要:理由是如果到了上,而這個(gè)對(duì)應(yīng)的操作遲遲不能就緒被出來(lái)。但我認(rèn)為這其實(shí)是一個(gè)超時(shí)處理問(wèn)題。問(wèn)題是,原生的是沒(méi)有超時(shí)支持的。如果是回調(diào)性質(zhì)的,一般的做法是正常就緒給一個(gè),超時(shí)給另外一個(gè)。只要時(shí)間合理,作者之前所說(shuō)的會(huì)引發(fā)的問(wèn)題并不會(huì)出現(xiàn)。 grizzly框架的作者曾經(jīng)提到NIO框架不應(yīng)該使用selection key的attach功能(鏈接)。理由是如果attach到了selection ...
摘要:試用了一下,發(fā)現(xiàn)它是基于的主要是。于是拿的代碼改了一個(gè)純協(xié)程的版本出來(lái)。兩個(gè)都是以提供和為主要,把協(xié)程的隱藏在下面。為了搞一個(gè)更簡(jiǎn)單的,純協(xié)程來(lái)玩,把里無(wú)關(guān)的代碼都給刪了。結(jié)果在這里和官方的版本的不同在于,添加了一個(gè)方法相當(dāng)于的。 試用了一下 http://docs.paralleluniverse.co/quasar/,發(fā)現(xiàn)它是基于JDK 1.7的(主要是fork join pool...
摘要:本文先回顧生成器,然后過(guò)渡到協(xié)程編程。其作用主要體現(xiàn)在三個(gè)方面數(shù)據(jù)生成生產(chǎn)者,通過(guò)返回?cái)?shù)據(jù)數(shù)據(jù)消費(fèi)消費(fèi)者,消費(fèi)傳來(lái)的數(shù)據(jù)實(shí)現(xiàn)協(xié)程。解決回調(diào)地獄的方式主要有兩種和協(xié)程。重點(diǎn)應(yīng)當(dāng)關(guān)注控制權(quán)轉(zhuǎn)讓的時(shí)機(jī),以及協(xié)程的運(yùn)作方式。 轉(zhuǎn)載請(qǐng)注明文章出處: https://tlanyan.me/php-review... PHP回顧系列目錄 PHP基礎(chǔ) web請(qǐng)求 cookie web響應(yīng) sess...
閱讀 1681·2021-11-23 09:51
閱讀 2691·2021-11-22 09:34
閱讀 1327·2021-10-14 09:43
閱讀 3668·2021-09-08 09:36
閱讀 3214·2019-08-30 12:57
閱讀 2035·2019-08-30 12:44
閱讀 2524·2019-08-29 17:15
閱讀 3021·2019-08-29 16:08