摘要:可寫流可寫流是對數(shù)據(jù)寫入目的地的一種抽象。對象流的特點就是它有一個標(biāo)志,我們可以設(shè)置它讓流可以接受任何對象。
可寫流(Writable Stream)
可寫流是對數(shù)據(jù)寫入"目的地"的一種抽象。
可寫流的原理其實與可讀流類似,當(dāng)數(shù)據(jù)過來的時候會寫入緩存池,當(dāng)寫入的速度很慢或者寫入暫停時候,數(shù)據(jù)流便會進(jìn)入到隊列池緩存起來,當(dāng)然即使緩存池滿了,剩余的數(shù)據(jù)也是存在內(nèi)存
可寫流的簡單用法如下代碼
let fs = require("fs"); let path = require("path"); let ws = fs.createWriteStream(path.join(__dirname,"1.txt"),{ highWaterMark:3, autoClose:true, flags:"w", encoding:"utf8", mode:0o666, start:0, }); let i = 9; function write(){ let flag = true; while(i>0&&flag){ flag = ws.write(--i+"","utf8",()=>{console.log("ok")}); console.log(flag) } } write(); // drain只有當(dāng)緩存區(qū)充滿后 并且被消費后觸發(fā) ws.on("drain",function(){ console.log("抽干") write(); });實現(xiàn)原理
現(xiàn)在就讓我們來實現(xiàn)一個簡單的可寫流,來研究可寫流的內(nèi)部原理,可寫流有很多方法與可讀流類似,這里不在重復(fù)了首先要有一個構(gòu)造函數(shù)來定義一些基本選項屬性,然后調(diào)用一個open放法打開文件,并且有一個destroy方法來處理關(guān)閉邏輯
let EventEmitter = require("events"); let fs = require("fs"); class WriteStream extends EventEmitter { constructor(path,options) { super(); this.path = path; this.highWaterMark = options.highWaterMark || 16 * 1024; this.autoClose = options.autoClose || true; this.mode = options.mode; this.start = options.start || 0; this.flags = options.flags || "w"; this.encoding = options.encoding || "utf8"; // 可寫流 要有一個緩存區(qū),當(dāng)正在寫入文件是,內(nèi)容要寫入到緩存區(qū)中 // 在源碼中是一個鏈表 => [] this.buffers = []; // 標(biāo)識 是否正在寫入 this.writing = false; // 是否滿足觸發(fā)drain事件 this.needDrain = false; // 記錄寫入的位置 this.pos = 0; // 記錄緩存區(qū)的大小 this.length = 0; this.open(); } destroy() { if (typeof this.fd !== "number") { return this.emit("close"); } fs.close(this.fd, () => { this.emit("close") }); } open() { fs.open(this.path, this.flags, this.mode, (err,fd) => { if (err) { this.emit("error", err); if (this.autoClose) { this.destroy(); } return; } this.fd = fd; this.emit("open"); }) } } module.exports = WriteStream;
接著我們實現(xiàn)write方法來讓可寫流對象調(diào)用,在write方法中我們首先將數(shù)據(jù)轉(zhuǎn)化為buffer,接著實現(xiàn)一些事件的觸發(fā)條件的邏輯,如果現(xiàn)在沒有正在寫入的話我們就要真正的進(jìn)行寫入操作了,這里我們實現(xiàn)一個_write方法來實現(xiàn)寫入操作,否則則代表文件正在寫入,那我們就將流傳來的數(shù)據(jù)先放在緩存區(qū)中,保證寫入數(shù)據(jù)不會同時進(jìn)行。
write(chunk,encoding=this.encoding,callback=()=>{}){ chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding); // write 返回一個boolean類型 this.length+=chunk.length; let ret = this.length{ callback(); this.clearBuffer(); }); // 8 } return ret; } _write(chunk,encoding,callback){ if(typeof this.fd !== "number"){ return this.once("open",()=>this._write(chunk,encoding,callback)); } fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{ this.length -= byteWritten; this.pos += byteWritten; callback(); // 清空緩存區(qū)的內(nèi)容 }); }
_write寫入之后的回調(diào)中我們會調(diào)用傳入回調(diào)函數(shù)clearBuffer,這個方法會去buffers中繼續(xù)遞歸地把數(shù)據(jù)取出,然后繼續(xù)調(diào)用_write方法去寫入,直到全部buffer中的數(shù)據(jù)取出后,這樣就清空了buffers。
clearBuffer(){ let buffer = this.buffers.shift(); if(buffer){ this._write(buffer.chunk,buffer.encoding,()=>{ buffer.callback(); this.clearBuffer() }); }else{ this.writing = false; if(this.needDrain){ // 是否需要觸發(fā)drain 需要就發(fā)射drain事件 this.needDrain = false; this.emit("drain"); } } }
最后附上完整的代碼
let EventEmitter = require("events"); let fs = require("fs"); class WriteStream extends EventEmitter{ constructor(path,options){ super(); this.path = path; this.highWaterMark = options.highWaterMark||16*1024; this.autoClose = options.autoClose||true; this.mode = options.mode; this.start = options.start||0; this.flags = options.flags||"w"; this.encoding = options.encoding || "utf8"; // 可寫流 要有一個緩存區(qū),當(dāng)正在寫入文件是,內(nèi)容要寫入到緩存區(qū)中 // 在源碼中是一個鏈表 => [] this.buffers = []; // 標(biāo)識 是否正在寫入 this.writing = false; // 是否滿足觸發(fā)drain事件 this.needDrain = false; // 記錄寫入的位置 this.pos = 0; // 記錄緩存區(qū)的大小 this.length = 0; this.open(); } destroy(){ if(typeof this.fd !=="number"){ return this.emit("close"); } fs.close(this.fd,()=>{ this.emit("close") }) } open(){ fs.open(this.path,this.flags,this.mode,(err,fd)=>{ if(err){ this.emit("error",err); if(this.autoClose){ this.destroy(); } return } this.fd = fd; this.emit("open"); }) } write(chunk,encoding=this.encoding,callback=()=>{}){ chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding); // write 返回一個boolean類型 this.length+=chunk.length; let ret = this.lengthPipe管道流{ callback(); this.clearBuffer(); }); // 8 } return ret; } clearBuffer(){ let buffer = this.buffers.shift(); if(buffer){ this._write(buffer.chunk,buffer.encoding,()=>{ buffer.callback(); this.clearBuffer() }); }else{ this.writing = false; if(this.needDrain){ // 是否需要觸發(fā)drain 需要就發(fā)射drain事件 this.needDrain = false; this.emit("drain"); } } } _write(chunk,encoding,callback){ if(typeof this.fd !== "number"){ return this.once("open",()=>this._write(chunk,encoding,callback)); } fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{ this.length -= byteWritten; this.pos += byteWritten; callback(); // 清空緩存區(qū)的內(nèi)容 }); } } module.exports = WriteStream;
前面我們了解了可讀流與可寫流,那么怎么讓二者結(jié)合起來使用呢,node給我們提供好了方法--Pipe管道,流顧名思義,就是在可讀流與可寫流中間加入一個管道,實現(xiàn)一邊讀取,一邊寫入,讀一點寫一點。
Pipe的使用方法如下
let fs = require("fs"); let path = require("path"); let ReadStream = require("./ReadStream"); let WriteStream = require("./WriteStream"); let rs = new ReadStream(path.join(__dirname, "./1.txt"), { highWaterMark: 4 }); let ws = new WriteStream(path.join(__dirname, "./2.txt"), { highWaterMark: 1 }); // 4 1 rs.pipe(ws);實現(xiàn)原理
Pipe的原理比較簡單,簡單說監(jiān)聽可讀流的data事件來持續(xù)獲取文件中的數(shù)據(jù),然后我們就會去調(diào)用寫流的write方法。如果可寫流緩存區(qū)已滿,那么當(dāng)我們得到調(diào)用可讀流的pause方法來暫停讀取,然后等到寫流的緩存區(qū)已經(jīng)全部寫入并且觸發(fā)drain事件時,我們就會調(diào)用resume重新開啟讀取的流程。上代碼
pipe(ws) { this.on("data", (chunk) => { let flag = ws.write(chunk); if (!flag) { this.pause(); } }); ws.on("drain", () => { this.resume(); }) }自定義流
Node允許我們自定義流,讀流繼承于Readable接口,寫流則繼承于Writable接口,所以我們其實是可以自定義一個流模塊,只要繼承stream模塊對應(yīng)的接口即可。
自定義可讀流如果我們要自定義讀流的話,那我們就需要繼承Readable,Readable里面有一個read()方法,默認(rèn)調(diào)用_read(),所以我們只要復(fù)寫了_read()方法就可實現(xiàn)讀取的邏輯,同時Readable中也提供了一個push方法,調(diào)用push方法就會觸發(fā)data事件,push中的參數(shù)就是data事件回調(diào)函數(shù)的參數(shù),當(dāng)push傳入的參數(shù)為null的時候就代表讀流停止,上代碼
let { Readable } = require("stream"); // 想實現(xiàn)什么流 就繼承這個流 // Readable里面有一個read()方法,默認(rèn)掉_read() // Readable中提供了一個push方法你調(diào)用push方法就會觸發(fā)data事件 let index = 9; class MyRead extends Readable { _read() { // 可讀流什么時候停止呢? 當(dāng)push null的時候停止 if (index-- > 0) return this.push("123"); this.push(null); } } let mr = new MyRead(); mr.on("data", function(data) { console.log(data); });自定義可寫流
與自定義讀流類似,自定義寫流需要繼承Writable接口,并且實現(xiàn)一個_write()方法,這里注意的是_write中可以傳入3個參數(shù),chunk, encoding, callback,chunk就是代表寫入的數(shù)據(jù),通常是一個buffer,encoding是編碼類型,通常不會用到,最后的callback要注意,它并不是我們用這個自定義寫流調(diào)用write時的回調(diào),而是我們上面講到寫流實現(xiàn)時的clearBuffer函數(shù)。
let { Writable } = require("stream"); // 可寫流實現(xiàn)_write方法 // 源碼中默認(rèn)調(diào)用的是Writable中的write方法 class MyWrite extends Writable { _write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); // clearBuffer } } let mw = new MyWrite(); mw.write("111", "utf8", () => { console.log(1); }) mw.write("222", "utf8", () => { console.log(1); });Duplex 雙工流
雙工流其實就是結(jié)合了上面我們說的自定義讀流和自定義寫流,它既能讀也能寫,同時可以做到讀寫之間互不干擾
let { Duplex } = require("stream"); // 雙工流 又能讀 又能寫,而且讀取可以沒關(guān)系(互不干擾) let d = Duplex({ read() { this.push("hello"); this.push(null); }, write(chunk, encoding, callback) { console.log(chunk); callback(); } }); d.on("data", function(data) { console.log(data); }); d.write("hello");Transform 轉(zhuǎn)換流
轉(zhuǎn)換流的本質(zhì)就是雙工流,唯一不同的是它并不需要像上面提到的雙工流一樣實現(xiàn)read和write,它只需要實現(xiàn)一個transform方法用于轉(zhuǎn)換
let { Transform } = require("stream"); // 它的參數(shù)和可寫流一樣 let tranform1 = Transform({ transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()); // 將輸入的內(nèi)容放入到可讀流中 callback(); } }); let tranform2 = Transform({ transform(chunk, encoding, callback){ console.log(chunk.toString()); callback(); } }); // 等待你的輸入 // rs.pipe(ws); // 希望將輸入的內(nèi)容轉(zhuǎn)化成大寫在輸出出來 process.stdin.pipe(tranform1).pipe(tranform2); // 對象流 可讀流里只能放buffer或者字符串 對象流里可以放對象對象流
默認(rèn)情況下,流處理的數(shù)據(jù)是Buffer/String類型的值。對象流的特點就是它有一個objectMode標(biāo)志,我們可以設(shè)置它讓流可以接受任何JavaScript對象。上代碼
const { Transform } = require("stream"); let fs = require("fs"); let rs = fs.createReadStream("./users.json"); rs.setEncoding("utf8"); let toJson = Transform({ readableObjectMode: true, transform(chunk, encoding, callback) { this.push(JSON.parse(chunk)); callback(); } }); let jsonOut = Transform({ writableObjectMode: true, transform(chunk, encoding, callback) { console.log(chunk); callback(); } }); rs.pipe(toJson).pipe(jsonOut);
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/94289.html
摘要:在可讀流事件里我們就必須調(diào)用方法。當(dāng)一個對象就意味著我們想發(fā)出信號這個流沒有更多數(shù)據(jù)了自定義可寫流為了實現(xiàn)可寫流,我們需要使用流模塊中的構(gòu)造函數(shù)。我們只需給構(gòu)造函數(shù)傳遞一些選項并創(chuàng)建一個對象。 前言 什么是流呢?看字面意思,我們可能會想起生活中的水流,電流。但是流不是水也不是電,它只是描述水和電的流動;所以說流是抽象的。在node.js中流是一個抽象接口,它不關(guān)心文件內(nèi)容,只關(guān)注是否從...
摘要:回調(diào)函數(shù)中檢測該次寫入是否被緩沖,若是,觸發(fā)事件。若目標(biāo)可寫流表示該寫入操作需要進(jìn)行緩沖,則立刻將源可讀流切換至?xí)和DJ健1O(jiān)聽源可讀流的事件,相應(yīng)地結(jié)束目標(biāo)可寫流。 在Node.js中,流(Stream)是其眾多原生對象的基類,它對處理潛在的大文件提供了支持,也抽象了一些場景下的數(shù)據(jù)處理和傳遞。在它對外暴露的接口中,最為神奇的,莫過于導(dǎo)流(pipe)方法了。鑒于近期自己正在閱讀Node...
摘要:當(dāng)一個客戶端的響應(yīng)對象是一個可讀流,那么在服務(wù)器端這就是一個可寫流。的模塊給我們提供了一個可以操作任何文件的可讀流通過方法創(chuàng)建。創(chuàng)建一個可讀流創(chuàng)建可讀流,我們需要類創(chuàng)建一個可讀流非常簡單。可以通過修改可讀流配置里面的方法實現(xiàn)。 Node.js的stream模塊是有名的應(yīng)用困難,更別說理解了。那現(xiàn)在可以告訴你,這些都不是問題了。 多年來,開發(fā)人員在那里創(chuàng)建了大量的軟件包,其唯一目的就是使...
摘要:事件的觸發(fā)頻次同樣是由實現(xiàn)者決定,譬如在進(jìn)行文件讀取時,可能每行都會觸發(fā)一次而在請求處理時,可能數(shù)的數(shù)據(jù)才會觸發(fā)一次。如果有參數(shù)傳入,它會讓可讀流停止流向某個特定的目的地,否則,它會移除所有目的地。 showImg(https://segmentfault.com/img/remote/1460000016328758?w=1967&h=821); 本文節(jié)選自 Node.js Chea...
摘要:流的類型中有四種基本的流類型可讀的流例如可寫的流例如可讀寫的流例如在讀寫過程中可以修改和變換數(shù)據(jù)的流例如可讀流可讀流有兩種模式流動模式可讀流自動讀取數(shù)據(jù),通過接口的事件盡快將數(shù)據(jù)提供給應(yīng)用。 流的簡介 流(stream)在 Node.js 中是處理流數(shù)據(jù)的抽象接口(abstract interface)。 stream 模塊提供了基礎(chǔ)的 API 。使用這些 API 可以很容易地來構(gòu)建實...
閱讀 604·2021-11-18 13:12
閱讀 1321·2021-11-15 11:39
閱讀 2480·2021-09-23 11:22
閱讀 6212·2021-09-22 15:15
閱讀 3665·2021-09-02 09:54
閱讀 2318·2019-08-30 11:10
閱讀 3250·2019-08-29 14:13
閱讀 2918·2019-08-29 12:49