深切nodejs中流(stream)的明白

流的基础观点及明白

流是一种数据传输手腕,是有递次的,有出发点和尽头,比方你要把数据从一个处所传到别的一个处所

流非常主要,gulp,webpack,HTTP里的请乞降相应,http里的socket都是流,包含背面紧缩,加密等

流为何这么好用还这么主要呢?

  • 由于有时候我们不体贴文件的主体内容,只体贴能不能取到数据,取到数据以后怎样举行处置惩罚
  • 关于小型的文本文件,我们能够把文件内容悉数读入内存,然后再写入文件,比方grunt-file-copy
  • 关于体积较大的二进制文件,比方音频、视频文件,动辄几个GB大小,假如运用这类要领,很轻易使内存“爆仓”。
  • 抱负的要领应该是读一部分,写一部分,不论文件有多大,只需时候许可,总会处置惩罚完成,这里就须要用到流的观点

流是一个笼统接口,被Node中许多对象所完成,比方HTTP服务器request和response对象都是流

Node.js 中有四种基础的流范例:

  • Readable – 可读的流 (比方 fs.createReadStream()).
  • Writable – 可写的流 (比方 fs.createWriteStream()).
  • Duplex – 可读写的流 (比方 net.Socket).
  • Transform – 在读写历程当中能够修正和变更数据的 Duplex 流 (比方 zlib.createDeflate()).

能够经由历程 require(‘stream’) 加载 Stream 基类。个中包含了 Readable 流、Writable 流、Duplex 流和 Transform 流的基类

Readable streams可读流

可读流(Readable streams)是对供应数据的 泉源(source)的笼统

可读流的例子包含:

  • HTTP responses, on the client :客户端请求
  • HTTP requests, on the server :服务端请求
  • fs read streams :读文件
  • zlib streams :紧缩
  • crypto streams :加密
  • TCP sockets :TCP协定
  • child process stdout and stderr :子历程规范输出和毛病输出
  • process.stdin :规范输入

一切的 Readable 都完成了 stream.Readable 类定义的接口

经由历程流读取数据

  • 用Readable建立对象readable后,便获得了一个可读流
  • 假如完成_read要领,就将留衔接到一个底层数据源
  • 流经由历程挪用_read向底层请求数据,底层再挪用流的push要领将须要的数据通报过来
  • 当readable衔接了数据源后,下流便能够挪用readable.read(n)向流请求数据,同时监听readable的data事宜来接收取到的数据

下面简朴举个可读流的例子:

  • 监听可读流的data事宜,当你一旦最先监听data事宜的时候,流就能够读文件的内容而且发射data,读一点发射一点读一点发射一点
  • 默许状况下,当你监听data事宜以后,会不断的读数据,然后触发data事宜,触发完data事宜后再次读数据
  • 读的时候不是把文件团体内容读出来再发射出来的,而且设置一个缓冲区,大小默许是64K,比方文件是128K,先读64K发射出来,再读64K在发射出来,会发射两次
  • 缓冲区的大小能够经由历程highWaterMark来设置
let fs = require('fs');
//经由历程建立一个可读流
let rs = fs.createReadStream('./1.txt',{
    flags:'r',//我们要对文件举行何种操纵
    mode:0o666,//权限位
    encoding:'utf8',//不传默许为buffer,显现为字符串
    start:3,//从索引为3的位置最先读
    //这是我的见过唯一一个包含完毕索引的
    end:8,//读到索引为8完毕
    highWaterMark:3//缓冲区大小
});
rs.on('open',function () {
    console.log('文件翻开');
});
rs.setEncoding('utf8');//显现为字符串
//愿望流有一个停息和恢复触发的机制
rs.on('data',function (data) {
    console.log(data);
    rs.pause();//停息读取和发射data事宜
    setTimeout(function(){
        rs.resume();//恢复读取并触发data事宜
    },2000);
});
//假如读取文件出错了,会触发error事宜
rs.on('error',function () {
    console.log("error");
});
//假如文件的内容读完了,会触发end事宜
rs.on('end',function () {
    console.log('读完了');
});
rs.on('close',function () {
    console.log('文件封闭');
});

/**
文件翻开
334
455
读完了
文件封闭
**/

可读流的简朴完成

