node那点事(一) -- Readable streams(可读流)

流的简介

流(stream)在 Node.js 中是处置惩罚流数据的笼统接口(abstract interface)。 stream 模块供应了基础的 API 。运用这些 API 能够很容易地来构建完成流接口的对象。

Node.js 供应了多种流对象。 比方, HTTP 要求 和 process.stdout 就都是流的实例。

流能够是可读的、可写的,或是可读写的。一切的流都是 EventEmitter 的实例。

为何要用流

这里我们举一个简朴的例子:

我们盘算读取一个文件,运用 fs.readFileSync 同步读取一个文件,顺序会被壅塞,一切的数据都会被读取到内存中。

换用 fs.readFile 读取文件,顺序不会被壅塞,然则一切的数据照旧会被一次性悉数被读取到内存中。

当处置惩罚大文件紧缩、归档、媒体文件和庞大的日记文件的时刻,内存运用就成了题目,如今人人平常家用机内存大多数都是8G、16G,软件包还在日趋增大,在这类状况下,流的上风就体现出来了。

流被设想为异步的体式格局,在内存中只开启一个牢固的空间,将文件化整为零,以活动的体式格局举行传输操纵,处理了以上题目。

《node那点事(一) -- Readable streams(可读流)》

流的范例

Node.js 中有四种基础的流范例:

Readable – 可读的流 (比方 fs.createReadStream()).

Writable – 可写的流 (比方 fs.createWriteStream()).

Duplex – 可读写的流 (比方 net.Socket).

Transform – 在读写过程当中能够修正和变更数据的 Duplex 流 (比方 zlib.createDeflate()).

可读流(Readable Stream)

可读流有两种形式:

1、活动形式(flowing):可读流自动读取数据,经由过程EventEmitter接口的事宜尽快将数据供应给运用。

2、停息形式(paused):必需显式挪用stream.read()要领来从流中读取数据片断。

能够经由过程三种门路切换到活动形式:

  • 监听 ‘data’ 事宜
  • 挪用 stream.resume() 要领
  • 挪用 stream.pipe() 要领将数据发送到 Writable

活动形式切换到停息形式的api有:

  • 假如不存在管道目的,挪用stream.pause()要领
  • 假如存在管道目的,挪用 stream.unpipe()并作废’data’事宜监听

可读流事宜:’data’,’readable’,’error’,’close’,’end’

我们能够设想下家用热水器的模子,热水器的水箱(buffer缓存区)内里存着热水(数据),在我们用热水的时刻,开启水龙头,自来水会不停的进入水箱,再从水箱由水龙头流出来供我们运用。这就是进入了“flowing”形式。当我们封闭水龙头时刻,水箱则会停息进水,水龙头也会停息出水,这是就进入了“paused”形式。

flowing形式

const fs = require('fs')
const path = require('path')
const rs = fs.createReadStream(path.join(__dirname, './1.txt'))

rs.setEncoding('utf8')

rs.on('data', (data) => {
    console.log(data)
})

paused形式

const fs = require('fs')
const path = require('path')
const rs = fs.createReadStream(path.join(__dirname, './1.txt'))

rs.setEncoding('utf8')

rs.on('readable', () => {
    let d = rs.read(1)
    console.log(d)
})

完成道理

活动形式道理

我们来完成一个简朴的活动形式下的可读流引见其道理,由NODEJS官方文档可知,流继承自EventEmitter模块,然后我们定义一些默许参数、缓存区、形式:

let EventEmitter = require('events');
let fs = require('fs');

class ReadStream extends EventEmitter {
    constructor(path,options) {
        super();
        this.path = path;
        this.flags = options.flags || 'r';
        this.autoClose = options.autoClose || true;
        this.highWaterMark = options.highWaterMark|| 64*1024;
        this.start = options.start||0;
        this.end = options.end;
        this.encoding = options.encoding || null
        
        this.buffer = Buffer.alloc(this.highWaterMark);//定义缓存区大小
        
        this.pos = this.start; // pos 读取的位置 可变 start稳定的
        
        this.flowing = null; // null就是停息形式
    }
}

