摘要:編寫異步代碼可能是一種不同的體驗,尤其是對異步控制流而言。回調函數的準則在編寫異步代碼時,要記住的第一個規則是在定義回調時不要濫用閉包。為回調創建命名函數,避免使用閉包,并將中間結果作為參數傳遞。
本系列文章為《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版鏈接。
歡迎關注我的專欄,之后的博文將在專欄同步:
Encounter的掘金專欄
知乎專欄 Encounter的編程思考
segmentfault專欄 前端小站
Asynchronous Control Flow Patterns with CallbacksNode.js這類語言習慣于同步的編程風格,其CPS風格和異步特性的API是其標準,對于新手來說可能難以理解。編寫異步代碼可能是一種不同的體驗,尤其是對異步控制流而言。異步代碼可能讓我們難以預測在Node.js中執行語句的順序。例如讀取一組文件,執行一串任務,或者等待一組操作完成,都需要開發人員采用新的方法和技術,以避免最終編寫出效率低下和不可維護的代碼。一個常見的錯誤是回調地獄,代碼量急劇上升又不可讀,使得簡單的程序也難以閱讀和維護。在本章中,我們將看到如何通過使用一些規則和一些模式來避免回調,并編寫干凈、可管理的異步代碼。我們將看到控制流庫,如async,可以極大地簡化我們的問題,提升我們的代碼可讀性,更易于維護。
異步編程的困難JavaScript中異步代碼的順序錯亂無疑是很容易的。閉包和對匿名函數的定義可以使開發人員有更好的編程體驗,而并不需要開發人員手動對異步操作進行管理和跳轉。這是符合KISS原則的。簡單且能保持異步代碼控制流,讓它在更短的時間內工作。但不幸的是,回調嵌套是以犧牲諸如模塊性、可重用性和可維護性,增大整個函數的大小,導致糟糕的代碼結構為代價的。大多數情況下,創建閉包在功能上是不需要的,但這更多是一種約束,而不是與異步編程相關的問題。認識到回調嵌套會使得我們的代碼變得笨拙,然后根據最適合的解決方案采取相應的方法解決回調地獄,這是新手與專家的區別。
創建一個簡單的Web爬蟲為了解釋上述問題,我們創建了一個簡單的Web爬蟲,一個命令行應用,其接受一個URL為輸入,然后可以把其內容下載到一個文件中。在下列代碼中,我們會依賴以下兩個npm庫。
此外,我們還將引用一個叫做./utilities的本地模塊。
我們的應用程序的核心功能包含在一個名為spider.js的模塊中。如下所示,首先加載我們所需要的依賴包:
const request = require("request"); const fs = require("fs"); const mkdirp = require("mkdirp"); const path = require("path"); const utilities = require("./utilities");
接下來,我們將創建一個名為spider()的新函數,該函數接受URL為參數,并在下載過程完成時調用一個回調函數。
function spider(url, callback) { const filename = utilities.urlToFilename(url); fs.exists(filename, exists => { if (!exists) { console.log(`Downloading ${url}`); request(url, (err, response, body) => { if (err) { callback(err); } else { mkdirp(path.dirname(filename), err => { if (err) { callback(err); } else { fs.writeFile(filename, body, err => { if (err) { callback(err); } else { callback(null, filename, true); } }); } }); } }); } else { callback(null, filename, false); } }); }
上述函數執行以下任務:
檢查該URL的文件是否已經下載過,即驗證相應文件是否已經被創建:
fs.exists(filename, exists => ...
如果文件還沒有被下載,則執行下列代碼進行下載操作:
request(url, (err, response, body) => ...
然后,我們需要確定目錄下是否已經包含了該文件:
mkdirp(path.dirname(filename), err => ...
最后,我們把HTTP請求返回的報文主體寫入文件系統:
mkdirp(path.dirname(filename), err => ...
要完成我們的Web爬蟲應用程序,只需提供一個URL作為輸入(在我們的例子中,我們從命令行參數中讀取它),我們只需調用spider()函數即可。
spider(process.argv[2], (err, filename, downloaded) => { if (err) { console.log(err); } else if (downloaded) { console.log(`Completed the download of "${filename}"`); } else { console.log(`"${filename}" was already downloaded`); } });
現在,我們開始嘗試運行Web爬蟲應用程序,但是首先,確保已有utilities.js模塊和package.json中的所有依賴包已經安裝到你的項目中:
npm install
之后,我們執行我們這個爬蟲模塊來下載一個網頁,使用以下命令:
node spider http://www.example.com
我們的Web爬蟲應用程序要求在我們提供的URL中總是包含協議類型(例如,http://)。另外,不要期望HTML鏈接被重新編寫,也不要期望下載像圖片這樣的資源,因為這只是一個簡單的例子來演示異步編程是如何工作的。
回調地獄看看我們的spider()函數,我們可以發現,盡管我們實現的算法非常簡單,但是生成的代碼有幾個級別的縮進,而且很難讀懂。使用阻塞式的同步API實現類似的功能是很簡單的,而且很少有機會讓它看起來如此錯誤。然而,使用異步CPS是另一回事,使用閉包可能會導致出現難以閱讀的代碼。
大量閉包和回調將代碼轉換成不可讀的、難以管理的情況稱為回調地獄。它是Node.js中最受認可和最嚴重的反模式之一。一般來說,對于JavaScript而言。受此問題影響的代碼的典型結構如下:
asyncFoo(err => { asyncBar(err => { asyncFooBar(err => { //... }); }); });
我們可以看到,用這種方式編寫的代碼是如何形成金字塔形狀的,由于深嵌的原因導致的難以閱讀,稱為“末日金字塔”。
像前面的代碼片段這樣的代碼最明顯的問題是可讀性差。由于嵌套太深,幾乎不可能跟蹤回調函數的結束位置和另一個回調函數開始的位置。
另一個問題是由每個作用域中使用的變量名的重疊引起的。通常,我們必須使用類似甚至相同的名稱來描述變量的內容。最好的例子是每個回調接收到的錯誤參數。有些人經常嘗試使用相同名稱的變體來區分每個范圍內的對象,例如,error、err、err1、err2等等。另一些人則傾向于隱藏在范圍中定義的變量,總是使用相同的名稱。例如,err。這兩種選擇都遠非完美,而且會造成混淆,并增加導致bug的可能性。
此外,我們必須記住,雖然閉包在性能和內存消耗方面的代價很小。此外,它們還可以創建不易識別的內存泄漏,因為我們不應該忘記,由閉包引用的任何上下文變量都不會被垃圾收集所保留。
關于對于V8的閉包工作原理,可以參考Vyacheslav Egorov的博客文章。
如果我們看一下我們的spider()函數,我們會清楚地注意到它便是一個典型的回調地獄的場景,并且在這個函數中有我們剛才描述的所有問題。這正是我們將在本章中學習的模式和技巧所要解決的問題。
使用簡單的JavaScript既然我們已經遇到了第一個回調地獄的例子,我們知道我們應該避免什么。然而,在編寫異步代碼時,這并不是惟一的關注點。事實上,有幾種情況下,控制一組異步任務的流需要使用特定的模式和技術,特別是如果我們只使用普通的JavaScript而沒有任何外部庫的幫助的情況下。例如,通過按順序應用異步操作來遍歷集合并不像在數組中調用forEach()那樣簡單,但實際上它需要一種類似于遞歸的技術。
在本節中,我們將學習如何避免回調地獄,以及如何使用簡單的JavaScript實現一些最常見的控制流模式。
回調函數的準則在編寫異步代碼時,要記住的第一個規則是在定義回調時不要濫用閉包。濫用閉包一時很爽,因為它不需要對諸如模塊化和可重用性這樣的問題進行額外的思考。但是,我們已經看到,這種做法弊大于利。大多數情況下,修復回調地獄問題并不需要任何庫、花哨的技術或范式的改變,只是一些常識。
以下是一些基本原則,可以幫助我們更少的嵌套,并改進我們的代碼的組織:
盡可能退出外層函數。根據上下文,使用return、continue或break,以便立即退出當前代碼塊,而不是使用if...else代碼塊。其他語句。這將有助于優化我們的代碼結構。
為回調創建命名函數,避免使用閉包,并將中間結果作為參數傳遞。命名函數也會使它們在堆棧跟蹤中更優雅。
代碼盡可能模塊化。并盡可能將代碼分成更小的、可重用的函數。
回調調用的準則為了展示上述原則,我們通過重構Web爬蟲應用程序來說明。
對于第一步,我們可以通過刪除else語句來重構我們的錯誤檢查方式。這是在我們收到錯誤后立即從函數中返回。因此,看以下代碼:
if (err) { callback(err); } else { // 如果沒有錯誤,執行該代碼塊 }
我們可以通過編寫下面的代碼來改進我們的代碼結構:
if (err) { return callback(err); } // 如果沒有錯誤,執行該代碼塊
有了這個簡單的技巧,我們立即減少了函數的嵌套級別,它很簡單,不需要任何復雜的重構。
在執行我們剛才描述的優化時,一個常見的錯誤是在調用回調函數之后忘記終止函數,即return。對于錯誤處理場景,以下代碼是bug的典型來源:
if (err) { callback(err); } // 如果沒有錯誤,執行該代碼塊
在這個例子中,即使在調用回調之后,函數的執行也會繼續。那么避免這種情況的出現,return語句是十分必要的。還要注意,函數返回的輸出是什么并不重要,實際結果(或錯誤)是異步生成的,并傳遞給回調。異步函數的返回值通常被忽略。該屬性允許我們編寫如下的代碼:
return callback(...);
否則我們必須拆成兩條語句來寫:
callback(...); return;
接下來我們繼續重構我們的spider()函數,我們可以嘗試識別可復用的代碼片段。例如,將給定字符串寫入文件的功能可以很容易地分解為一個多帶帶的函數:
function saveFile(filename, contents, callback) { mkdirp(path.dirname(filename), err => { if (err) { return callback(err); } fs.writeFile(filename, contents, callback); }); }
遵循同樣的原則,我們可以創建一個名為download()的通用函數,它將URL和文件名作為輸入,并將URL的內容下載到給定的文件中。在內部,我們可以使用前面創建的saveFile()函數。
function download(url, filename, callback) { console.log(`Downloading ${url}`); request(url, (err, response, body) => { if (err) { return callback(err); } saveFile(filename, body, err => { if (err) { return callback(err); } console.log(`Downloaded and saved: ${url}`); callback(null, body); }); }); }
最后,修改我們的spider()函數:
function spider(url, callback) { const filename = utilities.urlToFilename(url); fs.exists(filename, exists => { if (exists) { return callback(null, filename, false); } download(url, filename, err => { if (err) { return callback(err); } callback(null, filename, true); }) }); }
spider()函數的功能和接口仍然是完全相同的,改變的僅僅是代碼的組織方式。通過應用上述基本原則,我們能夠極大地減少代碼的嵌套,同時增加了它的可重用性和可測試性。實際上,我們可以考慮導出saveFile()和download(),這樣我們就可以在其他模塊中重用它們。這也使我們能夠更容易地測試他們的功能。
我們在這一節中進行的重構清楚地表明,大多數時候,我們所需要的只是一些規則,并確保我們不濫用閉包和匿名函數。它的工作非常出色,只需最少的工作量,并且只使用原始的JavaScript。
順序執行現在開始探尋異步控制流的執行順序,我們會通過開始分析一串異步代碼來探尋其控制流。
按順序執行一組任務意味著一次一個接一個地運行它們。執行順序很重要,必須保證其正確性,因為列表中一個任務的結果可能會影響下一個任務的執行。下圖說明了這個概念:
上述異步控制流有一些不同的變化:
按順序執行一組已知任務,無需鏈接或傳遞執行結果
使用任務的輸出作為下一個輸入(也稱為chain,pipeline,或者waterfall)
在每個元素上運行異步任務時迭代一個集合,一個元素接一個元素
對于順序執行而言,盡管在使用直接樣式阻塞API實現很簡單,但通常情況下使用異步CPS時會導致回調地獄問題。
按順序執行一組已知的任務在上一節中實現spider()函數時,我們已經遇到了順序執行的問題。通過研究如下方式,我們可以更好地控制異步代碼。以該代碼為準則,我們可以用以下模式來解決上述問題:
function task1(callback) { asyncOperation(() => { task2(callback); }); } function task2(callback) { asyncOperation(result() => { task3(callback); }); } function task3(callback) { asyncOperation(() => { callback(); //finally executes the callback }); } task1(() => { //executed when task1, task2 and task3 are completed console.log("tasks 1, 2 and 3 executed"); });
上述模式顯示了在完成一個異步操作后,再調用下一個異步操作。該模式強調任務的模塊化,并且避免在處理異步代碼使用閉包。
順序迭代我們前面描述的模式如果我們預先知道要執行什么和有多少個任務,這些模式是完美的。這使我們能夠對序列中下一個任務的調用進行硬編碼,但是如果要對集合中的每個項目執行異步操作,會發生什么?在這種情況下,我們不能對任務序列進行硬編碼。相反的是,我們必須動態構建它。
為了顯示順序迭代的例子,讓我們為Web爬蟲應用程序引入一個新功能。我們現在想要遞歸地下載網頁中的所有鏈接。要做到這一點,我們將從頁面中提取所有鏈接,然后按順序逐個地觸發我們的Web爬蟲應用程序。
第一步是修改我們的spider()函數,以便通過調用一個名為spiderLinks()的函數觸發頁面所有鏈接的遞歸下載。
此外,我們現在嘗試讀取文件,而不是檢查文件是否已經存在,并開始爬取其鏈接。這樣,我們就可以恢復中斷的下載。最后還有一個變化是,我們確保我們傳遞的參數是最新的,還要限制遞歸深度。結果代碼如下:
function spider(url, nesting, callback) { const filename = utilities.urlToFilename(url); fs.readFile(filename, "utf8", (err, body) => { if (err) { if (err.code! == "ENOENT") { return callback(err); } return download(url, filename, (err, body) => { if (err) { return callback(err); } spiderLinks(url, body, nesting, callback); }); } spiderLinks(url, body, nesting, callback); }); }
現在我們可以創建這個新版本的Web爬蟲應用程序的核心,即spiderLinks()函數,它使用順序異步迭代算法下載HTML頁面的所有鏈接。注意我們在下面的代碼塊中定義的方式:
function spiderLinks(currentUrl, body, nesting, callback) { if(nesting === 0) { return process.nextTick(callback); } let links = utilities.getPageLinks(currentUrl, body); //[1] function iterate(index) { //[2] if(index === links.length) { return callback(); } spider(links[index], nesting - 1, function(err) { //[3] if(err) { return callback(err); } iterate(index + 1); }); } iterate(0); //[4] }
從這個新功能中的重要步驟如下:
我們使用utilities.getPageLinks()函數獲取頁面中包含的所有鏈接的列表。此函數僅返回指向相同主機名的鏈接。
我們使用一個稱為iterate()的本地函數來遍歷鏈接,該函數需要下一個鏈接的索引進行分析。在這個函數中,我們首先要檢查索引是否等于鏈接數組的長度,如果等于則是迭代完成,在這種情況下我們立即調用callback()函數,因為這意味著我們處理了所有的項目。
這時,處理鏈接已準備就緒。我們通過遞歸調用spider()函數。
作為spiderLinks()函數的最后一步也是最重要的一步,我們通過調用iterate(0)來開始迭代。
我們剛剛提出的算法允許我們通過順序執行異步操作來迭代數組,在我們的例子中是spider()函數。
我們現在可以嘗試這個新版本的Web爬蟲應用程序,并觀看它一個接一個地遞歸地下載網頁的所有鏈接。要中斷這個過程,如果有很多鏈接可能需要一段時間,請記住我們可以隨時使用Ctrl + C。如果我們決定恢復它,我們可以通過啟動Web爬蟲應用程序并提供與上次結束時相同的URL來恢復執行。
現在我們的網絡Web爬蟲應用程序可能會觸發整個網站的下載,請仔細考慮使用它。例如,不要設置高嵌套級別或離開爬蟲運行超過幾秒鐘。用數千個請求重載服務器是不道德的。在某些情況下,這也被認為是非法的。需要考慮后果!
我們之前展示的spiderLinks()函數的代碼是一個清楚的例子,說明了如何在應用異步操作時迭代集合。我們還可以注意到,這是一種可以適應任何其他情況的模式,我們需要在集合的元素或通常的任務列表上按順序異步迭代。該模式可以推廣如下:
function iterate(index) { if (index === tasks.length) { return finish(); } const task = tasks[index]; task(function() { iterate(index + 1); }); } function finish() { // 迭代完成的操作 } iterate(0);
注意到,如果task()是同步操作,這些類型的算法變得真正遞歸。在這種情況下,可能造成調用棧的溢出。
我們剛剛提出的模式是非常強大的,因為它可以適應幾種情況。例如,我們可以映射數組的值,或者我們可以將迭代的結果傳遞給迭代中的下一個,以實現一個reduce算法,如果滿足特定的條件,我們可以提前退出循環,或者甚至可以迭代無限數量的元素。
我們還可以選擇將解決方案進一步推廣:
iterateSeries(collection, iteratorCallback, finalCallback);
通過創建一個名為iterator的函數來執行任務列表,該函數調用集合中的下一個可執行的任務,并確保在當前任務完成時調用迭代器結束的回調函數。
并行在某些情況下,一組異步任務的執行順序并不重要,我們只需要在所有這些運行的任務完成時通知我們。使用并行執行流更好地處理這種情況,如下圖所示:
如果我們認為Node.js是單線程的話,這可能聽起來很奇怪,但是如果我們記住我們在第一章中討論過的內容,我們意識到即使我們只有一個線程,我們仍然可以實現并發,由于Node.js的非阻塞性質。實際上,在這種情況下,并行字不正確地使用,因為這并不意味著任務同時運行,而是它們的執行由底層的非阻塞API執行,并由事件循環進行交織。
我們知道,當一個任務允許事件循環執行另一個任務時,或者是說一個任務允許控制回到事件循環。這種工作流的名稱為并發,但為了簡單起見,我們仍然會使用并行。
下圖顯示了兩個異步任務可以在Node.js程序中并行運行:
通過上圖,我們有一個Main函數執行兩個異步任務:
Main函數觸發Task 1和Task 2的執行。由于這些觸發異步操作,這兩個函數會立即返回,并將控制權返還給主函數,之后等到事件循環完成再通知主線程。
當Task 1的異步操作完成時,事件循環給與其線程控制權。當Task 1同步操作完成時,它通知Main函數。
當Task 2的異步操作完成時,事件循環給與其線程控制權。當Task 2同步操作完成時,它再次通知Main函數。在這一點上,Main函數知曉Task 1和Task 2都已經執行完畢,所以它可以繼續執行其后操作或將操作的結果返回給另一個回調函數。
簡而言之,這意味著在Node.js中,我們只能執行并行異步操作,因為它們的并發性由非阻塞API在內部處理。在Node.js中,同步阻塞操作不能同時運行,除非它們的執行與異步操作交錯,或者通過setTimeout()或setImmediate()延遲。我們將在第九章中更詳細地看到這一點。
Web爬蟲版本3上邊的Web爬蟲在并行異步操作上似乎也算表現得很完美。到目前為止,應用程序正在遞歸地執行鏈接頁面的下載。但性能不是最佳的,想要提升這個應用的性能很容易。
要做到這一點,我們只需要修改spiderLinks()函數,確保spider()任務只執行一次,當所有任務都執行完畢后,調用最后的回調,所以我們對spiderLinks()做如下修改:
function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); if (links.length === 0) { return process.nextTick(callback); } let completed = 0, hasErrors = false; function done(err) { if (err) { hasErrors = true; return callback(err); } if (++completed === links.length && !hasErrors) { return callback(); } } links.forEach(link => { spider(link, nesting - 1, done); }); }
上述代碼有何變化?,現在spider()函數的任務全部同步啟動。可以通過簡單地遍歷鏈接數組和啟動每個任務,我們不必等待前一個任務完成再進行下一個任務:
links.forEach(link => { spider(link, nesting - 1, done); });
然后,使我們的應用程序知曉所有任務完成的方法是為spider()函數提供一個特殊的回調函數,我們稱之為done()。當爬蟲任務完成時,done()函數設定一個計數器。當完成的下載次數達到鏈接數組的大小時,調用最終回調:
function done(err) { if (err) { hasErrors = true; return callback(err); } if (++completed === links.length && !hasErrors) { callback(); } }
通過上述變化,如果我們現在試圖對網頁運行我們的爬蟲,我們將注意到整個過程的速度有很大的改進,因為每次下載都是并行執行的,而不必等待之前的鏈接被處理。
模式此外,對于并行執行流程,我們可以提取我們方案,以便適應于不同的情況提高代碼的可復用性。我們可以使用以下代碼來表示模式的通用版本:
const tasks = [ /* ... */ ]; let completed = 0; tasks.forEach(task => { task(() => { if (++completed === tasks.length) { finish(); } }); }); function finish() { // 所有任務執行完成后調用 }
通過小的修改,我們可以調整模式,將每個任務的結果累積到一個list中,以便過濾或映射數組的元素,或者一旦完成了一個或一定數量的任務即可調用finish()回調。
用并發任務修復競爭條件注意:如果是沒有限制的情況下,并行執行的一組異步任務,然后等待所有異步任務完成后執行回調這種方式,其方法是計算它們的執行完成的數目。
當使用阻塞I/O與多線程組合的方式時,并行運行一組任務可能會導致一些問題。但是,我們剛剛看到,在Node.js中卻不一樣,并行運行多個異步任務實際上在資源方面消耗較低。這是Node.js最重要的優點之一,因此在Node.js中并行化成為一種常見的做法,而且這并是多么復雜的技術。
Node.js的并發模型的另一個重要特征是我們處理任務同步和競爭條件的方式。在多線程編程中,這通常使用諸如鎖,互斥條件,信號量和觀察器之類的構造來實現,這些是多線程語言并行化的最復雜的方面之一,對性能也有很大的影響。在Node.js中,我們通常不需要一個花哨的同步機制,因為所有運行在單個線程上!但是,這并不意味著我們沒有競爭條件。相反,他們可以相當普遍。問題的根源在于異步操作的調用與其結果通知之間的延遲。舉一個具體的例子,我們可以再次參考我們的Web爬蟲應用程序,特別是我們創建的最后一個版本,其實際上包含一個競爭條件。
問題在于在開始下載相應的URL的文檔之前,檢查文件是否已經存在的spider()函數:
function spider(url, nesting, callback) { if(spidering.has(url)) { return process.nextTick(callback); } spidering.set(url, true); const filename = utilities.urlToFilename(url); fs.readFile(filename, "utf8", function(err, body) { if(err) { if(err.code !== "ENOENT") { return callback(err); } return download(url, filename, function(err, body) { if(err) { return callback(err); } spiderLinks(url, body, nesting, callback); }); } spiderLinks(url, body, nesting, callback); }); }
現在的問題是,在同一個URL上操作的兩個爬蟲任務可能會在兩個任務之一完成下載并創建一個文件,導致第二個任務開始下載之前,在同一個文件上調用fs.readFile()的結果不對,致使下載兩次。這種情況如下圖所示:
上圖顯示了Task 1和Task 2如何在Node.js的單個線程中交錯執行,以及異步操作如何實際引入競爭條件。在我們的情況下,兩個爬蟲任務最終會下載相同的文件。
我們如何解決這個問題?答案比我們想象的要簡單得多。實際上,我們所需要的只是一個變量(互斥變量),可以相互排除運行在同一個URL上的多個spider()任務。這可以通過以下代碼來實現:
const spidering = new Map(); function spider(url, nesting, callback) { if (spidering.has(url)) { return process.nextTick(callback); } spidering.set(url, true); // ... }并行執行頻率限制
通常,如果不控制并行任務頻率,并行任務就會導致過載。想象一下,有數千個文件要讀取,訪問的URL或數據庫查詢并行運行。在這種情況下,常見的問題是系統資源不足,例如,當嘗試一次打開太多文件時,利用可用于應用程序的所有文件描述符。在Web應用程序中,它還可能會創建一個利用拒絕服務(DoS)攻擊的漏洞。在所有這種情況下,最好限制同時運行的任務數量。這樣,我們可以為服務器的負載增加一些可預測性,并確保我們的應用程序不會耗盡資源。下圖描述了一個情況,我們將五個任務并行運行并發限制為兩段:
從上圖可以清楚我們的算法如何工作:
我們可以執行盡可能多的任務,而不超過并發限制。
每當任務完成時,我們再執行一個或多個任務,同時確保任務數量達不到限制。
并發限制我們現在提出一種模式,以有限的并發性并行執行一組給定的任務:
const tasks = ... let concurrency = 2, running = 0, completed = 0, index = 0; function next() { while (running < concurrency && index < tasks.length) { task = tasks[index++]; task(() => { if (completed === tasks.length) { return finish(); } completed++, running--; next(); }); running++; } } next(); function finish() { // 所有任務執行完成 }
該算法可以被認為是順序執行和并行執行之間的混合。事實上,我們可能會注意到我們之前介紹的兩種模式的相似之處:
我們有一個迭代器函數,我們稱之為next(),有一個內部循環,并行執行盡可能多的任務,同時保持并發限制。
我們傳遞給每個任務的回調檢查是否完成了列表中的所有任務。如果還有任務要運行,它會調用next()來執行下一個任務。
全局并發限制我們的Web爬蟲應用程序非常適合應用我們所學到的限制一組任務的并發性。事實上,為了避免同時爬上數千個鏈接的情況,我們可以通過在并發下載數量上增加一些措施來限制并發量。
0.11之前的Node.js版本已經將每個主機的并發HTTP連接數限制為5.然而,這可以改變以適應我們的需要。請查看官方文檔http://nodejs.org/docs/v0.10.... axsockets中的更多內容。從Node.js 0.11開始,并發連接數沒有默認限制。
我們可以將我們剛剛學到的模式應用到我們的spiderLinks()函數,但是我們將獲得的只是限制一個頁面中的一組鏈接的并發性。如果我們選擇了并發量為2,我們最多可以為每個頁面并行下載兩個鏈接。然而,由于我們可以一次下載多個鏈接,因此每個頁面都會產生另外兩個下載,這樣遞歸下去,其實也沒有完全做到并發量的限制。
使用隊列我們真正想要的是限制我們可以并行運行的全局下載操作數量。我們可以略微修改之前展示的模式,但是我們寧愿把它作為一個練習,因為我們想借此機會引入另一個機制,它利用隊列來限制多個任務的并發性。讓我們看看這是如何工作的。
我們現在要實現一個名為TaskQueue類,它將隊列與我們之前提到的算法相結合。我們創建一個名為taskQueue.js的新模塊:
class TaskQueue { constructor(concurrency) { this.concurrency = concurrency; this.running = 0; this.queue = []; } pushTask(task) { this.queue.push(task); this.next(); } next() { while (this.running < this.concurrency && this.queue.length) { const task = this.queue.shift(); task(() => { this.running--; this.next(); }); this.running++; } } };
上述類的構造函數只作為輸入的并發限制,但除此之外,它初始化運行和隊列的變量。前一個變量是用于跟蹤所有正在運行的任務的計數器,而后者是將用作隊列以存儲待處理任務的數組。
pushTask()方法簡單地將新任務添加到隊列中,然后通過調用this.next()來引導任務的執行。
next()方法從隊列中生成一組任務,確保它不超過并發限制。
我們可能會注意到,這種方法與限制我們前面提到的并發性的模式有一些相似之處。它基本上從隊列開始盡可能多的任務,而不超過并發限制。當每個任務完成時,它會更新運行任務的計數,然后再次調用next()來啟動另一輪任務。 TaskQueue類的有趣屬性是它允許我們動態地將新的項目添加到隊列中。另一個優點是,現在我們有一個中央實體負責限制我們任務的并發性,這可以在函數執行的所有實例中共享。在我們的例子中,它是spider()函數,我們將在稍后看到。
Web爬蟲版本4現在我們有一個通用的隊列來執行有限的并行流程中的任務,我們可以在我們的Web爬蟲應用程序中直接使用它。我們首先加載新的依賴關系并通過將并發限制設置為2來創建TaskQueue類的新實例:
const TaskQueue = require("./taskQueue"); const downloadQueue = new TaskQueue(2);
接下來,我們使用新創建的downloadQueue更新spiderLinks()函數:
function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); if (links.length === 0) { return process.nextTick(callback); } let completed = 0, hasErrors = false; links.forEach(link => { downloadQueue.pushTask(done => { spider(link, nesting - 1, err => { if (err) { hasErrors = true; return callback(err); } if (++completed === links.length && !hasErrors) { callback(); } done(); }); }); }); }
這個函數的這種新的實現是非常容易的,它與這本章前面提到的無限并行執行的算法非常相似。這是因為我們將并發控制委托給TaskQueue對象,我們唯一要做的就是檢查所有任務是否完成。看上述代碼中如何定義我們的任務:
我們通過提供自定義回調來運行spider()函數。
在回調中,我們檢查與spiderLinks()函數執行相關的所有任務是否完成。當這個條件為真時,我們調用spiderLinks()函數的最后回調。
在我們的任務結束時,我們調用了done()回調,以便隊列可以繼續執行。
在我們進行這些小的變化之后,我們現在可以嘗試再次運行Web爬蟲應用程序。這一次,我們應該注意到,同時不會有兩個以上的下載。
async庫如果我們到目前為止我們分析的每一個控制流程模式看一下,我們可以看到它們可以用作構建可重用和更通用的解決方案的基礎。例如,我們可以將無限制的并行執行算法包裝到一個接受任務列表的函數中,并行運行它們,并且當它們都完成時調用給定的回調函數。將控制流算法轉化為可重用功能的這種方式可以導致更具聲明性和表達性的方式來定義異步控制流,這正是async所做的。async庫是一個非常流行的解決方案,在Node.js和JavaScript中來說,用于處理異步代碼。它提供了一組功能,可以大大簡化不同配置中一組任務的執行,并為異步處理集合提供了有用的幫助。即使有其他幾個具有相似目標的庫,由于它的受歡迎程度,因此async是Node.js中的一個事實上的標準。
順序執行async庫可以在實現復雜的異步控制流程時大大幫助我們,但是一個難題就是選擇正確的庫來解決問題。例如,對于順序執行,有大約20個不同的函數可供選擇,包括eachSeries(), mapSeries(), filterSeries(), rejectSeries(), reduce(), reduceRight(), detectSeries(), concatSeries(), series(), whilst(), doWhilst(), until(), doUntil(), forever(), waterfall(), compose(), seq(), applyEachSeries(), iterator(), 和timesSeries()
。
選擇正確的函數是編寫更穩固和可讀的代碼的重要一步,但這也需要一些經驗和實踐。在我們的例子中,我們將僅介紹其中的一些情況,但它們仍將為理解和有效地使用庫的其余部分提供堅實的基礎。
下面,通過例子說明async庫如何工作,我們將用于我們的Web爬蟲應用程序。我們直接從版本2開始,按順序遞歸地下載所有的鏈接。
但是,首先我們確保將async庫安裝到我們當前的項目中:
npm install async
然后我們需要從spider.js模塊加載新的依賴項:
const async = require("async");已知一組任務的順序執行
我們先修改download()函數。如下所示,它依次做了以下三件事:
下載URL的內容。
創建一個新目錄(如果尚不存在)。
將URL的內容保存到文件中。
async.series()可以實現順序執行一組任務:
async.series(tasks, [callback])
async.series()接受一個任務列表和一個在所有任務完成后調用的回調函數作為參數。每個任務只是一個接受回調函數的函數,當任務完成執行時,這個回調函數被調用:
function task(callback) {}
async的優勢是它使用與Node.js相同的回調約定,它會自動處理錯誤傳播。所以,如果任何一個任務調用它的回調并且產生了一個錯誤,async將跳過列表中剩余的任務,直接跳轉到最后的回調。
考慮到這一點,讓我們看看如何通過使用async來修改上述的download()函數:
function download(url, filename, callback) { console.log(`Downloading ${url}`); let body; async.series([ callback => { request(url, (err, response, resBody) => { if (err) { return callback(err); } body = resBody; callback(); }); }, mkdirp.bind(null, path.dirname(filename)), callback => { fs.writeFile(filename, body, callback); } ], err => { if (err) { return callback(err); } console.log(`Downloaded and saved: ${url}`); callback(null, body); }); }
對比起這段代碼的回調地獄版本,使用async方式使我們能夠更好地組織我們的異步任務。并且不會嵌套回調,因為我們只需要提供一個的任務列表,通常對于用于每個異步操作,然后異步任務將依次執行:
首先是下載URL的內容。我們將響應體保存到一個閉包變量(body)中,以便它可以與其他任務共享。
創建并保存下載的頁面的目錄。我們通過執行mkdirp()函數實現,并和創建的目錄路徑綁定。這樣,我們可以節省幾行代碼并增加其可讀性。
最后,我們將下載的URL的內容寫入文件。在這種情況下,我們無法執行部分應用程序(就像我們在第二個任務中所做的那樣),因為變量body只在系列中的下載任務完成后才可用。但是,通過將任務的回調直接傳遞到fs.writeFile()函數,我們仍然可以通過利用異步的自動錯誤管理來保存一些代碼行。
4.完成所有任務后,將調用async.series()的最后回調。在我們的例子中,我們只是做一些錯誤管理,然后返回body變量來回調download()函數。
對于上述情況,async.series()的一個可替代的方法是async.waterfall(),它仍然按順序執行任務,但另外還提供每個任務的輸出作為下一個輸入。在我們的情況下,我們可以使用這個特征來傳播body變量直到序列結束。
順序迭代在前面講了如何按順序執行一組任務。上面的例子async.series()來做到這一點。可以使用相同的功能來實現Web爬蟲版本2的spiderLinks()函數。然而,async為特定的情況提供了一個更合適的API,遍歷一個集合,這個API是async.eachSeries()。我們來使用它來重新實現我們的spiderLinks()函數(版本2,串行下載),如下所示:
function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); if (links.length === 0) { return process.nextTick(callback); } async.eachSeries(links, (link, callback) => { spider(link, nesting - 1, callback); }, callback); }
如果我們將使用async的上述代碼與使用純JavaScript模式實現的相同功能的代碼進行比較,我們將注意到async在代碼組織和可讀性方面給我們帶來的巨大優勢。
并行執行async不具有處理并行流的功能,其中可以找到each(),map(),filter(),reject(),detect(),some(),every(),concat(),parallel(),applyEach()和times()。它們遵循與我們已經看到的用于順序執行的功能相同的邏輯,區別在于所提供的任務是并行執行的。
為了證明這一點,我們可以嘗試應用上述功能之一來實現我們的Web爬蟲應用程序的第三版,即使用無限制的并行流程來執行下載。
如果我們記住我們之前使用的代碼來實現spiderLinks()函數的順序版本,那么調整它使其并行工作就比較簡單:
function spiderLinks(currentUrl, body, nesting, callback) { // ... async.each(links, (link, callback) => { spider(link, nesting - 1, callback); }, callback); }
這個函數與我們用于順序下載的功能完全相同,但是使用的是async.each()而非async.eachSeries()。這清楚地表明了使用庫(例如async)抽象異步流的功能。代碼不再綁定到特定的執行流程了,沒有專門為此寫的代碼。大多數只是應用邏輯。
限制并行執行如果你想知道async還可以用來限制并行任務的并發性,答案是肯定的。我們有一些我們可以使用的函數,即eachLimit(),mapLimit(),parallelLimit(),queue()和cargo()。
我們試圖利用其中的一個來實現Web爬蟲應用程序的第4版,以有限的并發性并行執行鏈接的下載。幸運的是,async有async.queue(),它的工作方式與本章前面創建的TaskQueue類似。 async.queue()函數創建一個新的隊列,它使用一個worker()函數來執行一組具有指定并發限制的任務:
const q = async.queue(worker, concurrency);
worker()函數作為輸入接收要運行的任務和一個回調函數作為參數,當任務完成時執行回調:
function worker(task, callback);
我們應該注意到在這個例子中 task 可以是任何類型,而不僅僅只能是函數。實際上, worker有責任以最適當的方式處理任務。新建任務,可以通過q.push(task, callback)將任務添加到隊列中。一個任務處理完后,關聯一個任務的回調函數必須被worker調用。
現在,我們再次修改我們的代碼實現一個全面并行的有并發限制的執行流,利用async.queue(),首先,我們需要創建一個隊列:
const downloadQueue = async.queue((taskData, callback) => { spider(taskData.link, taskData.nesting - 1, callback); }, 2);
代碼很簡單。我們正在創建一個并發限制為2的新隊列,讓一個工作人員只需使用與任務關聯的數據調用我們的spider()函數。接下來,我們實現spiderLinks()函數:
function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); if (links.length === 0) { return process.nextTick(callback); } const completed = 0, hasErrors = false; links.forEach(function(link) { const taskData = { link: link, nesting: nesting }; downloadQueue.push(taskData, err => { if (err) { hasErrors = true; return callback(err); } if (++completed === links.length && !hasErrors) { callback(); } }); }); }
前面的代碼應該看起來非常熟悉,因為它幾乎和使用TaskQueue對象來實現相同流程的代碼相同。此外,在這種情況下,要分析的重要部分是將新任務推入隊列的位置。在這一點上,我們確保我們傳遞一個回調,使我們能夠檢查當前頁面的所有下載任務是否完成,并最終調用最終回調。
辛虧有async.queue(),我們可以輕松地復制我們的TaskQueue對象的功能,再次證明了通過async,我們可以避免從頭開始編寫異步控制流模式,減少我們的工作量,代碼量更加簡潔。
總結在本章開始的時候,我們說Node.js的編程可能很難因為它的異步性,特別是對于以前在其他平臺上開發的人而言。然而,在本章中,我們展示了異步API如何可以從簡單原生JavaScript開始,從而為我們分析更復雜的技術奠定了基礎。然后我們看到,除了為每一種口味提供編程風格,我們所掌握的工具確實是多樣化的,并為我們大部分的問題提供了很好的解決方案。例如,我們可以選擇async庫來簡化最常見的流程。
還有更為先進的技術,如Promise和Generator函數,這將是下一章的重點。當了解所有這些技術時,能夠根據需求選擇最佳解決方案,或者在同一個項目中使用多種技術。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/89979.html
摘要:以下展示它是如何工作的函數使用構造函數創建一個新的對象,并立即將其返回給調用者。在傳遞給構造函數的函數中,我們確保傳遞給,這是一個特殊的回調函數。 本系列文章為《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版鏈接。 歡迎關注我的專欄,之后的博文將在專欄同步: Encounter的掘金專欄 知乎專欄...
摘要:中的流十分強大,它對處理潛在的大文件提供了支持,也抽象了一些場景下的數據處理和傳遞。本文將會提供兩個在編寫基于流的工具時,私以為有些用的兩個。 Node.js中的流十分強大,它對處理潛在的大文件提供了支持,也抽象了一些場景下的數據處理和傳遞。正因為它如此好用,所以在實戰中我們常常基于它來編寫一些工具 函數/庫 ,但往往又由于自己對流的某些特性的疏忽,導致寫出的 函數/庫 在一些情況會達...
摘要:階段是事件循環的第一階段習慣上往往都會設置數將回調函數添加到事件循環的階段的隊列中等待執行。 后端知識點總結——NODE.JS(高級) 1.Node入門: 什么是: 針對網絡應用開發的平臺主要特征: 基于Google的JavaScript運行時引擎V8 擴展了Node標準類庫: TCP,同步或異步文件管理,HTTP 為什么使用Node: 可以在服務器端運行js: 現有前端團隊可直...
摘要:在這樣的程序中,異步編程通常是有幫助的。最初是為了使異步編程簡單方便而設計的。在年設計時,人們已經在瀏覽器中進行基于回調的編程,所以該語言的社區用于異步編程風格。 來源:ApacheCN『JavaScript 編程精解 中文第三版』翻譯項目原文:Node.js 譯者:飛龍 協議:CC BY-NC-SA 4.0 自豪地采用谷歌翻譯 部分參考了《JavaScript 編程精解(第 2 版)...
摘要:基礎的端到端的基準測試顯示大約比快八倍。所謂單線程,就是指一次只能完成一件任務。在服務器端,異步模式甚至是唯一的模式,因為執行環境是單線程的,如果允許同步執行所有請求,服務器性能會急劇下降,很快就會失去響應。 模塊 Node.js 提供了exports 和 require 兩個對象,其中 exports 是模塊公開的接口,require 用于從外部獲取一個模塊的接口,即所獲取模塊的 e...
閱讀 2953·2023-04-26 01:32
閱讀 1548·2021-09-13 10:37
閱讀 2286·2019-08-30 15:56
閱讀 1677·2019-08-30 14:00
閱讀 3052·2019-08-30 12:44
閱讀 1967·2019-08-26 12:20
閱讀 1066·2019-08-23 16:29
閱讀 3233·2019-08-23 14:44