let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
    flags: 'r',
    encoding: 'utf8',
    start: 3,
    end: 7,
    highWaterMark: 3
});
rs.on('open', function () {
    console.log("open");
});
rs.on('data', function (data) {
    console.log(data);
});
rs.on('end', function () {
    console.log("end");
});
rs.on('close', function () {
    console.log("close");
});
/**
 open
 456
 789
 end
 close
 **/
let fs = require('fs');
let EventEmitter = require('events');

class ReadStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.flags = options.flags || 'r';
        this.encoding = options.encoding;
        this.mode = options.mode || 0o666;
        this.start = options.start || 0;
        this.end = options.end;
        this.pos = this.start;
        this.autoClose = options.autoClose || true;
        this.bytesRead = 0;
        this.closed = false;
        this.flowing;
        this.needReadable = false;
        this.length = 0;
        this.buffers = [];
        this.on('end', function () {
            if (this.autoClose) {
                this.destroy();
            }
        });
        this.on('newListener', (type) => {
            if (type == 'data') {
                this.flowing = true;
                this.read();
            }
            if (type == 'readable') {
                this.read(0);
            }
        });
        this.open();
    }

    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                    return this.emit('error', err);
                }
            }
            this.fd = fd;
            this.emit('open');
        });
    }

    read(n) {
        if (typeof this.fd != 'number') {
            return this.once('open', () => this.read());
        }
        n = parseInt(n, 10);
        if (n != n) {
            n = this.length;
        }
        if (this.length == 0)
            this.needReadable = true;
        let ret;
        if (0 < n < this.length) {
            ret = Buffer.alloc(n);
            let b;
            let index = 0;
            while (null != (b = this.buffers.shift())) {
                for (let i = 0; i < b.length; i++) {
                    ret[index++] = b[i];
                    if (index == ret.length) {
                        this.length -= n;
                        b = b.slice(i + 1);
                        this.buffers.unshift(b);
                        break;
                    }
                }
            }
            if (this.encoding) ret = ret.toString(this.encoding);
        }

        let _read = () => {
            let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
            fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
                if (err) {
                    return
                }
                let data;
                if (bytesRead > 0) {
                    data = this.buffer.slice(0, bytesRead);
                    this.pos += bytesRead;
                    this.length += bytesRead;
                    if (this.end && this.pos > this.end) {
                        if (this.needReadable) {
                            this.emit('readable');
                        }

                        this.emit('end');
                    } else {
                        this.buffers.push(data);
                        if (this.needReadable) {
                            this.emit('readable');
                            this.needReadable = false;
                        }

                    }
                } else {
                    if (this.needReadable) {
                        this.emit('readable');
                    }
                    return this.emit('end');
                }
            })
        }
        if (this.length == 0 || (this.length < this.highWaterMark)) {
            _read(0);
        }
        return ret;
    }

    destroy() {
        fs.close(this.fd, (err) => {
            this.emit('close');
        });
    }

    pause() {
        this.flowing = false;
    }

    resume() {
        this.flowing = true;
        this.read();
    }

    pipe(dest) {
        this.on('data', (data) => {
            let flag = dest.write(data);
            if (!flag) this.pause();
        });
        dest.on('drain', () => {
            this.resume();
        });
        this.on('end', () => {
            dest.end();
        });
    }
}
module.exports = ReadStream;

自定义可读流

为了完成可读流,援用Readable接口并用它组织新对象

  • 我们能够直接把供运用的数据push出去。
  • 当push一个null对象就意味着我们想发出信号——这个流没有更多数据了
var stream = require('stream');
var util = require('util');
util.inherits(Counter, stream.Readable);
function Counter(options) {
    stream.Readable.call(this, options);
    this._index = 0;
}
Counter.prototype._read = function() {
    if(this._index++<3){
        this.push(this._index+'');
    }else{
        this.push(null);
    }
};
var counter = new Counter();

counter.on('data', function(data){
    console.log("读到数据: " + data.toString());//no maybe
});
counter.on('end', function(data){
    console.log("读完了");
});

可读流的两种形式

Readable Stream 存在两种形式(flowing mode 与 paused mode),这两种形式决议了chunk数据活动的体式格局—自动活动照样手工活动。那怎样触发这两种形式呢:

  • flowing mode: 注册事宜data、挪用resume要领、挪用pipe要领
  • paused mode: 挪用pause要领(没有pipe要领)、移除data事宜 && unpipe一切pipe

假如 Readable 切换到 flowing 形式,且没有消费者处置惩罚流中的数据,这些数据将会丧失。 比方, 挪用了 readable.resume() 要领却没有监听 ‘data’ 事宜,或是取消了 ‘data’ 事宜监听,就有能够涌现这类状况

