流的基础观点及明白
流是一种数据传输手腕,是有递次的,有出发点和尽头,比方你要把数据从一个处所传到别的一个处所
流非常主要,gulp,webpack,HTTP里的请乞降相应,http里的socket都是流,包含背面紧缩,加密等
流为何这么好用还这么主要呢?
- 由于有时候我们不体贴文件的主体内容,只体贴能不能取到数据,取到数据以后怎样举行处置惩罚
- 关于小型的文本文件,我们能够把文件内容悉数读入内存,然后再写入文件,比方grunt-file-copy
- 关于体积较大的二进制文件,比方音频、视频文件,动辄几个GB大小,假如运用这类要领,很轻易使内存“爆仓”。
- 抱负的要领应该是读一部分,写一部分,不论文件有多大,只需时候许可,总会处置惩罚完成,这里就须要用到流的观点
流是一个笼统接口,被Node中许多对象所完成,比方HTTP服务器request和response对象都是流
Node.js 中有四种基础的流范例:
- Readable – 可读的流 (比方 fs.createReadStream()).
- Writable – 可写的流 (比方 fs.createWriteStream()).
- Duplex – 可读写的流 (比方 net.Socket).
- Transform – 在读写历程当中能够修正和变更数据的 Duplex 流 (比方 zlib.createDeflate()).
能够经由历程 require(‘stream’) 加载 Stream 基类。个中包含了 Readable 流、Writable 流、Duplex 流和 Transform 流的基类
Readable streams可读流
可读流(Readable streams)是对供应数据的 泉源(source)的笼统
可读流的例子包含:
- HTTP responses, on the client :客户端请求
- HTTP requests, on the server :服务端请求
- fs read streams :读文件
- zlib streams :紧缩
- crypto streams :加密
- TCP sockets :TCP协定
- child process stdout and stderr :子历程规范输出和毛病输出
- process.stdin :规范输入
一切的 Readable 都完成了 stream.Readable 类定义的接口
经由历程流读取数据
- 用Readable建立对象readable后,便获得了一个可读流
- 假如完成_read要领,就将留衔接到一个底层数据源
- 流经由历程挪用_read向底层请求数据,底层再挪用流的push要领将须要的数据通报过来
- 当readable衔接了数据源后,下流便能够挪用readable.read(n)向流请求数据,同时监听readable的data事宜来接收取到的数据
下面简朴举个可读流的例子:
- 监听可读流的data事宜,当你一旦最先监听data事宜的时候,流就能够读文件的内容而且发射data,读一点发射一点读一点发射一点
- 默许状况下,当你监听data事宜以后,会不断的读数据,然后触发data事宜,触发完data事宜后再次读数据
- 读的时候不是把文件团体内容读出来再发射出来的,而且设置一个缓冲区,大小默许是64K,比方文件是128K,先读64K发射出来,再读64K在发射出来,会发射两次
- 缓冲区的大小能够经由历程highWaterMark来设置
let fs = require('fs');
//经由历程建立一个可读流
let rs = fs.createReadStream('./1.txt',{
flags:'r',//我们要对文件举行何种操纵
mode:0o666,//权限位
encoding:'utf8',//不传默许为buffer,显现为字符串
start:3,//从索引为3的位置最先读
//这是我的见过唯一一个包含完毕索引的
end:8,//读到索引为8完毕
highWaterMark:3//缓冲区大小
});
rs.on('open',function () {
console.log('文件翻开');
});
rs.setEncoding('utf8');//显现为字符串
//愿望流有一个停息和恢复触发的机制
rs.on('data',function (data) {
console.log(data);
rs.pause();//停息读取和发射data事宜
setTimeout(function(){
rs.resume();//恢复读取并触发data事宜
},2000);
});
//假如读取文件出错了,会触发error事宜
rs.on('error',function () {
console.log("error");
});
//假如文件的内容读完了,会触发end事宜
rs.on('end',function () {
console.log('读完了');
});
rs.on('close',function () {
console.log('文件封闭');
});
/**
文件翻开
334
455
读完了
文件封闭
**/
可读流的简朴完成
let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
flags: 'r',
encoding: 'utf8',
start: 3,
end: 7,
highWaterMark: 3
});
rs.on('open', function () {
console.log("open");
});
rs.on('data', function (data) {
console.log(data);
});
rs.on('end', function () {
console.log("end");
});
rs.on('close', function () {
console.log("close");
});
/**
open
456
789
end
close
**/
let fs = require('fs');
let EventEmitter = require('events');
class ReadStream extends EventEmitter {
constructor(path, options) {
super(path, options);
this.path = path;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.buffer = Buffer.alloc(this.highWaterMark);
this.flags = options.flags || 'r';
this.encoding = options.encoding;
this.mode = options.mode || 0o666;
this.start = options.start || 0;
this.end = options.end;
this.pos = this.start;
this.autoClose = options.autoClose || true;
this.bytesRead = 0;
this.closed = false;
this.flowing;
this.needReadable = false;
this.length = 0;
this.buffers = [];
this.on('end', function () {
if (this.autoClose) {
this.destroy();
}
});
this.on('newListener', (type) => {
if (type == 'data') {
this.flowing = true;
this.read();
}
if (type == 'readable') {
this.read(0);
}
});
this.open();
}
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
if (this.autoClose) {
this.destroy();
return this.emit('error', err);
}
}
this.fd = fd;
this.emit('open');
});
}
read(n) {
if (typeof this.fd != 'number') {
return this.once('open', () => this.read());
}
n = parseInt(n, 10);
if (n != n) {
n = this.length;
}
if (this.length == 0)
this.needReadable = true;
let ret;
if (0 < n < this.length) {
ret = Buffer.alloc(n);
let b;
let index = 0;
while (null != (b = this.buffers.shift())) {
for (let i = 0; i < b.length; i++) {
ret[index++] = b[i];
if (index == ret.length) {
this.length -= n;
b = b.slice(i + 1);
this.buffers.unshift(b);
break;
}
}
}
if (this.encoding) ret = ret.toString(this.encoding);
}
let _read = () => {
let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
if (err) {
return
}
let data;
if (bytesRead > 0) {
data = this.buffer.slice(0, bytesRead);
this.pos += bytesRead;
this.length += bytesRead;
if (this.end && this.pos > this.end) {
if (this.needReadable) {
this.emit('readable');
}
this.emit('end');
} else {
this.buffers.push(data);
if (this.needReadable) {
this.emit('readable');
this.needReadable = false;
}
}
} else {
if (this.needReadable) {
this.emit('readable');
}
return this.emit('end');
}
})
}
if (this.length == 0 || (this.length < this.highWaterMark)) {
_read(0);
}
return ret;
}
destroy() {
fs.close(this.fd, (err) => {
this.emit('close');
});
}
pause() {
this.flowing = false;
}
resume() {
this.flowing = true;
this.read();
}
pipe(dest) {
this.on('data', (data) => {
let flag = dest.write(data);
if (!flag) this.pause();
});
dest.on('drain', () => {
this.resume();
});
this.on('end', () => {
dest.end();
});
}
}
module.exports = ReadStream;
自定义可读流
为了完成可读流,援用Readable接口并用它组织新对象
- 我们能够直接把供运用的数据push出去。
- 当push一个null对象就意味着我们想发出信号——这个流没有更多数据了
var stream = require('stream');
var util = require('util');
util.inherits(Counter, stream.Readable);
function Counter(options) {
stream.Readable.call(this, options);
this._index = 0;
}
Counter.prototype._read = function() {
if(this._index++<3){
this.push(this._index+'');
}else{
this.push(null);
}
};
var counter = new Counter();
counter.on('data', function(data){
console.log("读到数据: " + data.toString());//no maybe
});
counter.on('end', function(data){
console.log("读完了");
});
可读流的两种形式
Readable Stream 存在两种形式(flowing mode 与 paused mode),这两种形式决议了chunk数据活动的体式格局—自动活动照样手工活动。那怎样触发这两种形式呢:
- flowing mode: 注册事宜data、挪用resume要领、挪用pipe要领
- paused mode: 挪用pause要领(没有pipe要领)、移除data事宜 && unpipe一切pipe
假如 Readable 切换到 flowing 形式,且没有消费者处置惩罚流中的数据,这些数据将会丧失。 比方, 挪用了 readable.resume() 要领却没有监听 ‘data’ 事宜,或是取消了 ‘data’ 事宜监听,就有能够涌现这类状况
可读流的三种状况
在恣意时候,恣意可读流应确实处于下面三种状况之一:
- readable._readableState.flowing = null
- readable._readableState.flowing = false
- readable._readableState.flowing = true
两种形式取决于可读流flowing状况:
- 若为true : flowing mode;
- 若为false : paused mode
flowing mode
经由历程注册data、pipe、resume能够自动猎取所须要的数据,我们来看下源码的完成
// data事宜触发flowing mode
if (ev === 'data') {
// Start flowing on next tick if stream isn't explicitly paused
if (this._readableState.flowing !== false)
this.resume();
} else if (ev === 'readable') {
const state = this._readableState;
if (!state.endEmitted && !state.readableListening) {
state.readableListening = state.needReadable = true;
state.emittedReadable = false;
if (!state.reading) {
process.nextTick(nReadingNextTick, this);
} else if (state.length) {
emitReadable(this);
}
}
}
// resume触发flowing mode
Readable.prototype.resume = function() {
var state = this._readableState;
if (!state.flowing) {
debug('resume');
state.flowing = true;
resume(this, state);
}
return this;
}
// pipe要领触发flowing形式
Readable.prototype.resume = function() {
if (!state.flowing) {
this.resume()
}
}
flowing mode的三种要领末了均是经由历程resume要领,将状况变成true:state.flowing = true
paused mode
在paused mode下,须要手动地读取数据,而且能够直接指定读取数据的长度
能够经由历程监听事宜readable,触发时手工读取chunk数据:
- 当你监听 readable事宜的时候,会进入停息形式
- 当监听readable事宜的时候,可读流会立时去处底层读取文件,然后把读到文件的文件放在缓存区里const state = this._readableState;
- self.read(0); 只添补缓存,然则并不会发射data事宜,然则会发射stream.emit(‘readable’);事宜
- this._read(state.highWaterMark); 每次挪用底层的要领读取的时候是读取3个字节
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
highWaterMark:3
});
rs.on('readable',function(){
console.log(rs._readableState.length);
//read假如不加参数示意读取全部缓存区数据
//读取一个字段,假如可读流发明你要读的字节小于即是缓存字节大小,则直接返回
let chunk = rs.read(1);
console.log(chunk);
console.log(rs._readableState.length);
//当你读完指定的字节后,假如可读流发明剩下的字节已比最高水位线小了。则会立马再次读取填满 最高水位线
setTimeout(function(){
console.log(rs._readableState.length);
},200)
});
注重:一旦注册了readable事宜,必需手工读取read数据,不然数据就会流失,我们来看下源码的完成
function emitReadable(stream) {
var state = stream._readableState;
state.needReadable = false;
if (!state.emittedReadable) {
debug('emitReadable', state.flowing);
state.emittedReadable = true;
process.nextTick(emitReadable_, stream);
}
}
function emitReadable_(stream) {
var state = stream._readableState;
debug('emit readable');
if (!state.destroyed && (state.length || state.ended)) {
stream.emit('readable');
}
state.needReadable = !state.flowing && !state.ended;
flow(stream);
}
function flow(stream) {
const state = stream._readableState;
debug('flow', state.flowing);
while (state.flowing && stream.read() !== null);
}
function endReadable(stream) {
var state = stream._readableState;
debug('endReadable', state.endEmitted);
if (!state.endEmitted) {
state.ended = true;
process.nextTick(endReadableNT, state, stream);
}
}
Readable.prototype.read = function(n) {
debug('read', n);
n = parseInt(n, 10);
var state = this._readableState;
var nOrig = n;
if (n !== 0)
state.emittedReadable = false;
if (n === 0 &&
state.needReadable &&
(state.length >= state.highWaterMark || state.ended)) {
debug('read: emitReadable', state.length, state.ended);
if (state.length === 0 && state.ended)
endReadable(this);
else
emitReadable(this);
return null;
}
n = howMuchToRead(n, state);
if (n === 0 && state.ended) {
if (state.length === 0)
endReadable(this);
return null;
}
flow要领直接read数据,将获得的数据经由历程事宜data托付出去,但是此处没有注册data事宜监控,因而,获得的chunk数据并没有托付给任何对象,如许数据就白白流失了,所以在触发emit(‘readable’)时,须要提早read数据
Writable streams可写流
可写流是对数据写入’目的地’的一种笼统
Writable:可写流的例子包含了:
- HTTP requests, on the client 客户端请求
- HTTP responses, on the server 服务器相应
- fs write streams 文件
- zlib streams 紧缩
- crypto streams 加密
- TCP sockets TCP服务器
- child process stdin 子历程规范输入
- process.stdout, process.stderr 规范输出,毛病输出
下面举个可写流的简朴例子
- 当你往可写流里写数据的时候,不是会马上写入文件的,而是会很写入缓存区,缓存区的大小就是highWaterMark,默许值是16K。然后等缓存区满了以后再次真正的写入文件里
let fs = require('fs');
let ws = fs.createWriteStream('./2.txt',{
flags:'w',
mode:0o666,
start:3,
highWaterMark:3//默许是16K
});
- 假如缓存区已满 ,返回false,假如缓存区未满,返回true
- 假如能接着写,返回true,假如不能接着写,返回false
- 按理说假如返回了false,就不能再往内里写了,然则假如你真写了,假如也不会丧失,会缓存在内存里。等缓存区清空以后再从内存里读出来
let flag = ws.write('1');
console.log(flag);//true
flag =ws.write('2');
console.log(flag);//true
flag =ws.write('3');
console.log(flag);//false
flag =ws.write('4');
console.log(flag);//false
'drain' 事宜
假如挪用 stream.write(chunk) 要领返回 false,流将在恰当的机遇触发 ‘drain’ 事宜,这时候才能够继承向流中写入数据
当一个流不处在 drain 的状况, 对 write() 的挪用会缓存数据块, 而且返回 false。 一旦一切当前一切缓存的数据块都排空了(被操纵系统接收来举行输出), 那末 ‘drain’ 事宜就会被触发
发起, 一旦 write() 返回 false, 在 ‘drain’ 事宜触发前, 不能写入任何数据块
举个简朴的例子申明一下:
let fs = require('fs');
let ws = fs.createWriteStream('2.txt',{
flags:'w',
mode:0o666,
start:0,
highWaterMark:3
});
let count = 9;
function write(){
let flag = true;//缓存区未满
//写入要领是同步的,然则写入文件的历程是异步的。在真正写入文件后还会实行我们的回调函数
while(flag && count>0){
console.log('before',count);
flag = ws.write((count)+'','utf8',(function (i) {
return ()=>console.log('after',i);
})(count));
count--;
}
}
write();//987
//监听缓存区清空事宜
ws.on('drain',function () {
console.log('drain');
write();//654 321
});
ws.on('error',function (err) {
console.log(err);
});
/**
before 9
before 8
before 7
after 9
after 8
after 7
**/
假如已不再须要写入了,能够挪用end要领封闭写入流,一旦挪用end要领以后则不能再写入
比方在
ws.end();
后写
ws.write('x');
,会报错
write after end
'pipe'事宜
linux精典的管道的观点,前者的输出是后者的输入
pipe是一种最简朴直接的要领衔接两个stream,内部完成了数据通报的全部历程,在开辟的时候不须要关注内部数据的活动
- 这个要领从可读流拉取一切数据, 并将数据写入到供应的目的中
- 自动治理流量,将数据的滞留量限定到一个可接收的程度,以使得差别速率的泉源和目的不会吞没可用内存
- 默许状况下,当源数据流触发 end的时候挪用end(),所以写入数据的目的不可再写。传 { end:false }作为options,能够坚持目的流翻开状况
pipe要领的道理
var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
var flag = ws.write(data);
if(!flag)
rs.pause();
});
ws.on('drain', function () {
rs.resume();
});
rs.on('end', function () {
ws.end();
});
下面举个简朴的例子申明一下pipe的用法:
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
highWaterMark:3
});
let ws = fs.createWriteStream('./2.txt',{
highWaterMark:3
});
rs.pipe(ws);
//移除目的可写流
rs.unpipe(ws);
- 当监听可读流data事宜的时候会触发还调函数的实行
- 能够完成数据的生产者和消费者速率的平衡
rs.on('data',function (data) {
console.log(data);
let flag = ws.write(data);
if(!flag){
rs.pause();
}
});
- 监听可写流缓存区清空事宜,当一切要写入的数据写入完成后,接着恢复从可读流里读取并触发data事宜
ws.on('drain',function () {
console.log('drain');
rs.resume();
});
unpipe
readable.unpipe()要领将之前经由历程stream.pipe()要领绑定的流星散
- 假如写入的目的没有传入, 则一切绑定的流都会被星散
- 假如指定了写入的目的,然则没有绑定流,则什么事情都不会发作
简朴间隔申明下unpipe的用法:
let fs = require('fs');
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);
setTimeout(() => {
console.log('封闭向2.txt的写入');
from.unpipe(writable);
console.log('手工封闭文件流');
to.end();
}, 1000);
pipe的简朴完成
let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
flags: 'r',
encoding: 'utf8',
highWaterMark: 3
});
let FileWriteStream = require('./WriteStream');
let ws = FileWriteStream('./2.txt',{
flags:'w',
encoding:'utf8',
highWaterMark:3
});
rs.pipe(ws);
ReadStream.prototype.pipe = function (dest) {
this.on('data', (data)=>{
let flag = dest.write(data);
if(!flag){
this.pause();
}
});
dest.on('drain', ()=>{
this.resume();
});
this.on('end', ()=>{
dest.end();
});
}
ReadStream.prototype.pause = function(){
this.flowing = false;
}
ReadStream.prototype.resume = function(){
this.flowing = true;
this.read();
}
自定义管道流
const stream = require('stream')
var index = 0;
const readable = stream.Readable({
highWaterMark: 2,
read: function () {
process.nextTick(() => {
console.log('push', ++index)
this.push(index+'');
})
}
})
const writable = stream.Writable({
highWaterMark: 2,
write: function (chunk, encoding, next) {
console.log('写入:', chunk.toString())
}
})
readable.pipe(writable);
可写流的简朴完成
let fs = require('fs');
let FileWriteStream = require('./FileWriteStream');
let ws = FileWriteStream('./2.txt',{
flags:'w',
encoding:'utf8',
highWaterMark:3
});
let i = 10;
function write(){
let flag = true;
while(i&&flag){
flag = ws.write("1",'utf8',(function(i){
return function(){
console.log(i);
}
})(i));
i--;
console.log(flag);
}
}
write();
ws.on('drain',()=>{
console.log("drain");
write();
});
/**
10
9
8
drain
7
6
5
drain
4
3
2
drain
1
**/
let EventEmitter = require('events');
let util = require('util');
let fs = require('fs');
util.inherits(WriteStream, EventEmitter);
function WriteStream(path, options) {
EventEmitter.call(this);
if (!(this instanceof WriteStream)) {
return new WriteStream(path, options);
}
this.path = path;
this.fd = options.fd;
this.encoding = options.encoding||'utf8';
this.flags = options.flags || 'w';
this.mode = options.mode || 0o666;
this.autoClose = options.autoClose || true;
this.start = options.start || 0;
this.pos = this.start;//最先写入的索引位置
this.open();//翻开文件举行操纵
this.writing = false;//没有在写入历程 中
this.buffers = [];
this.highWaterMark = options.highWaterMark||16*1024;
//假如监听到end事宜,而且请求自动封闭的话则封闭文件
this.on('end', function () {
if (this.autoClose) {
this.destroy()
}
});
}
WriteStream.prototype.close = function(){
fs.close(this.fd,(err)=>{
if(err)
this.emit('error',err);
});
}
WriteStream.prototype.open = function () {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err)
return this.emit('error', err);
this.fd = fd;//把文件描述符赋给当前实例的fd属性
//发射open事宜
this.emit('open', fd);
});
}
/**
* 会推断当前是背景是不是在写入历程当中,假如在写入历程当中,则把这个数据放在待处置惩罚的缓存中,假如不在写入历程当中,能够直接写。
*/
WriteStream.prototype.write = function (chunk, encoding, cb) {
chunk= Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,this.encoding);
//先把数据放在缓存里
this.buffers.push({
chunk,
encoding,
cb
});
let isFull = this.buffers.reduce((len, item) => len + item.chunk.length, 0)>=this.highWaterMark;
//只有当缓存区写满了,那末清空缓存区的时候才会发射drain事宜,不然 不发放
this.needDrain = isFull;
//假如说文件还没有翻开,则把写入的要领压入open事宜的监听函数。等文件一旦翻开,马上实行写入操纵
if (typeof this.fd !== 'number') {
this.once('open', () => {
this._write();
});
return !isFull;
}else{
if(!this.writing){
setImmediate(()=>{
this._write();
this.writing = true;
});
}
return !isFull;
}
}
WriteStream.prototype._write = function () {
let part = this.buffers.shift();
if (part) {
fs.write(this.fd,part.chunk,0,part.chunk.length,null,(err,bytesWritten)=>{
if(err)return this.emit('error',err);
part.cb && part.cb();
this._write();
});
}else{
//发射一个缓存区清空的事宜
this.emit('drain');
this.writing = false;
}
}
module.exports = WriteStream;
自定义可写流
为了完成可写流,我们须要运用流模块中的Writable组织函数。 我们只需给Writable组织函数通报一些选项并建立一个对象。唯一须要的选项是write函数,该函数揭破数据块要往那里写
- chunk一般是一个buffer,除非我们设置差别的流。
- encoding是在特定状况下须要的参数,一般我们能够疏忽它。
- callback是在完成处置惩罚数据块后须要挪用的函数。这是写数据胜利与否的标志。若要发出毛病信号,请用毛病对象挪用回调函数
var stream = require('stream');
var util = require('util');
util.inherits(Writer, stream.Writable);
let stock = [];
function Writer(opt) {
stream.Writable.call(this, opt);
}
Writer.prototype._write = function(chunk, encoding, callback) {
setTimeout(()=>{
stock.push(chunk.toString('utf8'));
console.log("增添: " + chunk);
callback();
},500)
};
var w = new Writer();
for (var i=1; i<=5; i++){
w.write("项目:" + i, 'utf8');
}
w.end("完毕写入",function(){
console.log(stock);
});
Duplex streams可读写的流(双工流)
Duplex 流是同时完成了 Readable 和 Writable 接口的流
双工流的可读性和可写性操纵完整独立于相互,这仅仅是将两个特征组合成一个对象
Duplex 流的实例包含了:
- TCP sockets
- zlib streams
- crypto streams
下面简朴完成双工流:
const {Duplex} = require('stream');
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
read(size) {
this.push((++this.index)+'');
if (this.index > 3) {
this.push(null);
}
}
});
inoutStream.index = 0;
process.stdin.pipe(inoutStream).pipe(process.stdout);
Transform streams转换流
变更流(Transform streams) 是一种 Duplex 流。它的输出与输入是经由历程某种体式格局关联的。和一切 Duplex 流一样,变更流同时完成了 Readable 和 Writable 接口
转换流的输出是从输入中计算出来的
关于转换流,我们没必要完成read或write的要领,我们只须要完成一个transform要领,将二者结合起来。它有write要领的意义,我们也能够用它来push数据变更流的实例包含:
- zlib streams
- crypto streams
下面简朴完成转换流:
const {Transform} = require('stream');
const upperCase = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(upperCase).pipe(process.stdout);
对象流
默许状况下,流处置惩罚的数据是Buffer/String范例的值。有一个objectMode标志,我们能够设置它让流能够接收任何JavaScript对象
const {Transform} = require('stream');
let fs = require('fs');
let rs = fs.createReadStream('./users.json');
rs.setEncoding('utf8');
let toJson = Transform({
readableObjectMode: true,
transform(chunk, encoding, callback) {
this.push(JSON.parse(chunk));
callback();
}
});
let jsonOut = Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
console.log(chunk);
callback();
}
});
rs.pipe(toJson).pipe(jsonOut);