node那點事(二) -- Writable streams(可寫流)、自定義流

可寫流(Writable Stream)

可寫流是對數據寫入’目的地’的一種籠統。

可寫流的道理實在與可讀流相似,當數據過來的時刻會寫入緩存池,當寫入的速率很慢或許寫入停息時刻,數據流便會進入到行列池緩存起來,固然縱然緩存池滿了,盈餘的數據也是存在內存

可寫流的簡樸用法以下代碼

let fs = require('fs');
let path = require('path');
let ws = fs.createWriteStream(path.join(__dirname,'1.txt'),{
    highWaterMark:3,
    autoClose:true,
    flags:'w',
    encoding:'utf8',
    mode:0o666,
    start:0,
}); 
let i = 9;
function write(){
    let flag = true;
    while(i>0&&flag){
        flag = ws.write(--i+'','utf8',()=>{console.log('ok')});
        console.log(flag)
    }
}
write();
// drain只有當緩存區充溢后 而且被消耗后觸發
ws.on('drain',function(){
    console.log('抽干')
    write();
});

完成道理

如今就讓我們來完成一個簡樸的可寫流,來研討可寫流的內部道理,可寫流有許多要領與可讀流相似,這裏不在反覆了起首要有一個組織函數來定義一些基礎選項屬性,然後挪用一個open放法翻開文件,而且有一個destroy要領來處置懲罰封閉邏輯

let EventEmitter = require('events');
let fs = require('fs');

class WriteStream extends EventEmitter {
    constructor(path,options) {
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark || 16 * 1024;
        this.autoClose = options.autoClose || true;
        this.mode = options.mode;
        this.start = options.start || 0;
        this.flags = options.flags || 'w';
        this.encoding = options.encoding || 'utf8';

        // 可寫流 要有一個緩存區,當正在寫入文件是,內容要寫入到緩存區中
        // 在源碼中是一個鏈表 => []
        this.buffers = [];

        // 標識 是不是正在寫入
        this.writing = false;

        // 是不是滿足觸發drain事宜
        this.needDrain = false;

        // 紀錄寫入的位置
        this.pos = 0;

        // 紀錄緩存區的大小
        this.length = 0;
        this.open();
    }
    
    destroy() {
        if (typeof this.fd !== 'number') {
            return this.emit('close');
        }
        fs.close(this.fd, () => {
            this.emit('close')
        });
    }
    
    open() {
        fs.open(this.path, this.flags, this.mode, (err,fd) => {
            if (err) {
                this.emit('error', err);
                if (this.autoClose) {
                    this.destroy();
                }
                return;
            }
            this.fd = fd;
            this.emit('open');
        })
    }
}

module.exports = WriteStream;

接着我們完成write要領來讓可寫流對象挪用,在write要領中我們起首將數據轉化為buffer,接着完成一些事宜的觸發前提的邏輯,假如如今沒有正在寫入的話我們就要真正的舉行寫入操縱了,這裏我們完成一個_write要領來完成寫入操縱,不然則代表文件正在寫入,那我們就將流傳來的數據先放在緩存區中,保證寫入數據不會同時舉行。

write(chunk,encoding=this.encoding,callback=()=>{}){
    chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);
    // write 返回一個boolean範例 
    this.length+=chunk.length; 
    let ret = this.length<this.highWaterMark; // 比較是不是達到了緩存區的大小
    this.needDrain = !ret; // 是不是須要觸發needDrain
    // 推斷是不是正在寫入 假如是正在寫入 就寫入到緩存區中
    if(this.writing){
        this.buffers.push({
            encoding,
            chunk,
            callback
        }); // []
    }else{
        // 特地用來將內容 寫入到文件內
        this.writing = true;
        this._write(chunk,encoding,()=>{
            callback();
            this.clearBuffer();
        }); // 8
    }
    return ret;
}

_write(chunk,encoding,callback){
    if(typeof this.fd !== 'number'){
        return this.once('open',()=>this._write(chunk,encoding,callback));
    }
    fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{
        this.length -= byteWritten;
        this.pos += byteWritten;
        
        callback(); // 清空緩存區的內容
    });
}
    

_write寫入以後的回調中我們會挪用傳入回調函數clearBuffer,這個要領會去buffers中繼續遞歸地把數據掏出,然後繼續挪用_write要領去寫入,直到悉數buffer中的數據掏出后,如許就清空了buffers。