可读流的三种状况

在恣意时候,恣意可读流应确实处于下面三种状况之一:

  • readable._readableState.flowing = null
  • readable._readableState.flowing = false
  • readable._readableState.flowing = true

两种形式取决于可读流flowing状况:

  • 若为true : flowing mode;
  • 若为false : paused mode

flowing mode

经由历程注册data、pipe、resume能够自动猎取所须要的数据,我们来看下源码的完成

// data事宜触发flowing mode
 if (ev === 'data') {
    // Start flowing on next tick if stream isn't explicitly paused
    if (this._readableState.flowing !== false)
      this.resume();
  } else if (ev === 'readable') {
    const state = this._readableState;
    if (!state.endEmitted && !state.readableListening) {
      state.readableListening = state.needReadable = true;
      state.emittedReadable = false;
      if (!state.reading) {
        process.nextTick(nReadingNextTick, this);
      } else if (state.length) {
        emitReadable(this);
      }
    }
  }

// resume触发flowing mode
Readable.prototype.resume = function() {
    var state = this._readableState;
    if (!state.flowing) {
        debug('resume');
        state.flowing = true;
    resume(this, state);
  }
  return this;
}

// pipe要领触发flowing形式
Readable.prototype.resume = function() {
    if (!state.flowing) {
        this.resume()
    }
}

flowing mode的三种要领末了均是经由历程resume要领,将状况变成true:state.flowing = true

paused mode

在paused mode下,须要手动地读取数据,而且能够直接指定读取数据的长度

能够经由历程监听事宜readable,触发时手工读取chunk数据:

  • 当你监听 readable事宜的时候,会进入停息形式
  • 当监听readable事宜的时候,可读流会立时去处底层读取文件,然后把读到文件的文件放在缓存区里const state = this._readableState;
  • self.read(0); 只添补缓存,然则并不会发射data事宜,然则会发射stream.emit(‘readable’);事宜
  • this._read(state.highWaterMark); 每次挪用底层的要领读取的时候是读取3个字节
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
    highWaterMark:3
});
rs.on('readable',function(){
    console.log(rs._readableState.length);
    //read假如不加参数示意读取全部缓存区数据
    //读取一个字段,假如可读流发明你要读的字节小于即是缓存字节大小,则直接返回
    let chunk = rs.read(1);
    console.log(chunk);
    console.log(rs._readableState.length);
    //当你读完指定的字节后,假如可读流发明剩下的字节已比最高水位线小了。则会立马再次读取填满 最高水位线
    setTimeout(function(){
        console.log(rs._readableState.length);
    },200)
});

注重:一旦注册了readable事宜,必需手工读取read数据,不然数据就会流失,我们来看下源码的完成

function emitReadable(stream) {
  var state = stream._readableState;
  state.needReadable = false;
  if (!state.emittedReadable) {
    debug('emitReadable', state.flowing);
    state.emittedReadable = true;
    process.nextTick(emitReadable_, stream);
  }
}

function emitReadable_(stream) {
  var state = stream._readableState;
  debug('emit readable');
  if (!state.destroyed && (state.length || state.ended)) {
    stream.emit('readable');
  }
  state.needReadable = !state.flowing && !state.ended;
  flow(stream);
}

function flow(stream) {
  const state = stream._readableState;
  debug('flow', state.flowing);
  while (state.flowing && stream.read() !== null);
}

function endReadable(stream) {
  var state = stream._readableState;
  debug('endReadable', state.endEmitted);
  if (!state.endEmitted) {
    state.ended = true;
    process.nextTick(endReadableNT, state, stream);
  }
}

