摘要:在數據緩沖區已超過或寫入隊列當前正忙的任何情況下,將返回。當返回值時,背壓系統啟動,它會暫停傳入的流發送任何數據,并等待消費者再次準備就緒,清空數據緩沖區后,將發出事件并恢復傳入的數據流。
流中的背壓
在數據處理過程中會出現一個叫做背壓的常見問題,它描述了數據傳輸過程中緩沖區后面數據的累積,當傳輸的接收端具有復雜的操作時,或者由于某種原因速度較慢時,來自傳入源的數據就有累積的趨勢,就像阻塞一樣。
要解決這個問題,必須有一個委托系統來確保數據從一個源到另一個源的平滑流動,不同的社區已經針對他們的程序獨特地解決了這個問題,Unix管道和TCP套接字就是很好的例子,并且通常被稱為流量控制,在Node.js中,流是已采用的解決方案。
本指南的目的是進一步詳細說明背壓是什么,以及精確流如何在Node.js的源代碼中解決這個問題,本指南的第二部分將介紹建議的最佳實踐,以確保在實現流時應用程序的代碼是安全的和優化的。
我們假設你對Node.js中背壓、Buffer和EventEmitter的一般定義以及Stream的一些經驗有所了解。如果你還沒有閱讀這些文檔,那么首先查看API文檔并不是一個壞主意,因為它有助于在閱讀本指南時擴展你的理解。
數據處理的問題在計算機系統中,數據通過管道、sockets和信號從一個進程傳輸到另一個進程,在Node.js中,我們找到了一種名為Stream的類似機制。流很好!他們為Node.js做了很多事情,幾乎內部代碼庫的每個部分都使用該模塊,作為開發人員,我們鼓勵你使用它們!
const readline = require("readline"); // process.stdin and process.stdout are both instances of Streams const rl = readline.createInterface({ input: process.stdin, output: process.stdout }); rl.question("Why should you use streams? ", (answer) => { console.log(`Maybe it"s ${answer}, maybe it"s because they are awesome! :)`); rl.close(); });
通過比較Node.js的Stream實現的內部系統工具,可以證明為什么通過流實現背壓機制是一個很好的優化的一個很好的例子。
在一種情況下,我們將使用一個大文件(約?9gb)并使用熟悉的zip(1)工具對其進行壓縮。
$ zip The.Matrix.1080p.mkv
雖然這需要幾分鐘才能完成,但在另一個shell中我們可以運行一個腳本,該腳本采用Node.js的模塊zlib,它包含另一個壓縮工具gzip(1)。
const gzip = require("zlib").createGzip(); const fs = require("fs"); const inp = fs.createReadStream("The.Matrix.1080p.mkv"); const out = fs.createWriteStream("The.Matrix.1080p.mkv.gz"); inp.pipe(gzip).pipe(out);
要測試結果,請嘗試打開每個壓縮文件,zip(1)工具壓縮的文件將通知你文件已損壞,而Stream完成的壓縮將無錯誤地解壓縮。
注意:在此示例中,我們使用.pipe()將數據源從一端獲取到另一端,但是,請注意沒有附加正確的錯誤處理程序。如果無法正確接收數據塊,Readable源或gzip流將不會被銷毀,pump是一個實用工具,如果其中一個流失敗或關閉,它將正確地銷毀管道中的所有流,并且在這種情況下是必須的!
只有Nodejs 8.x或更早版本才需要pump,對于Node 10.x或更高版本,引入pipeline來替換pump。這是一個模塊方法,用于在流傳輸之間轉發錯誤和正確清理,并在管道完成時提供回調。
以下是使用管道的示例:
const { pipeline } = require("stream"); const fs = require("fs"); const zlib = require("zlib"); // Use the pipeline API to easily pipe a series of streams // together and get notified when the pipeline is fully done. // A pipeline to gzip a potentially huge video file efficiently: pipeline( fs.createReadStream("The.Matrix.1080p.mkv"), zlib.createGzip(), fs.createWriteStream("The.Matrix.1080p.mkv.gz"), (err) => { if (err) { console.error("Pipeline failed", err); } else { console.log("Pipeline succeeded"); } } );
你還可以在管道上調用promisify以將其與async/await一起使用:
const stream = require("stream"); const fs = require("fs"); const zlib = require("zlib"); const pipeline = util.promisify(stream.pipeline); async function run() { try { await pipeline( fs.createReadStream("The.Matrix.1080p.mkv"), zlib.createGzip(), fs.createWriteStream("The.Matrix.1080p.mkv.gz"), ); console.log("Pipeline succeeded"); } catch (err) { console.error("Pipeline failed", err); } }太多的數據,太快
有些情況下,Readable流可能會過快地為Writable提供數據 — 遠遠超過消費者可以處理的數據!
當發生這種情況時,消費者將開始排隊所有數據塊以供以后消費,寫入隊列將變得越來越長,因此在整個過程完成之前,必須將更多數據保存在內存中。
寫入磁盤比從磁盤讀取要慢很多,因此,當我們嘗試壓縮文件并將其寫入我們的硬盤時,將發生背壓,因為寫入磁盤將無法跟上讀取的速度。
// Secretly the stream is saying: "whoa, whoa! hang on, this is way too much!" // Data will begin to build up on the read-side of the data buffer as // `write` tries to keep up with the incoming data flow. inp.pipe(gzip).pipe(outputFile);
這就是背壓機制很重要的原因,如果沒有背壓系統,該進程會耗盡系統的內存,有效地減緩了其他進程,并獨占你系統的大部分直到完成。
這導致了一些事情:
減緩所有其他當前進程。
一個非常超負荷的垃圾收集器。
內存耗盡。
在下面的示例中,我們將取出.write()函數的返回值并將其更改為true,這有效地禁用了Node.js核心中的背壓支持,在任何對"modified"二進制文件的引用中,我們正在談論在沒有return ret;行的情況下運行node二進制,而改為return true;。
垃圾收集器上的過度負荷我們來看看快速基準測試,使用上面的相同示例,我們進行幾次試驗,以獲得兩個二進制的中位時間。
trial (#) | `node` binary (ms) | modified `node` binary (ms) ================================================================= 1 | 56924 | 55011 2 | 52686 | 55869 3 | 59479 | 54043 4 | 54473 | 55229 5 | 52933 | 59723 ================================================================= average time: | 55299 | 55975
兩者都需要大約一分鐘來運行,因此根本沒有太大差別,但讓我們仔細看看以確認我們的懷疑是否正確,我們使用Linux工具dtrace來評估V8垃圾收集器發生了什么。
GC(垃圾收集器)測量時間表示垃圾收集器完成單次掃描的完整周期的間隔:
approx. time (ms) | GC (ms) | modified GC (ms) ================================================= 0 | 0 | 0 1 | 0 | 0 40 | 0 | 2 170 | 3 | 1 300 | 3 | 1 * * * * * * * * * 39000 | 6 | 26 42000 | 6 | 21 47000 | 5 | 32 50000 | 8 | 28 54000 | 6 | 35
雖然這兩個過程開始時相同,但似乎以相同的速率運行GC,很明顯,在適當工作的背壓系統幾秒鐘后,它將GC負載分布在4-8毫秒的一致間隔內,直到數據傳輸結束。
但是,當背壓系統不到位時,V8垃圾收集開始拖延,正常二進制文件在一分鐘內調用GC約75次,然而,修改后的二進制文件僅觸發36次。
這是由于內存使用量增加而累積的緩慢而漸進的債務,隨著數據傳輸,在沒有背壓系統的情況下,每個塊傳輸使用更多內存。
分配的內存越多,GC在一次掃描中需要處理的內存就越多,掃描越大,GC就越需要決定可以釋放什么,并且在更大的內存空間中掃描分離的指針將消耗更多的計算能力。
內存耗盡為確定每個二進制的內存消耗,我們使用/usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js多帶帶為每個進程計時。
這是正常二進制的輸出:
Respecting the return value of .write() ============================================= real 58.88 user 56.79 sys 8.79 87810048 maximum resident set size 0 average shared memory size 0 average unshared data size 0 average unshared stack size 19427 page reclaims 3134 page faults 0 swaps 5 block input operations 194 block output operations 0 messages sent 0 messages received 1 signals received 12 voluntary context switches 666037 involuntary context switches
虛擬內存占用的最大字節大小約為87.81mb。
現在更改.write()函數的返回值,我們得到:
Without respecting the return value of .write(): ================================================== real 54.48 user 53.15 sys 7.43 1524965376 maximum resident set size 0 average shared memory size 0 average unshared data size 0 average unshared stack size 373617 page reclaims 3139 page faults 0 swaps 18 block input operations 199 block output operations 0 messages sent 0 messages received 1 signals received 25 voluntary context switches 629566 involuntary context switches
虛擬內存占用的最大字節大小約為1.52gb。
如果沒有流來委托背壓,則分配的內存空間要大一個數量級 — 同一進程之間的巨大差異!
這個實驗展示了Node.js的反壓機制是如何優化和節省成本的,現在,讓我們分析一下它是如何工作的!
背壓如何解決這些問題?將數據從一個進程傳輸到另一個進程有不同的函數,在Node.js中,有一個名為.pipe()的內部內置函數,還有其他包也可以使用!但最終,在這個過程的基本層面,我們有兩個獨立的組件:數據來源和消費者。
當從源調用.pipe()時,它向消費者發出信號,告知有數據要傳輸,管道函數有助于為事件觸發器設置適當的背壓閉合。
在Node.js中,源是Readable流,而消費者是Writable流(這些都可以與Duplex或Transform流互換,但這超出了本指南的范圍)。
觸發背壓的時刻可以精確地縮小到Writable的.write()函數的返回值,當然,該返回值由幾個條件決定。
在數據緩沖區已超過highWaterMark或寫入隊列當前正忙的任何情況下,.write()將返回false。
當返回false值時,背壓系統啟動,它會暫停傳入的Readable流發送任何數據,并等待消費者再次準備就緒,清空數據緩沖區后,將發出.drain()事件并恢復傳入的數據流。
隊列完成后,背壓將允許再次發送數據,正在使用的內存空間將自行釋放并為下一批數據做好準備。
這有效地允許在任何給定時間為.pipe()函數使用固定數量的內存,沒有內存泄漏,沒有無限緩沖,垃圾收集器只需要處理內存中的一個區域!
那么,如果背壓如此重要,為什么你(可能)沒有聽說過它?答案很簡單:Node.js會自動為你完成所有這些工作。
那太好了!但是當我們試圖了解如何實現我們自己的自定義流時,也不是那么好。
注意:在大多數機器中,有一個字節大小可以確定緩沖區何時已滿(在不同的機器上會有所不同),Node.js允許你設置自己的自定義highWaterMark,但通常,默認設置為16kb(16384,或objectMode流為16),在你可能希望提高該值的情況下,可以嘗試,但是要小心!
.pipe()的生命周期為了更好地理解背壓,下面是一個關于Readable流的生命周期的流程圖,該流被管道傳輸到Writable流中:
+===================+ x--> Piping functions +--> src.pipe(dest) | x are set up during |===================| x the .pipe method. | Event callbacks | +===============+ x |-------------------| | Your Data | x They exist outside | .on("close", cb) | +=======+=======+ x the data flow, but | .on("data", cb) | | x importantly attach | .on("drain", cb) | | x events, and their | .on("unpipe", cb) | +---------v---------+ x respective callbacks. | .on("error", cb) | | Readable Stream +----+ | .on("finish", cb) | +-^-------^-------^-+ | | .on("end", cb) | ^ | ^ | +-------------------+ | | | | | ^ | | ^ ^ ^ | +-------------------+ +=================+ ^ | ^ +----> Writable Stream +---------> .write(chunk) | | | | +-------------------+ +=======+=========+ | | | | | ^ | +------------------v---------+ ^ | +-> if (!chunk) | Is this chunk too big? | ^ | | emit .end(); | Is the queue busy? | | | +-> else +-------+----------------+---+ | ^ | emit .write(); | | | ^ ^ +--v---+ +---v---+ | | ^-----------------------------------< No | | Yes | ^ | +------+ +---v---+ ^ | | | ^ emit .pause(); +=================+ | | ^---------------^-----------------------+ return false; <-----+---+ | +=================+ | | | ^ when queue is empty +============+ | ^------------^-----------------------< Buffering | | | |============| | +> emit .drain(); | ^Buffer^ | | +> emit .resume(); +------------+ | | ^Buffer^ | | +------------+ add chunk to queue | | <---^---------------------< +============+
注意:如果要設置管道以將一些流鏈接在一起來操作數據,則很可能會實現Transform流。
在這種情況下,你的Readable流的輸出將輸入到Transform中,并將管道到Writable中。
Readable.pipe(Transformable).pipe(Writable);
背壓將自動應用,但請注意,Transform流的輸入和輸出highWaterMark都可能被操縱并將影響背壓系統。
背壓指南從Node.js v0.10開始,Stream類提供了通過使用這些相應函數的下劃線版本來修改.read()或.write()的行為的功能(._read()和._write())。
對于實現Readable流和Writable流,有文檔化的指南,我們假設你已閱讀過這些內容,下一節將更深入一些。
實現自定義流時要遵守的規則流的黃金法則始終是尊重背壓,最佳實踐的構成是非矛盾的實踐,只要你小心避免與內部背壓支持相沖突的行為,你就可以確定你遵循良好做法。
一般來說:
如果你沒有被要求,永遠不要.push()。
永遠不要在返回false后調用.write(),而是等待"drain"。
流在不同的Node.js版本和你使用的庫之間有變化,小心并測試一下。
注意:關于第3點,構建瀏覽器流的非常有用的包是readable-stream,Rodd Vagg撰寫了一篇很棒的博客文章,描述了這個庫的實用性,簡而言之,它為Readable流提供了一種自動優雅降級,并支持舊版本的瀏覽器和Node.js。
Readable流的特定規則到目前為止,我們已經了解了.write()如何影響背壓,并將重點放在Writable流上,由于Node.js的功能,數據在技術上從Readable流向下游Writable。但是,正如我們可以在數據、物質或能量的任何傳輸中觀察到的那樣,源與目標一樣重要,Readable流對于如何處理背壓至關重要。
這兩個過程都相互依賴,有效地進行通信,如果Readable忽略Writable流要求它停止發送數據的時候,那么.write()的返回值不正確就會有問題。
因此,關于.write()返回,我們還必須尊重._read()方法中使用的.push()的返回值,如果.push()返回false值,則流將停止從源讀取,否則,它將繼續而不會停頓。
以下是使用.push()的不好做法示例:
// This is problematic as it completely ignores return value from push // which may be a signal for backpressure from the destination stream! class MyReadable extends Readable { _read(size) { let chunk; while (null !== (chunk = getNextChunk())) { this.push(chunk); } } }
此外,在自定義流之外,存在忽略背壓的陷阱,在這個良好的實踐的反例中,應用程序的代碼會在數據可用時強制通過(由.data事件發出信號):
// This ignores the backpressure mechanisms Node.js has set in place, // and unconditionally pushes through data, regardless if the // destination stream is ready for it or not. readable.on("data", (data) => writable.write(data) );Writable流的特定規則
回想一下.write()可能會根據某些條件返回true或false,幸運的是,在構建我們自己的Writable流時,流狀態機將處理我們的回調并確定何時處理背壓并為我們優化數據流。
但是,當我們想直接使用Writable時,我們必須尊重.write()返回值并密切注意這些條件:
如果寫隊列忙,.write()將返回false。
如果數據塊太大,.write()將返回false(該值由變量highWaterMark指示)。
// This writable is invalid because of the async nature of JavaScript callbacks. // Without a return statement for each callback prior to the last, // there is a great chance multiple callbacks will be called. class MyWritable extends Writable { _write(chunk, encoding, callback) { if (chunk.toString().indexOf("a") >= 0) callback(); else if (chunk.toString().indexOf("b") >= 0) callback(); callback(); } } // The proper way to write this would be: if (chunk.contains("a")) return callback(); else if (chunk.contains("b")) return callback(); callback();
在實現._writev()時還需要注意一些事項,該函數與.cork()結合使用,但寫入時有一個常見錯誤:
// Using .uncork() twice here makes two calls on the C++ layer, rendering the // cork/uncork technique useless. ws.cork(); ws.write("hello "); ws.write("world "); ws.uncork(); ws.cork(); ws.write("from "); ws.write("Matteo"); ws.uncork(); // The correct way to write this is to utilize process.nextTick(), which fires // on the next event loop. ws.cork(); ws.write("hello "); ws.write("world "); process.nextTick(doUncork, ws); ws.cork(); ws.write("from "); ws.write("Matteo"); process.nextTick(doUncork, ws); // as a global function function doUncork(stream) { stream.uncork(); }
.cork()可以被調用多次,我們只需要小心調用.uncork()相同的次數,使其再次流動。
結論Streams是Node.js中經常使用的模塊,它們對于內部結構非常重要,對于開發人員來說,它們可以跨Node.js模塊生態系統進行擴展和連接。
希望你現在能夠進行故障排除,安全地編寫你自己的Writable和Readable流,并考慮背壓,并與同事和朋友分享你的知識。
在使用Node.js構建應用程序時,請務必閱讀有關其他API函數的Stream的更多信息,以幫助改進和釋放你的流功能。
上一篇:使用不同的文件系統 下一篇:域模塊剖析文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/100377.html
Node.js 指南 Node.js?是基于Chrome的V8 JavaScript引擎構建的JavaScript運行時。 常規 關于Node.js 入門指南 輕松分析Node.js應用程序 Docker化Node.js Web應用程序 遷移到安全的Buffer構造函數 Node.js核心概念 阻塞與非阻塞概述 Node.js事件循環、定時器和process.nextTick() 不要阻塞事...
摘要:避免使用最低公分母方法你可能想讓你的程序像最低公分母文件系統一樣,通過將所有文件名規范化為大寫,將所有文件名規范化為格式,并將所有文件時間戳標準化為秒分辨率,這是最小公分母的方法。 使用不同的文件系統 Node公開了文件系統的許多功能,但并非所有文件系統都相似,以下是建議的最佳實踐,以便在使用不同的文件系統時保持代碼簡單和安全。 文件系統行為 在使用文件系統之前,你需要知道它的行為方式...
摘要:快速檢查可能告訴我們,簡單地從的域處理程序拋出將允許然后捕獲異常并執行其自己的錯誤處理程序,雖然情況并非如此,檢查后,你會看到堆棧只包含。 域模塊剖析 可用性問題 隱式行為 開發人員可以創建新域,然后只需運行domain.enter(),然后,它充當將來拋出者無法觀察到的任何異常的萬能捕捉器,允許模塊作者攔截不同模塊中不相關代碼的異常,防止代碼的發起者知道自己的異常。 以下是一個間接鏈...
摘要:相對于最大的更新就是把對背壓問題的處理邏輯從中抽取出來產生了新的可觀察對象。由于基于發射的數據流,以及對數據加工處理的各操作符都添加了背壓支持,附加了額外的邏輯,其運行效率要比慢得多。 背壓(backpressure)當上下游在不同的線程中,通過Observable發射,處理,響應數據流時,如果上游發射數據的速度快于下游接收處理數據的速度,這樣對于那些沒來得及處理的數據就會造成積壓,這...
閱讀 3067·2021-11-18 10:02
閱讀 3336·2021-11-02 14:48
閱讀 3397·2019-08-30 13:52
閱讀 560·2019-08-29 17:10
閱讀 2088·2019-08-29 12:53
閱讀 1412·2019-08-29 12:53
閱讀 1033·2019-08-29 12:25
閱讀 2168·2019-08-29 12:17