初探Node中的stream

Stream流有以下四种类型:

  • Readable – 可读操作
  • Writable – 可写操作
  • Duplex – 可读可写操作
  • Transform – 操作被写入数据,然后读出结果

可读流(Readable stream)

可读流(Readable stream)接口是对你正在读取的数据的来源的抽象。换句话说,数据来来自可读流(Readable stream)不会分发数据,直到你表明准备就绪。
可读流(Readable stream) 有2种模式: 流动模式(flowing mode) 和 暂停模式(paused mode). 流动模式(flowing mode)时,尽快的从底层系统读取数据并提供给你的程序。 暂停模式(paused mode)时, 你必须明确的调用 stream.read() 来读取数据。 暂停模式(paused mode) 是默认模式。
可以通过下面几个方法,将流切换到流动模式(flowing mode)。

let fs = require('fs');
/**
 * 所有初始工作模式为 paused 的 Readable 流,可以通过下面三种途径切换到 flowing 模式:
 监听 'data' 事件
 调用 stream.resume() 方法
 调用 stream.pipe() 方法将数据发送到 Writable
 */
let rs = fs.createReadStream('./1.txt',{
    highWaterMark:3
});
/*
269
 stream.emit('data', chunk);
    stream.read(0);
rs.on('data',function (data) {
    console.log(data);
});
rs.on('end',function () {
    console.log('end');
});*/
//当你监听 readable事件的时候,会进入暂停模式
//当监听readable事件的时候,可读流会马上去向底层读取文件,然后把读到文件的文件放在缓存区里const state = this._readableState;
//self.read(0); 只填充缓存,但是并不会发射data事件,但是会发射stream.emit('readable');事件
//this._read(state.highWaterMark); 每次调用底层的方法读取的时候是读取3个字节
rs.on('readable',function(){
    //length就是指得缓存区数据的大小
    // state.length +=  chunk.length;==3
    console.log(rs._readableState.length);
    //read如果不加参数表示读取整个缓存区数据
    //读取一个字段,如果可读流发现你要读的字节小于等于缓存字节大小,则直接返回
    let ch = rs.read(1);
    console.log(ch);
    console.log(rs._readableState.length);
   /* ch = rs.read(1);
    console.log(ch);
    console.log(rs._readableState.length);*/
    //当你读完指定的字节后,如果可读流发现剩下的字节已经比最高水位线小了。则会立马再次读取填满 最高水位线
    setTimeout(function(){
        console.log(rs._readableState.length);
    },200)
});

可写流(Writable stream )

这个方法向底层系统写入数据,并在数据处理完毕后调用所给的回调。返回值表示你是否应该继续立即写入。如果数据要缓存在内部,将会返回false。否则返回 true。返回值仅供参考。即使返回 false,你也可能继续写。但是写会缓存在内存里,所以不要做的太过分。最好的办法是等待drain 事件后,再写入数据。

let fs = require('fs');
let ws = fs.createWriteStream('2.txt',{
    flags:'w',
    mode:0o666,
    start:0,
    highWaterMark:3
});
let count = 9;
function write(){
 let flag = true;//缓存区未满
    //写入方法是同步的,但是写入文件的过程 异步的。在真正写入文件后还会执行我们的回调函数
 while(flag && count>0){
     console.log('before',count);
     flag = ws.write((count)+'','utf8',(function (i) {
         return ()=>console.log('after',i);
     })(count));
     count--;
 }
}
write();//987
//监听缓存区清空事件
ws.on('drain',function () {
    console.log('drain');
    write();//654 321
});
ws.on('error',function (err) {
    console.log(err);
});
//如果已经不再需要写入了,可以调用end方法关闭写入流,一旦调用end方法之后则不能再写入
ws.end();
//write after end
//
ws.write('x');

双工流(Duplex streams)

双工流(Duplex streams)是同时实现了 Readable and Writable 接口。用法详见下文

let {Duplex} = require('stream');
let index = 0;
let s = Duplex({
    read(){
        if(index++<3)
          this.push('a'); 
          else 
       this.push(null);   
    },
    write(chunk,encoding,cb){
       console.log(chunk.toString().toUpperCase());
       cb();
    }
});
//process.stdin 标准输入流
//proces.stdout标准输出流
process.stdin.pipe(s).pipe(process.stdout);

转换流(Transform streams)

它的输出是从输入计算得来。 它实现了Readable 和 Writable 接口. 用法详见下文.

let {Transform}  = require('stream');
//转换流是实现数据转换的
let t = Transform({
    transform(chunk,encoding,cb){
        this.push(chunk.toString().toUpperCase());
        cb();
    }
});
process.stdin.pipe(t).pipe(process.stdout);
let {Transform} = require('stream');
let fs = require('fs');
let rs = fs.createReadStream('./user.json');
//普通流里的放的是Buffer,对象流里放的对象
let toJSON = Transform({
    readableObjectMode:true,//就可以向可读流里放对象
    transform(chunk,encoding,cb){
        //向可读流里的缓存区里放
      this.push(JSON.parse(chunk.toString()));
    }
});
let outJSON = Transform({
    writableObjectMode:true,//就可以向可读流里放对象
    transform(chunk,encoding,cb){
      console.log(chunk);
      cb();
    }
});
rs.pipe(toJSON).pipe(outJSON);
    原文作者:黄乾
    原文地址: https://segmentfault.com/a/1190000013110089
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