白洁血战Node.js并发编程 002 异步

这篇文章大面积重写了,更准确和严格的描述了Node.js的运行时模型,但本文中的部分例子被移除了。

请阅读:Reactor Model

前言

异步(Asynchronous)在不同的上下文下可以有很多不同的解释;在Node.js上下文里,它指的是如何对待一个或多个过程,所以我们先来谈过程。

过程

这里说的过程(Process)是抽象的概念,实际的实现可能是进程(另一种定义的Process),线程(thread),协程(coroutine/fiber),甚至只是一个函数(function)。

过程(Process)是一种解决问题的方法,或者说是解法域模型:把一个复杂问题拆解成多个简单问题,每个问题用一个过程解决,通过创建和销毁过程,以及过程通讯,来完成整个计算任务。

计算科学家们(尤其在谈并发编程的时候)喜欢举下面的例子来说明这种思维方式:

问题是求解整数n之内的所有质数(n > 2)。

解法的关键点是为每一个已知的质数建立一个Process,算法如下:

  1. 第一个质数是2,我们建立一个Process,标记为P2;
  2. 我们开始把从2到n的数依次发送给P2;
  3. P2的逻辑是:

    1. 如果这个数可以被2整除,扔掉它;
    2. 遇到第一个不能被2整除的数,实际上是3,它创建下一个Process,记为P3;
    3. 之后P2把所有不能被2整除的数都扔给P3;
  4. P3的逻辑和P2一样,只要把2换成3;

这个过程继续下去,会创建P5, P7, P11…等等。当所有的数都被处理完之后,这些Process本身就是问题的答案。

这个例子展示了用Process建立模型解决问题的基本要素:

  1. 每个Process是一个计算任务;
  2. Process可以创建,称为fork;例子中没有,但当然它也可以销毁,称为join;
  3. Process之间可以通讯;

这个例子是不是很优雅不在我们的讨论之列,即使它很优雅,大部分实际的编程问题没有这种优雅特性。

在这里我们只强调一点:用Process建模是用divide and conquer的办法解决问题的一种方式,同样的方式也存在于Component-based, Object-Oriented等多种编程模型技术中。

串行组合过程

我们先看最传统的基于blocking i/o编程的程序,如果把每个function都理解为一个process,这个程序的运行过程如何理解。

在所有的命令式语言中,函数都具有良好的可组合性(Composibility),即可以通过函数调用实现函数的组合;一个函数在使用者看来,不知道它是一个简单的函数还是复杂的函数组合,即函数和函数组合具有自相似性,或者说在语法上具有递归特性。所以我们只需回答针对一次函数组合,如何从Process的角度理解即可。

在一个父函数中调用子函数,父函数本身可以看作一个Process,P1,在它调用子函数时,可以理解为创建(fork)了一个新的Process,P2,然后P1被阻塞(blocked),一直等到P2完成并通过通讯把执行结果返还给P1(join),然后P1继续执行。

如果这样去理解我们可以看到,传统的基于blocking i/o的编程:

  1. 程序运行时可以由一组Process的组合来描述;
  2. 在任何时刻,只有一个Process在运行,Process之间的组合在运行时只有串行,没有并发;

异步

在上述传统的blocking io编程模式下,整个程序可能被一个i/o访问block住,这不是一个充分利用CPU计算资源的方式,尤其对于网络编程来说,它几乎是不可接受的。

所以操作系统本身提供了基于socket的回调函数机制,或者文件i/o的non-blocking访问,让应用程序可以充分利用处理器资源,减少执行等待时间;代价是开发者需要书写并发过程。

对于Node.js而言,在代码层面上,创建一个过程,是大家熟知的形式,例如:

fs.readdir('directory path', (err, files) => {
    
})

这里的fs.readdir是一个Node API里的文件i/o操作,它也可以是开发者自己封装的过程。

这个函数调用后立即同步返回,在返回时,创建了一个过程,这个过程的结果没有同步获得,从这个意义上说,我们称之为异步函数

如果连续调用两次这样的函数,在应用内就创建了两个过程,我们称之为并发

对事件模型的Node.js而言,这样来实现非阻塞和并发编程有两个显而易见的优势:

  1. 它不需要额外的语法原语去实现fork,一个函数即可创建一个process,包括构造函数;
  2. Node.js是单线程执行的,所以process之间的通讯是同步的;

同步的意思需要这样理解:假如用线程或者进程来实现process,process A只能通过异步通讯通知process B自己发生了某种状态迁移,因为这个通讯是异步的,所以process B不能相信收到的消息是它收到消息那个时刻的process A的真实状态(但是可以相信它是一个历史状态),双方的逻辑也必须健壮到对对方的状态假设是历史状态甚至错误状态之上,就像tcp协议那样。

