初探Node中的stream

Stream流有以下四种范例:

  • Readable – 可读操纵
  • Writable – 可写操纵
  • Duplex – 可读可写操纵
  • Transform – 操纵被写入数据,然后读出效果

可读流(Readable stream)

可读流(Readable stream)接口是对你正在读取的数据的泉源的笼统。换句话说,数据来来自可读流(Readable stream)不会分发数据,直到你表明准备就绪。
可读流(Readable stream) 有2种形式: 活动形式(flowing mode) 和 停息形式(paused mode). 活动形式(flowing mode)时,尽快的从底层体系读取数据并提供应你的顺序。 停息形式(paused mode)时, 你必需明白的挪用 stream.read() 来读取数据。 停息形式(paused mode) 是默许形式。
能够经由历程下面几个要领,将流切换到活动形式(flowing mode)。

let fs = require('fs');
/**
 * 一切初始事情形式为 paused 的 Readable 流,能够经由历程下面三种门路切换到 flowing 形式:
 监听 'data' 事宜
 挪用 stream.resume() 要领
 挪用 stream.pipe() 要领将数据发送到 Writable
 */
let rs = fs.createReadStream('./1.txt',{
    highWaterMark:3
});
/*
269
 stream.emit('data', chunk);
    stream.read(0);
rs.on('data',function (data) {
    console.log(data);
});
rs.on('end',function () {
    console.log('end');
});*/
//当你监听 readable事宜的时刻,会进入停息形式
//当监听readable事宜的时刻,可读流会立时去处底层读取文件,然后把读到文件的文件放在缓存区里const state = this._readableState;
//self.read(0); 只添补缓存,然则并不会发射data事宜,然则会发射stream.emit('readable');事宜
//this._read(state.highWaterMark); 每次挪用底层的要领读取的时刻是读取3个字节
rs.on('readable',function(){
    //length就是指得缓存区数据的大小
    // state.length +=  chunk.length;==3
    console.log(rs._readableState.length);
    //read假如不加参数示意读取全部缓存区数据
    //读取一个字段,假如可读流发明你要读的字节小于即是缓存字节大小,则直接返回
    let ch = rs.read(1);
    console.log(ch);
    console.log(rs._readableState.length);
   /* ch = rs.read(1);
    console.log(ch);
    console.log(rs._readableState.length);*/
    //当你读完指定的字节后,假如可读流发明剩下的字节已比最高水位线小了。则会立马再次读取填满 最高水位线
    setTimeout(function(){
        console.log(rs._readableState.length);
    },200)
});

可写流(Writable stream )

这个要领向底层体系写入数据,并在数据处理完毕后挪用所给的回调。返回值示意你是不是应当继承马上写入。假如数据要缓存在内部,将会返回false。不然返回 true。返回值仅供参考。纵然返回 false,你也能够继承写。然则写会缓存在内存里,所以不要做的太过分。最好的方法是守候drain 事宜后,再写入数据。

let fs = require('fs');
let ws = fs.createWriteStream('2.txt',{
    flags:'w',
    mode:0o666,
    start:0,
    highWaterMark:3
});
let count = 9;
function write(){
 let flag = true;//缓存区未满
    //写入要领是同步的,然则写入文件的历程 异步的。在真正写入文件后还会实行我们的回调函数
 while(flag && count>0){
     console.log('before',count);
     flag = ws.write((count)+'','utf8',(function (i) {
         return ()=>console.log('after',i);
     })(count));
     count--;
 }
}
write();//987
//监听缓存区清空事宜
ws.on('drain',function () {
    console.log('drain');
    write();//654 321
});
ws.on('error',function (err) {
    console.log(err);
});
//假如已不再须要写入了,能够挪用end要领封闭写入流,一旦挪用end要领以后则不能再写入
ws.end();
//write after end
//
ws.write('x');

双工流(Duplex streams)

双工流(Duplex streams)是同时完成了 Readable and Writable 接口。用法详见下文

let {Duplex} = require('stream');
let index = 0;
let s = Duplex({
    read(){
        if(index++<3)
          this.push('a'); 
          else 
       this.push(null);   
    },
    write(chunk,encoding,cb){
       console.log(chunk.toString().toUpperCase());
       cb();
    }
});
//process.stdin 规范输入流
//proces.stdout规范输出流
process.stdin.pipe(s).pipe(process.stdout);

转换流(Transform streams)

它的输出是从输入盘算得来。 它完成了Readable 和 Writable 接口. 用法详见下文.

let {Transform}  = require('stream');
//转换流是完成数据转换的
let t = Transform({
    transform(chunk,encoding,cb){
        this.push(chunk.toString().toUpperCase());
        cb();
    }
});
process.stdin.pipe(t).pipe(process.stdout);
let {Transform} = require('stream');
let fs = require('fs');
let rs = fs.createReadStream('./user.json');
//普通流里的放的是Buffer,对象流里放的对象
let toJSON = Transform({
    readableObjectMode:true,//就能够向可读流里放对象
    transform(chunk,encoding,cb){
        //向可读流里的缓存区里放
      this.push(JSON.parse(chunk.toString()));
    }
});
let outJSON = Transform({
    writableObjectMode:true,//就能够向可读流里放对象
    transform(chunk,encoding,cb){
      console.log(chunk);
      cb();
    }
});
rs.pipe(toJSON).pipe(outJSON);
    原文作者:黄干
    原文地址: https://segmentfault.com/a/1190000013110089
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