深入nodejs中流(stream)的理解

流的基本概念及理解

流是一种数据传输手段,是有顺序的,有起点和终点,比如你要把数据从一个地方传到另外一个地方

流非常重要,gulp,webpack,HTTP里的请求和响应,http里的socket都是流,包括后面压缩,加密等

流为什么这么好用还这么重要呢?

  • 因为有时候我们不关心文件的主体内容,只关心能不能取到数据,取到数据之后怎么进行处理
  • 对于小型的文本文件,我们可以把文件内容全部读入内存,然后再写入文件,比如grunt-file-copy
  • 对于体积较大的二进制文件,比如音频、视频文件,动辄几个GB大小,如果使用这种方法,很容易使内存“爆仓”。
  • 理想的方法应该是读一部分,写一部分,不管文件有多大,只要时间允许,总会处理完成,这里就需要用到流的概念

流是一个抽象接口,被Node中很多对象所实现,比如HTTP服务器request和response对象都是流

Node.js 中有四种基本的流类型:

  • Readable – 可读的流 (例如 fs.createReadStream()).
  • Writable – 可写的流 (例如 fs.createWriteStream()).
  • Duplex – 可读写的流 (例如 net.Socket).
  • Transform – 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).

可以通过 require(‘stream’) 加载 Stream 基类。其中包括了 Readable 流、Writable 流、Duplex 流和 Transform 流的基类

Readable streams可读流

可读流(Readable streams)是对提供数据的 源头(source)的抽象

可读流的例子包括:

  • HTTP responses, on the client :客户端请求
  • HTTP requests, on the server :服务端请求
  • fs read streams :读文件
  • zlib streams :压缩
  • crypto streams :加密
  • TCP sockets :TCP协议
  • child process stdout and stderr :子进程标准输出和错误输出
  • process.stdin :标准输入

所有的 Readable 都实现了 stream.Readable 类定义的接口

通过流读取数据

  • 用Readable创建对象readable后,便得到了一个可读流
  • 如果实现_read方法,就将流连接到一个底层数据源
  • 流通过调用_read向底层请求数据,底层再调用流的push方法将需要的数据传递过来
  • 当readable连接了数据源后,下游便可以调用readable.read(n)向流请求数据,同时监听readable的data事件来接收取到的数据

下面简单举个可读流的例子:

  • 监听可读流的data事件,当你一旦开始监听data事件的时候,流就可以读文件的内容并且发射data,读一点发射一点读一点发射一点
  • 默认情况下,当你监听data事件之后,会不停的读数据,然后触发data事件,触发完data事件后再次读数据
  • 读的时候不是把文件整体内容读出来再发射出来的,而且设置一个缓冲区,大小默认是64K,比如文件是128K,先读64K发射出来,再读64K在发射出来,会发射两次
  • 缓冲区的大小可以通过highWaterMark来设置
let fs = require('fs');
//通过创建一个可读流
let rs = fs.createReadStream('./1.txt',{
    flags:'r',//我们要对文件进行何种操作
    mode:0o666,//权限位
    encoding:'utf8',//不传默认为buffer,显示为字符串
    start:3,//从索引为3的位置开始读
    //这是我的见过唯一一个包括结束索引的
    end:8,//读到索引为8结束
    highWaterMark:3//缓冲区大小
});
rs.on('open',function () {
    console.log('文件打开');
});
rs.setEncoding('utf8');//显示为字符串
//希望流有一个暂停和恢复触发的机制
rs.on('data',function (data) {
    console.log(data);
    rs.pause();//暂停读取和发射data事件
    setTimeout(function(){
        rs.resume();//恢复读取并触发data事件
    },2000);
});
//如果读取文件出错了,会触发error事件
rs.on('error',function () {
    console.log("error");
});
//如果文件的内容读完了,会触发end事件
rs.on('end',function () {
    console.log('读完了');
});
rs.on('close',function () {
    console.log('文件关闭');
});

/**
文件打开
334
455
读完了
文件关闭
**/

可读流的简单实现

let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
    flags: 'r',
    encoding: 'utf8',
    start: 3,
    end: 7,
    highWaterMark: 3
});
rs.on('open', function () {
    console.log("open");
});
rs.on('data', function (data) {
    console.log(data);
});
rs.on('end', function () {
    console.log("end");
});
rs.on('close', function () {
    console.log("close");
});
/**
 open
 456
 789
 end
 close
 **/