在Node.js里没有这个烦恼,因为这些process的代码都在同一个线程内运行,无论是process A还是process B遇到事件:

  1. 都可以同步读取的对方的真是状态
  2. 都可以通过方法调用或者消息机制让自己和另一方实现一次同步和同时的状态迁移

    • 同步的意思是通过同步函数完成(在一个Node event loop的tick内)
    • 同时的意思是process A s1 -> s2和process B t1 -> t2同时迁移

同时的特性被称为(shared transition),可以看作两个LTS(Labelled Transition System)的交互(interaction),也可以用Petri Net描述。它可以让过程组合容易具有完备的状态和状态迁移定义。

从上述这个意义上说,在并发模型上,Node领先Go或者任何其他异步过程通讯的语言一大截;但是反过来说,Node的单线程执行模型对于计算而言,没能利用到更多的处理器,是它的显著缺点。但是对于io,Node则完全不输用底层语言编程的多线程程序。

Callback Hell与CPS

这是被谈论最多的话题。

如何用Node.js书写异步与并发,和开发者面对的问题有关。如果在写一个微服务程序,fork过程主要发生在http server对象的事件里,或者express的router里;同时fork出来的每一个过程,我们用串行组合过程的方式完成它,即如果使用callback形式的异步函数嵌套下去,最终会得到一个可以利用到Node的异步并发特性,但是形式上非常难读的代码,这被称为Callback Hell。

在ES7中出现的async/await语法一推出就大受欢迎:

  1. 它比较好的解决了Callback Hell的问题,
  2. 书写条件流程在形式上也回归了传统的代码形式,
  3. 能catch所有的错误
  4. 通过Promise.all提供最简单的并发(fork/join)支持

在串行组合过程时,开发者最关心的问题是如何让过程连续下去,所以从这个意义上说callback函数,或者对应的promise,async/await,也被一些开发者称为Continuation Passing Style(CPS)。

这样说在概念上和实践上都没有问题。但是这件事情在整个Node并发编程上是非常微不足道的。因为这样的模型没有考虑到一个重要的问题:过程可以是有态的

过程的状态

当我们在传统的blocking i/o模式下编程,书写一个表示过程的函数,或者在Node.js里用callback形式或async语法的函数,书写一个表示过程的函数,其状态可以这样表述:

P: s -> 0

它只有两个状态,在运行(s),或者结束了(0)。结束的意思是这个过程不会对程序逻辑和系统状态产生任何未来影响

我们优先关心结束,而不是关心它的成功、失败、返回值,因为前者是对任何过程都普适的状态描述,可以理解为语法,后者是针对每个具体过程都不同的语义。

当然不是所有的过程都会自发结束,比如用setInterval创建的周期性fire的时钟,调用了listen方法的http server,或者打开了文件句柄的fs.WriteStream,如果他们没有遇到严重错误导致自发结束,他们需要使用者的进一步触发(trigger)才能结束。

对于setInterval而言这个触发是clearInterval,对于http server而言这个触发是close,对于fs.WriteStream而言,这个触发是end,无论哪种情况,开发者应该从抽象的角度去理解这个问题:

  1. 如果从Process模型去理解,它们需要使用者发送message
  2. 如果从状态角度去理解,它们需要使用者触发event(形式上可以是method call)

我们举两个例子来说明这种状态更为丰富的过程,即无法用P过程表示的有态过程。

Q过程

第一个例子,我们考虑一个有try/catch逻辑的过程,如果async/await函数形式来写它大体是这样的代码:

async function MyProcess () {
    try {
        // 1 do some await
    } catch (e) {
        // 2 
          // 3 do some await
          throw e
    } 
}

这个Process开始执行后就进入了s状态,它有可能在try block成功完成任务,即s -> 0。它也可能遇到错误走向catch block,但是错误并不是从2的位置抛出,让使用者可以立刻获知和采取行动,它被推迟到3结束。

这样做的好处是这个过程仍然可以用s -> 0来描述,但是,从并发编程的角度说,使用者的error handler逻辑的执行时间被推迟了。可能很多情况下3的逻辑的时间足够短,并不需要去计较,从practical的角度说,我也会经常这样写代码,因为它容易。

但是从Process状态定义的完备角度说,这是个设计缺陷。

同样的过程我们可以换成Event Emitter的形式来实现,Emitter的实现不强制同时抛出error(或data)和抛出finish这两个事件,这对于callback形式或者async函数是强制的:返回即停止。