Readable.prototype.read = function(n) {
  debug('read', n);
  n = parseInt(n, 10);
  var state = this._readableState;
  var nOrig = n;
  if (n !== 0)
    state.emittedReadable = false;
  if (n === 0 &&
      state.needReadable &&
      (state.length >= state.highWaterMark || state.ended)) {
    debug('read: emitReadable', state.length, state.ended);
    if (state.length === 0 && state.ended)
      endReadable(this);
    else
      emitReadable(this);
    return null;
  }
  n = howMuchToRead(n, state);
  if (n === 0 && state.ended) {
    if (state.length === 0)
      endReadable(this);
    return null;
  }

flow要领直接read数据,将获得的数据经由历程事宜data托付出去,但是此处没有注册data事宜监控,因而,获得的chunk数据并没有托付给任何对象,如许数据就白白流失了,所以在触发emit(‘readable’)时,须要提早read数据

Writable streams可写流

可写流是对数据写入’目的地’的一种笼统

Writable:可写流的例子包含了:

  • HTTP requests, on the client 客户端请求
  • HTTP responses, on the server 服务器相应
  • fs write streams 文件
  • zlib streams 紧缩
  • crypto streams 加密
  • TCP sockets TCP服务器
  • child process stdin 子历程规范输入
  • process.stdout, process.stderr 规范输出,毛病输出

下面举个可写流的简朴例子

  • 当你往可写流里写数据的时候,不是会马上写入文件的,而是会很写入缓存区,缓存区的大小就是highWaterMark,默许值是16K。然后等缓存区满了以后再次真正的写入文件里
let fs = require('fs');
let ws = fs.createWriteStream('./2.txt',{
   flags:'w',
   mode:0o666,
   start:3,
   highWaterMark:3//默许是16K
});
  • 假如缓存区已满 ,返回false,假如缓存区未满,返回true
  • 假如能接着写,返回true,假如不能接着写,返回false
  • 按理说假如返回了false,就不能再往内里写了,然则假如你真写了,假如也不会丧失,会缓存在内存里。等缓存区清空以后再从内存里读出来
let flag = ws.write('1');
console.log(flag);//true
flag =ws.write('2');
console.log(flag);//true
flag =ws.write('3');
console.log(flag);//false
flag =ws.write('4');
console.log(flag);//false

'drain' 事宜

假如挪用 stream.write(chunk) 要领返回 false,流将在恰当的机遇触发 ‘drain’ 事宜,这时候才能够继承向流中写入数据

当一个流不处在 drain 的状况, 对 write() 的挪用会缓存数据块, 而且返回 false。 一旦一切当前一切缓存的数据块都排空了(被操纵系统接收来举行输出), 那末 ‘drain’ 事宜就会被触发

发起, 一旦 write() 返回 false, 在 ‘drain’ 事宜触发前, 不能写入任何数据块

举个简朴的例子申明一下:

let fs = require('fs');
let ws = fs.createWriteStream('2.txt',{
    flags:'w',
    mode:0o666,
    start:0,
    highWaterMark:3
});
let count = 9;
function write(){
 let flag = true;//缓存区未满
    //写入要领是同步的,然则写入文件的历程是异步的。在真正写入文件后还会实行我们的回调函数
 while(flag && count>0){
     console.log('before',count);
     flag = ws.write((count)+'','utf8',(function (i) {
         return ()=>console.log('after',i);
     })(count));
     count--;
 }
}
write();//987
//监听缓存区清空事宜
ws.on('drain',function () {
    console.log('drain');
    write();//654 321
});
ws.on('error',function (err) {
    console.log(err);
});
/**
before 9
before 8
before 7
after 9
after 8
after 7
**/

假如已不再须要写入了,能够挪用end要领封闭写入流,一旦挪用end要领以后则不能再写入

比方在
ws.end();后写
ws.write('x');,会报错
write after end

'pipe'事宜

linux精典的管道的观点,前者的输出是后者的输入

pipe是一种最简朴直接的要领衔接两个stream,内部完成了数据通报的全部历程,在开辟的时候不须要关注内部数据的活动

  • 这个要领从可读流拉取一切数据, 并将数据写入到供应的目的中
  • 自动治理流量,将数据的滞留量限定到一个可接收的程度,以使得差别速率的泉源和目的不会吞没可用内存
  • 默许状况下,当源数据流触发 end的时候挪用end(),所以写入数据的目的不可再写。传 { end:false }作为options,能够坚持目的流翻开状况

pipe要领的道理

var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
    var flag = ws.write(data);
    if(!flag)
    rs.pause();
});
ws.on('drain', function () {
    rs.resume();
});
rs.on('end', function () {
    ws.end();
});

下面举个简朴的例子申明一下pipe的用法:

let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
  highWaterMark:3
});
let ws = fs.createWriteStream('./2.txt',{
    highWaterMark:3
});
rs.pipe(ws);
//移除目的可写流
rs.unpipe(ws);
  • 当监听可读流data事宜的时候会触发还调函数的实行
  • 能够完成数据的生产者和消费者速率的平衡