let fs = require('fs');
let EventEmitter = require('events');

class ReadStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.flags = options.flags || 'r';
        this.encoding = options.encoding;
        this.mode = options.mode || 0o666;
        this.start = options.start || 0;
        this.end = options.end;
        this.pos = this.start;
        this.autoClose = options.autoClose || true;
        this.bytesRead = 0;
        this.closed = false;
        this.flowing;
        this.needReadable = false;
        this.length = 0;
        this.buffers = [];
        this.on('end', function () {
            if (this.autoClose) {
                this.destroy();
            }
        });
        this.on('newListener', (type) => {
            if (type == 'data') {
                this.flowing = true;
                this.read();
            }
            if (type == 'readable') {
                this.read(0);
            }
        });
        this.open();
    }

    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                    return this.emit('error', err);
                }
            }
            this.fd = fd;
            this.emit('open');
        });
    }

    read(n) {
        if (typeof this.fd != 'number') {
            return this.once('open', () => this.read());
        }
        n = parseInt(n, 10);
        if (n != n) {
            n = this.length;
        }
        if (this.length == 0)
            this.needReadable = true;
        let ret;
        if (0 < n < this.length) {
            ret = Buffer.alloc(n);
            let b;
            let index = 0;
            while (null != (b = this.buffers.shift())) {
                for (let i = 0; i < b.length; i++) {
                    ret[index++] = b[i];
                    if (index == ret.length) {
                        this.length -= n;
                        b = b.slice(i + 1);
                        this.buffers.unshift(b);
                        break;
                    }
                }
            }
            if (this.encoding) ret = ret.toString(this.encoding);
        }

        let _read = () => {
            let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
            fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
                if (err) {
                    return
                }
                let data;
                if (bytesRead > 0) {
                    data = this.buffer.slice(0, bytesRead);
                    this.pos += bytesRead;
                    this.length += bytesRead;
                    if (this.end && this.pos > this.end) {
                        if (this.needReadable) {
                            this.emit('readable');
                        }

                        this.emit('end');
                    } else {
                        this.buffers.push(data);
                        if (this.needReadable) {
                            this.emit('readable');
                            this.needReadable = false;
                        }

                    }
                } else {
                    if (this.needReadable) {
                        this.emit('readable');
                    }
                    return this.emit('end');
                }
            })
        }
        if (this.length == 0 || (this.length < this.highWaterMark)) {
            _read(0);
        }
        return ret;
    }

    destroy() {
        fs.close(this.fd, (err) => {
            this.emit('close');
        });
    }

    pause() {
        this.flowing = false;
    }

    resume() {
        this.flowing = true;
        this.read();
    }

    pipe(dest) {
        this.on('data', (data) => {
            let flag = dest.write(data);
            if (!flag) this.pause();
        });
        dest.on('drain', () => {
            this.resume();
        });
        this.on('end', () => {
            dest.end();
        });
    }
}
module.exports = ReadStream;

自定义可读流

为了实现可读流,引用Readable接口并用它构造新对象

  • 我们可以直接把供使用的数据push出去。
  • 当push一个null对象就意味着我们想发出信号——这个流没有更多数据了
var stream = require('stream');
var util = require('util');
util.inherits(Counter, stream.Readable);
function Counter(options) {
    stream.Readable.call(this, options);
    this._index = 0;
}
Counter.prototype._read = function() {
    if(this._index++<3){
        this.push(this._index+'');
    }else{
        this.push(null);
    }
};
var counter = new Counter();

counter.on('data', function(data){
    console.log("读到数据: " + data.toString());//no maybe
});
counter.on('end', function(data){
    console.log("读完了");
});

可读流的两种模式

Readable Stream 存在两种模式(flowing mode 与 paused mode),这两种模式决定了chunk数据流动的方式—自动流动还是手工流动。那如何触发这两种模式呢:

  • flowing mode: 注册事件data、调用resume方法、调用pipe方法
  • paused mode: 调用pause方法(没有pipe方法)、移除data事件 && unpipe所有pipe

如果 Readable 切换到 flowing 模式,且没有消费者处理流中的数据,这些数据将会丢失。 比如, 调用了 readable.resume() 方法却没有监听 ‘data’ 事件,或是取消了 ‘data’ 事件监听,就有可能出现这种情况