这样就给使用者提供了选择:

  1. 如果它选择在error/data之后立刻开始后续逻辑,这种情况下我们称之为race。
  2. 如果它选择必须在finish之后才开始后续逻辑,这种情况我们称之为settle。

race和settle都是join逻辑。但他们不必是互斥的(exclusive or),使用者也完全可以在error(或data)的时候触发一个后续动作,在settle的时候触发另一个动作。这样的模型才是普适的和充分并发的。

更为重要的,无论你采用什么样的模型去封装一个单元,一个重要的设计原则是,这个封装应该提供机制(Mechanism),而不是策略(Policy),选择策略是使用者的自由,不是实现者的决策。

如果分开两次抛出error和finish,使用者有自由选择race,或者settle,甚至both,这是使用者的Policy。

在这种情况下,我们可以用下述状态描述来表示这个过程:

Q: s -> 0 | s -> e -> 0

约定:我们用s,或者s1, s2, …表示可能成功的状态,或者说(意图上)走在走向成功的路上,用e,或者e1, e2, e3…表示明确放弃成功尝试,走在尽可能快结束的路上的状态。0只代表结束,对成功失败未置可否。

在这个Q过程定义中,所有的->状态迁移都是过程的自发迁移,不包含使用者触发的强制迁移。在后面的例子中我们会加入强制迁移逻辑。

在这里我们先列出一个重要观点(没有试图去prove它,所以目前不称之为定理或者结论):完整的Q过程是无法简化成s -> 0的P过程的,所以它也无法应用到串行组合P过程的编程模式中。

在开发早期对Q过程有充分认知是非常必要的,因为开发者可能从很小的逻辑开始写代码,把他们写成P过程,然后层层封装P过程,等到他们发现某个逻辑需要用Q过程来描述时,整个代码结构可能都坍塌了,需要推倒重来。

这是我为什么说async/await的CPS实现不那么重要的原因。在Node中基于P过程构建整个程序是很简单的,如果设计允许这样做,那么恭喜你。如果设计上不允许这样做,你需要仔细理解这篇文章说的Q过程,和对应的代码实现里需要遵循的设计原则。

Q过程的问题本身不限于Node编程,用fiber,coroutine,thread或者process实现并发一样会遇到这个问题。要实现完整的Q过程逻辑必须用特殊的语法创建过程,例如Go语言的Goroutine,或者fibjs里的Coroutine;使用者和实现者之间需要通过Channel或对等的方式实现通讯。实现者的s/e/0等状态对使用者来说是显式的。

在Node里做这件事情事实上是比其他方式简单的,因为前面说的,inter-process communication是同步的,它比异步通讯要简化得多。

销毁过程

Node/JavaScript社区可以说很不重视从语法角度做抽象设计了。

http.ClientRequest里的取消被称为abort,Node 8.x之后stream对象可以被使用者直接销毁,方法名是destroy,但是 Node 8很新,你不能指望大量的已有代码和第三方库在可见的未来都能遵循一致的设计规则。在Promise的提案里,开发者选择了cancel来表示这个操作。

我在这篇文档里选择了destroy,用这个方法名来表示使用者主动和显式销毁(放弃)一个过程。新设计的引入需要小小的修改一下Q过程的定义:

Q: s -> 0 | s => e -> 0

=>在这里用于表示它可以是一个自发迁移,也可以是一个被使用者强制的迁移;如果是后者,它必须是一个同步迁移,这在实现上没有困难,能够使用同步的时候我们没必要去使用异步,徒然增加状态数量却没有收益。

在Node生态圈里有不少可以取消的过程对象,例如http.ClientRequest具有abort方法,因此相应的requestsuperagent等第三方加强版的封装,也都支持了abort方法。

但是绝大多数情况下它们都不符合上述Q过程定义,而是把他们定义成了:

P: s => 0

即状态s到0的迁移,可能是自发的,可能是被强制的(同步的)。这些库能这么做是因为:它们处理的是网络通讯,Node在底层提供了abort去清理一个socket connection,除此之外没有其他负担,所以在abort函数调用后,即使后续还有一些操作在库里完成,你仍然可以virtually的当作他们都结束了,因为它符合我们前面对结束的定义“不会对程序逻辑和系统状态产生任何未来影响”。

很多库都选择了这样的简化设计,这个设计在使用者比较小心的前提下也能纳入到“用串行Process”来构造逻辑的框架里,因为大部分库都采用了一个callback形式的异步函数同步返回句柄的方式。

let r = request('something', (err, res) => {
  
})

// elsewhere
r.abort()

这个写法不是解决destroy问题的银子弹,因为它没办法同时提供race和settle逻辑。而且在r.abort()之后,callback函数是否该被调用,也是一个争议话题,不同的库或者代码可能处理方式不一致,是一个需要注意的坑。