module.exports = ReadStream;

接着在我们须要定义一个翻开文件的要领用于翻开文件。另有一个一个destroy要领,用于在文件操纵失足或许读完以后封闭文件。

open(){
    fs.open(this.path,this.flags,(err,fd)=>{
        if(err){
            this.emit('error',err);
            if(this.autoClose){ // 是不是自动封闭
                this.destroy();
            }
            return;
        }
        this.fd = fd; // 保留文件描述符
        this.emit('open'); // 文件翻开了
    });
}
 destroy(){
    // 先推断有无fd 有封闭文件 触发close事宜
    if(typeof this.fd ==='number'){
        fs.close(this.fd,()=>{
            this.emit('close');
        });
        return;
    }
    this.emit('close'); // 烧毁
}

接着要在组织函数中挪用open要领,当用户绑定data监听时,修正可读流的形式:

constructor(path,options){
    super();
    this.path = path;
    this.flags = options.flags || 'r';
    this.autoClose = options.autoClose || true;
    this.highWaterMark = options.highWaterMark|| 64*1024;
    this.start = options.start||0;
    this.end = options.end;
    this.encoding = options.encoding || null
    this.flowing = null; 
    this.buffer = Buffer.alloc(this.highWaterMark);
    this.pos = this.start;
    
    this.open();//翻开文件 fd
    this.on('newListener',(eventName,callback)=>{
        if(eventName === 'data'){
            // 相当于用户监听了data事宜
            this.flowing  = true;
            // 监听了 就去读
            this.read(); // 去读内容了
        }
    })
}

接下来我们完成最总要的read要领,首先要保证文件已翻开,接着镀组文件进入缓存,触发data事宜传入数据,假如处于活动形式,继承读取直到读完文件。

read(){
    // 此时文件还没翻开呢
    if(typeof this.fd !== 'number'){
        // 当文件真正翻开的时刻 会触发open事宜,触发事宜后再实行read,此时fd一定有了
        return this.once('open',()=>this.read())
    }
    // 此时有fd了
    // 应该填highWaterMark?
    // 想读4个 写的是3  每次读3个
    // 123 4
    let howMuchToRead = this.end?Math.min(this.highWaterMark,this.end-this.pos+1):this.highWaterMark;
    fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytesRead)=>{
        // 读到了多少个 累加
        if(bytesRead>0){
            this.pos+= bytesRead;
            let data = this.encoding?this.buffer.slice(0,bytesRead).toString(this.encoding):this.buffer.slice(0,bytesRead);
            this.emit('data',data);
            // 当读取的位置 大于了末端 就是读取终了了
            if(this.pos > this.end){
                this.emit('end');
                this.destroy();
            }
            if(this.flowing) { // 活动形式继承触发
                this.read(); 
            }
        }else{
            this.emit('end');
            this.destroy();
        }
    });
}

剩下的pause和resume要领,很简朴

resume() {
    this.flowing = true;
    this.read();
}
pause() {
    this.flowing = false;
}

简朴的流完成完成了,看一下完全代码

let EventEmitter = require('events');
let fs = require('fs');

class ReadStream extends EventEmitter {
    constructor(path, options) {
        super();
        this.path = path;
        this.flags = options.flags || 'r';
        this.autoClose = options.autoClose || true;
        this.highWaterMark = options.highWaterMark|| 64*1024;
        this.start = options.start||0;
        this.end = options.end;
        this.encoding = options.encoding || null

        this.open();

        this.flowing = null; // null就是停息形式

        this.buffer = Buffer.alloc(this.highWaterMark);

        this.pos = this.start; 
        this.on('newListener', (eventName,callback) => {
            if (eventName === 'data') {
                this.flowing  = true;
                this.read(); 
            }
        })
    }
    