可读流的三种状态

在任意时刻,任意可读流应确切处于下面三种状态之一:

  • readable._readableState.flowing = null
  • readable._readableState.flowing = false
  • readable._readableState.flowing = true

两种模式取决于可读流flowing状态:

  • 若为true : flowing mode;
  • 若为false : paused mode

flowing mode

通过注册data、pipe、resume可以自动获取所需要的数据,我们来看下源码的实现

// data事件触发flowing mode
 if (ev === 'data') {
    // Start flowing on next tick if stream isn't explicitly paused
    if (this._readableState.flowing !== false)
      this.resume();
  } else if (ev === 'readable') {
    const state = this._readableState;
    if (!state.endEmitted && !state.readableListening) {
      state.readableListening = state.needReadable = true;
      state.emittedReadable = false;
      if (!state.reading) {
        process.nextTick(nReadingNextTick, this);
      } else if (state.length) {
        emitReadable(this);
      }
    }
  }

// resume触发flowing mode
Readable.prototype.resume = function() {
    var state = this._readableState;
    if (!state.flowing) {
        debug('resume');
        state.flowing = true;
    resume(this, state);
  }
  return this;
}

// pipe方法触发flowing模式
Readable.prototype.resume = function() {
    if (!state.flowing) {
        this.resume()
    }
}

flowing mode的三种方法最后均是通过resume方法,将状态变为true:state.flowing = true

paused mode

在paused mode下,需要手动地读取数据,并且可以直接指定读取数据的长度

可以通过监听事件readable,触发时手工读取chunk数据:

  • 当你监听 readable事件的时候,会进入暂停模式
  • 当监听readable事件的时候,可读流会马上去向底层读取文件,然后把读到文件的文件放在缓存区里const state = this._readableState;
  • self.read(0); 只填充缓存,但是并不会发射data事件,但是会发射stream.emit(‘readable’);事件
  • this._read(state.highWaterMark); 每次调用底层的方法读取的时候是读取3个字节
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
    highWaterMark:3
});
rs.on('readable',function(){
    console.log(rs._readableState.length);
    //read如果不加参数表示读取整个缓存区数据
    //读取一个字段,如果可读流发现你要读的字节小于等于缓存字节大小,则直接返回
    let chunk = rs.read(1);
    console.log(chunk);
    console.log(rs._readableState.length);
    //当你读完指定的字节后,如果可读流发现剩下的字节已经比最高水位线小了。则会立马再次读取填满 最高水位线
    setTimeout(function(){
        console.log(rs._readableState.length);
    },200)
});

注意:一旦注册了readable事件,必须手工读取read数据,否则数据就会流失,我们来看下源码的实现

function emitReadable(stream) {
  var state = stream._readableState;
  state.needReadable = false;
  if (!state.emittedReadable) {
    debug('emitReadable', state.flowing);
    state.emittedReadable = true;
    process.nextTick(emitReadable_, stream);
  }
}

function emitReadable_(stream) {
  var state = stream._readableState;
  debug('emit readable');
  if (!state.destroyed && (state.length || state.ended)) {
    stream.emit('readable');
  }
  state.needReadable = !state.flowing && !state.ended;
  flow(stream);
}

function flow(stream) {
  const state = stream._readableState;
  debug('flow', state.flowing);
  while (state.flowing && stream.read() !== null);
}

function endReadable(stream) {
  var state = stream._readableState;
  debug('endReadable', state.endEmitted);
  if (!state.endEmitted) {
    state.ended = true;
    process.nextTick(endReadableNT, state, stream);
  }
}