clearBuffer(){
    let buffer = this.buffers.shift();
    if(buffer){
        this._write(buffer.chunk,buffer.encoding,()=>{
            buffer.callback();
            this.clearBuffer()
        });
    }else{
        this.writing = false;
        if(this.needDrain){ // 是不是須要觸發drain 須要就發射drain事宜
            this.needDrain = false;
            this.emit('drain');
        }
    }
}

末了附上完全的代碼

let EventEmitter = require('events');
let fs = require('fs');
class WriteStream extends EventEmitter{
    constructor(path,options){
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark||16*1024;
        this.autoClose = options.autoClose||true;
        this.mode = options.mode;
        this.start = options.start||0;
        this.flags = options.flags||'w';
        this.encoding = options.encoding || 'utf8';

        // 可寫流 要有一個緩存區,當正在寫入文件是,內容要寫入到緩存區中
        // 在源碼中是一個鏈表 => []

        this.buffers = [];

        // 標識 是不是正在寫入
        this.writing = false;

        // 是不是滿足觸發drain事宜
        this.needDrain = false;

        // 紀錄寫入的位置
        this.pos = 0;

        // 紀錄緩存區的大小
        this.length = 0;
        this.open();
    }
    destroy(){
        if(typeof this.fd !=='number'){
            return this.emit('close');
        }
        fs.close(this.fd,()=>{
            this.emit('close')
        })
    }
    open(){
        fs.open(this.path,this.flags,this.mode,(err,fd)=>{
            if(err){
                this.emit('error',err);
                if(this.autoClose){
                    this.destroy();
                }
                return
            }
            this.fd = fd;
            this.emit('open');
        })
    }
    write(chunk,encoding=this.encoding,callback=()=>{}){
        chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);
        // write 返回一個boolean範例 
        this.length+=chunk.length; 
        let ret = this.length<this.highWaterMark; // 比較是不是達到了緩存區的大小
        this.needDrain = !ret; // 是不是須要觸發needDrain
        // 推斷是不是正在寫入 假如是正在寫入 就寫入到緩存區中
        if(this.writing){
            this.buffers.push({
                encoding,
                chunk,
                callback
            }); // []
        }else{
            // 特地用來將內容 寫入到文件內
            this.writing = true;
            this._write(chunk,encoding,()=>{
                callback();
                this.clearBuffer();
            }); // 8
        }
        return ret;
    }
    clearBuffer(){
        let buffer = this.buffers.shift();
        if(buffer){
            this._write(buffer.chunk,buffer.encoding,()=>{
                buffer.callback();
                this.clearBuffer()
            });
        }else{
            this.writing = false;
            if(this.needDrain){ // 是不是須要觸發drain 須要就發射drain事宜
                this.needDrain = false;
                this.emit('drain');
            }
        }
    }
    _write(chunk,encoding,callback){
        if(typeof this.fd !== 'number'){
            return this.once('open',()=>this._write(chunk,encoding,callback));
        }
        fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{
            this.length -= byteWritten;
            this.pos += byteWritten;
            
            callback(); // 清空緩存區的內容
        });
    }
}

module.exports = WriteStream;

Pipe管道流

前面我們了解了可讀流與可寫流,那末怎樣讓兩者連繫起來運用呢,node給我們供應好了要領–Pipe管道,流望文生義,就是在可讀流與可寫流中心到場一個管道,完成一邊讀取,一邊寫入,讀一點寫一點。

Pipe的運用要領以下

let fs = require('fs');
let path = require('path');
let ReadStream = require('./ReadStream');
let WriteStream = require('./WriteStream');

let rs = new ReadStream(path.join(__dirname, './1.txt'), {
    highWaterMark: 4
});
let ws = new WriteStream(path.join(__dirname, './2.txt'), {
    highWaterMark: 1
});
// 4 1
rs.pipe(ws); 

完成道理

Pipe的道理比較簡樸,簡樸說監聽可讀流的data事宜來延續獵取文件中的數據,然後我們就會去挪用寫流的write要領。假如可寫流緩存區已滿,那末當我們獲得挪用可讀流的pause要領來停息讀取,然後比及寫流的緩存區已悉數寫入而且觸發drain事宜時,我們就會挪用resume從新開啟讀取的流程。上代碼

