摘要:上游水源通過里中的方法流入水電站中。當在接收數據中出現錯誤時發出。暫停可讀流,不再發出事件恢復可讀流,繼續發出事件把這個可讀流的輸出傳遞給指定的流,兩個流組成一個管道。
題外話
該文章整合了多篇網絡文章(整合之處已設置超鏈接,可點擊直接了解原文),目的僅僅是為了和大伙分享,更加通俗易懂的了解流的各個流程的初始。本人也是node的初學菜鳥,有描述錯誤或誤人子弟的地方多請大神們多多指出。
readable 我們先來安利一些思路,方便理清楚邏輯:)。事件 查看原文讀緩沖區(readable buffer):這里的讀是個形容詞,是指可讀流臨時存放data(只能是字符串或者Buffer,不能是數字)的緩沖區。(讀緩沖區就像一個水電站一樣,感覺這樣描述比較好理解flowing、paused模式)
flowing模式:即流動模式,就像打開水電站的水閘一樣,上游的水和下游完完全全連通直到上游來源的數據耗盡。
paused模式:即暫停模式,就像水電站的水閘在你指定的時候(使用stream.read())才會打開。不過,當你使用read()打開水閘的時候是一個超自然現象---水電站里的水瞬間被抽干,上游的水還沒來得及填充水電站。然后自動關閉水閘,等待你的下一次“惠顧“read()。
_read:上游水源通過_read里中的push、unshift方法流入水電站中。
函數 查看原文readable:在數據塊可以從流中讀取的時候發出。它對應的處理器沒有參數,可以在處理器里調用read([size])方法讀取數據。
data:有數據可讀時發出。它對應的處理器有一個參數,代表數據。如果你只想快快地讀取一個流的數據,給data關聯一個處理器是最方便的辦法。處理器的參數是Buffer對象,如果你調用了Readable的setEncoding(encoding)方法,處理器的參數就是String對象。
end:當數據被讀完時發出。對應的處理器沒有參數。
close:當底層的資源,如文件,已關閉時發出。不是所有的Readable流都會發出這個事件。對應的處理器沒有參數。
error:當在接收數據中出現錯誤時發出。對應的處理器參數是Error的實例,它的message屬性描述了錯誤原因,stack屬性保存了發生錯誤時的堆棧信息。
流動模式和暫停模式切換 查看原文read([size]):如果你給read方法傳遞了一個大小作為參數,那它會返回指定數量的數據,如果數據不足,就會返回null。如果你不給read方法傳參,它會返回內部緩沖區里的所有數據,如果沒有數據,會返回null,此時有可能說明遇到了文件末尾。read返回的數據可能是Buffer對象,也可能是String對象。
setEncoding(encoding):給流設置一個編碼格式,用于解碼讀到的數據。調用此方法后,read([size])方法返回String對象。
pause():暫停可讀流,不再發出data事件
resume():恢復可讀流,繼續發出data事件
pipe(destination,[options]):把這個可讀流的輸出傳遞給destination指定的Writable流,兩個流組成一個管道。options是一個JS對象,這個對象有一個布爾類型的end屬性,默認值為true,當end為true時,Readable結束時自動結束Writable。注意,我們可以把一個Readable與若干Writable連在一起,組成多個管道,每一個Writable都能得到同樣的數據。這個方法返回destination,如果destination本身又是Readable流,就可以級聯調用pipe(比如我們在使用gzip壓縮、解壓縮時就會這樣,馬上會講到)。
unpipe([destination]):端口與指定destination的管道。不傳遞destination時,斷開與這個可讀流連在一起的所有管道。
通過添加 data 事件監聽器來啟動數據監聽
調用 resume() 方法啟動數據流
調用 pipe() 方法將數據轉接到另一個 可寫流
觸發準備數據(_read)的方法在流沒有 pipe() 時,調用 pause() 方法可以將流暫停
pipe() 時,需要移除所有 data 事件的監聽,再調用 unpipe() 方法
工作流程 查看原文data listener
readable listener
read()——如果當前緩沖區為空,或者緩沖區并未超出我們設定的最大值,那么就可以繼續準備數據;如果此時正在準備數據(_read())或者已經結束讀取(push(null)),那么就放棄準備數據。
1.在paused模式下則讀取全部緩沖區的長度;若讀取的字節數(n)大于設置的緩沖區最大值,則適當擴大緩沖區的大小(默認為16k,最大為8m);若讀取的長度大于當前緩沖區的大小,設置needReadable屬性并準備數據等待下一次讀取。
2.如果當前緩沖區為空,或者緩沖區并未超出我們設定的最大值,那么就可以繼續準備數據;如果此時正在準備數據(_read())或者已經結束讀取(push(null)),那么就放棄準備數據。
3.針對這個私有方法_read,文檔上有特殊說明,自定義的Readable實現類需要實現這個方法,在該方法中手動添加數據到Readable對象的讀緩沖區,然后進行Readable的讀取。可以理解為_read函數為讀取數據前的準備工作(準備數據),針對的是流的實現者而言。
1.對于處在flowing模式下的讀取,每次只讀緩沖區中第一個buffer的長度
2.針對這個私有方法_read,文檔上有特殊說明,自定義的Readable實現類需要實現這個方法,在該方法中手動添加數據到Readable對象的讀緩沖區,然后進行Readable的讀取。可以理解為_read函數為讀取數據前的準備工作(準備數據),針對的是流的實現者而言。
實例//這是一個將存放多條json字符串的txt文件讀取成json的例子 const stream = require("stream"); const fs = require("fs"); const util = require("util"); function JSONLineReader(source) { stream.Readable.call(this); this._source = source; this._foundLineEnd = false; this._buffer = ""; source.on("readable", function() {//監聽source什么時候準備好,那么我們就可以用read()或則readable listener去觸發JSONLineReader的_read方法 this.read(); // this.on("readable", function(data) { // console.log("readable"); // }); }.bind(this)) } util.inherits(JSONLineReader, stream.Readable); JSONLineReader.prototype._read = function(size) { var chunk; var line; var lineIndex; var result; if (this._buffer.length === 0) { chunk = this._source.read(); this._buffer += chunk; //一次就拿完 只是看什么時候push null } lineIndex = this._buffer.indexOf(" "); if (lineIndex !== -1) { line = this._buffer.slice(0, lineIndex); if (line) { result = JSON.parse(line); this._buffer = this._buffer.slice(lineIndex + 1); this.emit("object", result);util.inspect(result)) this.push(util.inspect(result)); } else { this._buffer = this._buffer.slice(1); } } } let input = fs.createReadStream(__dirname + "/json-lines.txt", { encoding: "utf8" }); var jsonLineReader = new JSONLineReader(input); jsonLineReader.on("object", function(obj) { console.log("pos:", obj); }) /*json-lines.txt {"success":false,"code":501} {"success":true,"code":202} {"success":false,"code":503} {"success":true,"code":204} {"success":false,"code":505} {"success":true,"code":206} {"success":false,"code":507} {"success":true,"code":208} {"success":false,"code":509} */
let stream = require("stream"); let util = require("util"); util.inherits(flowingReadableDemo, stream.Readable); function flowingReadableDemo(opt) { stream.Readable.call(this, opt); this.quotes = ["yessdasdsa", "noasdasdas", "maybe"]; this._index = 0; } flowingReadableDemo.prototype._read = function() { if (this._index >= this.quotes.length) { this.push(null); } else { this.push(this.quotes[this._index]); this._index += 1; } }; let r = new flowingReadableDemo(); r.on("data", function(data) { console.log("Callback read: " + data.toString()); // flowing狀態下,我們無需執行read,僅需要設置data事件處理函數或者設定導流目標pipe }); r.on("end", function(data) { console.log("No more answers."); });
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/87390.html
摘要:內部架構上圖表示一個實例的組成部分部分緩沖數組內部函數部分緩沖鏈表內部函數實例必須實現的內部函數以及系統提供的回調函數。有三個參數,第一個為待處理的數據,第二個為編碼,第三個為回調函數。 Transform流特性 在開發中直接接觸Transform流的情況不是很多,往往是使用相對成熟的模塊或者封裝的API來完成流的處理,最為特殊的莫過于through2模塊和gulp流操作。那么,Tra...
摘要:回調函數中檢測該次寫入是否被緩沖,若是,觸發事件。若目標可寫流表示該寫入操作需要進行緩沖,則立刻將源可讀流切換至暫停模式。監聽源可讀流的事件,相應地結束目標可寫流。 在Node.js中,流(Stream)是其眾多原生對象的基類,它對處理潛在的大文件提供了支持,也抽象了一些場景下的數據處理和傳遞。在它對外暴露的接口中,最為神奇的,莫過于導流(pipe)方法了。鑒于近期自己正在閱讀Node...
摘要:是消費數據的,從中獲取數據,然后對得到的塊數據進行處理,至于如何處理,就依賴于具體實現也就是的實現。也可以說是建立在的基礎上。 1. 認識Stream Stream的概念最早來源于Unix系統,其可以將一個大型系統拆分成一些小的組件,然后將這些小的組件可以很好地運行 TCP/IP協議中的TCP協議也用到了Stream的思想,進而可以進行流量控制、差錯控制 在unix中通過 |來表示流...
摘要:方法也可以接收一個參數表示數據請求著請求的數據大小,但是可讀流可以根據需要忽略這個參數。讀取數據大部分情況下我們只要簡單的使用方法將可讀流的數據重定向到另外形式的流,但是在某些情況下也許直接從可讀流中讀取數據更有用。 介紹本文介紹了使用 node.js streams 開發程序的基本方法。 We should have some ways of connecting programs ...
摘要:事件的觸發頻次同樣是由實現者決定,譬如在進行文件讀取時,可能每行都會觸發一次而在請求處理時,可能數的數據才會觸發一次。如果有參數傳入,它會讓可讀流停止流向某個特定的目的地,否則,它會移除所有目的地。 showImg(https://segmentfault.com/img/remote/1460000016328758?w=1967&h=821); 本文節選自 Node.js Chea...
閱讀 4637·2021-10-25 09:48
閱讀 3217·2021-09-07 09:59
閱讀 2198·2021-09-06 15:01
閱讀 2702·2021-09-02 15:21
閱讀 2738·2019-08-30 14:14
閱讀 2192·2019-08-29 13:59
閱讀 2523·2019-08-29 11:02
閱讀 2541·2019-08-26 13:33