Readable.prototype.read = function(n) {
  debug('read', n);
  n = parseInt(n, 10);
  var state = this._readableState;
  var nOrig = n;
  if (n !== 0)
    state.emittedReadable = false;
  if (n === 0 &&
      state.needReadable &&
      (state.length >= state.highWaterMark || state.ended)) {
    debug('read: emitReadable', state.length, state.ended);
    if (state.length === 0 && state.ended)
      endReadable(this);
    else
      emitReadable(this);
    return null;
  }
  n = howMuchToRead(n, state);
  if (n === 0 && state.ended) {
    if (state.length === 0)
      endReadable(this);
    return null;
  }

flow方法直接read数据,将得到的数据通过事件data交付出去,然而此处没有注册data事件监控,因此,得到的chunk数据并没有交付给任何对象,这样数据就白白流失了,所以在触发emit(‘readable’)时,需要提前read数据

Writable streams可写流

可写流是对数据写入’目的地’的一种抽象

Writable:可写流的例子包括了:

  • HTTP requests, on the client 客户端请求
  • HTTP responses, on the server 服务器响应
  • fs write streams 文件
  • zlib streams 压缩
  • crypto streams 加密
  • TCP sockets TCP服务器
  • child process stdin 子进程标准输入
  • process.stdout, process.stderr 标准输出,错误输出

下面举个可写流的简单例子

  • 当你往可写流里写数据的时候,不是会立刻写入文件的,而是会很写入缓存区,缓存区的大小就是highWaterMark,默认值是16K。然后等缓存区满了之后再次真正的写入文件里
let fs = require('fs');
let ws = fs.createWriteStream('./2.txt',{
   flags:'w',
   mode:0o666,
   start:3,
   highWaterMark:3//默认是16K
});
  • 如果缓存区已满 ,返回false,如果缓存区未满,返回true
  • 如果能接着写,返回true,如果不能接着写,返回false
  • 按理说如果返回了false,就不能再往里面写了,但是如果你真写了,如果也不会丢失,会缓存在内存里。等缓存区清空之后再从内存里读出来
let flag = ws.write('1');
console.log(flag);//true
flag =ws.write('2');
console.log(flag);//true
flag =ws.write('3');
console.log(flag);//false
flag =ws.write('4');
console.log(flag);//false

'drain' 事件

如果调用 stream.write(chunk) 方法返回 false,流将在适当的时机触发 ‘drain’ 事件,这时才可以继续向流中写入数据

当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false。 一旦所有当前所有缓存的数据块都排空了(被操作系统接受来进行输出), 那么 ‘drain’ 事件就会被触发

建议, 一旦 write() 返回 false, 在 ‘drain’ 事件触发前, 不能写入任何数据块

举个简单的例子说明一下:

let fs = require('fs');
let ws = fs.createWriteStream('2.txt',{
    flags:'w',
    mode:0o666,
    start:0,
    highWaterMark:3
});
let count = 9;
function write(){
 let flag = true;//缓存区未满
    //写入方法是同步的,但是写入文件的过程是异步的。在真正写入文件后还会执行我们的回调函数
 while(flag && count>0){
     console.log('before',count);
     flag = ws.write((count)+'','utf8',(function (i) {
         return ()=>console.log('after',i);
     })(count));
     count--;
 }
}
write();//987
//监听缓存区清空事件
ws.on('drain',function () {
    console.log('drain');
    write();//654 321
});
ws.on('error',function (err) {
    console.log(err);
});
/**
before 9
before 8
before 7
after 9
after 8
after 7
**/

如果已经不再需要写入了,可以调用end方法关闭写入流,一旦调用end方法之后则不能再写入

比如在
ws.end();后写
ws.write('x');,会报错
write after end

'pipe'事件

linux精典的管道的概念,前者的输出是后者的输入

pipe是一种最简单直接的方法连接两个stream,内部实现了数据传递的整个过程,在开发的时候不需要关注内部数据的流动

  • 这个方法从可读流拉取所有数据, 并将数据写入到提供的目标中
  • 自动管理流量,将数据的滞留量限制到一个可接受的水平,以使得不同速度的来源和目标不会淹没可用内存
  • 默认情况下,当源数据流触发 end的时候调用end(),所以写入数据的目标不可再写。传 { end:false }作为options,可以保持目标流打开状态

pipe方法的原理

var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
    var flag = ws.write(data);
    if(!flag)
    rs.pause();
});
ws.on('drain', function () {
    rs.resume();
});
rs.on('end', function () {
    ws.end();
});

下面举个简单的例子说明一下pipe的用法:

let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
  highWaterMark:3
});
let ws = fs.createWriteStream('./2.txt',{
    highWaterMark:3
});
rs.pipe(ws);
//移除目标可写流
rs.unpipe(ws);
  • 当监听可读流data事件的时候会触发回调函数的执行
  • 可以实现数据的生产者和消费者速度的均衡
