流的基本概念及理解
流是一种数据传输手段,是有顺序的,有起点和终点,比如你要把数据从一个地方传到另外一个地方
流非常重要,gulp,webpack,HTTP里的请求和响应,http里的socket都是流,包括后面压缩,加密等
流为什么这么好用还这么重要呢?
- 因为有时候我们不关心文件的主体内容,只关心能不能取到数据,取到数据之后怎么进行处理
- 对于小型的文本文件,我们可以把文件内容全部读入内存,然后再写入文件,比如grunt-file-copy
- 对于体积较大的二进制文件,比如音频、视频文件,动辄几个GB大小,如果使用这种方法,很容易使内存“爆仓”。
- 理想的方法应该是读一部分,写一部分,不管文件有多大,只要时间允许,总会处理完成,这里就需要用到流的概念
流是一个抽象接口,被Node中很多对象所实现,比如HTTP服务器request和response对象都是流
Node.js 中有四种基本的流类型:
- Readable – 可读的流 (例如 fs.createReadStream()).
- Writable – 可写的流 (例如 fs.createWriteStream()).
- Duplex – 可读写的流 (例如 net.Socket).
- Transform – 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).
可以通过 require(‘stream’) 加载 Stream 基类。其中包括了 Readable 流、Writable 流、Duplex 流和 Transform 流的基类
Readable streams可读流
可读流(Readable streams)是对提供数据的 源头(source)的抽象
可读流的例子包括:
- HTTP responses, on the client :客户端请求
- HTTP requests, on the server :服务端请求
- fs read streams :读文件
- zlib streams :压缩
- crypto streams :加密
- TCP sockets :TCP协议
- child process stdout and stderr :子进程标准输出和错误输出
- process.stdin :标准输入
所有的 Readable 都实现了 stream.Readable 类定义的接口
通过流读取数据
- 用Readable创建对象readable后,便得到了一个可读流
- 如果实现_read方法,就将流连接到一个底层数据源
- 流通过调用_read向底层请求数据,底层再调用流的push方法将需要的数据传递过来
- 当readable连接了数据源后,下游便可以调用readable.read(n)向流请求数据,同时监听readable的data事件来接收取到的数据
下面简单举个可读流的例子:
- 监听可读流的data事件,当你一旦开始监听data事件的时候,流就可以读文件的内容并且发射data,读一点发射一点读一点发射一点
- 默认情况下,当你监听data事件之后,会不停的读数据,然后触发data事件,触发完data事件后再次读数据
- 读的时候不是把文件整体内容读出来再发射出来的,而且设置一个缓冲区,大小默认是64K,比如文件是128K,先读64K发射出来,再读64K在发射出来,会发射两次
- 缓冲区的大小可以通过highWaterMark来设置
let fs = require('fs');
//通过创建一个可读流
let rs = fs.createReadStream('./1.txt',{
flags:'r',//我们要对文件进行何种操作
mode:0o666,//权限位
encoding:'utf8',//不传默认为buffer,显示为字符串
start:3,//从索引为3的位置开始读
//这是我的见过唯一一个包括结束索引的
end:8,//读到索引为8结束
highWaterMark:3//缓冲区大小
});
rs.on('open',function () {
console.log('文件打开');
});
rs.setEncoding('utf8');//显示为字符串
//希望流有一个暂停和恢复触发的机制
rs.on('data',function (data) {
console.log(data);
rs.pause();//暂停读取和发射data事件
setTimeout(function(){
rs.resume();//恢复读取并触发data事件
},2000);
});
//如果读取文件出错了,会触发error事件
rs.on('error',function () {
console.log("error");
});
//如果文件的内容读完了,会触发end事件
rs.on('end',function () {
console.log('读完了');
});
rs.on('close',function () {
console.log('文件关闭');
});
/**
文件打开
334
455
读完了
文件关闭
**/
可读流的简单实现
let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
flags: 'r',
encoding: 'utf8',
start: 3,
end: 7,
highWaterMark: 3
});
rs.on('open', function () {
console.log("open");
});
rs.on('data', function (data) {
console.log(data);
});
rs.on('end', function () {
console.log("end");
});
rs.on('close', function () {
console.log("close");
});
/**
open
456
789
end
close
**/
let fs = require('fs');
let EventEmitter = require('events');
class ReadStream extends EventEmitter {
constructor(path, options) {
super(path, options);
this.path = path;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.buffer = Buffer.alloc(this.highWaterMark);
this.flags = options.flags || 'r';
this.encoding = options.encoding;
this.mode = options.mode || 0o666;
this.start = options.start || 0;
this.end = options.end;
this.pos = this.start;
this.autoClose = options.autoClose || true;
this.bytesRead = 0;
this.closed = false;
this.flowing;
this.needReadable = false;
this.length = 0;
this.buffers = [];
this.on('end', function () {
if (this.autoClose) {
this.destroy();
}
});
this.on('newListener', (type) => {
if (type == 'data') {
this.flowing = true;
this.read();
}
if (type == 'readable') {
this.read(0);
}
});
this.open();
}
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
if (this.autoClose) {
this.destroy();
return this.emit('error', err);
}
}
this.fd = fd;
this.emit('open');
});
}
read(n) {
if (typeof this.fd != 'number') {
return this.once('open', () => this.read());
}
n = parseInt(n, 10);
if (n != n) {
n = this.length;
}
if (this.length == 0)
this.needReadable = true;
let ret;
if (0 < n < this.length) {
ret = Buffer.alloc(n);
let b;
let index = 0;
while (null != (b = this.buffers.shift())) {
for (let i = 0; i < b.length; i++) {
ret[index++] = b[i];
if (index == ret.length) {
this.length -= n;
b = b.slice(i + 1);
this.buffers.unshift(b);
break;
}
}
}
if (this.encoding) ret = ret.toString(this.encoding);
}
let _read = () => {
let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
if (err) {
return
}
let data;
if (bytesRead > 0) {
data = this.buffer.slice(0, bytesRead);
this.pos += bytesRead;
this.length += bytesRead;
if (this.end && this.pos > this.end) {
if (this.needReadable) {
this.emit('readable');
}
this.emit('end');
} else {
this.buffers.push(data);
if (this.needReadable) {
this.emit('readable');
this.needReadable = false;
}
}
} else {
if (this.needReadable) {
this.emit('readable');
}
return this.emit('end');
}
})
}
if (this.length == 0 || (this.length < this.highWaterMark)) {
_read(0);
}
return ret;
}
destroy() {
fs.close(this.fd, (err) => {
this.emit('close');
});
}
pause() {
this.flowing = false;
}
resume() {
this.flowing = true;
this.read();
}
pipe(dest) {
this.on('data', (data) => {
let flag = dest.write(data);
if (!flag) this.pause();
});
dest.on('drain', () => {
this.resume();
});
this.on('end', () => {
dest.end();
});
}
}
module.exports = ReadStream;
自定义可读流
为了实现可读流,引用Readable接口并用它构造新对象
- 我们可以直接把供使用的数据push出去。
- 当push一个null对象就意味着我们想发出信号——这个流没有更多数据了
var stream = require('stream');
var util = require('util');
util.inherits(Counter, stream.Readable);
function Counter(options) {
stream.Readable.call(this, options);
this._index = 0;
}
Counter.prototype._read = function() {
if(this._index++<3){
this.push(this._index+'');
}else{
this.push(null);
}
};
var counter = new Counter();
counter.on('data', function(data){
console.log("读到数据: " + data.toString());//no maybe
});
counter.on('end', function(data){
console.log("读完了");
});
可读流的两种模式
Readable Stream 存在两种模式(flowing mode 与 paused mode),这两种模式决定了chunk数据流动的方式—自动流动还是手工流动。那如何触发这两种模式呢:
- flowing mode: 注册事件data、调用resume方法、调用pipe方法
- paused mode: 调用pause方法(没有pipe方法)、移除data事件 && unpipe所有pipe
如果 Readable 切换到 flowing 模式,且没有消费者处理流中的数据,这些数据将会丢失。 比如, 调用了 readable.resume() 方法却没有监听 ‘data’ 事件,或是取消了 ‘data’ 事件监听,就有可能出现这种情况
可读流的三种状态
在任意时刻,任意可读流应确切处于下面三种状态之一:
- readable._readableState.flowing = null
- readable._readableState.flowing = false
- readable._readableState.flowing = true
两种模式取决于可读流flowing状态:
- 若为true : flowing mode;
- 若为false : paused mode
flowing mode
通过注册data、pipe、resume可以自动获取所需要的数据,我们来看下源码的实现
// data事件触发flowing mode
if (ev === 'data') {
// Start flowing on next tick if stream isn't explicitly paused
if (this._readableState.flowing !== false)
this.resume();
} else if (ev === 'readable') {
const state = this._readableState;
if (!state.endEmitted && !state.readableListening) {
state.readableListening = state.needReadable = true;
state.emittedReadable = false;
if (!state.reading) {
process.nextTick(nReadingNextTick, this);
} else if (state.length) {
emitReadable(this);
}
}
}
// resume触发flowing mode
Readable.prototype.resume = function() {
var state = this._readableState;
if (!state.flowing) {
debug('resume');
state.flowing = true;
resume(this, state);
}
return this;
}
// pipe方法触发flowing模式
Readable.prototype.resume = function() {
if (!state.flowing) {
this.resume()
}
}
flowing mode的三种方法最后均是通过resume方法,将状态变为true:state.flowing = true
paused mode
在paused mode下,需要手动地读取数据,并且可以直接指定读取数据的长度
可以通过监听事件readable,触发时手工读取chunk数据:
- 当你监听 readable事件的时候,会进入暂停模式
- 当监听readable事件的时候,可读流会马上去向底层读取文件,然后把读到文件的文件放在缓存区里const state = this._readableState;
- self.read(0); 只填充缓存,但是并不会发射data事件,但是会发射stream.emit(‘readable’);事件
- this._read(state.highWaterMark); 每次调用底层的方法读取的时候是读取3个字节
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
highWaterMark:3
});
rs.on('readable',function(){
console.log(rs._readableState.length);
//read如果不加参数表示读取整个缓存区数据
//读取一个字段,如果可读流发现你要读的字节小于等于缓存字节大小,则直接返回
let chunk = rs.read(1);
console.log(chunk);
console.log(rs._readableState.length);
//当你读完指定的字节后,如果可读流发现剩下的字节已经比最高水位线小了。则会立马再次读取填满 最高水位线
setTimeout(function(){
console.log(rs._readableState.length);
},200)
});
注意:一旦注册了readable事件,必须手工读取read数据,否则数据就会流失,我们来看下源码的实现
function emitReadable(stream) {
var state = stream._readableState;
state.needReadable = false;
if (!state.emittedReadable) {
debug('emitReadable', state.flowing);
state.emittedReadable = true;
process.nextTick(emitReadable_, stream);
}
}
function emitReadable_(stream) {
var state = stream._readableState;
debug('emit readable');
if (!state.destroyed && (state.length || state.ended)) {
stream.emit('readable');
}
state.needReadable = !state.flowing && !state.ended;
flow(stream);
}
function flow(stream) {
const state = stream._readableState;
debug('flow', state.flowing);
while (state.flowing && stream.read() !== null);
}
function endReadable(stream) {
var state = stream._readableState;
debug('endReadable', state.endEmitted);
if (!state.endEmitted) {
state.ended = true;
process.nextTick(endReadableNT, state, stream);
}
}
Readable.prototype.read = function(n) {
debug('read', n);
n = parseInt(n, 10);
var state = this._readableState;
var nOrig = n;
if (n !== 0)
state.emittedReadable = false;
if (n === 0 &&
state.needReadable &&
(state.length >= state.highWaterMark || state.ended)) {
debug('read: emitReadable', state.length, state.ended);
if (state.length === 0 && state.ended)
endReadable(this);
else
emitReadable(this);
return null;
}
n = howMuchToRead(n, state);
if (n === 0 && state.ended) {
if (state.length === 0)
endReadable(this);
return null;
}
flow方法直接read数据,将得到的数据通过事件data交付出去,然而此处没有注册data事件监控,因此,得到的chunk数据并没有交付给任何对象,这样数据就白白流失了,所以在触发emit(‘readable’)时,需要提前read数据
Writable streams可写流
可写流是对数据写入’目的地’的一种抽象
Writable:可写流的例子包括了:
- HTTP requests, on the client 客户端请求
- HTTP responses, on the server 服务器响应
- fs write streams 文件
- zlib streams 压缩
- crypto streams 加密
- TCP sockets TCP服务器
- child process stdin 子进程标准输入
- process.stdout, process.stderr 标准输出,错误输出
下面举个可写流的简单例子
- 当你往可写流里写数据的时候,不是会立刻写入文件的,而是会很写入缓存区,缓存区的大小就是highWaterMark,默认值是16K。然后等缓存区满了之后再次真正的写入文件里
let fs = require('fs');
let ws = fs.createWriteStream('./2.txt',{
flags:'w',
mode:0o666,
start:3,
highWaterMark:3//默认是16K
});
- 如果缓存区已满 ,返回false,如果缓存区未满,返回true
- 如果能接着写,返回true,如果不能接着写,返回false
- 按理说如果返回了false,就不能再往里面写了,但是如果你真写了,如果也不会丢失,会缓存在内存里。等缓存区清空之后再从内存里读出来
let flag = ws.write('1');
console.log(flag);//true
flag =ws.write('2');
console.log(flag);//true
flag =ws.write('3');
console.log(flag);//false
flag =ws.write('4');
console.log(flag);//false
'drain' 事件
如果调用 stream.write(chunk) 方法返回 false,流将在适当的时机触发 ‘drain’ 事件,这时才可以继续向流中写入数据
当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false。 一旦所有当前所有缓存的数据块都排空了(被操作系统接受来进行输出), 那么 ‘drain’ 事件就会被触发
建议, 一旦 write() 返回 false, 在 ‘drain’ 事件触发前, 不能写入任何数据块
举个简单的例子说明一下:
let fs = require('fs');
let ws = fs.createWriteStream('2.txt',{
flags:'w',
mode:0o666,
start:0,
highWaterMark:3
});
let count = 9;
function write(){
let flag = true;//缓存区未满
//写入方法是同步的,但是写入文件的过程是异步的。在真正写入文件后还会执行我们的回调函数
while(flag && count>0){
console.log('before',count);
flag = ws.write((count)+'','utf8',(function (i) {
return ()=>console.log('after',i);
})(count));
count--;
}
}
write();//987
//监听缓存区清空事件
ws.on('drain',function () {
console.log('drain');
write();//654 321
});
ws.on('error',function (err) {
console.log(err);
});
/**
before 9
before 8
before 7
after 9
after 8
after 7
**/
如果已经不再需要写入了,可以调用end方法关闭写入流,一旦调用end方法之后则不能再写入
比如在
ws.end();
后写
ws.write('x');
,会报错
write after end
'pipe'事件
linux精典的管道的概念,前者的输出是后者的输入
pipe是一种最简单直接的方法连接两个stream,内部实现了数据传递的整个过程,在开发的时候不需要关注内部数据的流动
- 这个方法从可读流拉取所有数据, 并将数据写入到提供的目标中
- 自动管理流量,将数据的滞留量限制到一个可接受的水平,以使得不同速度的来源和目标不会淹没可用内存
- 默认情况下,当源数据流触发 end的时候调用end(),所以写入数据的目标不可再写。传 { end:false }作为options,可以保持目标流打开状态
pipe方法的原理
var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
var flag = ws.write(data);
if(!flag)
rs.pause();
});
ws.on('drain', function () {
rs.resume();
});
rs.on('end', function () {
ws.end();
});
下面举个简单的例子说明一下pipe的用法:
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
highWaterMark:3
});
let ws = fs.createWriteStream('./2.txt',{
highWaterMark:3
});
rs.pipe(ws);
//移除目标可写流
rs.unpipe(ws);
- 当监听可读流data事件的时候会触发回调函数的执行
- 可以实现数据的生产者和消费者速度的均衡
rs.on('data',function (data) {
console.log(data);
let flag = ws.write(data);
if(!flag){
rs.pause();
}
});
- 监听可写流缓存区清空事件,当所有要写入的数据写入完成后,接着恢复从可读流里读取并触发data事件
ws.on('drain',function () {
console.log('drain');
rs.resume();
});
unpipe
readable.unpipe()方法将之前通过stream.pipe()方法绑定的流分离
- 如果写入的目标没有传入, 则所有绑定的流都会被分离
- 如果指定了写入的目标,但是没有绑定流,则什么事情都不会发生
简单距离说明下unpipe的用法:
let fs = require('fs');
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);
setTimeout(() => {
console.log('关闭向2.txt的写入');
from.unpipe(writable);
console.log('手工关闭文件流');
to.end();
}, 1000);
pipe的简单实现
let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
flags: 'r',
encoding: 'utf8',
highWaterMark: 3
});
let FileWriteStream = require('./WriteStream');
let ws = FileWriteStream('./2.txt',{
flags:'w',
encoding:'utf8',
highWaterMark:3
});
rs.pipe(ws);
ReadStream.prototype.pipe = function (dest) {
this.on('data', (data)=>{
let flag = dest.write(data);
if(!flag){
this.pause();
}
});
dest.on('drain', ()=>{
this.resume();
});
this.on('end', ()=>{
dest.end();
});
}
ReadStream.prototype.pause = function(){
this.flowing = false;
}
ReadStream.prototype.resume = function(){
this.flowing = true;
this.read();
}
自定义管道流
const stream = require('stream')
var index = 0;
const readable = stream.Readable({
highWaterMark: 2,
read: function () {
process.nextTick(() => {
console.log('push', ++index)
this.push(index+'');
})
}
})
const writable = stream.Writable({
highWaterMark: 2,
write: function (chunk, encoding, next) {
console.log('写入:', chunk.toString())
}
})
readable.pipe(writable);
可写流的简单实现
let fs = require('fs');
let FileWriteStream = require('./FileWriteStream');
let ws = FileWriteStream('./2.txt',{
flags:'w',
encoding:'utf8',
highWaterMark:3
});
let i = 10;
function write(){
let flag = true;
while(i&&flag){
flag = ws.write("1",'utf8',(function(i){
return function(){
console.log(i);
}
})(i));
i--;
console.log(flag);
}
}
write();
ws.on('drain',()=>{
console.log("drain");
write();
});
/**
10
9
8
drain
7
6
5
drain
4
3
2
drain
1
**/
let EventEmitter = require('events');
let util = require('util');
let fs = require('fs');
util.inherits(WriteStream, EventEmitter);
function WriteStream(path, options) {
EventEmitter.call(this);
if (!(this instanceof WriteStream)) {
return new WriteStream(path, options);
}
this.path = path;
this.fd = options.fd;
this.encoding = options.encoding||'utf8';
this.flags = options.flags || 'w';
this.mode = options.mode || 0o666;
this.autoClose = options.autoClose || true;
this.start = options.start || 0;
this.pos = this.start;//开始写入的索引位置
this.open();//打开文件进行操作
this.writing = false;//没有在写入过程 中
this.buffers = [];
this.highWaterMark = options.highWaterMark||16*1024;
//如果监听到end事件,而且要求自动关闭的话则关闭文件
this.on('end', function () {
if (this.autoClose) {
this.destroy()
}
});
}
WriteStream.prototype.close = function(){
fs.close(this.fd,(err)=>{
if(err)
this.emit('error',err);
});
}
WriteStream.prototype.open = function () {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err)
return this.emit('error', err);
this.fd = fd;//把文件描述符赋给当前实例的fd属性
//发射open事件
this.emit('open', fd);
});
}
/**
* 会判断当前是后台是否在写入过程中,如果在写入过程中,则把这个数据放在待处理的缓存中,如果不在写入过程中,可以直接写。
*/
WriteStream.prototype.write = function (chunk, encoding, cb) {
chunk= Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding);
//先把数据放在缓存里
this.buffers.push({
chunk,
encoding,
cb
});
let isFull = this.buffers.reduce((len, item) => len + item.chunk.length, 0)>=this.highWaterMark;
//只有当缓存区写满了,那么清空缓存区的时候才会发射drain事件,否则 不发放
this.needDrain = isFull;
//如果说文件还没有打开,则把写入的方法压入open事件的监听函数。等文件一旦打开,立刻执行写入操作
if (typeof this.fd !== 'number') {
this.once('open', () => {
this._write();
});
return !isFull;
}else{
if(!this.writing){
setImmediate(()=>{
this._write();
this.writing = true;
});
}
return !isFull;
}
}
WriteStream.prototype._write = function () {
let part = this.buffers.shift();
if (part) {
fs.write(this.fd,part.chunk,0,part.chunk.length,null,(err,bytesWritten)=>{
if(err)return this.emit('error',err);
part.cb && part.cb();
this._write();
});
}else{
//发射一个缓存区清空的事件
this.emit('drain');
this.writing = false;
}
}
module.exports = WriteStream;
自定义可写流
为了实现可写流,我们需要使用流模块中的Writable构造函数。 我们只需给Writable构造函数传递一些选项并创建一个对象。唯一需要的选项是write函数,该函数揭露数据块要往哪里写
- chunk通常是一个buffer,除非我们配置不同的流。
- encoding是在特定情况下需要的参数,通常我们可以忽略它。
- callback是在完成处理数据块后需要调用的函数。这是写数据成功与否的标志。若要发出故障信号,请用错误对象调用回调函数
var stream = require('stream');
var util = require('util');
util.inherits(Writer, stream.Writable);
let stock = [];
function Writer(opt) {
stream.Writable.call(this, opt);
}
Writer.prototype._write = function(chunk, encoding, callback) {
setTimeout(()=>{
stock.push(chunk.toString('utf8'));
console.log("增加: " + chunk);
callback();
},500)
};
var w = new Writer();
for (var i=1; i<=5; i++){
w.write("项目:" + i, 'utf8');
}
w.end("结束写入",function(){
console.log(stock);
});
Duplex streams可读写的流(双工流)
Duplex 流是同时实现了 Readable 和 Writable 接口的流
双工流的可读性和可写性操作完全独立于彼此,这仅仅是将两个特性组合成一个对象
Duplex 流的实例包括了:
- TCP sockets
- zlib streams
- crypto streams
下面简单实现双工流:
const {Duplex} = require('stream');
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
read(size) {
this.push((++this.index)+'');
if (this.index > 3) {
this.push(null);
}
}
});
inoutStream.index = 0;
process.stdin.pipe(inoutStream).pipe(process.stdout);
Transform streams转换流
变换流(Transform streams) 是一种 Duplex 流。它的输出与输入是通过某种方式关联的。和所有 Duplex 流一样,变换流同时实现了 Readable 和 Writable 接口
转换流的输出是从输入中计算出来的
对于转换流,我们不必实现read或write的方法,我们只需要实现一个transform方法,将两者结合起来。它有write方法的意思,我们也可以用它来push数据变换流的实例包括:
- zlib streams
- crypto streams
下面简单实现转换流:
const {Transform} = require('stream');
const upperCase = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(upperCase).pipe(process.stdout);
对象流
默认情况下,流处理的数据是Buffer/String类型的值。有一个objectMode标志,我们可以设置它让流可以接受任何JavaScript对象
const {Transform} = require('stream');
let fs = require('fs');
let rs = fs.createReadStream('./users.json');
rs.setEncoding('utf8');
let toJson = Transform({
readableObjectMode: true,
transform(chunk, encoding, callback) {
this.push(JSON.parse(chunk));
callback();
}
});
let jsonOut = Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
console.log(chunk);
callback();
}
});
rs.pipe(toJson).pipe(jsonOut);