    read(){
        if (typeof this.fd !== 'number') {
            return this.once('open', () => this.read())
        }
        let howMuchToRead = this.end ? Math.min(this.highWaterMark, this.end - this.pos+1) : this.highWaterMark;
        fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err,bytesRead) => {
            if (bytesRead > 0) {
                this.pos += bytesRead;
                let data = this.encoding ? this.buffer.slice(0, bytesRead).toString(this.encoding) : this.buffer.slice(0, bytesRead);
                this.emit('data', data);
                if(this.pos > this.end){
                    this.emit('end');
                    this.destroy();
                }
                if(this.flowing) { 
                    this.read(); 
                }
            }else{
                this.emit('end');
                this.destroy();
            }
        });
    }
    
    resume() {
        this.flowing = true;
        this.read();
    }
    
    pause() {
        this.flowing = false;
    }
    
    destroy() {
        if(typeof this.fd === 'number') {
            fs.close(this.fd, () => {
                this.emit('close');
            });
            return;
        }
        this.emit('close'); 
    };
    
    open() {
        fs.open(this.path, this.flags, (err,fd) => {
            if (err) {
                this.emit('error', err);
                if (this.autoClose) { 
                    this.destroy();
                }
                return;
            }
            this.fd = fd; 
            this.emit('open'); 
        });
    }
}
module.exports = ReadStream;

停息形式道理

以上是活动形式的可读流完成道理,停息形式的可读流道理与活动形式的重要区分在于监听readable事宜的绑定与read要领,先完成监听绑定readable事宜回调函数时,挪用read要领读取数据到缓存区,定义一个读取要领_read

constructor(path, options) {
    super();
    this.path = path;
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    this.autoClose = options.autoClose || true;
    this.start = 0;
    this.end = options.end;
    this.flags = options.flags || 'r';

    this.buffers = []; // 缓存区 
    this.pos = this.start;
    this.length = 0; // 缓存区大小
    this.emittedReadable = false;
    this.reading = false; // 不是正在读取的
    this.open();
    this.on('newListener', (eventName) => {
        if (eventName === 'readable') {
            this.read();
        }
    })
}

read(n) {
    if (this.length == 0) {
        this.emittedReadable = true;
    }
    if (this.length < this.highWaterMark) {
        if(!this.reading) {
            this.reading = true;
            this._read(); 
        }
    }
}

_read() {
    if (typeof this.fd !== 'number') {
        return this.once('open', () => this._read());
    }
    let buffer = Buffer.alloc(this.highWaterMark);
    fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {
        if (bytesRead > 0) {
            this.buffers.push(buffer.slice(0, bytesRead));
            this.pos += bytesRead;
            this.length += bytesRead;
            this.reading = false;
            if (this.emittedReadable) {
                this.emittedReadable = false; 
                this.emit('readable');
            }
        } else {
            this.emit('end');
            this.destroy();
        }
    })
}

由api可知,停息形式下的可读流手动挪用read要领参数能够大于highWaterMark,为了处置惩罚这类状况,我们先写一个函数computeNewHighWaterMark,取到大于即是n的最小2的n次方的整数

function computeNewHighWaterMark(n) {
      n--;
      n |= n >>> 1;
      n |= n >>> 2;
      n |= n >>> 4;
      n |= n >>> 8;
      n |= n >>> 16;
      n++;
     return n;
  }

然后写read要领,要斟酌全n的各种状况,上代码

read(n) { 

    if(n>this.length){
        // 变动缓存区大小  读取五个就找 2的频频放近来的
        this.highWaterMark = computeNewHighWaterMark(n)
        this.emittedReadable = true;
        this._read();
    }


    // 假如n>0 去缓存区中取吧
    let buffer=null;
    let index = 0; // 保护buffer的索引的
    let flag = true;
    if (n > 0 && n <= this.length) { // 读的内容 缓存区中有这么多
        // 在缓存区中取 [[2,3],[4,5,6]]
        buffer = Buffer.alloc(n); // 这是要返回的buffer
        let buf;
        while (flag&&(buf = this.buffers.shift())) {
            for (let i = 0; i < buf.length; i++) {
                buffer[index++] = buf[i];
                if(index === n){ // 拷贝够了 不须要拷贝了
                    flag = false;
                    this.length -= n;
                    let bufferArr = buf.slice(i+1); // 掏出留下的部份
                    // 假如有剩下的内容 在放入到缓存中
                    if(bufferArr.length > 0){
                        this.buffers.unshift(bufferArr);
                    }
                    break;
                }
            }
        }
    }
    // 当前缓存区 小于highWaterMark时在去读取
    if (this.length == 0) {
        this.emittedReadable = true;
    }
    if (this.length < this.highWaterMark) {
        if(!this.reading){
            this.reading = true;
            this._read(); // 异步的
        }
    }
    return buffer
}