pipe(ws) {
    this.on('data', (chunk) => {
        let flag = ws.write(chunk);
        if (!flag) {
            this.pause();
        }
    });
    ws.on('drain', () => {
        this.resume();
    })
}

自定義流

Node許可我們自定義流,讀流繼續於Readable接口,寫流則繼續於Writable接口,所以我們實際上是能夠自定義一個流模塊,只需繼續stream模塊對應的接口即可。

自定義可讀流

假如我們要自定義讀流的話,那我們就須要繼續Readable,Readable內里有一個read()要領,默許挪用_read(),所以我們只需複寫了_read()要領便可完成讀取的邏輯,同時Readable中也供應了一個push要領,挪用push要領就會觸發data事宜,push中的參數就是data事宜回調函數的參數,當push傳入的參數為null的時刻就代表讀流住手,上代碼

let { Readable } = require('stream');

// 想完成什麼流 就繼續這個流
// Readable內里有一個read()要領,默許掉_read()
// Readable中供應了一個push要領你挪用push要領就會觸發data事宜
let index = 9;
class MyRead extends Readable {
    _read() {
        // 可讀流什麼時刻住手呢? 當push null的時刻住手
        if (index-- > 0) return this.push('123');
        this.push(null);
    }
}

let mr = new MyRead();
mr.on('data', function(data) {
    console.log(data);
});

自定義可寫流

與自定義讀流相似,自定義寫流須要繼續Writable接口,而且完成一個_write()要領,這裏注重的是_write中能夠傳入3個參數,chunk, encoding, callback,chunk就是代表寫入的數據,一般是一個buffer,encoding是編碼範例,一般不會用到,末了的callback要注重,它並非我們用這個自定義寫流挪用write時的回調,而是我們上面講到寫流完成時的clearBuffer函數。

let { Writable } = require('stream');

// 可寫流完成_write要領
// 源碼中默許挪用的是Writable中的write要領
class MyWrite extends Writable {
    _write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback(); // clearBuffer
    }
}

let mw = new MyWrite();
mw.write('111', 'utf8', () => {
    console.log(1);
})
mw.write('222', 'utf8', () => {
    console.log(1);
});

Duplex 雙工流

雙工流實在就是連繫了上面我們說的自定義讀流和自定義寫流,它既能讀也能寫,同時能夠做到讀寫之間互不滋擾

let { Duplex } =  require('stream');

// 雙工流 又能讀 又能寫,而且讀取能夠沒緊要(互不滋擾)
let d = Duplex({
    read() {
        this.push('hello');
        this.push(null);
    },
    write(chunk, encoding, callback) {
        console.log(chunk);
        callback();
    }
});

d.on('data', function(data) {
    console.log(data);
});
d.write('hello');

Transform 轉換流

轉換流的實質就是雙工流,唯一差別的是它並不須要像上面提到的雙工流一樣完成read和write,它只須要完成一個transform要領用於轉換

let { Transform } =  require('stream');

// 它的參數和可寫流一樣
let tranform1 = Transform({
    transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase()); // 將輸入的內容放入到可讀流中
        callback();
    }
});
let tranform2 = Transform({
    transform(chunk, encoding, callback){
        console.log(chunk.toString());
        callback();
    }
});

// 守候你的輸入
// rs.pipe(ws);
// 願望將輸入的內容轉化成大寫在輸出出來
process.stdin.pipe(tranform1).pipe(tranform2);
// 對象流 可讀流里只能放buffer或許字符串 對象流里能夠放對象

對象流

默許情況下,流處置懲罰的數據是Buffer/String範例的值。對象流的特性就是它有一個objectMode標誌,我們能夠設置它讓流能夠接收任何JavaScript對象。上代碼

const { Transform } = require('stream');

let fs = require('fs');
let rs = fs.createReadStream('./users.json');

rs.setEncoding('utf8');

let toJson = Transform({
    readableObjectMode: true,
    transform(chunk, encoding, callback) {
        this.push(JSON.parse(chunk));
        callback();
    }
});

let jsonOut = Transform({
    writableObjectMode: true,
    transform(chunk, encoding, callback) {
        console.log(chunk);
        callback();
    }
});
rs.pipe(toJson).pipe(jsonOut);
    原文作者:標記層疊樣式球
    原文地址: https://segmentfault.com/a/1190000014402530
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