可写流(Writable Stream)
可写流是对数据写入’目的地’的一种抽象。
可写流的原理其实与可读流类似,当数据过来的时候会写入缓存池,当写入的速度很慢或者写入暂停时候,数据流便会进入到队列池缓存起来,当然即使缓存池满了,剩余的数据也是存在内存
可写流的简单用法如下代码
let fs = require('fs');
let path = require('path');
let ws = fs.createWriteStream(path.join(__dirname,'1.txt'),{
highWaterMark:3,
autoClose:true,
flags:'w',
encoding:'utf8',
mode:0o666,
start:0,
});
let i = 9;
function write(){
let flag = true;
while(i>0&&flag){
flag = ws.write(--i+'','utf8',()=>{console.log('ok')});
console.log(flag)
}
}
write();
// drain只有当缓存区充满后 并且被消费后触发
ws.on('drain',function(){
console.log('抽干')
write();
});
实现原理
现在就让我们来实现一个简单的可写流,来研究可写流的内部原理,可写流有很多方法与可读流类似,这里不在重复了首先要有一个构造函数来定义一些基本选项属性,然后调用一个open放法打开文件,并且有一个destroy方法来处理关闭逻辑
let EventEmitter = require('events');
let fs = require('fs');
class WriteStream extends EventEmitter {
constructor(path,options) {
super();
this.path = path;
this.highWaterMark = options.highWaterMark || 16 * 1024;
this.autoClose = options.autoClose || true;
this.mode = options.mode;
this.start = options.start || 0;
this.flags = options.flags || 'w';
this.encoding = options.encoding || 'utf8';
// 可写流 要有一个缓存区,当正在写入文件是,内容要写入到缓存区中
// 在源码中是一个链表 => []
this.buffers = [];
// 标识 是否正在写入
this.writing = false;
// 是否满足触发drain事件
this.needDrain = false;
// 记录写入的位置
this.pos = 0;
// 记录缓存区的大小
this.length = 0;
this.open();
}
destroy() {
if (typeof this.fd !== 'number') {
return this.emit('close');
}
fs.close(this.fd, () => {
this.emit('close')
});
}
open() {
fs.open(this.path, this.flags, this.mode, (err,fd) => {
if (err) {
this.emit('error', err);
if (this.autoClose) {
this.destroy();
}
return;
}
this.fd = fd;
this.emit('open');
})
}
}
module.exports = WriteStream;
接着我们实现write方法来让可写流对象调用,在write方法中我们首先将数据转化为buffer,接着实现一些事件的触发条件的逻辑,如果现在没有正在写入的话我们就要真正的进行写入操作了,这里我们实现一个_write方法来实现写入操作,否则则代表文件正在写入,那我们就将流传来的数据先放在缓存区中,保证写入数据不会同时进行。
write(chunk,encoding=this.encoding,callback=()=>{}){
chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);
// write 返回一个boolean类型
this.length+=chunk.length;
let ret = this.length<this.highWaterMark; // 比较是否达到了缓存区的大小
this.needDrain = !ret; // 是否需要触发needDrain
// 判断是否正在写入 如果是正在写入 就写入到缓存区中
if(this.writing){
this.buffers.push({
encoding,
chunk,
callback
}); // []
}else{
// 专门用来将内容 写入到文件内
this.writing = true;
this._write(chunk,encoding,()=>{
callback();
this.clearBuffer();
}); // 8
}
return ret;
}
_write(chunk,encoding,callback){
if(typeof this.fd !== 'number'){
return this.once('open',()=>this._write(chunk,encoding,callback));
}
fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{
this.length -= byteWritten;
this.pos += byteWritten;
callback(); // 清空缓存区的内容
});
}
_write写入之后的回调中我们会调用传入回调函数clearBuffer,这个方法会去buffers中继续递归地把数据取出,然后继续调用_write方法去写入,直到全部buffer中的数据取出后,这样就清空了buffers。
clearBuffer(){
let buffer = this.buffers.shift();
if(buffer){
this._write(buffer.chunk,buffer.encoding,()=>{
buffer.callback();
this.clearBuffer()
});
}else{
this.writing = false;
if(this.needDrain){ // 是否需要触发drain 需要就发射drain事件
this.needDrain = false;
this.emit('drain');
}
}
}
最后附上完整的代码
let EventEmitter = require('events');
let fs = require('fs');
class WriteStream extends EventEmitter{
constructor(path,options){
super();
this.path = path;
this.highWaterMark = options.highWaterMark||16*1024;
this.autoClose = options.autoClose||true;
this.mode = options.mode;
this.start = options.start||0;
this.flags = options.flags||'w';
this.encoding = options.encoding || 'utf8';
// 可写流 要有一个缓存区,当正在写入文件是,内容要写入到缓存区中
// 在源码中是一个链表 => []
this.buffers = [];
// 标识 是否正在写入
this.writing = false;
// 是否满足触发drain事件
this.needDrain = false;
// 记录写入的位置
this.pos = 0;
// 记录缓存区的大小
this.length = 0;
this.open();
}
destroy(){
if(typeof this.fd !=='number'){
return this.emit('close');
}
fs.close(this.fd,()=>{
this.emit('close')
})
}
open(){
fs.open(this.path,this.flags,this.mode,(err,fd)=>{
if(err){
this.emit('error',err);
if(this.autoClose){
this.destroy();
}
return
}
this.fd = fd;
this.emit('open');
})
}
write(chunk,encoding=this.encoding,callback=()=>{}){
chunk = Buffer.isBuffer(chunk)?chunk:Buffer.from(chunk,encoding);
// write 返回一个boolean类型
this.length+=chunk.length;
let ret = this.length<this.highWaterMark; // 比较是否达到了缓存区的大小
this.needDrain = !ret; // 是否需要触发needDrain
// 判断是否正在写入 如果是正在写入 就写入到缓存区中
if(this.writing){
this.buffers.push({
encoding,
chunk,
callback
}); // []
}else{
// 专门用来将内容 写入到文件内
this.writing = true;
this._write(chunk,encoding,()=>{
callback();
this.clearBuffer();
}); // 8
}
return ret;
}
clearBuffer(){
let buffer = this.buffers.shift();
if(buffer){
this._write(buffer.chunk,buffer.encoding,()=>{
buffer.callback();
this.clearBuffer()
});
}else{
this.writing = false;
if(this.needDrain){ // 是否需要触发drain 需要就发射drain事件
this.needDrain = false;
this.emit('drain');
}
}
}
_write(chunk,encoding,callback){
if(typeof this.fd !== 'number'){
return this.once('open',()=>this._write(chunk,encoding,callback));
}
fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWritten)=>{
this.length -= byteWritten;
this.pos += byteWritten;
callback(); // 清空缓存区的内容
});
}
}
module.exports = WriteStream;
Pipe管道流
前面我们了解了可读流与可写流,那么怎么让二者结合起来使用呢,node给我们提供好了方法–Pipe管道,流顾名思义,就是在可读流与可写流中间加入一个管道,实现一边读取,一边写入,读一点写一点。
Pipe的使用方法如下
let fs = require('fs');
let path = require('path');
let ReadStream = require('./ReadStream');
let WriteStream = require('./WriteStream');
let rs = new ReadStream(path.join(__dirname, './1.txt'), {
highWaterMark: 4
});
let ws = new WriteStream(path.join(__dirname, './2.txt'), {
highWaterMark: 1
});
// 4 1
rs.pipe(ws);
实现原理
Pipe的原理比较简单,简单说监听可读流的data事件来持续获取文件中的数据,然后我们就会去调用写流的write方法。如果可写流缓存区已满,那么当我们得到调用可读流的pause方法来暂停读取,然后等到写流的缓存区已经全部写入并且触发drain事件时,我们就会调用resume重新开启读取的流程。上代码
pipe(ws) {
this.on('data', (chunk) => {
let flag = ws.write(chunk);
if (!flag) {
this.pause();
}
});
ws.on('drain', () => {
this.resume();
})
}
自定义流
Node允许我们自定义流,读流继承于Readable接口,写流则继承于Writable接口,所以我们其实是可以自定义一个流模块,只要继承stream模块对应的接口即可。
自定义可读流
如果我们要自定义读流的话,那我们就需要继承Readable,Readable里面有一个read()方法,默认调用_read(),所以我们只要复写了_read()方法就可实现读取的逻辑,同时Readable中也提供了一个push方法,调用push方法就会触发data事件,push中的参数就是data事件回调函数的参数,当push传入的参数为null的时候就代表读流停止,上代码
let { Readable } = require('stream');
// 想实现什么流 就继承这个流
// Readable里面有一个read()方法,默认掉_read()
// Readable中提供了一个push方法你调用push方法就会触发data事件
let index = 9;
class MyRead extends Readable {
_read() {
// 可读流什么时候停止呢? 当push null的时候停止
if (index-- > 0) return this.push('123');
this.push(null);
}
}
let mr = new MyRead();
mr.on('data', function(data) {
console.log(data);
});
自定义可写流
与自定义读流类似,自定义写流需要继承Writable接口,并且实现一个_write()方法,这里注意的是_write中可以传入3个参数,chunk, encoding, callback,chunk就是代表写入的数据,通常是一个buffer,encoding是编码类型,通常不会用到,最后的callback要注意,它并不是我们用这个自定义写流调用write时的回调,而是我们上面讲到写流实现时的clearBuffer函数。
let { Writable } = require('stream');
// 可写流实现_write方法
// 源码中默认调用的是Writable中的write方法
class MyWrite extends Writable {
_write(chunk, encoding, callback) {
console.log(chunk.toString());
callback(); // clearBuffer
}
}
let mw = new MyWrite();
mw.write('111', 'utf8', () => {
console.log(1);
})
mw.write('222', 'utf8', () => {
console.log(1);
});
Duplex 双工流
双工流其实就是结合了上面我们说的自定义读流和自定义写流,它既能读也能写,同时可以做到读写之间互不干扰
let { Duplex } = require('stream');
// 双工流 又能读 又能写,而且读取可以没关系(互不干扰)
let d = Duplex({
read() {
this.push('hello');
this.push(null);
},
write(chunk, encoding, callback) {
console.log(chunk);
callback();
}
});
d.on('data', function(data) {
console.log(data);
});
d.write('hello');
Transform 转换流
转换流的本质就是双工流,唯一不同的是它并不需要像上面提到的双工流一样实现read和write,它只需要实现一个transform方法用于转换
let { Transform } = require('stream');
// 它的参数和可写流一样
let tranform1 = Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase()); // 将输入的内容放入到可读流中
callback();
}
});
let tranform2 = Transform({
transform(chunk, encoding, callback){
console.log(chunk.toString());
callback();
}
});
// 等待你的输入
// rs.pipe(ws);
// 希望将输入的内容转化成大写在输出出来
process.stdin.pipe(tranform1).pipe(tranform2);
// 对象流 可读流里只能放buffer或者字符串 对象流里可以放对象
对象流
默认情况下,流处理的数据是Buffer/String类型的值。对象流的特点就是它有一个objectMode标志,我们可以设置它让流可以接受任何JavaScript对象。上代码
const { Transform } = require('stream');
let fs = require('fs');
let rs = fs.createReadStream('./users.json');
rs.setEncoding('utf8');
let toJson = Transform({
readableObjectMode: true,
transform(chunk, encoding, callback) {
this.push(JSON.parse(chunk));
callback();
}
});
let jsonOut = Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
console.log(chunk);
callback();
}
});
rs.pipe(toJson).pipe(jsonOut);