摘要:這使我們的知道什么時候原始模塊被初始化,在初始化后執(zhí)行預(yù)初始化隊列的操作,之后清空預(yù)初始化隊列,再調(diào)用作為參數(shù)的回調(diào)函數(shù),以下為具體步驟把賦值給,表示預(yù)初始化已經(jīng)完成了。
本系列文章為《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版鏈接。
歡迎關(guān)注我的專欄,之后的博文將在專欄同步:
Encounter的掘金專欄
知乎專欄 Encounter的編程思考
segmentfault專欄 前端小站
Advanced Asynchronous Recipes幾乎所有我們迄今為止看到的設(shè)計模式都可以被認為是通用的,并且適用于應(yīng)用程序的許多不同的領(lǐng)域。但是,有一套更具體的模式,專注于解決明確的問題。我們可以調(diào)用這些模式。就像現(xiàn)實生活中的烹飪一樣,我們有一套明確的步驟來實現(xiàn)預(yù)期的結(jié)果。當然,這并不意味著我們不能用一些創(chuàng)意來定制設(shè)計模式,以配合我們的客人的口味,對于書寫Node.js程序來說是必要的。在本章中,我們將提供一些常見的解決方案來解決我們在日常Node.js開發(fā)中遇到的一些具體問題。這些模式包括以下內(nèi)容:
異步引入模塊并初始化
在高并發(fā)的應(yīng)用程序中使用批處理和緩存異步操作的性能優(yōu)化
運行與Node.js處理并發(fā)請求的能力相悖的阻塞事件循環(huán)的同步CPU綁定操作
異步引入模塊并初始化在Chapter2-Node.js Essential Patterns中,當我們討論Node.js模塊系統(tǒng)的基本屬性時,我們提到了require()是同步的,并且module.exports也不能異步設(shè)置。
這是在核心模塊和許多npm包中存在同步API的主要原因之一,是否同步加載會被作為一個option參數(shù)被提供,主要用于初始化任務(wù),而不是替代異步API。
不幸的是,這并不總是可能的。同步API可能并不總是可用的,特別是對于在初始化階段使用網(wǎng)絡(luò)的組件,例如執(zhí)行三次握手協(xié)議或在網(wǎng)絡(luò)中檢索配置參數(shù)。 許多數(shù)據(jù)庫驅(qū)動程序和消息隊列等中間件系統(tǒng)的客戶端都是如此。
廣泛適用的解決方案我們舉一個例子:一個名為db的模塊,它將會連接到遠程數(shù)據(jù)庫。 只有在連接和與服務(wù)器的握手完成之后,db模塊才能夠接受請求。在這種情況下,我們通常有兩種選擇:
在開始使用之前確保模塊已經(jīng)初始化,否則則等待其初始化。每當我們想要在異步模塊上調(diào)用一個操作時,都必須完成這個過程:
const db = require("aDb"); //The async module module.exports = function findAll(type, callback) { if (db.connected) { //is it initialized? runFind(); } else { db.once("connected", runFind); } function runFind() { db.findAll(type, callback); }; };
使用依賴注入(Dependency Injection)而不是直接引入異步模塊。通過這樣做,我們可以延遲一些模塊的初始化,直到它們的異步依賴被完全初始化。 這種技術(shù)將管理模塊初始化的復(fù)雜性轉(zhuǎn)移到另一個組件,通常是它的父模塊。 在下面的例子中,這個組件是app.js:
// 模塊app.js const db = require("aDb"); // aDb是一個異步模塊 const findAllFactory = require("./findAll"); db.on("connected", function() { const findAll = findAllFactory(db); // 之后再執(zhí)行異步操作 }); // 模塊findAll.js module.exports = db => { //db 在這里被初始化 return function findAll(type, callback) { db.findAll(type, callback); } }
我們可以看出,如果所涉及的異步依賴的數(shù)量過多,第一種方案便不太適用了。
另外,使用DI有時也是不理想的,正如我們在Chapter7-Wiring Modules中看到的那樣。在大型項目中,它可能很快變得過于復(fù)雜,尤其對于手動完成并使用異步初始化模塊的情況下。如果我們使用一個設(shè)計用于支持異步初始化模塊的DI容器,這些問題將會得到緩解。
但是,我們將會看到,還有第三種方案可以讓我們輕松地將模塊從其依賴關(guān)系的初始化狀態(tài)中分離出來。
預(yù)初始化隊列將模塊與依賴項的初始化狀態(tài)分離的簡單模式涉及到使用隊列和命令模式。這個想法是保存一個模塊在尚未初始化的時候接收到的所有操作,然后在所有初始化步驟完成后立即執(zhí)行這些操作。
實現(xiàn)一個異步初始化的模塊為了演示這個簡單而有效的技術(shù),我們來構(gòu)建一個應(yīng)用程序。首先創(chuàng)建一個名為asyncModule.js的異步初始化模塊:
const asyncModule = module.exports; asyncModule.initialized = false; asyncModule.initialize = callback => { setTimeout(() => { asyncModule.initialized = true; callback(); }, 10000); }; asyncModule.tellMeSomething = callback => { process.nextTick(() => { if(!asyncModule.initialized) { return callback( new Error("I don"t have anything to say right now") ); } callback(null, "Current time is: " + new Date()); }); };
在上面的代碼中,asyncModule展現(xiàn)了一個異步初始化模塊的設(shè)計模式。 它有一個initialize()方法,在10秒的延遲后,將初始化的flag變量設(shè)置為true,并通知它的回調(diào)調(diào)用(10秒對于真實應(yīng)用程序來說是很長的一段時間了,但是對于具有互斥條件的應(yīng)用來說可能會顯得力不從心)。
另一個方法tellMeSomething()返回當前的時間,但是如果模塊還沒有初始化,它拋出產(chǎn)生一個異常。
下一步是根據(jù)我們剛剛創(chuàng)建的服務(wù)創(chuàng)建另一個模塊。 我們設(shè)計一個簡單的HTTP請求處理程序,在一個名為routes.js的文件中實現(xiàn):
const asyncModule = require("./asyncModule"); module.exports.say = (req, res) => { asyncModule.tellMeSomething((err, something) => { if(err) { res.writeHead(500); return res.end("Error:" + err.message); } res.writeHead(200); res.end("I say: " + something); }); };
在handler中調(diào)用asyncModule的tellMeSomething()方法,然后將其結(jié)果寫入HTTP響應(yīng)中。 正如我們所看到的那樣,我們沒有對asyncModule的初始化狀態(tài)進行任何檢查,這可能會導(dǎo)致問題。
現(xiàn)在,創(chuàng)建app.js模塊,使用核心http模塊創(chuàng)建一個非?;镜?b>HTTP服務(wù)器:
const http = require("http"); const routes = require("./routes"); const asyncModule = require("./asyncModule"); asyncModule.initialize(() => { console.log("Async module initialized"); }); http.createServer((req, res) => { if (req.method === "GET" && req.url === "/say") { return routes.say(req, res); } res.writeHead(404); res.end("Not found"); }).listen(8000, () => console.log("Started"));
上述模塊是我們應(yīng)用程序的入口點,它所做的只是觸發(fā)asyncModule的初始化并創(chuàng)建一個HTTP服務(wù)器,它使用我們以前創(chuàng)建的handler(routes.say())來對網(wǎng)絡(luò)請求作出相應(yīng)。
我們現(xiàn)在可以像往常一樣通過執(zhí)行app.js模塊來嘗試啟動我們的服務(wù)器。
在服務(wù)器啟動后,我們可以嘗試使用瀏覽器訪問URL:http://localhost:8000/并查看從asyncModule返回的內(nèi)容。
和預(yù)期的一樣,如果我們在服務(wù)器啟動后立即發(fā)送請求,結(jié)果將是一個錯誤,如下所示:
Error:I don"t have anything to say right now
顯然,在異步模塊加載好了之后:
這意味著asyncModule尚未初始化,但我們?nèi)試L試使用它,則會拋出一個錯誤。
根據(jù)異步初始化模塊的實現(xiàn)細節(jié),幸運的情況是我們可能會收到一個錯誤,乃至丟失重要的信息,崩潰整個應(yīng)用程序。 總的來說,我們剛剛描述的情況總是必須要避免的。
大多數(shù)時候,可能并不會出現(xiàn)上述問題,畢竟初始化一般來說很快,以至于在實踐中,它永遠不會發(fā)生。 然而,對于設(shè)計用于自動調(diào)節(jié)的高負載應(yīng)用和云服務(wù)器,情況就完全不同了。
用預(yù)初始化隊列包裝模塊為了維護服務(wù)器的健壯性,我們現(xiàn)在要通過使用我們在本節(jié)開頭描述的模式來進行異步模塊加載。我們將在asyncModule尚未初始化的這段時間內(nèi)對所有調(diào)用的操作推入一個預(yù)初始化隊列,然后在異步模塊加載好后處理它們時立即刷新隊列。這就是狀態(tài)模式的一個很好的應(yīng)用!我們將需要兩個狀態(tài),一個在模塊尚未初始化的時候?qū)⑺胁僮髋抨?,另一個在初始化完成時將每個方法簡單地委托給原始的asyncModule模塊。
通常,我們沒有機會修改異步模塊的代碼;所以,為了添加我們的排隊層,我們需要圍繞原始的asyncModule模塊創(chuàng)建一個代理。
接下來創(chuàng)建一個名為asyncModuleWrapper.js的新文件,讓我們依照每個步驟逐個構(gòu)建它。我們需要做的第一件事是創(chuàng)建一個代理,并將原始異步模塊的操作委托給這個代理:
const asyncModule = require("./asyncModule"); const asyncModuleWrapper = module.exports; asyncModuleWrapper.initialized = false; asyncModuleWrapper.initialize = () => { activeState.initialize.apply(activeState, arguments); }; asyncModuleWrapper.tellMeSomething = () => { activeState.tellMeSomething.apply(activeState, arguments); };
在前面的代碼中,asyncModuleWrapper將其每個方法簡單地委托給activeState。 讓我們來看看這兩個狀態(tài)是什么樣子
從notInitializedState開始,notInitializedState是指還沒初始化的狀態(tài):
// 當模塊沒有被初始化時的狀態(tài) let pending = []; let notInitializedState = { initialize: function(callback) { asyncModule.initialize(function() { asyncModuleWrapper.initalized = true; activeState = initializedState; pending.forEach(function(req) { asyncModule[req.method].apply(null, req.args); }); pending = []; callback(); }); }, tellMeSomething: function(callback) { return pending.push({ method: "tellMeSomething", args: arguments }); } };
當initialize()方法被調(diào)用時,我們觸發(fā)初始化asyncModule模塊,提供一個回調(diào)函數(shù)作為參數(shù)。 這使我們的asyncModuleWrapper知道什么時候原始模塊被初始化,在初始化后執(zhí)行預(yù)初始化隊列的操作,之后清空預(yù)初始化隊列,再調(diào)用作為參數(shù)的回調(diào)函數(shù),以下為具體步驟:
把initializedState賦值給activeState,表示預(yù)初始化已經(jīng)完成了。
執(zhí)行先前存儲在待處理隊列中的所有命令。
調(diào)用原始回調(diào)。
由于此時的模塊尚未初始化,此狀態(tài)的tellMeSomething()方法僅創(chuàng)建一個新的Command對象,并將其添加到預(yù)初始化隊列中。
此時,當原始的asyncModule模塊尚未初始化時,代理應(yīng)該已經(jīng)清楚,我們的代理將簡單地把所有接收到的請求防到預(yù)初始化隊列中。 然后,當我們被通知初始化完成時,我們執(zhí)行所有預(yù)初始化隊列的操作,然后將內(nèi)部狀態(tài)切換到initializedState。來看這個代理模塊最后的定義:
let initializedState = asyncModule;
不出意外,initializedState對象只是對原始的asyncModule的引用!事實上,初始化完成后,我們可以安全地將任何請求直接發(fā)送到原始模塊。
最后,設(shè)定異步模塊還沒加載好的的狀態(tài),即notInitializedState
let activeState = notInitializedState;
我們現(xiàn)在可以嘗試再次啟動我們的測試服務(wù)器,但首先,我們不要忘記用我們新的asyncModuleWrapper對象替換原始的asyncModule模塊的引用; 這必須在app.js和routes.js模塊中完成。
這樣做之后,如果我們試圖再次向服務(wù)器發(fā)送一個請求,我們會看到在asyncModule模塊尚未初始化的時候,請求不會失敗; 相反,他們會掛起,直到初始化完成,然后才會被實際執(zhí)行。我們當然可以肯定,比起之前,容錯率變得更高了。
可以看到,在剛剛初始化異步模塊的時候,服務(wù)器會等待請求的響應(yīng):
在異步模塊加載完成后,服務(wù)器才會返回響應(yīng)的信息:
模式:如果模塊是需要異步初始化的,則對每個操作進行排隊,直到模塊完全初始化釋放隊列。
現(xiàn)在,我們的服務(wù)器可以在啟動后立即開始接受請求,并保證這些請求都不會由于其模塊的初始化狀態(tài)而失敗。我們能夠在不使用DI的情況下獲得這個結(jié)果,也不需要冗長且容易出錯的檢查來驗證異步模塊的狀態(tài)。
其它場景的應(yīng)用我們剛剛介紹的模式被許多數(shù)據(jù)庫驅(qū)動程序和ORM庫所使用。 最值得注意的是Mongoose,它是MongoDB的ORM。使用Mongoose,不必等待數(shù)據(jù)庫連接打開,以便能夠發(fā)送查詢,因為每個操作都排隊,稍后與數(shù)據(jù)庫的連接完全建立時執(zhí)行。 這顯然提高了其API的可用性。
看一下Mongoose的源碼,它的每個方法是如何通過代理添加預(yù)初始化隊列。 可以看看實現(xiàn)這中模式的代碼片段:https://github.com/Automattic...
for (var i in Collection.prototype) { (function(i){ NativeCollection.prototype[i] = function () { if (this.buffer) { // mongoose中,在緩沖區(qū)不為空時,只是簡單地把這個操作加入緩沖區(qū)內(nèi) this.addQueue(i, arguments); return; } var collection = this.collection , args = arguments , self = this , debug = self.conn.base.options.debug; if (debug) { if ("function" === typeof debug) { debug.apply(debug , [self.name, i].concat(utils.args(args, 0, args.length-1))); } else { console.error("x1B[0;36mMongoose:x1B[0m %s.%s(%s) %s %s %s" , self.name , i , print(args[0]) , print(args[1]) , print(args[2]) , print(args[3])) } } return collection[i].apply(collection, args); }; })(i); }異步批處理和緩存
在高負載的應(yīng)用程序中,緩存起著至關(guān)重要的作用,幾乎在網(wǎng)絡(luò)中的任何地方,從網(wǎng)頁,圖像和樣式表等靜態(tài)資源到純數(shù)據(jù)(如數(shù)據(jù)庫查詢的結(jié)果)都會使用緩存。 在本節(jié)中,我們將學(xué)習(xí)如何將緩存應(yīng)用于異步操作,以及如何充分利用緩存解決高請求吞吐量的問題。
實現(xiàn)沒有緩存或批處理的服務(wù)器在這之前,我們來實現(xiàn)一個小型的服務(wù)器,以便用它來衡量緩存和批處理等技術(shù)在解決高負載應(yīng)用程序的優(yōu)勢。
讓我們考慮一個管理電子商務(wù)公司銷售的web服務(wù)器,特別是對于查詢我們的服務(wù)器所有特定類型的商品交易的總和的情況。 為此,考慮到LevelUP的簡單性和靈活性,我們將再次使用LevelUP。我們要使用的數(shù)據(jù)模型是存儲在sales這一個sublevel中的簡單事務(wù)列表,它是以下的形式:
transactionId {amount, item}
key由transactionId表示,value則是一個JSON對象,它包含amount,表示銷售金額和item,表示項目類型。
要處理的數(shù)據(jù)是非?;镜模宰屛覀兞⒓丛诿麨榈?b>totalSales.js文件中實現(xiàn)API,將如下所示:
const level = require("level"); const sublevel = require("level-sublevel"); const db = sublevel(level("example-db", {valueEncoding: "json"})); const salesDb = db.sublevel("sales"); module.exports = function totalSales(item, callback) { console.log("totalSales() invoked"); let sum = 0; salesDb.createValueStream() // [1] .on("data", data => { if(!item || data.item === item) { // [2] sum += data.amount; } }) .on("end", () => { callback(null, sum); // [3] }); };
該模塊的核心是totalSales函數(shù),它也是唯一exports的API;它進行如下工作:
我們從包含交易信息的salesDb的sublevel創(chuàng)建一個Stream。Stream將從數(shù)據(jù)庫中提取所有條目。
監(jiān)聽data事件,這個事件觸發(fā)時,將從數(shù)據(jù)庫Stream中提取出每一項,如果這一項的item參數(shù)正是我們需要的item,就去累加它的amount到總的sum里面。
最后,end事件觸發(fā)時,我們最終調(diào)用callback()方法。
上述查詢方式可能在性能方面并不好。理想情況下,在實際的應(yīng)用程序中,我們可以使用索引,甚至使用增量映射來縮短實時計算的時間;但是,由于我們需要體現(xiàn)緩存的優(yōu)勢,對于上述例子來說,慢速的查詢實際上更好,因為它會突出顯示我們要分析的模式的優(yōu)點。
為了完成總銷售應(yīng)用程序,我們只需要從HTTP服務(wù)器公開totalSales的API;所以,下一步是構(gòu)建一個(app.js文件):
const http = require("http"); const url = require("url"); const totalSales = require("./totalSales"); http.createServer((req, res) => { const query = url.parse(req.url, true).query; totalSales(query.item, (err, sum) => { res.writeHead(200); res.end(`Total sales for item ${query.item} is ${sum}`); }); }).listen(8000, () => console.log("Started"));
我們創(chuàng)建的服務(wù)器是非常簡單的;我們只需要它暴露totalSales API。
在我們第一次啟動服務(wù)器之前,我們需要用一些示例數(shù)據(jù)填充數(shù)據(jù)庫;我們可以使用專用于本節(jié)的代碼示例中的populate_db.js腳本來執(zhí)行此操作。該腳本將在數(shù)據(jù)庫中創(chuàng)建100K個隨機銷售交易。
好的! 現(xiàn)在,一切都準備好了。 像往常一樣,啟動服務(wù)器,我們執(zhí)行以下命令:
node app
請求這個HTTP接口,訪問至以下URL:
http://localhost:8000/?item=book
但是,為了更好地了解服務(wù)器的性能,我們需要連續(xù)發(fā)送多個請求;所以,我們創(chuàng)建一個名為loadTest.js的腳本,它以200 ms的間隔發(fā)送請求。它已經(jīng)被配置為連接到服務(wù)器的URL,因此,要運行它,執(zhí)行以下命令:
node loadTest
我們會看到這20個請求需要一段時間才能完成。注意測試的總執(zhí)行時間,因為我們現(xiàn)在開始我們的服務(wù),并測量我們可以節(jié)省多少時間。
批量異步請求在處理異步操作時,最基本的緩存級別可以通過將一組調(diào)用集中到同一個API來實現(xiàn)。這非常簡單:如果我們在調(diào)用異步函數(shù)的同時在隊列中還有另一個尚未處理的回調(diào),我們可以將回調(diào)附加到已經(jīng)運行的操作上,而不是創(chuàng)建一個全新的請求。看下圖的情況:
前面的圖像顯示了兩個客戶端(它們可以是兩臺不同的機器,或兩個不同的Web請求),使用完全相同的輸入調(diào)用相同的異步操作。 當然,描述這種情況的自然方式是由兩個客戶開始兩個多帶帶的操作,這兩個操作將在兩個不同的時刻完成,如前圖所示?,F(xiàn)在考慮下一個場景,如下圖所示:
上圖向我們展示了如何對API的兩個請求進行批處理,或者換句話說,對兩個請求執(zhí)行到相同的操作。通過這樣做,當操作完成時,兩個客戶端將同時被通知。這代表了一種簡單而又非常強大的方式來降低應(yīng)用程序的負載,而不必處理更復(fù)雜的緩存機制,這通常需要適當?shù)膬?nèi)存管理和緩存失效策略。
在電子商務(wù)銷售的Web服務(wù)器中使用批處理現(xiàn)在讓我們在totalSales API上添加一個批處理層。我們要使用的模式非常簡單:如果在API被調(diào)用時已經(jīng)有另一個相同的請求掛起,我們將把這個回調(diào)添加到一個隊列中。當異步操作完成時,其隊列中的所有回調(diào)立即被調(diào)用。
現(xiàn)在,讓我們來改變之前的代碼:創(chuàng)建一個名為totalSalesBatch.js的新模塊。在這里,我們將在原始的totalSales API之上實現(xiàn)一個批處理層:
const totalSales = require("./totalSales"); const queues = {}; module.exports = function totalSalesBatch(item, callback) { if(queues[item]) { // [1] console.log("Batching operation"); return queues[item].push(callback); } queues[item] = [callback]; // [2] totalSales(item, (err, res) => { const queue = queues[item]; // [3] queues[item] = null; queue.forEach(cb => cb(err, res)); }); };
totalSalesBatch()函數(shù)是原始的totalSales() API的代理,它的工作原理如下:
如果請求的item已經(jīng)存在隊列中,則意味著該特定item的請求已經(jīng)在服務(wù)器任務(wù)隊列中。在這種情況下,我們所要做的只是將回調(diào)push到現(xiàn)有隊列,并立即從調(diào)用中返回。不進行后續(xù)操作。
如果請求的item沒有在隊列中,這意味著我們必須創(chuàng)建一個新的請求。為此,我們?yōu)樵撎囟?b>item的請求創(chuàng)建一個新隊列,并使用當前回調(diào)函數(shù)對其進行初始化。 接下來,我們調(diào)用原始的totalSales() API。
當原始的totalSales()請求完成時,則執(zhí)行我們的回調(diào)函數(shù),我們遍歷隊列中為該特定請求的item添加的所有回調(diào),并分別調(diào)用這些回調(diào)函數(shù)。
totalSalesBatch()函數(shù)的行為與原始的totalSales() API的行為相同,不同之處在于,現(xiàn)在對于相同內(nèi)容的請求API進行批處理,從而節(jié)省時間和資源。
想知道相比于totalSales() API原始的非批處理版本,在性能方面的優(yōu)勢是什么?然后,讓我們將HTTP服務(wù)器使用的totalSales模塊替換為我們剛剛創(chuàng)建的模塊,修改app.js文件如下:
//const totalSales = require("./totalSales"); const totalSales = require("./totalSalesBatch"); http.createServer(function(req, res) { // ... });
如果我們現(xiàn)在嘗試再次啟動服務(wù)器并進行負載測試,我們首先看到的是請求被批量返回。
除此之外,我們觀察到請求的總時間大大減少;它應(yīng)該至少比對原始totalSales() API執(zhí)行的原始測試快四倍!
這是一個驚人的結(jié)果,證明了只需應(yīng)用一個簡單的批處理層即可獲得巨大的性能提升,比起緩存機制,也沒有顯得太復(fù)雜,因為,無需考慮緩存淘汰策略。
批處理模式在高負載應(yīng)用程序和執(zhí)行較為緩慢的API中發(fā)揮巨大作用,正是由于這種模式的運用,可以批量處理大量的請求。異步請求緩存策略
異步批處理模式的問題之一是對于API的答復(fù)越快,我們對于批處理來說,其意義就越小。有人可能會爭辯說,如果一個API已經(jīng)很快了,那么試圖優(yōu)化它就沒有意義了。然而,它仍然是一個占用應(yīng)用程序的資源負載的因素,總結(jié)起來,仍然可以有解決方案。另外,如果API調(diào)用的結(jié)果不會經(jīng)常改變;因此,這時候批處理將并不會有較好的性能提升。在這種情況下,減少應(yīng)用程序負載并提高響應(yīng)速度的最佳方案肯定是更好的緩存模式。
緩存模式很簡單:一旦請求完成,我們將其結(jié)果存儲在緩存中,該緩存可以是變量,數(shù)據(jù)庫中的條目,也可以是專門的緩存服務(wù)器。因此,下一次調(diào)用API時,可以立即從緩存中檢索結(jié)果,而不是產(chǎn)生另一個請求。
對于一個有經(jīng)驗的開發(fā)人員來說,緩存不應(yīng)該是多么新的技術(shù),但是異步編程中這種模式的不同之處在于它應(yīng)該與批處理結(jié)合在一起,以達到最佳效果。原因是因為多個請求可能并發(fā)運行,而沒有設(shè)置緩存,并且當這些請求完成時,緩存將會被設(shè)置多次,這樣做則會造成緩存資源的浪費。
基于這些假設(shè),異步請求緩存模式的最終結(jié)構(gòu)如下圖所示:
上圖給出了異步緩存算法的兩個步驟:
與批處理模式完全相同,與在未設(shè)置高速緩存時接收到的任何請求將一起批處理。這些請求完成時,緩存將會被設(shè)置一次。
當緩存最終被設(shè)置時,任何后續(xù)的請求都將直接從緩存中提供。
另外我們需要考慮Zalgo的反作用(我們已經(jīng)在Chapter 2-Node.js Essential Patterns中看到了它的實際應(yīng)用)。在處理異步API時,我們必須確保始終以異步方式返回緩存的值,即使訪問緩存只涉及同步操作。
在電子商務(wù)銷售的Web服務(wù)器中使用異步緩存請求實踐異步緩存模式的優(yōu)點,現(xiàn)在讓我們將我們學(xué)到的東西應(yīng)用到totalSales() API。
與異步批處理示例程序一樣,我們創(chuàng)建一個代理,其作用是添加緩存層。
然后創(chuàng)建一個名為totalSalesCache.js的新模塊,代碼如下:
const totalSales = require("./totalSales"); const queues = {}; const cache = {}; module.exports = function totalSalesBatch(item, callback) { const cached = cache[item]; if (cached) { console.log("Cache hit"); return process.nextTick(callback.bind(null, null, cached)); } if (queues[item]) { console.log("Batching operation"); return queues[item].push(callback); } queues[item] = [callback]; totalSales(item, (err, res) => { if (!err) { cache[item] = res; setTimeout(() => { delete cache[item]; }, 30 * 1000); //30 seconds expiry } const queue = queues[item]; queues[item] = null; queue.forEach(cb => cb(err, res)); }); };
我們可以看到前面的代碼與我們異步批處理的很多地方基本相同。 其實唯一的區(qū)別是以下幾點:
我們需要做的第一件事就是檢查緩存是否被設(shè)置,如果是這種情況,我們將立即使用callback()返回緩存的值,這里必須要使用process.nextTick(),因為緩存可能是異步設(shè)定的,需要等到下一次事件輪詢時才能夠保證緩存已經(jīng)被設(shè)定。
繼續(xù)異步批處理模式,但是這次,當原始API成功完成時,我們將結(jié)果保存到緩存中。此外,我們還設(shè)置了一個緩存淘汰機制,在30秒后使緩存失效。 一個簡單而有效的技術(shù)!
現(xiàn)在,我們準備嘗試我們剛創(chuàng)建的totalSales模塊。 先更改app.js模塊,如下所示:
// const totalSales = require("./totalSales"); // const totalSales = require("./totalSalesBatch"); const totalSales = require("./totalSalesCache"); http.createServer(function(req, res) { // ... });
現(xiàn)在,重新啟動服務(wù)器,并使用loadTest.js腳本進行配置,就像我們在前面的例子中所做的那樣。使用默認的測試參數(shù),與簡單的異步批處理模式相比,很明顯地有了更好的性能提升。 當然,這很大程度上取決于很多因素;例如收到的請求數(shù)量,以及一個請求和另一個請求之間的延遲等。當請求數(shù)量較高且跨越較長時間時,使用高速緩存批處理的優(yōu)勢將更為顯著。
Memoization被稱做緩存函數(shù)調(diào)用的結(jié)果的算法。 在npm中,你可以找到許多包來實現(xiàn)異步的memoization,其中最著名的之一之一是memoizee。有關(guān)實現(xiàn)緩存機制的說明
我們必須記住,在實際應(yīng)用中,我們可能想要使用更先進的失效技術(shù)和存儲機制。 這可能是必要的,原因如下:
大量的緩存值可能會消耗大量內(nèi)存。 在這種情況下,可以應(yīng)用最近最少使用(LRU)算法來保持恒定的存儲器利用率。
當應(yīng)用程序分布在多個進程中時,對緩存使用簡單變量可能會導(dǎo)致每個服務(wù)器實例返回不同的結(jié)果。如果這對于我們正在實現(xiàn)的特定應(yīng)用程序來說是不希望的,那么解決方案就是使用共享存儲來存儲緩存。 常用的解決方案是Redis和Memcached。
與定時淘汰緩存相比,手動淘汰高速緩存可使得高速緩存使用壽命更長,同時提供更新的數(shù)據(jù),但當然,管理起緩存來要復(fù)雜得多。
使用Promise進行批處理和緩存在Chapter4-Asynchronous Control Flow Patterns with ES2015 and Beyond中,我們看到了Promise如何極大地簡化我們的異步代碼,但是在處理批處理和緩存時,它則可以提供更大的幫助。
利用Promise進行異步批處理和緩存策略,有如下兩個優(yōu)點:
多個then()監(jiān)聽器可以附加到相同的Promise實例。
then()監(jiān)聽器最多保證被調(diào)用一次,即使在Promise已經(jīng)被resolve了之后,then()也能正常工作。 此外,then()總是會被保證其是異步調(diào)用的。
簡而言之,第一個優(yōu)點正是批處理請求所需要的,而第二個優(yōu)點則在Promise已經(jīng)是解析值的緩存時,也會提供同樣的的異步返回緩存值的機制。
下面開始看代碼,我們可以嘗試使用Promises為totalSales()創(chuàng)建一個模塊,在其中添加批處理和緩存功能。創(chuàng)建一個名為totalSalesPromises.js的新模塊:
const pify = require("pify"); // [1] const totalSales = pify(require("./totalSales")); const cache = {}; module.exports = function totalSalesPromises(item) { if (cache[item]) { // [2] return cache[item]; } cache[item] = totalSales(item) // [3] .then(res => { // [4] setTimeout(() => {delete cache[item]}, 30 * 1000); //30 seconds expiry return res; }) .catch(err => { // [5] delete cache[item]; throw err; }); return cache[item]; // [6] };
Promise確實很好,下面是上述函數(shù)的功能描述:
首先,我們需要一個名為pify的模塊,它允許我們對totalSales()模塊進行promisification。這樣做之后,totalSales()將返回一個符合ES2015標準的Promise實例,而不是接受一個回調(diào)函數(shù)作為參數(shù)。
當調(diào)用totalSalesPromises()時,我們檢查給定的項目類型是否已經(jīng)在緩存中有相應(yīng)的Promise。如果我們已經(jīng)有了這樣的Promise,我們直接返回這個Promise實例。
如果我們在緩存中沒有針對給定項目類型的Promise,我們繼續(xù)通過調(diào)用原始(promisified)的totalSales()來創(chuàng)建一個Promise實例。
當Promise正常resolve了,我們設(shè)置了一個清除緩存的時間(假設(shè)為30秒),我們返回res將操作的結(jié)果返回給應(yīng)用程序。
如果Promise被異常reject了,我們立即重置緩存,并再次拋出錯誤,將其傳播到Promise chain中,所以任何附加到相同Promise的其他應(yīng)用程序也將收到這一異常。
最后,我們返回我們剛才創(chuàng)建或者緩存的Promise實例。
非常簡單直觀,更重要的是,我們使用Promise也能夠?qū)崿F(xiàn)批處理和緩存。
如果我們現(xiàn)在要嘗試使用totalSalesPromise()函數(shù),稍微調(diào)整app.js模塊,因為現(xiàn)在使用Promise而不是回調(diào)函數(shù)。 讓我們通過創(chuàng)建一個名為appPromises.js的app模塊來實現(xiàn):
const http = require("http"); const url = require("url"); const totalSales = require("./totalSalesPromises"); http.createServer(function(req, res) { const query = url.parse(req.url, true).query; totalSales(query.item).then(function(sum) { res.writeHead(200); res.end(`Total sales for item ${query.item} is ${sum}`); }); }).listen(8000, function() {console.log("Started")});
它的實現(xiàn)與原始應(yīng)用程序模塊幾乎完全相同,不同的是現(xiàn)在我們使用的是基于Promise的批處理/緩存封裝版本; 因此,我們調(diào)用它的方式也略有不同。
運行以下命令開啟這個新版本的服務(wù)器:
node appPromises運行與CPU-bound的任務(wù)
雖然上面的totalSales()在系統(tǒng)資源上面消耗較大,但是其也不會影響服務(wù)器處理并發(fā)的能力。 我們在Chapter1-Welcome to the Node.js Platform中了解到有關(guān)事件循環(huán)的內(nèi)容,應(yīng)該為此行為提供解釋:調(diào)用異步操作會導(dǎo)致堆棧退回到事件循環(huán),從而使其免于處理其他請求。
但是,當我們運行一個長時間的同步任務(wù)時,會發(fā)生什么情況,從不會將控制權(quán)交還給事件循環(huán)?
這種任務(wù)也被稱為CPU-bound,因為它的主要特點是CPU利用率較高,而不是I/O操作繁重。
讓我們立即舉一個例子上看看這些類型的任務(wù)在Node.js中的具體行為。
現(xiàn)在讓我們做一個CPU占用比較高的高計算量的實驗。下面來看的是子集總和問題,我們計算一個數(shù)組中是否具有一個子數(shù)組,其總和為0。例如,如果我們有數(shù)組[1, 2, -4, 5, -3]作為輸入,則滿足問題的子數(shù)組是[1, 2, -3]和[2, -4, 5, -3]。
最簡單的算法是把每一個數(shù)組元素做遍歷然后依次計算,時間復(fù)雜度為O(2^n),或者換句話說,它隨著輸入的數(shù)組長度成指數(shù)增長。這意味著一組20個整數(shù)則會有多達1, 048, 576中情況,顯然不能夠通過窮舉來做到。當然,這個問題的解決方案可能并不算復(fù)雜。為了使事情變得更加困難,我們將考慮數(shù)組和問題的以下變化:給定一組整數(shù),我們要計算所有可能的組合,其總和等于給定的任意整數(shù)。
const EventEmitter = require("events").EventEmitter; class SubsetSum extends EventEmitter { constructor(sum, set) { super(); this.sum = sum; this.set = set; this.totalSubsets = 0; } //... }
SubsetSum類是EventEmitter類的子類;這使得我們每次找到一個匹配收到的總和作為輸入的新子集時都會發(fā)出一個事件。 我們將會看到,這會給我們很大的靈活性。
接下來,讓我們看看我們?nèi)绾文軌蛏伤锌赡艿淖蛹M合:
開始構(gòu)建一個這樣的算法。創(chuàng)建一個名為subsetSum.js的新模塊。在其中聲明一個SubsetSum類:
_combine(set, subset) { for(let i = 0; i < set.length; i++) { let newSubset = subset.concat(set[i]); this._combine(set.slice(i + 1), newSubset); this._processSubset(newSubset); } }
不管算法其中到底是什么內(nèi)容,但有兩點要注意:
_combine()方法是完全同步的;它遞歸地生成每一個可能的子集,而不把CPU控制權(quán)交還給事件循環(huán)。如果我們考慮一下,這對于不需要任何I/O的算法來說是非常正常的。
每當生成一個新的組合時,我們都會將這個組合提供給_processSubset()方法以供進一步處理。
_processSubset()方法負責(zé)驗證給定子集的元素總和是否等于我們要查找的數(shù)字:
_processSubset(subset) { console.log("Subset", ++this.totalSubsets, subset); const res = subset.reduce((prev, item) => (prev + item), 0); if (res == this.sum) { this.emit("match", subset); } }
簡單地說,_processSubset()方法將reduce操作應(yīng)用于子集,以便計算其元素的總和。然后,當結(jié)果總和等于給定的sum參數(shù)時,會發(fā)出一個match事件。
最后,調(diào)用start()方法開始執(zhí)行算法:
start() { this._combine(this.set, []); this.emit("end"); }
通過調(diào)用_combine()觸發(fā)算法,最后觸發(fā)一個end事件,表明所有的組合都被檢查過,并且任何可能的匹配都已經(jīng)被計算出來。 這是可能的,因為_combine()是同步的; 因此,只要前面的函數(shù)返回,end事件就會觸發(fā),這意味著所有的組合都被計算出來了。
接下來,我們在網(wǎng)絡(luò)上公開剛剛創(chuàng)建的算法。可以使用一個簡單的HTTP服務(wù)器對響應(yīng)的任務(wù)作出響應(yīng)。 特別是,我們希望以/subsetSum?data=
在一個名為app.js的模塊中實現(xiàn)這個簡單的服務(wù)器:
const http = require("http"); const SubsetSum = require("./subsetSum"); http.createServer((req, res) => { const url = require("url").parse(req.url, true); if(url.pathname === "/subsetSum") { const data = JSON.parse(url.query.data); res.writeHead(200); const subsetSum = new SubsetSum(url.query.sum, data); subsetSum.on("match", match => { res.write("Match: " + JSON.stringify(match) + " "); }); subsetSum.on("end", () => res.end()); subsetSum.start(); } else { res.writeHead(200); res.end("Im alive! "); } }).listen(8000, () => console.log("Started"));
由于SubsetSum實例使用事件返回結(jié)果,所以我們可以在算法生成后立即對匹配的結(jié)果使用Stream進行處理。另一個需要注意的細節(jié)是,每次我們的服務(wù)器都會返回I"m alive!,這樣我們每次發(fā)送一個不同于/subsetSum的請求的時候??梢杂脕頇z查我們服務(wù)器是否掛掉了,這在稍后將會看到。
開始運行:
node app
一旦服務(wù)器啟動,我們準備發(fā)送我們的第一個請求;讓我們嘗試發(fā)送一組17個隨機數(shù),這將導(dǎo)致產(chǎn)生131,071個組合,那么服務(wù)器將會處理一段時間:
curl -G http://localhost:8000/subsetSum --data-urlencode "data=[116,119,101,101,-116,109,101,-105,-102,117,-115,-97,119,-116,-104,-105,115]"--data-urlencode "sum=0"
這是如果我們在第一個請求仍在運行的時候在另一個終端中嘗試輸入以下命令,我們將發(fā)現(xiàn)一個巨大的問題:
curl -G http://localhost:8000
我們會看到直到第一個請求結(jié)束之前,最后一個請求一直處于掛起的狀態(tài)。服務(wù)器沒有返回響應(yīng)!這正如我們所想的那樣。Node.js事件循環(huán)運行在一個多帶帶的線程中,如果這個線程被一個長的同步計算阻塞,它將不能再執(zhí)行一個循環(huán)來響應(yīng)I"m alive!,
我們必須知道,這種代碼顯然不能夠用于同時接收到多個請求的應(yīng)用程序。
但是不要對Node.js中絕望,我們可以通過幾種方式來解決這種情況。我們來分析一下最常見的兩種方案:
使用setImmediate通常,CPU-bound算法是建立在一定規(guī)則之上的。它可以是一組遞歸調(diào)用,一個循環(huán),或者基于這些的任何變化/組合。 所以,對于我們的問題,一個簡單的解決方案就是在這些步驟完成后(或者在一定數(shù)量的步驟之后),將控制權(quán)交還給事件循環(huán)。這樣,任何待處理的I / O仍然可以在事件循環(huán)在長時間運行的算法產(chǎn)生CPU的時間間隔中處理。對于這個問題而言,解決這一問題的方式是把算法的下一步在任何可能導(dǎo)致掛起的I/O請求之后運行。這聽起來像是setImmediate()方法的完美用例(我們已經(jīng)在Chapter2-Node.js Essential Patterns中介紹過這一API)。
模式:使用setImmediate()交錯執(zhí)行長時間運行的同步任務(wù)。使用setImmediate進行子集求和算法的步驟
現(xiàn)在我們來看看這個模式如何應(yīng)用于子集求和算法。 我們所要做的只是稍微修改一下subsetSum.js模塊。 為方便起見,我們將創(chuàng)建一個名為subsetSumDefer.js的新模塊,將原始的subsetSum類的代碼作為起點。
我們要做的第一個改變是添加一個名為_combineInterleaved()的新方法,它是我們正在實現(xiàn)的模式的核心:
_combineInterleaved(set, subset) { this.runningCombine++; setImmediate(() => { this._combine(set, subset); if(--this.runningCombine === 0) { this.emit("end"); } }); }
正如我們所看到的,我們所要做的只是使用setImmediate()調(diào)用原始的同步的_combine()方法。然而,現(xiàn)在的問題是因為該算法不再是同步的,我們更難以知道何時已經(jīng)完成了所有的組合的計算。
為了解決這個問題,我們必須使用非常類似于我們在Chapter3-Asynchronous Control Flow Patterns with Callbacks看到的異步并行執(zhí)行的模式來追溯_combine()方法的所有正在運行的實例。 當_combine()方法的所有實例都已經(jīng)完成運行時,觸發(fā)end事件,通知任何監(jiān)聽器,進程需要做的所有動作都已經(jīng)完成。
對于最終子集求和算法的重構(gòu)版本。首先,我們需要將_combine()方法中的遞歸步驟替換為異步:
_combine(set, subset) { for(let i = 0; i < set.length; i++) { let newSubset = subset.concat(set[i]); this._combineInterleaved(set.slice(i + 1), newSubset); this._processSubset(newSubset); } }
通過上面的更改,我們確保算法的每個步驟都將使用setImmediate()在事件循環(huán)中排隊,在事件循環(huán)隊列中I / O請求之后執(zhí)行,而不是同步運行造成阻塞。
另一個小調(diào)整是對于start()方法:
start() { this.runningCombine = 0; this._combineInterleaved(this.set, []); }
在前面的代碼中,我們將_combine()方法的運行實例的數(shù)量初始化為0.我們還通過調(diào)用_combineInterleaved()來將調(diào)用替換為_combine(),并移除了end的觸發(fā),因為現(xiàn)在_combineInterleaved()是異步處理的。
通過這個最后的改變,我們的子集求和算法現(xiàn)在應(yīng)該能夠通過事件循環(huán)可以運行的時間間隔交替地運行其可能大量占用CPU的代碼,并且不會再造成阻塞。
最后更新app.js模塊,以便它可以使用新版本的SubsetSum:
const http = require("http"); // const SubsetSum = require("./subsetSum"); const SubsetSum = require("./subsetSumDefer"); http.createServer(function(req, res) { // ... })
和之前一樣的方式開始運行,結(jié)果如下:
此時,使用異步的方式運行,不再會阻塞CPU了。
interleaving模式正如我們所看到的,在保持應(yīng)用程序的響應(yīng)性的同時運行一個CPU-bound的任務(wù)并不復(fù)雜,只需要使用setImmediate()把同步執(zhí)行的代碼變?yōu)楫惒綀?zhí)行即可。但是,這不是效率最好的模式;實際上,延遲執(zhí)行一個任務(wù)會額外帶來一個小的開銷,在這樣的算法中,積少成多,則會產(chǎn)生重大的影響。這通常是我們在運行CPU限制任務(wù)時所需要的最后一件事情,特別是如果我們必須將結(jié)果直接返回給用戶,這應(yīng)該在合理的時間內(nèi)進行響應(yīng)。 緩解這個問題的一個可能的解決方案是只有在一定數(shù)量的步驟之后使用setImmediate(),而不是在每一步中使用它。但是這仍然不能解決問題的根源。
記住,這并不是說一旦我們想要通過異步的模式來執(zhí)行CPU-bound的任務(wù),我們就應(yīng)該不惜一切代價來避免這樣的額外開銷,事實上,從更廣闊的角度來看,同步任務(wù)并不一定非常漫長和復(fù)雜,以至于造成麻煩。在繁忙的服務(wù)器中,即使是阻塞事件循環(huán)200毫秒的任務(wù)也會產(chǎn)生不希望的延遲。 在那些并發(fā)量并不高的服務(wù)器來說,即使產(chǎn)生一定短時的阻塞,也不會影響性能,使用交錯執(zhí)行setImmediate()可能是避免阻塞事件循環(huán)的最簡單也是最有效的方法。
process.nextTick()不能用于交錯長時間運行的任務(wù)。正如我們在Chapter1-Welcome to the Node.js Platform中看到的,nextTick()會在任何未返回的I / O之前調(diào)度,并且在重復(fù)調(diào)用process.nextTick()最終會導(dǎo)致I / O饑餓。 你可以通過在前面的例子中用process.nextTick()替換setImmediate()來驗證。使用多個進程
使用interleaving模式并不是我們用來運行CPU-bound任務(wù)的唯一方法;防止事件循環(huán)阻塞的另一種模式是使用子進程。我們已經(jīng)知道Node.js在運行I / O密集型應(yīng)用程序(如Web服務(wù)器)的時候是最好的,因為Node.js可以使得我們可以通過異步來優(yōu)化資源利用率。
所以,我們必須保持應(yīng)用程序響應(yīng)的最好方法是不要在主應(yīng)用程序的上下文中運行昂貴的CPU-bound任務(wù),而是使用多帶帶的進程。這有三個主要的優(yōu)點:
同步任務(wù)可以全速運行,而不需要交錯執(zhí)行的步驟
在Node.js中處理進程很簡單,可能比修改一個使用setImmediate()的算法更容易,并且多進程允許我們輕松使用多個處理器,而無需擴展主應(yīng)用程序本身。
如果我們真的需要超高的性能,可以使用低級語言,如性能良好的C。
Node.js有一個充足的API庫帶來與外部進程交互。 我們可以在child_process模塊中找到我們需要的所有東西。 而且,當外部進程只是另一個Node.js程序時,將它連接到主應(yīng)用程序是非常容易的,我們甚至不覺得我們在本地應(yīng)用程序外部運行任何東西。這得益于child_process.fork()函數(shù),該函數(shù)創(chuàng)建一個新的子Node.js進程,并自動創(chuàng)建一個通信管道,使我們能夠使用與EventEmitter非常相似的接口交換信息。來看如何用這個特性來重構(gòu)我們的子集求和算法。
將子集求和任務(wù)委托給其他進程重構(gòu)SubsetSum任務(wù)的目標是創(chuàng)建一個多帶帶的子進程,負責(zé)處理CPU-bound的任務(wù),使服務(wù)器的事件循環(huán)專注于處理來自網(wǎng)絡(luò)的請求:
我們將創(chuàng)建一個名為processPool.js的新模塊,它將允許我們創(chuàng)建一個正在運行的進程池。創(chuàng)建一個新的進程代價昂貴,需要時間,因此我們需要保持它們不斷運行,盡量不要產(chǎn)生中斷,時刻準備好處理請求,使我們可以節(jié)省時間和CPU。此外,進程池需要幫助我們限制同時運行的進程數(shù)量,以避免將使我們的應(yīng)用程序受到拒絕服務(wù)(DoS)攻擊。
接下來,我們將創(chuàng)建一個名為subsetSumFork.js的模塊,負責(zé)抽象子進程中運行的SubsetSum任務(wù)。 它的角色將與子進程進行通信,并將任務(wù)的結(jié)果展示為來自當前應(yīng)用程序。
最后,我們需要一個worker(我們的子進程),一個新的Node.js程序,運行子集求和算法并將其結(jié)果轉(zhuǎn)發(fā)給父進程。
DoS攻擊是企圖使其計劃用戶無法使用機器或網(wǎng)絡(luò)資源,例如臨時或無限中斷或暫停連接到Internet的主機的服務(wù)。
先從構(gòu)建processPool.js模塊開始:
const fork = require("child_process").fork; class ProcessPool { constructor(file, poolMax) { this.file = file; this.poolMax = poolMax; this.pool = []; this.active = []; this.waiting = []; } //... }
在模塊的第一部分,引入我們將用來創(chuàng)建新進程的child_process.fork()函數(shù)。 然后,我們定義ProcessPool的構(gòu)造函數(shù),該構(gòu)造函數(shù)接受表示要運行的Node.js程序的文件參數(shù)以及池中運行的最大實例數(shù)poolMax作為參數(shù)。然后我們定義三個實例變量:
pool表示的是準備運行的進程
active表示的是當前正在運行的進程列表
waiting包含所有這些請求的任務(wù)隊列,保存由于缺少可用的資源而無法立即實現(xiàn)的任務(wù)
看ProcessPool類的acquire()方法,它負責(zé)取出一個準備好被使用的進程:
acquire(callback) { let worker; if(this.pool.length > 0) { // [1] worker = this.pool.pop(); this.active.push(worker); return process.nextTick(callback.bind(null, null, worker)); } if(this.active.length >= this.poolMax) { // [2] return this.waiting.push(callback); } worker = fork(this.file); // [3] this.active.push(worker); process.nextTick(callback.bind(null, null, worker)); }
函數(shù)邏輯如下:
如果在進程池中有一個準備好被使用的進程,我們只需將其移動到active數(shù)組中,然后通過異步的方式調(diào)用其回調(diào)函數(shù)。
如果池中沒有可用的進程,或者已經(jīng)達到運行進程的最大數(shù)量,必須等待。通過把當前回調(diào)放入waiting數(shù)組。
如果我們還沒有達到運行進程的最大數(shù)量,我們將使用child_process.fork()創(chuàng)建一個新的進程,將其添加到active列表中,然后調(diào)用其回調(diào)。
ProcessPool類的最后一個方法是release(),其目的是將一個進程放回進程池中:
release(worker) { if(this.waiting.length > 0) { // [1] const waitingCallback = this.waiting.shift(); waitingCallback(null, worker); } this.active = this.active.filter(w => worker !== w); // [2] this.pool.push(worker); }
前面的代碼也很簡單,其解釋如下:
如果在waiting任務(wù)隊列里面有任務(wù)需要被執(zhí)行,我們只需為這個任務(wù)分配一個進程worker執(zhí)行。
否則,如果在waiting任務(wù)隊列中都沒有需要被執(zhí)行的任務(wù),我們則把active的進程列表中的進程放回進程池中。
正如我們所看到的,進程從來沒有中斷,只在為其不斷地重新分配任務(wù),使我們可以通過在每個請求不重新啟動一個進程達到節(jié)省時間和空間的目的。然而,重要的是要注意,這可能并不總是最好的選擇,這很大程度上取決于我們的應(yīng)用程序的要求。為減少進程池長期占用內(nèi)存,可能的調(diào)整如下:
在一個進程空閑一段時間后,終止進程,釋放內(nèi)存空間。
添加一個機制來終止或重啟沒有響應(yīng)的或者崩潰了的進程。
現(xiàn)在我們的ProcessPool類已經(jīng)準備就緒,我們可以使用它來實現(xiàn)SubsetSumFork模塊,SubsetSumFork的作用是與子進程進行通信得到子集求和的結(jié)果。前面曾說到,用child_process.fork()啟動一個進程也給了我們創(chuàng)建了一個簡單的基于消息的管道,通過實現(xiàn)subsetSumFork.js模塊來看看它是如何工作的:
const EventEmitter = require("events").EventEmitter; const ProcessPool = require("./processPool"); const workers = new ProcessPool(__dirname + "/subsetSumWorker.js", 2); class SubsetSumFork extends EventEmitter { constructor(sum, set) { super(); this.sum = sum; this.set = set; } start() { workers.acquire((err, worker) => { // [1] worker.send({sum: this.sum, set: this.set}); const onMessage = msg => { if (msg.event === "end") { // [3] worker.removeListener("message", onMessage); workers.release(worker); } this.emit(msg.event, msg.data); // [4] }; worker.on("message", onMessage); // [2] }); } } module.exports = SubsetSumFork;
首先注意,我們在subsetSumWorker.js調(diào)用ProcessPool的構(gòu)造函數(shù)創(chuàng)建ProcessPool實例。 我們還將進程池的最大容量設(shè)置為2。
另外,我們試圖維持原來的SubsetSum類相同的公共API。實際上,SubsetSumFork是EventEmitter的子類,它的構(gòu)造函數(shù)接受sum和set,而start()方法則觸發(fā)算法的執(zhí)行,而這個SubsetSumFork實例運行在一個多帶帶的進程上。調(diào)用start()方法時會發(fā)生的情況:
我們試圖從進程池中獲得一個新的子進程。在創(chuàng)建進程成功之后,我們嘗試向子進程發(fā)送一條消息,包含sum和set。 send()方法是Node.js自動提供給child_process.fork()創(chuàng)建的所有進程,這實際上與父子進程之間的通信管道有關(guān)。
然后我們開始監(jiān)聽子進程返回的任何消息,我們使用on()方法附加一個新的事件監(jiān)聽器(這也是所有以child_process.fork()創(chuàng)建的進程提供的通信通道的一部分)。
在事件監(jiān)聽器中,我們首先檢查是否收到一個end事件,這意味著SubsetSum所有任務(wù)已經(jīng)完成,在這種情況下,我們刪除onMessage監(jiān)聽器并釋放worker,并將其放回進程池中,不再讓其占用內(nèi)存資源和CPU資源。
worker以{event,data}格式生成消息,使得任何時候一旦子進程處理完畢任務(wù),我們在外部都能接收到這一消息。
這就是SubsetSumFork模塊現(xiàn)在我們來實現(xiàn)這個worker應(yīng)用程序。
現(xiàn)在我們來創(chuàng)建subsetSumWorker.js模塊,我們的應(yīng)用程序,這個模塊的全部內(nèi)容將在一個多帶帶的進程中運行:
const SubsetSum = require("./subsetSum"); process.on("message", msg => { // [1] const subsetSum = new SubsetSum(msg.sum, msg.set); subsetSum.on("match", data => { // [2] process.send({event: "match", data: data}); }); subsetSum.on("end", data => { process.send({event: "end", data: data}); }); subsetSum.start(); });
由于我們的handler處于一個多帶帶的進程中,我們不必擔(dān)心這類CPU-bound任務(wù)阻塞事件循環(huán),所有的HTTP請求將繼續(xù)由主應(yīng)用程序的事件循環(huán)處理,而不會中斷。
當子進程開始啟動時,父進程:
子進程立即開始監(jiān)聽來自父進程的消息。這可以通過process.on()函數(shù)輕松實現(xiàn)。我們期望從父進程中唯一的消息是為新的SubsetSum任務(wù)提供輸入的消息。只要收到這樣的消息,我們創(chuàng)建一個SubsetSum類的新實例,并注冊match和end事件監(jiān)聽器。最后,我們用subsetSum.start()開始計算。
每次子集求和算法收到事件時,把結(jié)果它封裝在格式為{event,data}的對象中,并將其發(fā)送給父進程。這些消息然后在subsetSumFork.js模塊中處理,就像我們在前面的章節(jié)中看到的那樣。
注意:當子進程不是Node.js進程時,則上述的通信管道就不可用了。在這種情況下,我們?nèi)匀豢梢酝ㄟ^在暴露于父進程的標準輸入流和標準輸出流之上實現(xiàn)我們自己的協(xié)議來建立父子進程通信的接口。多進程模式
嘗試新版本的子集求和算法,我們只需要替換HTTP服務(wù)器使用的模塊(文件app.js):
運行結(jié)果如下:
更有趣的是,我們也可以嘗試同時啟動兩個subsetSum任務(wù),我們可以充分看到多核CPU的作用。 相反,如果我們嘗試同時運行三個subsetSum任務(wù),結(jié)果應(yīng)該是最后一個啟動將掛起。這不是因為主進程的事件循環(huán)被阻塞,而是因為我們?yōu)?b>subsetSum任務(wù)設(shè)置了兩個進程的并發(fā)限制。
正如我們所看到的,多進程模式比interleaving模式更加強大和靈活;然而,由于單個機器提供的CPU和內(nèi)存資源量仍然是一個硬性限制,所以它仍然不可擴展。在這種情況下,將負載分配到多臺機器上,則是更優(yōu)秀的解決辦法。
值得一提的是,在運行CPU-bound任務(wù)時,多線程可以成為多進程的替代方案。目前,有幾個npm包公開了一個用于處理用戶級模塊的線程的API;其中最流行的是webworker-threads。但是,即使線程更輕量級,完整的進程也可以提供更大的靈活性,并具備更高更可靠的容錯處理。總結(jié)
本章講述以下三點:
異步初始化模塊
批處理和緩存在Node.js異步中的運用
使用異步或者多進程來處理CPU-bound的任務(wù)
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/90718.html
摘要:原文鏈接在中貫徹單元測試在團隊合作中,你寫好了一個函數(shù),供隊友使用,跑去跟你的隊友說,你傳個值進去,他就會返回結(jié)果了。如果你也為社區(qū)貢獻過,想更多人使用的話,加上單元測試吧,讓你的值得別人信賴。 原文鏈接:BlueSun | 在Nodejs中貫徹單元測試 在團隊合作中,你寫好了一個函數(shù),供隊友使用,跑去跟你的隊友說,你傳個A值進去,他就會返回B結(jié)果了。過了一會,你隊友跑過來說,我傳個A...
摘要:編寫異步代碼可能是一種不同的體驗,尤其是對異步控制流而言?;卣{(diào)函數(shù)的準則在編寫異步代碼時,要記住的第一個規(guī)則是在定義回調(diào)時不要濫用閉包。為回調(diào)創(chuàng)建命名函數(shù),避免使用閉包,并將中間結(jié)果作為參數(shù)傳遞。 本系列文章為《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版鏈接。 歡迎關(guān)注我的專欄,之后的博文將在專...
摘要:回調(diào)函數(shù)是在異步操作完成后傳播其操作結(jié)果的函數(shù),總是用來替代同步操作的返回指令。下面的圖片顯示了中事件循環(huán)過程當異步操作完成時,執(zhí)行權(quán)就會交給這個異步操作開始的地方,即回調(diào)函數(shù)。 本系列文章為《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版鏈接。 歡迎關(guān)注我的專欄,之后的博文將在專欄同步: Enc...
摘要:如果不能快速返回,就應(yīng)當將其遷移到另一個進程中模塊讓開發(fā)人員可以為事件設(shè)置偵聽器和處理器。我們需要給每個想要響應(yīng)的事件創(chuàng)建偵聽器 Node.js的http服務(wù)器 通過使用HTTP模塊的低級API,Node.js允許我們創(chuàng)建服務(wù)器和客戶端。剛開始學(xué)node的時候,我們都會遇到如下代碼: var http = require(http); http.createServer(funct...
摘要:回調(diào)函數(shù)提供兩個參數(shù)和,表示有沒有錯誤發(fā)生,是文件內(nèi)容。文件關(guān)閉第一個參數(shù)文件時傳遞的文件描述符第二個參數(shù)回調(diào)函數(shù)回調(diào)函數(shù)有一個參數(shù)錯誤,關(guān)閉文件后執(zhí)行。 showImg(//img.mukewang.com/5d3f890d0001836113660768.jpg); 人所缺乏的不是才干而是志向,不是成功的能力而是勤勞的意志。 —— 部爾衛(wèi) 文章同步到github博客:https:/...
閱讀 3686·2021-09-22 15:34
閱讀 1197·2019-08-29 17:25
閱讀 3407·2019-08-29 11:18
閱讀 1381·2019-08-26 17:15
閱讀 1751·2019-08-23 17:19
閱讀 1239·2019-08-23 16:15
閱讀 726·2019-08-23 16:02
閱讀 1345·2019-08-23 15:19