Stream流有以下四种范例:
- Readable – 可读操纵
- Writable – 可写操纵
- Duplex – 可读可写操纵
- Transform – 操纵被写入数据,然后读出效果
可读流(Readable stream)
可读流(Readable stream)接口是对你正在读取的数据的泉源的笼统。换句话说,数据来来自可读流(Readable stream)不会分发数据,直到你表明准备就绪。
可读流(Readable stream) 有2种形式: 活动形式(flowing mode) 和 停息形式(paused mode). 活动形式(flowing mode)时,尽快的从底层体系读取数据并提供应你的顺序。 停息形式(paused mode)时, 你必需明白的挪用 stream.read() 来读取数据。 停息形式(paused mode) 是默许形式。
能够经由历程下面几个要领,将流切换到活动形式(flowing mode)。
let fs = require('fs');
/**
* 一切初始事情形式为 paused 的 Readable 流,能够经由历程下面三种门路切换到 flowing 形式:
监听 'data' 事宜
挪用 stream.resume() 要领
挪用 stream.pipe() 要领将数据发送到 Writable
*/
let rs = fs.createReadStream('./1.txt',{
highWaterMark:3
});
/*
269
stream.emit('data', chunk);
stream.read(0);
rs.on('data',function (data) {
console.log(data);
});
rs.on('end',function () {
console.log('end');
});*/
//当你监听 readable事宜的时刻,会进入停息形式
//当监听readable事宜的时刻,可读流会立时去处底层读取文件,然后把读到文件的文件放在缓存区里const state = this._readableState;
//self.read(0); 只添补缓存,然则并不会发射data事宜,然则会发射stream.emit('readable');事宜
//this._read(state.highWaterMark); 每次挪用底层的要领读取的时刻是读取3个字节
rs.on('readable',function(){
//length就是指得缓存区数据的大小
// state.length += chunk.length;==3
console.log(rs._readableState.length);
//read假如不加参数示意读取全部缓存区数据
//读取一个字段,假如可读流发明你要读的字节小于即是缓存字节大小,则直接返回
let ch = rs.read(1);
console.log(ch);
console.log(rs._readableState.length);
/* ch = rs.read(1);
console.log(ch);
console.log(rs._readableState.length);*/
//当你读完指定的字节后,假如可读流发明剩下的字节已比最高水位线小了。则会立马再次读取填满 最高水位线
setTimeout(function(){
console.log(rs._readableState.length);
},200)
});
可写流(Writable stream )
这个要领向底层体系写入数据,并在数据处理完毕后挪用所给的回调。返回值示意你是不是应当继承马上写入。假如数据要缓存在内部,将会返回false。不然返回 true。返回值仅供参考。纵然返回 false,你也能够继承写。然则写会缓存在内存里,所以不要做的太过分。最好的方法是守候drain 事宜后,再写入数据。
let fs = require('fs');
let ws = fs.createWriteStream('2.txt',{
flags:'w',
mode:0o666,
start:0,
highWaterMark:3
});
let count = 9;
function write(){
let flag = true;//缓存区未满
//写入要领是同步的,然则写入文件的历程 异步的。在真正写入文件后还会实行我们的回调函数
while(flag && count>0){
console.log('before',count);
flag = ws.write((count)+'','utf8',(function (i) {
return ()=>console.log('after',i);
})(count));
count--;
}
}
write();//987
//监听缓存区清空事宜
ws.on('drain',function () {
console.log('drain');
write();//654 321
});
ws.on('error',function (err) {
console.log(err);
});
//假如已不再须要写入了,能够挪用end要领封闭写入流,一旦挪用end要领以后则不能再写入
ws.end();
//write after end
//
ws.write('x');
双工流(Duplex streams)
双工流(Duplex streams)是同时完成了 Readable and Writable 接口。用法详见下文
let {Duplex} = require('stream');
let index = 0;
let s = Duplex({
read(){
if(index++<3)
this.push('a');
else
this.push(null);
},
write(chunk,encoding,cb){
console.log(chunk.toString().toUpperCase());
cb();
}
});
//process.stdin 规范输入流
//proces.stdout规范输出流
process.stdin.pipe(s).pipe(process.stdout);
转换流(Transform streams)
它的输出是从输入盘算得来。 它完成了Readable 和 Writable 接口. 用法详见下文.
let {Transform} = require('stream');
//转换流是完成数据转换的
let t = Transform({
transform(chunk,encoding,cb){
this.push(chunk.toString().toUpperCase());
cb();
}
});
process.stdin.pipe(t).pipe(process.stdout);
let {Transform} = require('stream');
let fs = require('fs');
let rs = fs.createReadStream('./user.json');
//普通流里的放的是Buffer,对象流里放的对象
let toJSON = Transform({
readableObjectMode:true,//就能够向可读流里放对象
transform(chunk,encoding,cb){
//向可读流里的缓存区里放
this.push(JSON.parse(chunk.toString()));
}
});
let outJSON = Transform({
writableObjectMode:true,//就能够向可读流里放对象
transform(chunk,encoding,cb){
console.log(chunk);
cb();
}
});
rs.pipe(toJSON).pipe(outJSON);