还有很多更为复杂的场景的例子。不一一列举了。我们倒回来回顾一下Node API本身,相当多的重要组件都是采用Event Emitter封装的,包括child,stream,fs.stream等等。他们基本上都可以用Q过程描述,但很难纳入到P过程和P过程串行组合中去。

小结

在这一部分内容里我们提出了用Process Model来分析问题的方法,它是一个概念模型,不仅限于分析Event Model的Node/JavaScript,同样可以用于多进程,多线程,或者协程编程的场景。

基于过程的分析我们看出Node的特点在于可以用函数这样简单的语法形式创建并发过程,也指出了Node的一大优势是过程通讯是保证同步的。

最后我们提出了过程可以是有态的,一个相对通用的Q过程状态定义。这是一个难点,但并不是Node的特色,任何双方有态通讯的编程都困难,例如实现一个tcp协议,但是在并发编程上我们回避不了这个问题。

Node的异步和并发编程可以简单的分为两部分:

  1. 如何封装一个有态异步过程(Q过程)
  2. 如何写出健壮的过程组合

前者是Node独有的特色,它不难,但是要有一套规则;后者不是Node独有的,而且Node写起来只会比多线程编程更容易,如果在Node里写不好,在其他语言里也一样。

通常的情况是开发者在设计上做一点妥协,牺牲一点并发特性,串行一些逻辑,在可接受的情况下这是Practical的,因为它会大大降低错误处理的难度,但遇到设计需求上不能妥协的场景,开发者可能在错误处理上永远也写不对。

这篇文章的后面部分会讲解第一部分,如何封装异步过程。如何组合这些异步过程,包括等待、排队、调度、fork/join、和如何写出极致的并发和健壮的错误处理,是另外一篇文章的内容。

异步过程

异步过程这个名字不是很恰当,过程本身就是过程,没什么同步异步的,同步或者异步指的是使用者的使用方式。但在没有歧义的情况下我们先用这个称呼。

在Node里一般有两种方式书写一个异步过程:callback形式的异步函数,和Event Emitter。

还有一种不一般的形式是把一个Event Emitter劈成多个callback形式的异步函数,这个做法的一个收益是,和OO里的State Pattern一样,把状态空间劈开到多个代码block之内;很显然它不适合写复杂的状态迁移,会给使用者带来负担,但在只有两三个串行状态的情况下,它可以使用,如果你偏爱function形式的语法,唾弃Class的话。

异步函数

异步函数的状态定义是:

P: s -> 0

它没有返回值(或句柄),它唯一的问题是要保证异步。初学者常犯的错误是写出这样的代码:

const doSomething = (args, callback) => {
  if (!isValid(args)) {
    return callback(new Error('invalid args'))
  }
}

这是个很严重的错误,因为doSomething并不是保证异步的,它是可能异步可能同步的,取决于args是否合法。对于使用者而言,如果象下面这样调用,doElse()和doAnotherThing()谁先执行就不知道了。这样的逻辑在Node里是严格禁止的。

doSomething((err, data) => {
    doAnotherThing()
})

doElse()

正确的写法很简单,使用process.nextTick让callback调用发生在下一个tick里。

const doSomething = (args, callback) => {
  if (!isValid(args)) {
    process.nextTick(() => callback(new Error('invalid args')))
    return
  }
}

异步函数可以同步返回一个对象句柄或者函数(就像前面说的http.ClientRequest),如果这样做它是一个Event Emitter形式的等价版本。我们先讨论完Emitter形式的异步过程,再回头看这个形式,它事实上是一个退化形式。

Event Emitter

我们直接考虑一个Q过程:

P: s -> 0 | s => e -> 0

代码大体上是这样一个样子,:

class QProcess extends EventEmitter {
  constructor() {
    super()
  }

  doSomething () {

  }

  destroy() {

  }
  
  end() {
      
  }
}

这个过程对象可以emit error, data, finish三个event。

用Process Model来理解它,这个class的方法,相当于使用者过程向实现者过程发送消息,而这个class对象emit的事件,相当于实现者向使用者发送消息。从Process模型思考有益于我们梳理使用者和实现者之间的各种约定。

使用者调用new QProcess时创建(fork)了一个新的Process。绝大多数Process不需要有额外的方法,因为大部分参数直接通过constructor提供了。

Builder Pattern在Node和Node的第三方库中很常用,例如superagent,它提供了很多的方法,这些方法都是在构造时使用的,他们都是同步方法,而且没有fork过程,直到最后使用者调用end(callback)之后才fork了过程,我们不仔细讨论这个形式,它很容易和这里的QProcess对应,唯一的区别是end的含义和这里定义的不同。