rs.on('data',function (data) {
    console.log(data);
    let flag = ws.write(data);
   if(!flag){
       rs.pause();
   }
});
  • 监听可写流缓存区清空事宜,当一切要写入的数据写入完成后,接着恢复从可读流里读取并触发data事宜
ws.on('drain',function () {
    console.log('drain');
    rs.resume();
});

unpipe

readable.unpipe()要领将之前经由历程stream.pipe()要领绑定的流星散

  • 假如写入的目的没有传入, 则一切绑定的流都会被星散
  • 假如指定了写入的目的,然则没有绑定流,则什么事情都不会发作

简朴间隔申明下unpipe的用法:

let fs = require('fs');
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);
setTimeout(() => {
console.log('封闭向2.txt的写入');
from.unpipe(writable);
console.log('手工封闭文件流');
to.end();
}, 1000);

pipe的简朴完成

let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
    flags: 'r',
    encoding: 'utf8',
    highWaterMark: 3
});
let FileWriteStream = require('./WriteStream');
let ws = FileWriteStream('./2.txt',{
    flags:'w',
    encoding:'utf8',
    highWaterMark:3
});
rs.pipe(ws);
ReadStream.prototype.pipe = function (dest) {
    this.on('data', (data)=>{
        let flag = dest.write(data);
        if(!flag){
            this.pause();
        }
    });
    dest.on('drain', ()=>{
        this.resume();
    });
    this.on('end', ()=>{
        dest.end();
    });
}
ReadStream.prototype.pause = function(){
    this.flowing = false;

}
ReadStream.prototype.resume = function(){
    this.flowing = true;
    this.read();
}

自定义管道流

const stream = require('stream')

var index = 0;
const readable = stream.Readable({
    highWaterMark: 2,
    read: function () {
        process.nextTick(() => {
            console.log('push', ++index)
            this.push(index+'');
        })
    }
})
const writable = stream.Writable({
    highWaterMark: 2,
    write: function (chunk, encoding, next) {
        console.log('写入:', chunk.toString())
    }
})
readable.pipe(writable);

可写流的简朴完成

let fs = require('fs');
 let FileWriteStream = require('./FileWriteStream');
 let ws = FileWriteStream('./2.txt',{
     flags:'w',
     encoding:'utf8',
     highWaterMark:3
 });
 let i = 10;
 function write(){
     let  flag = true;
     while(i&&flag){
         flag = ws.write("1",'utf8',(function(i){
             return function(){
                 console.log(i);
             }
         })(i));
         i--;
         console.log(flag);
     }
 }
 write();
 ws.on('drain',()=>{
     console.log("drain");
     write();
 });
 /**
  10
  9
  8
  drain
  7
  6
  5
  drain
  4
  3
  2
  drain
  1
  **/
let EventEmitter = require('events');
let util = require('util');
let fs = require('fs');
util.inherits(WriteStream, EventEmitter);

function WriteStream(path, options) {
    EventEmitter.call(this);
    if (!(this instanceof WriteStream)) {
        return new WriteStream(path, options);
    }
    this.path = path;
    this.fd = options.fd;
    this.encoding = options.encoding||'utf8';
    this.flags = options.flags || 'w';
    this.mode = options.mode || 0o666;
    this.autoClose = options.autoClose || true;
    this.start = options.start || 0;
    this.pos = this.start;//最先写入的索引位置
    this.open();//翻开文件举行操纵
    this.writing = false;//没有在写入历程 中
    this.buffers = [];
    this.highWaterMark = options.highWaterMark||16*1024;
    //假如监听到end事宜,而且请求自动封闭的话则封闭文件
    this.on('end', function () {
        if (this.autoClose) {
            this.destroy()
        }
    });
}
WriteStream.prototype.close = function(){
    fs.close(this.fd,(err)=>{
        if(err)
            this.emit('error',err);
    });
}
WriteStream.prototype.open = function () {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
        if (err)
            return this.emit('error', err);
        this.fd = fd;//把文件描述符赋给当前实例的fd属性
        //发射open事宜
        this.emit('open', fd);
    });
}
/**
 * 会推断当前是背景是不是在写入历程当中,假如在写入历程当中,则把这个数据放在待处置惩罚的缓存中,假如不在写入历程当中,能够直接写。
 */