rs.on('data',function (data) {
    console.log(data);
    let flag = ws.write(data);
   if(!flag){
       rs.pause();
   }
});
  • 监听可写流缓存区清空事件,当所有要写入的数据写入完成后,接着恢复从可读流里读取并触发data事件
ws.on('drain',function () {
    console.log('drain');
    rs.resume();
});

unpipe

readable.unpipe()方法将之前通过stream.pipe()方法绑定的流分离

  • 如果写入的目标没有传入, 则所有绑定的流都会被分离
  • 如果指定了写入的目标,但是没有绑定流,则什么事情都不会发生

简单距离说明下unpipe的用法:

let fs = require('fs');
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);
setTimeout(() => {
console.log('关闭向2.txt的写入');
from.unpipe(writable);
console.log('手工关闭文件流');
to.end();
}, 1000);

pipe的简单实现

let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
    flags: 'r',
    encoding: 'utf8',
    highWaterMark: 3
});
let FileWriteStream = require('./WriteStream');
let ws = FileWriteStream('./2.txt',{
    flags:'w',
    encoding:'utf8',
    highWaterMark:3
});
rs.pipe(ws);
ReadStream.prototype.pipe = function (dest) {
    this.on('data', (data)=>{
        let flag = dest.write(data);
        if(!flag){
            this.pause();
        }
    });
    dest.on('drain', ()=>{
        this.resume();
    });
    this.on('end', ()=>{
        dest.end();
    });
}
ReadStream.prototype.pause = function(){
    this.flowing = false;

}
ReadStream.prototype.resume = function(){
    this.flowing = true;
    this.read();
}

自定义管道流

const stream = require('stream')

var index = 0;
const readable = stream.Readable({
    highWaterMark: 2,
    read: function () {
        process.nextTick(() => {
            console.log('push', ++index)
            this.push(index+'');
        })
    }
})
const writable = stream.Writable({
    highWaterMark: 2,
    write: function (chunk, encoding, next) {
        console.log('写入:', chunk.toString())
    }
})
readable.pipe(writable);

可写流的简单实现

let fs = require('fs');
 let FileWriteStream = require('./FileWriteStream');
 let ws = FileWriteStream('./2.txt',{
     flags:'w',
     encoding:'utf8',
     highWaterMark:3
 });
 let i = 10;
 function write(){
     let  flag = true;
     while(i&&flag){
         flag = ws.write("1",'utf8',(function(i){
             return function(){
                 console.log(i);
             }
         })(i));
         i--;
         console.log(flag);
     }
 }
 write();
 ws.on('drain',()=>{
     console.log("drain");
     write();
 });
 /**
  10
  9
  8
  drain
  7
  6
  5
  drain
  4
  3
  2
  drain
  1
  **/
let EventEmitter = require('events');
let util = require('util');
let fs = require('fs');
util.inherits(WriteStream, EventEmitter);

function WriteStream(path, options) {
    EventEmitter.call(this);
    if (!(this instanceof WriteStream)) {
        return new WriteStream(path, options);
    }
    this.path = path;
    this.fd = options.fd;
    this.encoding = options.encoding||'utf8';
    this.flags = options.flags || 'w';
    this.mode = options.mode || 0o666;
    this.autoClose = options.autoClose || true;
    this.start = options.start || 0;
    this.pos = this.start;//开始写入的索引位置
    this.open();//打开文件进行操作
    this.writing = false;//没有在写入过程 中
    this.buffers = [];
    this.highWaterMark = options.highWaterMark||16*1024;
    //如果监听到end事件,而且要求自动关闭的话则关闭文件
    this.on('end', function () {
        if (this.autoClose) {
            this.destroy()
        }
    });
}
WriteStream.prototype.close = function(){
    fs.close(this.fd,(err)=>{
        if(err)
            this.emit('error',err);
    });
}
WriteStream.prototype.open = function () {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
        if (err)
            return this.emit('error', err);
        this.fd = fd;//把文件描述符赋给当前实例的fd属性
        //发射open事件
        this.emit('open', fd);
    });
}
/**
 * 会判断当前是后台是否在写入过程中,如果在写入过程中,则把这个数据放在待处理的缓存中,如果不在写入过程中,可以直接写。
 */
