经由过程源码剖析 Node.js 中 cluster 模块的主要功能完成

尽人皆知,Node.js中的JavaScript代码实行在单线程中,非常软弱,一旦涌现了未捕捉的非常,那末全部运用就会崩溃。这在很多场景下,尤其是web运用中,是无法忍受的。一般的处理方案,就是运用Node.js中自带的cluster模块,以master-worker形式启动多个运用实例。但是人人在享用cluster模块带来的福祉的同时,不少人也最先猎奇:

  1. 为何我的运用代码中明显有app.listen(port);,但cluter模块在屡次fork这份代码时,却没有报端口已被占用?

  2. Master是如何将吸收的要求通报至worker中举行处置惩罚然后相应的?

让我们从Node.js项目的lib/cluster.js中的代码里,来一勘终究。

题目一

为了获得这个题目的解答,我们先从worker历程的初始化看起,master历程在fork事情历程时,会为其附上环境变量NODE_UNIQUE_ID,是一个从零最先的递增数:

// lib/cluster.js
// ...

function createWorkerProcess(id, env) {
  // ...
  workerEnv.NODE_UNIQUE_ID = '' + id;

  // ...
  return fork(cluster.settings.exec, cluster.settings.args, {
    env: workerEnv,
    silent: cluster.settings.silent,
    execArgv: execArgv,
    gid: cluster.settings.gid,
    uid: cluster.settings.uid
  });
}

随后Node.js在初始化时,会依据该环境变量,来推断该历程是不是为cluster模块fork出的事情历程,假如,则实行workerInit()函数来初始化环境,不然实行masterInit()函数。

workerInit()函数中,定义了cluster._getServer要领,这个要领在任何net.Server实例的listen要领中,会被挪用:

// lib/net.js
// ...

function listen(self, address, port, addressType, backlog, fd, exclusive) {
  exclusive = !!exclusive;

  if (!cluster) cluster = require('cluster');

  if (cluster.isMaster || exclusive) {
    self._listen2(address, port, addressType, backlog, fd);
    return;
  }

  cluster._getServer(self, {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags: 0
  }, cb);

  function cb(err, handle) {
    // ...

    self._handle = handle;
    self._listen2(address, port, addressType, backlog, fd);
  }
}

你能够已猜到,题目一的答案,就在这个cluster._getServer函数的代码中。它重要干了两件事:

  • 向master历程注册该worker,若master历程是第一次吸收到监听此端口/描述符下的worker,则起一个内部TCP服务器,来负担监听该端口/描述符的职责,随后在master中记录下该worker。

  • Hack掉worker历程中的net.Server实例的listen要领里监听端口/描述符的部份,使其不再负担该职责。

关于第一件事,因为master在吸收,通报要求给worker时,会相符肯定的负载平衡划定规矩(在非Windows平台下默以为轮询),这些逻辑被封装在RoundRobinHandle类中。故,初始化内部TCP服务器等操纵也在此处:

// lib/cluster.js
// ...

function RoundRobinHandle(key, address, port, addressType, backlog, fd) {
  // ...
  this.handles = [];
  this.handle = null;
  this.server = net.createServer(assert.fail);

  if (fd >= 0)
    this.server.listen({ fd: fd });
  else if (port >= 0)
    this.server.listen(port, address);
  else
    this.server.listen(address);  // UNIX socket path.

  /// ...
}

关于第二件事,因为net.Server实例的listen要领,终究会挪用本身_handle属性下listen要领来完成监听行动,故在代码中修正之:

// lib/cluster.js
// ...

function rr(message, cb) {
  // ...
  // 此处的listen函数不再做任何监听行动
  function listen(backlog) {
    return 0;
  }

  function close() {
    // ...
  }
  function ref() {}
  function unref() {}

  var handle = {
    close: close,
    listen: listen,
    ref: ref,
    unref: unref,
  };
  // ...
  handles[key] = handle;
  cb(0, handle); // 传入这个cb中的handle将会被赋值给net.Server实例中的_handle属性
}

// lib/net.js
// ...
function listen(self, address, port, addressType, backlog, fd, exclusive) {
  // ...

  if (cluster.isMaster || exclusive) {
    self._listen2(address, port, addressType, backlog, fd);
    return; // 仅在worker环境下转变
  }
    
  cluster._getServer(self, {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags: 0
  }, cb);

  function cb(err, handle) {
    // ...
    self._handle = handle; 
    // ...
  }
}

至此,第一个题目便已恍然大悟了,总结下:

  • 端口仅由master历程中的内部TCP服务器监听了一次。

  • 不会涌现端口被反复监听报错,是因为,worker历程中,末了实行监听端口操纵的要领,已被cluster模块主动hack。

题目二

处理了题目一,题目二的处理就晴明轻松很多了。经由过程题目一我们已得知,监听端口的是master历程中建立的内部TCP服务器,所以第二个题目的处理,动手点就是该内部TCP服务器接办衔接时,实行的操纵。Cluster模块的做法是,监听该内部TCP服务器的connection事宜,在监听器函数里,有负载平衡地遴选出一个worker,向其发送newconn内部音讯(音讯体对象中包括cmd: 'NODE_CLUSTER'属性)以及一个客户端句柄(即connection事宜处置惩罚函数的第二个参数),相干代码以下:

// lib/cluster.js
// ...

function RoundRobinHandle(key, address, port, addressType, backlog, fd) {
  // ...
  this.server = net.createServer(assert.fail);
  // ...

  var self = this;
  this.server.once('listening', function() {
    // ...
    self.handle.onconnection = self.distribute.bind(self);
  });
}

RoundRobinHandle.prototype.distribute = function(err, handle) {
  this.handles.push(handle);
  var worker = this.free.shift();
  if (worker) this.handoff(worker);
};

RoundRobinHandle.prototype.handoff = function(worker) {
  // ...
  var message = { act: 'newconn', key: this.key };
  var self = this;
  sendHelper(worker.process, message, handle, function(reply) {
    // ...
  });
};

Worker历程在吸收到了newconn内部音讯后,依据通报过来的句柄,挪用现实的营业逻辑处置惩罚并返回:

// lib/cluster.js
// ...

// 该要领会在Node.js初始化时由 src/node.js 挪用
cluster._setupWorker = function() {
  // ...
  process.on('internalMessage', internal(worker, onmessage));

  // ...
  function onmessage(message, handle) {
    if (message.act === 'newconn')
      onconnection(message, handle);
    // ...
  }
};

function onconnection(message, handle) {
  // ...
  var accepted = server !== undefined;
  // ...
  if (accepted) server.onconnection(0, handle);
}

至此,题目二也获得了处理,也总结一下:

  • 一切要求先同一经由内部TCP服务器。

  • 在内部TCP服务器的要求处置惩罚逻辑中,有负载平衡地遴选出一个worker历程,将其发送一个newconn内部音讯,随音讯发送客户端句柄。

  • Worker历程吸收到此内部音讯,依据客户端句柄建立net.Socket实例,实行详细营业逻辑,返回。

末了

Node.js中的cluster模块除了上述提到的功用外,实在还供应了非常丰富的API供master和worker历程之前通讯,关于差别的操纵系统平台,也供应了差别的默许行动。本文仅遴选了一条功用线举行了剖析论述。假如人人有闲,非常引荐完全明白一下cluster模块的代码完成。

参考:

    原文作者:菜菜蔡伟
    原文地址: https://segmentfault.com/a/1190000004613969
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