WriteStream.prototype.write = function (chunk, encoding, cb) {
    chunk= Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding);

    //先把数据放在缓存里
    this.buffers.push({
        chunk,
        encoding,
        cb
    });

    let isFull = this.buffers.reduce((len, item) => len + item.chunk.length, 0)>=this.highWaterMark;
    //只有当缓存区写满了,那末清空缓存区的时候才会发射drain事宜,不然 不发放
    this.needDrain = isFull;
    //假如说文件还没有翻开,则把写入的要领压入open事宜的监听函数。等文件一旦翻开,马上实行写入操纵
    if (typeof this.fd !== 'number') {
         this.once('open', () => {
            this._write();
        });
        return !isFull;
    }else{
        if(!this.writing){
            setImmediate(()=>{
                this._write();
                this.writing = true;
            });
        }

        return !isFull;
    }
}
WriteStream.prototype._write = function () {
    let part = this.buffers.shift();
    if (part) {
        fs.write(this.fd,part.chunk,0,part.chunk.length,null,(err,bytesWritten)=>{
            if(err)return this.emit('error',err);
            part.cb && part.cb();
            this._write();
        });
    }else{
        //发射一个缓存区清空的事宜
        this.emit('drain');
        this.writing = false;
    }
}
module.exports = WriteStream;

自定义可写流

为了完成可写流,我们须要运用流模块中的Writable组织函数。 我们只需给Writable组织函数通报一些选项并建立一个对象。唯一须要的选项是write函数,该函数揭破数据块要往那里写

  • chunk一般是一个buffer,除非我们设置差别的流。
  • encoding是在特定状况下须要的参数,一般我们能够疏忽它。
  • callback是在完成处置惩罚数据块后须要挪用的函数。这是写数据胜利与否的标志。若要发出毛病信号,请用毛病对象挪用回调函数
var stream = require('stream');
var util = require('util');
util.inherits(Writer, stream.Writable);
let stock = [];
function Writer(opt) {
    stream.Writable.call(this, opt);
}
Writer.prototype._write = function(chunk, encoding, callback) {
    setTimeout(()=>{
        stock.push(chunk.toString('utf8'));
        console.log("增添: " + chunk);
        callback();
    },500)
};
var w = new Writer();
for (var i=1; i<=5; i++){
    w.write("项目:" + i, 'utf8');
}
w.end("完毕写入",function(){
    console.log(stock);
});

Duplex streams可读写的流(双工流)

Duplex 流是同时完成了 Readable 和 Writable 接口的流

双工流的可读性和可写性操纵完整独立于相互,这仅仅是将两个特征组合成一个对象

Duplex 流的实例包含了:

  • TCP sockets
  • zlib streams
  • crypto streams

下面简朴完成双工流:

const {Duplex} = require('stream');
const inoutStream = new Duplex({
    write(chunk, encoding, callback) {
        console.log(chunk.toString());
        callback();
    },
    read(size) {
        this.push((++this.index)+'');
        if (this.index > 3) {
            this.push(null);
        }
    }
});

inoutStream.index = 0;
process.stdin.pipe(inoutStream).pipe(process.stdout);

Transform streams转换流

变更流(Transform streams) 是一种 Duplex 流。它的输出与输入是经由历程某种体式格局关联的。和一切 Duplex 流一样,变更流同时完成了 Readable 和 Writable 接口

转换流的输出是从输入中计算出来的
关于转换流,我们没必要完成read或write的要领,我们只须要完成一个transform要领,将二者结合起来。它有write要领的意义,我们也能够用它来push数据

变更流的实例包含:

  • zlib streams
  • crypto streams

下面简朴完成转换流:

const {Transform} = require('stream');
const upperCase = new Transform({
    transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase());
        callback();
    }
});
process.stdin.pipe(upperCase).pipe(process.stdout);

对象流

默许状况下,流处置惩罚的数据是Buffer/String范例的值。有一个objectMode标志,我们能够设置它让流能够接收任何JavaScript对象

const {Transform} = require('stream');
let fs = require('fs');
let rs = fs.createReadStream('./users.json');
rs.setEncoding('utf8');
let toJson = Transform({
    readableObjectMode: true,
    transform(chunk, encoding, callback) {
        this.push(JSON.parse(chunk));
        callback();
    }
});
let jsonOut = Transform({
    writableObjectMode: true,
    transform(chunk, encoding, callback) {
        console.log(chunk);
        callback();
    }
});
rs.pipe(toJson).pipe(jsonOut);
    原文作者:October
    原文地址: https://segmentfault.com/a/1190000013122145
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