WriteStream.prototype.write = function (chunk, encoding, cb) {
    chunk= Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding);

    //先把数据放在缓存里
    this.buffers.push({
        chunk,
        encoding,
        cb
    });

    let isFull = this.buffers.reduce((len, item) => len + item.chunk.length, 0)>=this.highWaterMark;
    //只有当缓存区写满了,那么清空缓存区的时候才会发射drain事件,否则 不发放
    this.needDrain = isFull;
    //如果说文件还没有打开,则把写入的方法压入open事件的监听函数。等文件一旦打开,立刻执行写入操作
    if (typeof this.fd !== 'number') {
         this.once('open', () => {
            this._write();
        });
        return !isFull;
    }else{
        if(!this.writing){
            setImmediate(()=>{
                this._write();
                this.writing = true;
            });
        }

        return !isFull;
    }
}
WriteStream.prototype._write = function () {
    let part = this.buffers.shift();
    if (part) {
        fs.write(this.fd,part.chunk,0,part.chunk.length,null,(err,bytesWritten)=>{
            if(err)return this.emit('error',err);
            part.cb && part.cb();
            this._write();
        });
    }else{
        //发射一个缓存区清空的事件
        this.emit('drain');
        this.writing = false;
    }
}
module.exports = WriteStream;

自定义可写流

为了实现可写流,我们需要使用流模块中的Writable构造函数。 我们只需给Writable构造函数传递一些选项并创建一个对象。唯一需要的选项是write函数,该函数揭露数据块要往哪里写

  • chunk通常是一个buffer,除非我们配置不同的流。
  • encoding是在特定情况下需要的参数,通常我们可以忽略它。
  • callback是在完成处理数据块后需要调用的函数。这是写数据成功与否的标志。若要发出故障信号,请用错误对象调用回调函数
var stream = require('stream');
var util = require('util');
util.inherits(Writer, stream.Writable);
let stock = [];
function Writer(opt) {
    stream.Writable.call(this, opt);
}
Writer.prototype._write = function(chunk, encoding, callback) {
    setTimeout(()=>{
        stock.push(chunk.toString('utf8'));
        console.log("增加: " + chunk);
        callback();
    },500)
};
var w = new Writer();
for (var i=1; i<=5; i++){
    w.write("项目:" + i, 'utf8');
}
w.end("结束写入",function(){
    console.log(stock);
});

Duplex streams可读写的流(双工流)

Duplex 流是同时实现了 Readable 和 Writable 接口的流

双工流的可读性和可写性操作完全独立于彼此,这仅仅是将两个特性组合成一个对象

Duplex 流的实例包括了:

  • TCP sockets
  • zlib streams
  • crypto streams

下面简单实现双工流:

const {Duplex} = require('stream');
const inoutStream = new Duplex({
    write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback();
    },
    read(size) {
        this.push((++this.index)+'');
        if (this.index > 3) {
            this.push(null);
        }
    }
});

inoutStream.index = 0;
process.stdin.pipe(inoutStream).pipe(process.stdout);

Transform streams转换流

变换流(Transform streams) 是一种 Duplex 流。它的输出与输入是通过某种方式关联的。和所有 Duplex 流一样,变换流同时实现了 Readable 和 Writable 接口

转换流的输出是从输入中计算出来的
对于转换流,我们不必实现read或write的方法,我们只需要实现一个transform方法,将两者结合起来。它有write方法的意思,我们也可以用它来push数据

变换流的实例包括:

  • zlib streams
  • crypto streams

下面简单实现转换流:

const {Transform} = require('stream');
const upperCase = new Transform({
    transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase());
        callback();
    }
});
process.stdin.pipe(upperCase).pipe(process.stdout);

对象流

默认情况下,流处理的数据是Buffer/String类型的值。有一个objectMode标志,我们可以设置它让流可以接受任何JavaScript对象

const {Transform} = require('stream');
let fs = require('fs');
let rs = fs.createReadStream('./users.json');
rs.setEncoding('utf8');
let toJson = Transform({
    readableObjectMode: true,
    transform(chunk, encoding, callback) {
        this.push(JSON.parse(chunk));
        callback();
    }
});
let jsonOut = Transform({
    writableObjectMode: true,
    transform(chunk, encoding, callback) {
        console.log(chunk);
        callback();
    }
});
rs.pipe(toJson).pipe(jsonOut);
    原文作者:October
    原文地址: https://segmentfault.com/a/1190000013122145
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