经由过程源码剖析 Node.js 中导流(pipe)的完成

Node.js中,流(Stream)是其浩瀚原生对象的基类,它对处置惩罚潜伏的大文件供应了支撑,也笼统了一些场景下的数据处置惩罚和通报。在它对外暴露的接口中,最为奇异的,莫过于导流(pipe)要领了。鉴于近期本身正在浏览Node.js中的部份源码,也来从源码层面分享下导流的详细完成。

正题

以下是一个关于导流的简朴例子:

'use strict'
import {createReadStream, createWriteStream} from 'fs'

createReadStream('/path/to/a/big/file').pipe(createWriteStream('/path/to/the/dest'))

再连系官方文档,我们可以把pipe要领的主要功能分解为:

  • 不停从泉源可读流中取得一个指定长度的数据。

  • 将获取到的数据写入目的可写流。

  • 均衡读取和写入速率,防备读取速率大大凌驾写入速率时,涌现大批滞留数据。

好,让我们追随Node.js项目里lib/_stream_readable.jslib/_stream_writable.js中的代码,逐一剖析这三个主要功能的完成。

读取数据

刚创建出的可读流只是一个纪录了一些初始状况的空壳,内里没有任何数据,而且其状况不属于官方文档中的活动形式(flowing mode)和停息形式(paused mode)中的任何一种,算是一种伪停息形式,因为此时实例的状况中纪录它是不是为停息形式的变量还不是规范的布尔值,而是null,但又可经由历程将停息形式转化为活动形式的行动(挪用实例的resume()要领),将可读流切换至活动形式。在外部代码中,我们可以手动监听可读流的data事宜,让其进入活动形式:

// lib/_stream_readable.js
// ...

Readable.prototype.on = function(ev, fn) {
  var res = Stream.prototype.on.call(this, ev, fn);

  if (ev === 'data' && false !== this._readableState.flowing) {
    this.resume();
  }

  // ...

  return res;
};

可见,可读流类经由历程二次封装父类(EventEmitter)的on()要领,替我们在监听data事宜时,将流切换至了活动形式。而最先读取数据的行动,则存在于resume()要领挪用的内部要领resume_()中,让我们一窥终究:

// lib/_stream_readable.js
// ...

function resume_(stream, state) {
  if (!state.reading) {
    debug('resume read 0');
    stream.read(0);
  }

  // ...
}

经由历程向可读流读取一次空数据(大小为0),将会触发实例层面完成的_read()要领,最先读取数据,然后应用读到的数据触发data事宜:

// lib/_stream_readable.js
// ...

Readable.prototype.read = function(n) {
  // ...
  // 此次推断的意图为,假如可读流的缓冲中已满,则只空触发readable事宜。
  if (n === 0 &&
      state.needReadable &&
      (state.length >= state.highWaterMark || state.ended)) {
    if (state.length === 0 && state.ended)
      endReadable(this);
    else
      emitReadable(this);
    return null;
  }
  
  // 若可读流已被传入了终止符(null),且缓冲中没有遗留数据,则终了这个可读流
  if (n === 0 && state.ended) {
    if (state.length === 0)
      endReadable(this);
    return null;
  }

  // 若现在缓冲中的数据大小为空,或未凌驾设置的警戒线,则举行一次数据读取。
  if (state.length === 0 || state.length - n < state.highWaterMark) {
    doRead = true;
  }

  if (doRead) {
    // ...
    this._read(state.highWaterMark);
  }

  // ...

  if (ret !== null)
    this.emit('data', ret);

  return ret;
};

可见,在可读流的read()要领内部,经由历程挪用在实例层面完成的_read(size)要领,取得了一段(设置的警戒线)大小的数据,然则,你可能会迷惑,这只是读取了一次数据啊,抱负状况下,应该是轮回挪用_read(size)直至取完一切数据才对啊!?实在,这部份的逻辑存在于我们完成_read(size)要领时,在其内部挪用的this.push(data)要领中,在末了其会挪用私有要领maybeReadMore_(),再次触发read(0),接着在read(0)函数的代码中再次推断可读流是不是可以终了,不然再举行一次_read(size)读取:

