摘要:調度器有了迭代器,還需要一個調度器才能按照預期的流程串行執行需要的函數,同時處理參數傳遞的過程我自己寫的代碼,調度的工作是由一起做的。
本文摘自我的博客,歡迎大家去逛逛。
又是兩周沒寫博客了,圣誕夜來水一發~
今天稍微看了下async的源碼,感覺很簡短精煉,總共也才1000多行代碼,好多值得學習的地方。主要看的是waterfall模塊,由于源碼中有好多不同接口公用的部分,因此看完waterfall這個接口的整個流程,差不多就cover了一半的async源碼了。
在沒有太多使用經驗的情況下,直接看源碼,可能會遇到一些不明所以的細節,看了可能也只能吸收很少的一部分。最好的方式我覺得莫過于自己先造一遍輪子,再看源碼了。
接口需求waterfall這個接口的命名還是很形象的
我要定義一個waterfall函數,滿足以下需求:
可以按照Array給定的順序逐個執行
所有函數執行完畢后,調用指定的回調函數
前一個函數的輸出作為后一個函數的輸入
中途某一個函數執行失敗,直接調用回調函數結束
需求的代碼描述如下:
async.waterfall([ function(callback) { callback(null, "one", "two"); }, function(arg1, arg2, callback) { console.log(arg1); console.log(arg2); // arg1 now equals "one" and arg2 now equals "two" callback(null, "three"); }, function(arg1, callback) { console.log(arg1); // arg1 now equals "three" callback(null, "done"); } ], function (err, result) { // result now equals "done" console.log(result); }); // 期望輸出: // one // two // three // done編碼
代碼組織了好一會兒,又調試了好一會后(中間遇到了一個關于arguments的坑,后面會講),終于成型了。輸出是按照預期的,和async源碼運行的結果相同,分析也都寫在注釋中:
var async = {}; async.waterfall = function (tasks, cb){ // 指向下一個將要執行的函數 var _index = 0; /** * 調用用戶指定的函數 */ function _run(index, args, cb){ var task = tasks[index]; args.push(cb); task.apply(null, args); }; /** * 因為涉及到控制流的轉移,從框架轉移到用戶,再從用戶轉移到框架。 * 需要定義一個傳遞控制流的使者,就是這個_cb函數 * 1.框架轉移到用戶:調用用戶函數的同時,把_cb作為參數 * 2.用戶轉移到框架:用戶調用這個_cb,表明已執行完該函數,把控制交給框架。抑或結束,抑或執行下一個函數 */ function _cb(){ // 如果錯誤了,直接回調最外層的cb // 如果是最后一個,也直接調用最外層的cb if (arguments[0] || _index === tasks.length) { return cb && cb.apply(null, arguments); } /** * 取出回調參數,作為下一個函數的輸入 * 因為回調的第一個參數是錯誤碼,所以要去掉第一個 */ // var rest = arguments.slice(1); //arguments并沒有slice方法,因此這樣會報錯 var rest = [].slice.call(arguments, 1); _run(_index++, rest, _cb); }; // 如果用戶沒有指定要串行執行的函數,則直接調用回調 if (tasks.length === 0) return cb && cb(); _run(_index++, [], _cb); };坑
踩的這個坑是關于arguments的(在ES6語法中其實不推薦使用arguments的方式,因為語法已經支持了rest param)。我一直以為一個函數的arguments屬性是一個Array,因為經常可以看到通過arguments[0]的方式去獲取參數,也從來沒有質疑過。先來看看下面這一個例子:
function a (){ console.log(typeof arguments); console.log(arguments); console.log(arguments[0]); console.log(arguments["0"]); console.log(arguments.length); console.log([].slice.call(arguments, 1)); }; a("one", "two", "three"); /** * 輸出(chrome): * object * ["one", "two", "three"] * one * one * 3 * ["two", "three"] * * 輸出(node.js) * object * { "0": "one", "1": "two", "2": "three" } * one * one * 3 * [ "two", "three" ] */
可以看出,arguments對象并不是一個array對象。在chrome中雖然看上去打印出來的是Array,但它是可以展開的,里面還有好多參數。而且下標取值的時候不光可以用數字,也可以用字符串來取值。這也是為什么我寫的代碼注釋中arguments.slice(1);的方式會執行錯誤(slice是Array才有的方法)。但是[].slice.call(arguments, 1);卻能執行,說明arguments還是有一點slice的特性的,有點不太懂。感覺它同時繼承了dict和array兩種對象的部分特性。
原來的輪子貼上原來的代碼實現:
async.waterfall = function (tasks, callback) { // 這種方式也是很聰明的一種方式,可以代替 callback && callback()的方式 // noop 是一個空函數,什么也不執行 callback = _once(callback || noop); if (!_isArray(tasks)) { var err = new Error("First argument to waterfall must be an array of functions"); return callback(err); } if (!tasks.length) { return callback(); } function wrapIterator(iterator) { return _restParam(function (err, args) { if (err) { callback.apply(null, [err].concat(args)); } else { var next = iterator.next(); if (next) { args.push(wrapIterator(next)); } else { args.push(callback); } ensureAsync(iterator).apply(null, args); } }); } wrapIterator(async.iterator(tasks))(); };
拋開一些異常處理的情況,就總體邏輯流程上還是有些區別的,下面就逐個來分析一下。
迭代器我是自己通過_index的局部變量來記錄當前執行的函數的(得益于閉包的特性,這個局部變量可以一直保留著)。源碼實現了一種迭代器的方式去管理傳入的函數數組,非常優雅,支持next特性,觀摩一下:
async.iterator = function (tasks) { function makeCallback(index) { function fn() { if (tasks.length) { tasks[index].apply(null, arguments); } return fn.next(); } fn.next = function () { return (index < tasks.length - 1) ? makeCallback(index + 1): null; }; return fn; } return makeCallback(0); };
通過async.iterator包裝以后返回的是一個迭代器對象,他同時又是一個函數可以直接執行,包裝了用戶傳入的tasks中的第一個函數。
調度器有了迭代器,還需要一個調度器才能按照預期的流程串行執行需要的函數,同時處理參數傳遞的過程(我自己寫的代碼,調度的工作是由_cb一起做的)。
這個調度器實現的非常棒,由于它返回的也是一個函數,因此和迭代器是屬于同一個維度的(如果是調用者和被調用者的關系則不屬于同一維度,他們的調用層次關系是同一層的)。_restParam函數可以暫時不用管它,因為從它的實現中可以看到,它本身和它參數中的函數是同一個維度的,它只是負責轉換了一下參數的結構。完全可以理解為wrapIterator返回的就是被_restParam包著的那個函數,_restParam只是一個參數結構的轉換器,處理了參數結構不一致的問題。
function _restParam(func, startIndex) { startIndex = startIndex == null ? func.length - 1 : +startIndex; return function() { var length = Math.max(arguments.length - startIndex, 0); var rest = Array(length); for (var index = 0; index < length; index++) { rest[index] = arguments[index + startIndex]; } switch (startIndex) { case 0: return func.call(this, rest); case 1: return func.call(this, arguments[0], rest); } // Currently unused but handle cases outside of the switch statement: // var args = Array(startIndex + 1); // for (index = 0; index < startIndex; index++) { // args[index] = arguments[index]; // } // args[startIndex] = rest; // return func.apply(this, args); }; }
回到調度器的上下文,在參數傳遞的過程中,args是上一個函數的返回結果組成的數組,再把下一個迭代器包裝一下作為該數組的最后一個元素。這樣在調用當前迭代器對應的函數的時候,用戶態上下文中的callback就是下一個用戶態函數對應的迭代器了。整個控制流程完全處在用戶層,框架層所做的事僅僅是參數結構的轉換(畢竟apply函數需要的參數結構是數組,而函數調用的時候則是展開的形式)。
奇淫技巧在閱讀代碼的過程中看到了不少巧妙的用法
在async源碼最后有這樣一段代碼:
// Node.js if (typeof module === "object" && module.exports) { module.exports = async; } // AMD / RequireJS else if (typeof define === "function" && define.amd) { define([], function () { return async; }); } // included directly via