这里提供了两个特殊方法,destroy和end,前者是销毁过程使用的,它在错误处理时很常见。在串行组合过程的编程方式下,我们没有任何理由去destroy其中的一个正在执行的过程;但是在并发编程下,一个过程即使没有发生任何错误,也可能因为并发进行的另一个过程发生错误而被销毁,从这个角度说,Q过程都应该提供destroy方法。

end在这里是对应stream.Writable的end逻辑;一个写入流(我们同样把他当Process看),如果没有end它永远不会结束;这种过程对象和等价于for-loopsetInterval不同,在绝大多数情况下我们不会希望它是永远不结束的。

doSomething是一个通用的写法,如果QProcess封装的过程需要尽早开始工作,但是它也需要不断的接受数据,doSomething用于完成这个通讯,典型的例子是stream.Writable上的write方法。write方法不同于end方法的地方在于,write方法不会导致QProcess出现对使用者而言显式的状态迁移,但end方法是的。

error事件,当然可以argue说一个QProcess可以在抛出error后继续工作,但是我们不讨论这个场景;我们假定QProcess在抛出error后无法继续走在可能成功的s路线上;如果它可以继续,那么那个error请使用其他event name,当作一种data类型来处理。

Node的Emitter必须提供error handler,否则它会直接抛出错误导致整个应用退出,所以从这个意义上说,error是critical的。符合我们说的限制。

data事件,它类似write,它应该表示QProcess仍然走在可能成功的s路线上。

finish事件,它符合前面我们说过的过程结束的定义。

实现者承诺

1. 异步保证

在任何情况下,QProcess都不允许在使用者调用方法时,同步Emit事件。

在Event Emitter形式下的Q过程,仍然要遵循异步保证。它的出发点是一致的,如果没有这个保证,使用者无法知道它调用方法之后的代码,和event handler里的代码哪一个先执行。如果遇到同步的错误,QProcess仍然需要象异步函数一样,用process.nextTick()来处理。

2. emit时的状态完备性

如果QProcess需要emit事件,它必须保证自己处于一个对使用者而言,显式且完整的状态下。

QProcess内部的实现也会侦听其他过程的事件,这些事件的到来可能会导致QProcess执行一连串的动作。

例子:如果一个QProcess内部包含一个ChildProcess对象,在QProcess处于s状态时,它抛出了error,这时过程已经无法继续,QProcess执行一连串的动作向e状态迁移:

transition: s -> action 1 -> action 2 -> action 3 -> e

emit error的时间点在进入e状态之后,emit error的含义是通知使用者发生了致命错误,而且QProcess已经迁移至e状态

在action过程中随意的emit error是严格禁止的。

因为:使用者可能在error handler中调用QProcess的方法,无论是同步还是异步方法。如果严格要求QProcess在有效状态下emit,那么QProcess的实现承诺就是这些方法在有效状态下可用。如果写成在action 1之后也允许emit error,对QProcess的要求就提升到在任何transition action时都可用,这种是无厘头的挑战自我设计,毫无意义。而且即使你成功的挑战了自我,使用者也带来了额外的负担,它的handler里对QProcess的状态假设是什么?是状态迁移中?这明显不合理。

Q过程对使用者是显式有态的,是它的执行逻辑的依据,所以这里应该消除歧义,杜绝错误。这种错误是严重的协议错误。

3. 只emit一次,承诺状态

QProcess在内部的一次event handler中只允许emit一次,而且承诺状态:

  1. 如果emit error,表示QProcess处于e状态
  2. 如果emit data,表示QProcess处于s状态
  3. 如果emit finish,表示QProcess处于0状态

有两种常见的错误情况。

第一个例子:假如QProcess的第一个操作,例如通过fs.createReadStream()创建一个文件输入流,因为文件不存在立刻死亡了。这时它有这样一些选择:

  1. 先emit error,然后同步emit finish;
  2. 先emit error,然后异步emit finish;
  3. 只emit error,不再emit finish;
  4. 只emit finish,不emit error;

正确的做法是4。因为QProcess已经结束,是显式状态0,emit finish通知使用者自己发生了状态迁移(s -> 0)是正确的做法。至于错误,推荐的做法是直接在finish里提供,在对象上创建标记让使用者去读取也是可以的。

在这样的设计下,emit error的语义被缩减的了,即如果QProcess emit error,说明它一定处于e状态,不是0状态,这有助于使用者使用(代码路径分开原则,后述)。