// lib/_stream_readable.js
// ...

Readable.prototype.push = function(chunk, encoding) {
  var state = this._readableState;
  // ...
  return readableAddChunk(this, state, chunk, encoding, false);
};

function readableAddChunk(stream, state, chunk, encoding, addToFront) {
  // ...
  if (er) {
    stream.emit('error', er);
  } else if (chunk === null) {
    state.reading = false;
    onEofChunk(stream, state); // 当传入终止符时,将可读流的终了标识(state.ended)设为true
  }
  // ...
      maybeReadMore(stream, state);
    }
  } 

  // ...
}

function maybeReadMore(stream, state) {
  if (!state.readingMore) {
    // ...
    process.nextTick(maybeReadMore_, stream, state);
  }
}

function maybeReadMore_(stream, state) {
    // ...
    stream.read(0);
}

function onEofChunk(stream, state) {
  if (state.ended) return;
  // ...
  state.ended = true;
  // ...
}

好的,此时从可读流中读取数据的悉数中心流程已完成了,让我们归结一下:

  • 刚创建出的可读流只是一个空壳,保存着一些初始状况。

  • 监听它的data事宜,将会自动挪用该可读流的resume()要领,使流切换至活动形式。

  • resume()要领的内部函数_resume()中,对可读流举行了一次read(0)挪用。

  • read(0)挪用的内部,起首搜检流是不是相符了终了前提,若相符,则终了之。不然挪用实例完成的_read(size)要领读取一段预设的警戒线(highWaterMark)大小的数据。

  • 在实例完成_read(size)函数时内部挪用的this.push(data)要领里,会先推断的读到的数据是不是为终了符,如果,则将流的状况设为终了,然后再一次对可读流挪用read(0)

写入数据

和可读流一样,刚创建出的可写流也只是一个纪录了相干状况(包含预设的写入缓冲大小)的空壳。直接挪用它的write要领,该要领会在其内部挪用writeOrBuffer函数来对数据是不是可以直接一次性悉数写入举行推断:

// lib/_stream_writable.js
// ...

function writeOrBuffer(stream, state, chunk, encoding, cb) {
  // ...
  var ret = state.length < state.highWaterMark;

  // 纪录可写流是不是须要动身drain事宜
  if (!ret)
    state.needDrain = true;

  if (state.writing || state.corked) {
    // 若可写流正在被写入或被人工壅塞,则先将写入操纵列队
    // ...
  } else {
    doWrite(stream, state, false, len, chunk, encoding, cb);
  }

  return ret;
}

function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  // ...
  if (writev)
    stream._writev(chunk, state.onwrite);
  else
    stream._write(chunk, encoding, state.onwrite);
  // ...
}

从代码中可知,在writeOrBuffer函数纪录下了数据是不是可以被一次性写入后,挪用了实例层完成的_write()_writev()要领举行了现实的写入操纵。那末,假如不能一次性写入终了,那末在真正写入终了时,又是怎样举行关照的呢?嗯,答案就在设置的state.onwrite回调中:

// lib/_stream_writable.js
// ...

function onwrite(stream, er) {
  // ...

  if (er)
    onwriteError(stream, state, sync, er, cb);
  else {
    // ...
    if (sync) {
      process.nextTick(afterWrite, stream, state, finished, cb);
    } else {
      afterWrite(stream, state, finished, cb);
    }
  }
}

function afterWrite(stream, state, finished, cb) {
  if (!finished)
    onwriteDrain(stream, state);
  // ...
}

function onwriteDrain(stream, state) {
  if (state.length === 0 && state.needDrain) {
    state.needDrain = false;
    stream.emit('drain');
  }
}