附上可读流停息形式的完全完成道理代码

let fs = require('fs');
let EventEmitter = require('events');
function computeNewHighWaterMark(n) {
      n--;
      n |= n >>> 1;
      n |= n >>> 2;
      n |= n >>> 4;
      n |= n >>> 8;
      n |= n >>> 16;
      n++;
     return n;
  }
class ReadStream extends EventEmitter {
    constructor(path, options) {
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.autoClose = options.autoClose || true;
        this.start = 0;
        this.end = options.end;
        this.flags = options.flags || 'r';

        this.buffers = []; // 缓存区 
        this.pos = this.start;
        this.length = 0; // 缓存区大小
        this.emittedReadable = false;
        this.reading = false; // 不是正在读取的
        this.open();
        this.on('newListener', (eventName) => {
            if (eventName === 'readable') {
                this.read();
            }
        })
    }
    read(n) { 

        if(n>this.length){
            // 变动缓存区大小  读取五个就找 2的频频放近来的
            this.highWaterMark = computeNewHighWaterMark(n)
            this.emittedReadable = true;
            this._read();
        }


        // 假如n>0 去缓存区中取吧
        let buffer=null;
        let index = 0; // 保护buffer的索引的
        let flag = true;
        if (n > 0 && n <= this.length) { // 读的内容 缓存区中有这么多
            // 在缓存区中取 [[2,3],[4,5,6]]
            buffer = Buffer.alloc(n); // 这是要返回的buffer
            let buf;
            while (flag&&(buf = this.buffers.shift())) {
                for (let i = 0; i < buf.length; i++) {
                    buffer[index++] = buf[i];
                    if(index === n){ // 拷贝够了 不须要拷贝了
                        flag = false;
                        this.length -= n;
                        let bufferArr = buf.slice(i+1); // 掏出留下的部份
                        // 假如有剩下的内容 在放入到缓存中
                        if(bufferArr.length > 0){
                            this.buffers.unshift(bufferArr);
                        }
                        break;
                    }
                }
            }
        }
        // 当前缓存区 小于highWaterMark时在去读取
        if (this.length == 0) {
            this.emittedReadable = true;
        }
        if (this.length < this.highWaterMark) {
            if(!this.reading){
                this.reading = true;
                this._read(); // 异步的
            }
        }
        return buffer
    }
    // 封装的读取的要领
    _read() {
        // 当文件翻开后在去读取
        if (typeof this.fd !== 'number') {
            return this.once('open', () => this._read());
        }
        // 上来我要喝水 先倒三升水 []
        let buffer = Buffer.alloc(this.highWaterMark);
        fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {
            if (bytesRead > 0) {
                // 默许读取的内容放到缓存区中
                this.buffers.push(buffer.slice(0, bytesRead));
                this.pos += bytesRead; // 保护读取的索引
                this.length += bytesRead;// 保护缓存区的大小
                this.reading = false;
                // 是不是须要触发readable事宜
                if (this.emittedReadable) {
                    this.emittedReadable = false; // 下次默许不触发
                    this.emit('readable');
                }
            } else {
                this.emit('end');
                this.destroy();
            }
        })
    }
    destroy() {
        if (typeof this.fd !== 'number') {
            return this.emit('close')
        }
        fs.close(this.fd, () => {
            this.emit('close')
        })
    }
    open() {
        fs.open(this.path, this.flags, (err, fd) => {
            if (err) {
                this.emit('error', err);
                if (this.autoClose) {
                    this.destroy();
                }
                return
            }
            this.fd = fd;
            this.emit('open');
        });
    }
}

module.exports = ReadStream;
    原文作者:标记层叠样式球
    原文地址: https://segmentfault.com/a/1190000014148205
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