在这个例子下如果选择1呢?你怎么考虑如果在emit error之后:

  1. 实现者对自己的状态承诺是什么?
  2. 使用者如果在error handler里调用QProcess的同步方法,它会被强制状态迁移吗?如果迁移了,那么随后emit finish还能成立吗?
  3. 使用者的error handler方法和finish handler方法是可能异步可能同步执行的,使用者要保证在这样的情况下也OK吗?

第二个例子:假如QProcess处于s状态,抛出了data事件,对QProcess而言它不知道这个data是否非法,但是使用者可能有额外的逻辑认定这个data是错误的,这个时候它调用了QProcess的destroy方法,这个方法要求QProcess的强制状态迁移s -> e

如果遵循这一条设计要求,这种设计就是很安全的。否则连续的emit的第二次的emit对状态的假设就没法确认了,难道使用者还需要在第二次emit之前去检查一下自己的状态吗?

error的处理有另外一种设计路径。在s状态下emit error,然后使用者调用destroy方法强制其进入e状态;逻辑上是对的,也具有数学美感,因为它没区分s和e的处理方式;但我倾向于不要给使用者制造负担,实现者的代码写一次,使用者的代码写很多次,这样设计需要使用者在每一次都要去调用destroy,相对麻烦。当然如果你有足够的理由局部去做这样的设计,可以的。

4. Destroy是同步方法

回到Process模型上来。

如果一个Process的实现是用操作系统进程、线程来实现,同步destroy的可能性是没有的,只能发送一个message或signal,对应的进程或者线程在未来处理这个消息,对于使用者而言它仍然可能在destroy之后获得data之类的message,当然这也不是很麻烦,使用者只要建立一个状态作为guard,表示Process已经被destroy了,忽略除了exit之外的消息即可。

在Node里面,逻辑上是一样的,但是实现者的destroy的代码可以同步执行,它也是同步迁移到e状态的,使用者不需要建立guard变量来记录实现者的状态;按照Node的stream习惯,实现者应该有一个成员变量,destroyed,设置为bool类型,供使用者检查实现者状态。

5. End是同步方法

逻辑上是和Destroy一样的,不同之处在于实现者都处于s状态。

在一些错误处理情况下,使用者可能会根据一个过程对象是否end采取不同的错误处理策略。尚未end的过程一般被抛弃了,通常也无法继续进行;但是已经end的过程可能会等待它完成。即对一组并发的过程可能采用不同的策略。

但是在设计角度说,还是前面那句话,要坚持mechanism和policy分离的原则。实现者应该提供这个机制而不是强制使用者必须用某种策略,策略是使用者的逻辑。它可以全部抛弃尚未完成的Process,也可以只抛弃尚未end的,对于大文件传输来说这可以避免不必要的再次传输,毕竟网络传输已经完成,只是文件尚未完全写入文件系统。

使用者策略

下面来看使用者策略和需要注意的问题,其中一些问题的讨论会给使用者和实现者都增加新规则。

Mutex

Event Handler是一种状态资源。

假如我们在用状态机方法写一个对象(不需要是上述的过程),这个对象的某个状态有一个时钟,它在进入这个状态时需要建立这个时钟,在超时时向另一个状态迁移,但是它也可以在计时状态下收到其他事件从而迁出这个状态,这是它必须清除这个时钟,这是状态机编程下的资源处理原则:在enter时创建,在exit时清理。为什么要清理呢?即使不考虑浪费了一个系统时钟资源,这个时钟挂上了一个callback,我们必须阻止它的执行,否则系统逻辑就错了。

所以从这个意义上说,Event Emitter上的所有Listener,也要被看作是资源。需要做相应的清理,否则可能会导致错误。

例子,启动一个子进程,等待它返回一个消息,作为过程需要的结果。

let c = child.spawn('some child process')
c.on('error', err => {})
c.on('message', message => {})
c.on('exit', (code, signal) => {
    
})

这段常见的代码形式最终如何封装取决于我们的设计要求,如果允许省事我们可以先把它封装成s -> 0,即使用最简单的回调函数形式。

我们看一下这三个handler,他们都是我们需要在开始的时候就挂载上去的,包括exit(finish),因为子进程可能没有完成工作就意外死亡了。但是exit有点特殊,如果error先发生了,我们就不关心message和exit了,如果message先发生了,我们也不再关心error和exit了,这意味着我们已经拿到了正确的结果,即使在这个结果之后发生了子程序的意外退出,无所谓了,而如果exit先发生了,那么我们也不用关心error和message了,因为ChildProcess应该承诺这是finish。

所以看起来非常简单的代码,仔细分析一下才会发现三者是互斥的。不管结果如何,first win。所以代码写成这个样子了:

function doSomething(args, callback) {
  let c = child.spawn('some child process, with args')
  c.on('error', err => {
    c.removeAllListeners()
    c.on('error', () => {})
    callback(err)
  })
  
  c.on('message', message => {
    c.removeAllListeners()
    c.on('error', () => {})
    callback(null, message)
  })
  
  c.on('exit', (code, signal) => {
      // if ChildProcess is trusted, no need to remove any listeners
    callback(new Error(`unexpected exit with code ${code} and signal ${signal}`))
  })
}

在error和message handler里都清除了与之互斥的代码路径。有一个小小的坑是Emitter的error handler必须提供,否则会造成全局错误,这是Node的设计,所以我们塞上一个function mute它。另外这里的child对象是我们自己创建的,这样粗暴的removeAllListeners就可以了。如果是外部传入的,不能这样暴力,而只能清除自己装上去的handler。

这段代码通常的写法是在function内放一个闭包变量,例如let finished = false,然后在所有的event handler里面做guard,如果只有一层逻辑,这样写是OK的,但是闭包变量做guard在逻辑多的时候,尤其是出现等待同步逻辑的时候,很无力。它没有清楚的看到所有的状态空间,容易导致错误。

习惯上我把这个设计原则称为mutex(mutual exclusive),当然有时候不一定是双方互斥,象上面的例子就是多方的。mutex当然在thread model下有其他的含义,但是在Node.js的上下文下没有那个含义的mutex,我们姑且用这个词。

这里顺便给一个拆除pipe的例子,因为这种情况太场景了,写不对的开发者不在少数。

假定在代码中有ws是write stream,rs是read stream,调用过rs.pipe(ws)

rs.removeAllListeners()
ws.removeAllListeners()
rs.on('error', () => {})
ws.on('error', () => {})
rs.unpipe()
rs.destroy()  // node 8.x specific
ws.destroy()  // node 8.x specific
  1. 这里removeAllListeners()是简化的写法,如果stream不是自己创建的,需指定listener。
  2. 拆除和重装listeners应该在调用方法之前,因为我们不确定Node stream遵循了写在前面的设计原则,即它会不会同步emit error或其他事件。
  3. destroy是node 8.x才有的方法,如果是早期的版本没有这个方法,会报错。

代码分开原则

这个原则表述起来很容易。

  1. 空间上分开,例如error是走向错误的代码(e-path),data是走向可能获得结果的代码(s-path);这一点在Event Emitter的形式上(error和data handler)已经得到保证,在callback或者finish handler上需要自己检查,但不是很大的麻烦;
  2. 时间上分开,这个要麻烦一点,它的意思是说最好在每个状态下只给所有的过程对象装上对这个状态向下一个状态迁移的handler代码,在真正迁移时,先移除所有旧的handler,然后装上新的handler。

比如把上面P过程实现的代码进化成过程,即考虑使用者对finish事件是有兴趣的,它可能需要采取settle逻辑,而不仅仅是race。

在这种情况下:

  1. s -> 0一步完成且得到message的成功路径是没有的。成功的路径只能是s1 -> s2 -> 0,其中s1没有message,s2有。
  2. s -> 0一步完成未获得message的路径是有的。
  3. s -> e -> 0的路径是有的,先遇到错误,迁移到e状态,然后迁移到完成。
  4. 实际上还可能遇到s1 -> s2 -> e -> 0的情况,即错误发生在获取message之后;与之相反的,也可能遇到先得到error然后得到message的可能,那么这两种情况我们都抛弃了,我们的设计目的是使用者对finish有兴趣,不是为了穷举状态空间。
class DoSomething extends EventEmitter {
  
  constructor(args) {
    super()
    this.destroyed = false
    this.finished = false
    
    this.c = child.spawn('some child process, with args')
    c.on('error', err => {
      // enter e state
      this.destroyed = true
      c.removeAllListeners()
      c.on('error', () => {})
      c.on('exit', (code, signal) => {
        this.finished = true
        this.emit('finish')
      })
      this.emit('error', err)
    })
    
    c.on('message', message => {
      // s1 -> s2
      this.message = message
      c.removeAllListeners()
      c.on('error', () => {})
      c.on('exit', (code, signal) => {
        this.finished = true
        this.emit('finish')
      })
      this.emit('data', this.message)
    })
    
    c.on('exit', (code, signal) => {
      // -> 0
      this.finished = true
      this.emit('finish', new Error('unexpected exit'))
    })
  }
  
  destroy () {
    if (this.finished || this.destroyed) return
    this.destroyed = true
    this.c.removeAllListeners()
    this.c.on('error', () => {})
    this.c.on('exit', () => {
      this.finished = true
      this.emit('finished')
    })
    this.c.kill()
  }
}