可见,在回调函数的实行中,会对该可写流该次被写入的数据是不是凌驾了警戒线的状况举行推断,假如是,则触发drain事宜,举行关照。

我们也可以挪用end()要领来表明要终了这个写入流,并举行末了一次写入,end()要领的内部终究会挪用endWritable()函数来说可写流的状况切换为已终了:

// lib/_stream_writable.js
// ...

function endWritable(stream, state, cb) {
  // ...
  state.ended = true;
  stream.writable = false;
}

此时,向可写流中写入数据的悉数中心流程已完成了,这个流程和可写流的轮回读取流程差别,它是直线的,归结一下:

  • 刚创建出的可写流只是一个空壳,保存着一些初始状况。

  • 挪用write()要领,其内部的writeOrBuffer()检测该次写入的数据是不是须要被暂存在缓冲区中。

  • writeOrBuffer()函数挪用实例完成的_write()_writev()要领,举行现实的写入,完成后挪用回调函数state.onwrite

  • 回调函数中检测该次写入是不是被缓冲,如果,触发drain事宜。

  • 反复以上历程,直至挪用end()要领终了该可写流。

导流

在摸清了从可读流中读数据,和向可写流中写数据完成的中心流程后,Node.js中完成导流的中心流程实在已呼之欲出了。起首,为了最先从源可读流读取数据,在pipe()要领的内部,它主动为源可读流添加了data事宜的监听函数:

// lib/_stream_readable.js
// ...

Readable.prototype.pipe = function(dest, pipeOpts) {
  // ...

  src.on('data', ondata);
  function ondata(chunk) {
      // ...
      src.pause();
    }
  }

  // ...
  return dest;
};

从代码中可见,若向目的可写流写入一次数据时,目的可写流示意该次写入它须要举行缓冲,则主动将源可读流切换至停息形式。那末,源可读流经由历程什么手腕得知可以再次读取数据并写入呢?嗯,经由历程监听目的可写流的drain事宜:

// lib/_stream_readable.js
// ...

Readable.prototype.pipe = function(dest, pipeOpts) {
  // ...
  var ondrain = pipeOnDrain(src);
  dest.on('drain', ondrain);

  // ...
  return dest;
};

function pipeOnDrain(src) {
  return function() {
    var state = src._readableState;
    
    // 目的可写流可能会存在屡次写入须要举行缓冲的状况,需确保一切须要缓冲的写入都
    // 完成后,再次将可读流切换至活动形式。
    if (state.awaitDrain)
      state.awaitDrain--;
    if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
      state.flowing = true;
      flow(src);
    }
  };
}

末了,监听源可读流的终了事宜,对应着终了目的可写流:

// lib/_stream_readable.js
// ...

Readable.prototype.pipe = function(dest, pipeOpts) {
  // ...
  var endFn = doEnd ? onend : cleanup;
  if (state.endEmitted)
    process.nextTick(endFn);
  else
    src.once('end', endFn);

  function onend() {
    debug('onend');
    dest.end();
  }

  // ...
  return dest;
};

因为前面的铺垫,现实导流操纵的中心流程实在完成得异常轻松,归结一下:

  • 主动监听源可读流的data事宜,在该事宜的监听函数中,向目的可写流写入数据。

  • 若目的可写流示意该写入操纵须要举行缓冲,则马上将源可读流切换至停息形式。

  • 监听目的可写流的drain事宜,当目的可写流里一切须要缓冲的写入操纵都终了后,将流从新切换回活动形式。

  • 监听源可读流的end事宜,响应地终了目的可写流。

末了

Node.js中流的现实完成实在异常巨大,庞杂,精巧。每个流的内部,都治理着大批状况。本文仅仅只是在巨大的流的完成中,挑选了一条主线,举行了论述。人人假如有闲,异常引荐完整地浏览一遍其完成。

参考:

    原文作者:菜菜蔡伟
    原文地址: https://segmentfault.com/a/1190000004424485
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