这个例子不算特别的有代表性,但是它展示了和OO编程模式下的State Pattern一样的代码路径分开原则,收益也是一样的。

destroy的实现也很容易。

代码中的这种实现方式大部分开发者都会同意:强制转换状态,且继续emit finish。但我们有另外的设计方式。

更好的Destroy设计

如果考虑error code path和success code path的分开,我推荐另一种设计方式:

destroy在调用后,过程对象不再emit finish事件;如果destroy提供了callback,在原有应该emit finish事件的地方,改为调用该callback;如果destroy没有提供callback,do nothing。换句话说,如果使用者提供了callback,它就选择了settle逻辑,如果它不提供callback,就是race了。

按照这样的设计方式,destroy的代码改为:

  destroy (callback) {
    if (this.finished || this.destroyed) return
    this.destroyed = true
    this.c.removeAllListeners()
    this.c.on('error', () => {})
    if (callback) {
      this.c.on('exit', () => {
        this.finished = true
        callback()
      })
    }
    this.c.kill()
  }

站在使用者角度讲,这样的实现更好。因为使用者可以分开让过程自发运行和强制销毁的finish handler代码。destroy操作如此特殊,它几乎只用于错误处理阶段,让使用者为此独立提供代码块是更方便的,这符合我们严格分开成功和错误处理的代码路径的原则。

退化的场景

下面来给前面用P过程状态定义实现的异步函数加装destroy方法,粗暴的方式是直接返回函数(而不是对象句柄)。

function doSomething(args, callback) {
  let destroyed = false
  let finished = false
  
  let c = child.spawn('some child process, with args')
  c.on('error', err => {
    c.removeAllListeners()
    c.on('error', () => {})
    finished = true
    callback(err)
  })
  
  c.on('message', message => {
    c.removeAllListeners()
    c.on('error', () => {})
    finished = true
    callback(null, message)
  })
  
  c.on('exit', (code, signal) => {
      // if ChildProcess is trusted, no need to remove any listeners
    finished = true
    callback(new Error(`unexpected exit with code ${code} and signal ${signal}`))
  })
  
  return (callback) => {
    if (destroyed || finished) return
    c.removeAllListeners()
    c.on('error', () => {})
    if (callback) {
      c.on('exit', () => callback())
    }
    c.kill()
  }
}

这个逻辑和之前是一样的。但是这个做法更容易出现设计形式上的争议:因为doSomething在调用时提供的callback没有保证返回。

我们说这个原则在doSomething不提供返回的时候肯定是需要保证的。但是如果象这样写,这个原则可以修改。

反对这样设计的理由是充分的,任何请求或者Listener在提供者销毁时应该得到通知,这是对的;比如在OO编程时我们经常会观察一些对象,或者向某个提供服务的组件请求服务,即使被观察对象或者服务组件销毁也应该提供返回或通知。

但是这里的设计原则和上述场景不一样。在这里我们强调Owner原则,使用者创建实现者是给自己使用的,自己是Owner,自己销毁这个实现者然后还要坚持从原有的callback或者finish handler得到一个特定的Error Code判断销毁原因,这没有必要。

至于上述的Observer Pattern的Listener,或者向Service组件的请求的实现,我们在下一篇会给出代码例子。

在这里我们强调的是,这里的callback或者finish handler,不是上述Observer/Service Request意义上的listener/callback,Observer和Service Requester并非被观察对象或者服务的Owner,他们当然无权修改对象或服务的特性,而且服务质量也必须被保证。

但是在这里,我们是在用callback或者emitter这种代码形式实现process composition所需的inter-process communication。我们有权为了使用便利而设计代码形式。

总结

为复杂Process建立显式状态,是解决问题的根本方法。如果设计要求就是具有这些状态的,你用什么方法写都一样,这不是Node特有的问题。

Node特有的问题在于它写异步有态过程需要遵循的设计原则,很多开发者不熟悉状态机方法,所以很难写的健壮。这个话题也可能有它比较独特的地方,因为Node是强制异步过程和事件模型的混合体,但是这种编程模式在嵌入式系统、内核、以及一些系统程序中,是非常常见的(大多数是C语言的,指针实现callback)。

这篇文章我会长期维护。如果需要更多的代码示例,或者有很具体的问题需要讨论,可以提出。

只要能把基础的异步过程写对,如何组合它们实现并发,并发控制,等待,同步(join),以及健壮的错误处理,是易如反掌的,那是我们下一篇的话题。

    原文作者:uglee
    原文地址: https://segmentfault.com/a/1190000011447510
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