Reactor Model

简介

Reactor模型是一种在事件模型下的并发编程模型。

Reactor模型首先是一个概念模型;它可以描述在Node.js中所有的并发和异步编程行为,包括基础的异步API,EventEmitter对象,以及第三方库或实际项目中的已有的异步过程代码。

其次,Reactor模型定义了一组简单但严格的设计规则,这些规则涵盖异步过程的行为、状态、和组合设计。这些规则可以用于设计和实现一个Node.js应用的全部行为逻辑。

对于已有的Node.js API、库和项目代码,大多数情况下它们的行为和状态设计已经遵循了Reactor模型的要求,但是在组合实现上,可能一些代码不符合要求,但可以通过重构使之符合Reactor模型规范,通常不必大面积重写。

第三,Reactor模型没有提供任何代码,包括库函数或者基础类代码,也不要求统一的代码形式,只要应用Reactor模型的设计规则即可。

Reactor模型是与Process模型对等的并发模型,它适用于使用事件模型和支持异步io的编程环境例如,Node.js。和设计模式(Design Pattern)中的解决某类问题的行为型模式相比,Reactor是事件模型下通用的行为模型,所以我们不把Reactor模型看作一种设计模式,设计模式可以在Reactor模型之上实现。

动机

Node.js已经问世8年,但对很多开发者而言它仍然是一个较新的语言环境;它独特的结合了事件模型和非阻塞io,非阻塞性在代码中通过代码单元的组合,把几乎所有代码都变成了异步形式。

对基于Process模型实现并发编程的问题,既不缺乏抽象的理论模型,也不缺乏解决各种具体问题的设计模式,且不限于开发语言,甚至存在语言针对高并发问题设计(例如Go);但是在事件模型和结合了异步过程的Node.js中,我在近两年的高强度开发中几乎没有读到过在这方面具有统一的模型抽象、系统、全面、同时象GoF设计模式那样直接指导编程实践的文章。

这是建立Reactor模型和写下这篇文章的目标。

方法论

我们从三个方面阐述Reactor模型:

  1. 什么是Reactor?它是什么?有什么基础特性?
  2. 如何实现组合模式?通过组合我们不仅仅可以用Reactor描述一个组件的行为,还可以用它来描述整个应用。
  3. 如何使用Reactor模型应对并发编程中的各种问题?

这篇文章阐述1和2的内容,3是下一篇文章的内容。

Reactor

一个Reactor表示一个正在执行的过程,它是一个概念【见脚注1】;和Process模型中的Process概念一样,是对一个过程的抽象。

在实际的Node.js代码中,程序运行时每次调用一个异步函数,或者用new关键字创建了一个表示过程的对象,在应用中都创建了一个Reactor;在这个异步函数执行结束,或者过程对象抛出最后的事件(通常为finish或者close)时,这个Reactor的生命周期就结束了;不管是否称之为Reactor,开发者对它并不陌生。

我们用略微严格的方式来看这个有生命周期的动态过程,具有哪些特性和设计规则。

特性(Properties)

一个Reactor具有五个基本特性:

  1. Reactor是反应式的(Reactive);
  2. Reactor具有输入输出(io);
  3. Reactor是异步的(asynchornous);
  4. Reactor是有态的(stateful);
  5. Reactor是动态的(dynamic);

Reactive

Reactor本质上是状态机(state machine);虽然应用Reactor模型不强制要求开发者使用设计模式中的State Pattern代码形式,但要求开发者为每个Reactor在心里构建一个状态机模型。

Reactor的内部行为用State/Event来描述,它是Reactor的实现(Implementation)方法。在一个Reactor收到事件时——这个事件可能来自内部,也可能来自外部——它会根据状态机行为设计更新自己的状态,这个过程我们称为React或Reaction。

Input & Output

Reactor具有输入输出;在形式上:

  • 对于一个异步函数,我们可以说它的参数是输入,返回结果是输出(包括错误);
  • 对于一个EventEmitter对象,调用它的方法可以看作是输入,它emit的事件都可以看作输出;
  • 如果一个对象向外提供callback形式或者async形式的异步方法,那么调用该方法可以看作输入,该方法的返回可以看作是输出;

与内部的State/Event状态机实现不同,input/output描述的是Reactor的外部界面(interface)。

一个Reactor的input,在内部看都可以理解为事件,虽然其中一些可能导致Reactor的状态迁移,而另一些不会;在讨论Reactor的状态机行为时,我们使用input event或external event表示Reactor收到的外部事件。

Reactor可以产生output,用于输出数据或通知外部自己发生了状态迁移;Reactor模型主要关心后面一种情况。

Reactor可以组合,其组合方式和OO中的组合逻辑一样,同样的我们把一个Reactor包含的Reactor称为其成员(member)。成员抛出(output)的事件对于Reactor的内部状态机实现而言是事件,但它不是input,因为它来自内部。我们使用internal event表示一个Reactor收到的来自成员的事件。

Asynchronous

在任何情况下,一个Reactor都不允许在input时同步产生output。这就是Reactor模型中对异步的定义,它被定义成了一个Reactor的内禀属性。

这个要求对于异步函数来说是一种常识;但对于EventEmitter对象而言,一些实际代码并没有做到这个承诺(甚至没有试图这样去做);在Reactor Model中,这个行为是强制要求的。

Stateful

Reactor是有态的。

在实现组合(包括有并发的组合)时,Reactor需要清楚的知道每个成员的状态,至少一个成员是否在进行中还是已结束是必须清楚的,所以Reactor的状态迁移定义至少是running -> stopped

和input/output一样,这里说的状态指的是Reactor的外部状态,而非其内部实现的完整状态机状态。因为Reactor是对过程的通用抽象,在绝大多数情况下,在外部看只需为其建立很少的状态,例如:正常运行、已经发生过错误、已经被强制终止,已经结束等等。

我们用显式状态(explicit state)一词来表述Reactor的外部状态【见脚注2】。

Dynamic

Reactor是动态的包含两个意思:

  1. 它可以被动态创建和销毁的【见脚注3】。
  2. Reactor是运行时概念,而不是代码层面的编译时的概念;如果用面向对象编程来类比,它对应object,而不是class。

Summary

Reactor不是一个高深或复杂的概念,它甚至可以认为是从代码中总结出来的。

输入输出和动态性写在这里是为了看起来略微严谨,实际上几乎所有的模型的基础构件,Process,Object, Actor等等,都有输入输出和动态性。

Reactive特性是Reactor的内部实现要求,写在这里是为编程考虑;如果要严格(形式化)定义Reactor,它不是必须的要素,因为Reactor概念是黑盒抽象,如何实现是白盒特性。

所以Reactor真正特别的地方只有两点:

  1. 它封装了异步的概念,在Reactor模型中只有这样一个异步定义,毫无歧义,而且可以说是毫无实现负担;
  2. 它显式有态,但是不复杂;建立显式状态的唯一目的是为了实现组合,它与一个Reactor的目的或实现细节无关,应该把它理解为一种语法(Syntax)而不是一种语义(Semantics)。

简单的说,Reactor表示一个异步和显式有态的过程。

代码示例

在Node.js里Reactor通常有两种代码形式:

  1. 异步函数
  2. EventEmitter的继承类

下面我们看看如何把它们理解成Reactor,其中一些代码例子展示了简单的封装。

异步函数

fs.readdir('some path', (err, data) => {
  ...
})

调用一个异步函数可以创建一个Reactor(实例)

把一个异步看作一个Reactor的构造函数,这看上去有些古怪。但我们应该这样理解:虽然这个异步函数可能没有返回任何对象引用,但是调用之后系统实实在在的启动了一个过程,这个过程在未来会对系统行为产生影响,调用过和没调用过,系统整体的状态不一样。如果我们用状态来描述系统,它是系统状态的一部分。

这里有个哲学问题可以说一下,通常我们说状态的本质是系统的输入(事件)历史。假如系统没有建立任何状态模型,理论上,如果系统记录了全部的输入历史,不考虑性能,系统总是可以完成同样行为的;从这个意义上说,状态的本质是输入历史。

但是在调用一个异步函数后,我们说有一个未来(Future)是系统状态的一部分,这个未来是系统自身的历史行为的造成的,但它并非是一个输入的历史。

换一个角度看,在JavaScript里调用一个异步函数等价于创建一个promise,处于pending状态,那么这个promise可以看作是这个Reactor的一个显式表达。它符合Reactor的四个定义。

异步函数的返回,可以看作它产生了output,这个output除了可以返回error/data(也可以不返回任何值),更重要的,它向外部emit了一个finish事件。

在状态约定上,这种Reactor的状态数量是最少的,只有两个状态:在执行,记为S,和结束,记为0;它的状态迁移图可以简单的写成:

S -> 0

其中->符号用于表示一种自发迁移,即这种迁移不是因为使用者通过input强制的。

Spawn a Child Process

let x = child.spawn('a command')
x.on('error', err => { /* do something */ })
x.on('message', message => { /* do something */ })
x.on('exit', (code, signal) => { /* do something */})

// somewhere
x.send('a message')

// elsewhere
x.kill()

一个ChildProcess对象可以看作一个Reactor;调用它的sendkill方法都视为input,它emit的error, message, exit等事件都应该看作output;

ChildProcess对象的状态定义取决于执行的程序(command)和使用者的约定。Node.js提供的ChildProcess是一个通用对象,它有很多状态设计的可能。

S -> 0

考虑最简单的情况:执行的命令会在结束时通过ipc返回正确结果,如果它遇到错误,异常退出。在任何情况下我们不试图kill子进程。在这种情况下,它可以被封装成S->0的状态定义。

const spawnChild1 = (command, opts, callback) => {
  let x
  const mute = () => {
    x.removeAllListeners()
    x.on('error', () => {})
  }

  x = child.spawn(command, opts)
  x.on('error', err => {
    mute()
    x.kill()
    callback(err)
  })

  x.on('message', message => {
    mute()
    callback(null, message)
  })

  x.on('exit', () => {
    mute()
    callback(new Error('unexpected exit'))
  })
}

在这里我们不关心子进程在什么时间最终结束,即不需要等到exit事件到来即可返回结果;设计逻辑是error, messageexit是互斥的(exclusive OR),无论谁先到来我们都认为过程结束,first win。

这段代码虽然没有用变量显式表达状态,但应该理解为它在spawn后立刻进入S状态,任何事件到来都向0状态迁移。

按照状态机的设计原则,进入一个状态时应该创建该状态所需资源,退出一个状态时应该清理,在这里应该把event handler看作一种每状态资源(state-specific resource);在不同状态下,即使是相同的event名称也应提供不同的函数对象作为event handler(或不提供)。

所以mute函数的意思是:清除在S状态下的event handlers,装上0状态下的event handlers。使用Reactor Model编程,这种代码形式是极推荐的,它有利于分开不同状态下事件处理逻辑的代码路径,这和在Object Oriented语言里使用State Pattern分开代码路径是一样的工程收益。

S => 0

我们使用a ~> b来表示一个Reactor可以被input强制其从a状态迁移至b状态,用a => b表示这个状态迁移既可以是input产生的强制迁移,也可以自发迁移。

习惯上我们使用S,或者S0, S1, …表示过程可能成功的正常状态,用E表示其被销毁或者发生错误但是尚未执行停止。

S => 0的状态定义的意思是Reactor可以自发完成,也可能被强制销毁。

原则上是不应该定义S ~> 0或者S => 0这样的状态迁移的,而应该定义成S ~> E -> 0或者S => E -> 0,因为大多数过程都不可能同步结束,他们只能同步迁移到一个错误状态,再自发迁移到结束。

但常见的情况是一个过程是只读操作,出错时不需要再考虑后续行为,或者在强制销毁之后,其后续执行不会对系统未来的状态产生任何影响,那么我们可以认为它已经结束。

允许这样做的另一个原因是可以少定义一个状态,在书写并发时会相对简单。

在这种情况下spawnChild2可以写成一个class,也可以象下面这样仍然写成一个异步函数,但同步返回一个对象引用;如果使用者只需要destroy方法,返回一个函数也是可以的,但未来要给这个对象增加新方法(input)就麻烦了。

const spawnChild2 = (command, opts, callback) => {
  let x
  const mute = () => {
    x.removeAllListeners()
    x.on('error', () => {})
  }

  x = child.spawn(command, opts)
  x.on('error', err => {
    mute()
    x.kill()
    callback(err)
  })

  x.on('message', message => {
    mute()
    callback(null, message)
  })

  x.on('exit', () => {
    mute()
    callback(new Error('unexpected exit'))
  })

  return {
    destroy: function() {
      x.mute()
      x.kill()
    }
  }
}

这里的设计是如果destroy被使用者调用,callback函数不会返回了,这样的函数形式设计可能有争议,后面会讨论。

S -> 0 | S => E -> 0

这是一个更为复杂的情况,使用者可能期望等待到子程序真正结束,以确定其不再对系统的未来产生任何影响。这时就不能设计S => 0这样的简化。

S -> 0和前面一样,表示子程序可以直接结束;S => E表示子程序可能发生错误,或者被强制销毁,进入E状态;E -> 0表示其最终自发迁移到结束。

在这种情况下,需要使用EventEmitter的继承类形式了。

class SpawnChild3 extends EventEmitter {

  constructor(command, opts) {
    super()

    let x = child.spawn(command, opts)
    x.on('error', err => { // S state
      // s -> e, we are not interested in message event anymore.
      this.mute()
      this.on('exit', () => {
        mute()
        this.emit('finish')
      })

      // notify user
      this.emit('error', err)
    })

    x.on('message', message => { // S state
      // stay in s, but we don't care any further error
      this.message = message
      this.mute()
      this.on('exit', () => {
        this.mute()
        this.emit('finish')
      })
    })

    x.on('exit', () => { // S state
      this.mute()
      this.emit('finish', new Error('unexpected exit'))
    })

    this.x = x
  }

  // internal function
  mute() {
    this.x.removeAllListeners()
    this.x.on('error', () => {})
  }

  destroy() {
    this.mute()
    this.x.kill()
  }
}

stream

Node.js有非常易用的stream实现,各种stream都可以理解成一个Reactor,只是状态更多一点。

对Readable stream,通常前面的S -> 0 | S => E -> 0可以描述其状态。在Node 8.x版本之后,有destroy方法可用。

S1 -> S2 -> 0 | (S1 | S2) => E -> 0

对于Writable stream,S状态可能需要区分S0和S1,区别是S0是end方法尚未被调用的状态,S1是end方法已经被调用的状态。

区分这两种状态的原因是:在使用者遇到错误时,它可能希望尚未end的Writable Stream需要抛弃,但已经end的Writable Stream可以等待其结束,不必destroy。

在这种情况下,严格的状态表述可以写成S1 -> S2 -> 0 | (S1 | S2) => E -> 0

Node.js里的对象设计和Reactor Model的设计要求高度相似,但不是完全一致。一般而言stream不需要再封装使用。实际上熟悉Reactor Model之后前面的spawn child也没有封装的必要,代码中稍微写一下是使用了哪个状态设计即可。

Summary

在实际工程中,实现一个Reactor应该尽可能提供destroy方法和诚实汇报结束(finish)事件;否则经过组合后的Reactor将无法完成这两个功能。如果不从最细的粒度做好准备,在粗粒度上销毁一个复杂组件并等到其全部内部过程都完成清理和结束,就会成为无法完成的工作。

在这一节里我们看到了四种最基本的显式状态定义:

S -> 0                                    # 可以表示简单的异步过程
S => 0                                    # 可以表示可以取消的只读操作或网络请求
S -> 0 | S => E -> 0                    # 可以表示绝大多数以结果为目标,会发生错误和可销毁的过程
S0 -> S1 -> 0 | (S0 | S1) => E -> 0        # 可以表示Writable Stream等需要操作才可以结束的过程

对于工程实践中的绝大多数情况,这四种显式状态定义就够用了。

组合(Composition)

在上一节我们把在Node.js代码中调用异步函数或new关键子定义为创建Reactor;在实际代码中,除了Node.js API之外的异步函数或者EventEmitter类都是开发者自己定义的,在代码上,它们需要调用其他的异步函数或者创建类成员实例来实现;在这一节,我们来理解父函数或者父类创建的Reactor,和它使用的子含说或者子类创建的Reactor对象之间的关系,这种关系也是动态的,我们把它定义为Reactor的组合。

组合指的是两个Reactor之间的包含关系,借用OO编程里的术语,我们把被包含的Reactor称为成员。有时候我们也会用父Reactor和子Reactor来表述这种关系。

组合关系不是一种并发关系,这就像一个函数调用了另一个函数,我们不会说他们是并发的。

组合关系是一种动态关系,因为Reactor是一个过程,在运行时,它的子过程(即成员)是不断的被创建和结束的。

例子

在Node.js中书写一个异步函数或者EventEmitter,都是在实现组合,这里给两个例子。

异步函数

const lsdir = (dirPath, callback) => {
  fs.readdir(dirPath, (err, entries) => {
    if (err) return callback(err)
    if (entries.length === 0) return callback(null, [])
    let count = entries.length
    let stats = []
    entries.forEach(entry => {
      let entryPath = path.join(dirPath, entry)
      fs.lstat(entryPath, (err, stat) => {
        if (!err) stats.push(Object.assign(stat, { entry }))
        if (!--count) callback(null, stats)
      })
    })
  })
}

这段代码中的执行分为两个阶段,先是readdir过程,readdir结束后(可能)并发一组lstat过程,每个readdir或者lstat过程都是一个Reactor,他们都和lsdir过程构成了组合关系。在这里所有的Reactor都采用了异步函数的形式,也都采用了S -> 0的状态定义。

Emitter

class Hash extends EventEmitter {
  
  constructor(rs, filePath) {
    super()
    this.ws = fs.createWriteStream(filePath)
    this.hash = crypto.createHash('sha256')
    this.destroyed = false
    this.finished = false

    this.destroy = () => {
      if (this.destroy || this.finished) return
      this.destroyed = true
      rs.removeListener('error', this.error)
      rs.removeListener('data', this.data)
      rs.removeListener('end', this.end)
      this.ws.removeAllListeners()
      this.ws.on('error', () => {})
      this.ws.destroy()
    }

    this.error = err => (this.destroy(), this.emit(err))    
    this.data = data => (this.ws.write(data), this.hash.update(data))
    this.end = () => (this.ws.end(), this.digest = this.hash.digest('hex'))

    rs.on('error', this.error) 
    rs.on('data', this.data)
    rs.on('end', this.end)
    ws.on('error', this.error)
    ws.on('finish', () => (this.finished = true, this.emit('finish')))
  }
}

这段代码中Hash接受两个参数,一个Readable Stream对象和一个文件路径,它把stream写入文件,同时计算了sha256,保存在this.digest中。

具体的代码逻辑不重要,这里Hash可以构造一个过程对象,它会在内部创建一个write stream过程和一个hash计算过程,对应的Reactor的组合关系和对象和成员的组合关系是一致的,这是使用class语法的好处。

这个例子中组合得到的Hash,采用了S -> 0 | S => E -> 0的定义。(实际上这段代码有bug,在filePath指向了一个目录而非文件时,但作为例子这里暂时忽视这个问题。)

Reactor Tree

上面的例子看起来更象在解释代码,没有额外逻辑;事实也是如此,我们并不试图创造新写法,只是从Reactor组合的角度去看程序运行时过程之间的关系。和Reactor是一个运行时的动态对象一样,这个组合关系也是运行时动态的。

用Reactor组合去理解整个Node.js应用:

  1. 整个应用可以理解为“最大”的Reactor;
  2. Node.js的异步API可以理解为最小的Reactor,这些Reactor的内部实现是Node.js run-time提供的,在应用中看不到,应用中只能看到它的界面,即输入输出;
  3. 在两者之间,每一个在执行的异步函数或创建的EventEmitter对象,都可以用Reactor组合来解释;

一个在运行的Node.js应用,在任何时刻,都存在这个由过程和过程组合构成的层级结构(Hierarchy),在Reactor模型中我们把这个运行时的过程和过程的组合关系构成的层级结构称为Reactor Hierarchy,或者Reactor Tree

并发

在Reactor Tree上,我们可以获得并发的第一个定义。

如果在任何时刻这个Tree都退化成一个单向链表,程序就回到了单线程、使用blocking i/o编程和运行的方式,在Process/Thread模型中,它被称为Sequential Process;如果存在时刻,至少有一个Reactor存在两个或两个以上Children,或者等价的说,整个tree存在两个或两个以上Leaf Node,这个时候我们说它存在并发

这个并发定义是从现象观察得到的,它从Reactor(或过程)的组合关系来定义,具有数学意义上的严格性;但是它没有区分一个Reactor所表示的过程到底处于何种状态,是一个在执行的io,还是一个计算任务;两者的区别在于前者几乎不占用CPU资源,而后者可能产生显著的计算时间,在调度任务时,后者可能会造成其他任务的starving(长时间拿不到CPU资源),甚至导致系统完全不可用。

但是从概念模型角度说,我们接受这个并发定义,它简单纯粹,没有歧义。

状态通讯协议

即使不建立严格的模型和术语体系,在直觉上,用朴素的过程和过程组合来理解Reactor Tree的行为,我们也可以预见到在程序的运行时,每个过程在不断的抛出事件,它的父过程接受和处理事件,构成运行时的执行流程

我们建立Reactor模型的目的,就是要显式表述和充分理解这个执行流程, 它首先是设计的一部分,开发者需要给出其精确和完备的定义;其次,它在组合的过程中应该遵循一些简单规则,使这个流程尽可能容易理解、容易设计、容易调试、减少设计和实现的错误;这个执行流程不能是模糊的,或者在运行时陷入混沌(Chaos),包括在软件工程中逻辑单元和系统规模在不断的增长,逻辑变得越来越复杂时。

这是我们建立Reactor模型和定义Reactor组合关系的初衷;为了让Reactor之间的交互和整个Reactor Tree的执行流程更加简单、可靠、和有序,我们需要设计一套在组合Reactor时,Reactor之间的交互和通讯需要遵循的逻辑。我们把这套规则,称为Reactor模型的状态通讯协议

严格的说,我们在前面定义的Reactor时,其输入输出、异步、和有态特性,都是这个状态通讯协议的一部分;但是我们这里不追求形式化意义上的严格,我们把上述特性留给Reactor的界面定义,把其余的部分作为状态通讯协议定义。

Reactor的组合模式具有良好的递归特性和黑盒特性,即在各个粒度上都可以实现再组合,也可以在各个粒度上把一个Reactor当成(有态)黑盒看待,所以我们只需要定义在一层组合关系下的通讯协议。

在Reactor组合中,父子过程之间的通讯应遵循下述协议要求:

  1. 子Reactor如果因为内部事件触发显式状态迁移,必须emit事件通知父Reactor;

    1. 子Reactor必须先完成状态迁移(reaction),然后才能emit;
    2. emit必须是同步的;
    3. emit的事件必须表明子Reactor刚刚迁入的显式状态;
  2. 子Reactor如果因为外部事件触发显式状态迁移,禁止emit事件;

    1. emit事件违反Reactor的异步要求;
    2. 如果外部事件会触发显式状态迁移,必须是一次强制迁移;即父Reactor调用子Reactor的方法强制子Reactor迁移至某个状态,这是该方法承诺实现的,且迁移是同步的;

规则1是子Reactor发生自发(触发来自内部)的显式状态迁移时的行为要求和对对父Reactor做出的状态承诺;规则2是父Reactor强制子Reactor状态迁移时子Reactor的行为要求和状态承诺。这两种情况中父子Reactor之间的交互,称为Reactor组合中的状态通讯

在第一种情况中的状态通讯的代码形式是异步函数返回或者emit事件时调用(父Reactor提供的)callback函数;在第二种情况中是父Reactor调用子Reactor的(同步)方法。

在Reactor模型的组合模式下,父子Reactor的通讯必须是同步的。

连锁反应

一个Reactor的内部事件可以产生它的内部状态更新,这是Reaction;如果这个状态更新导致其显式状态迁移,按照通讯协议设计,它应该向父Reactor抛出事件,这是Communication,这个事件对父Reactor来说是内部事件,父Reactor同样做出Reaction。

这个过程可以迭代下去,成为连锁反应(chained reaction)。

在Reactor Tree上,上述连锁反应是自下而上的;它也可以自上至下,例如在父Reactor接收到data事件时,它判断数据有错误,因此强制销毁子Reactor;

这个也可能影响到并发的Reactor;例如父Reactor具有两个并发成员,a和b,在a抛出error事件时,父Reactor决定销毁并发的b过程(例如前面Hash的例子中,rs的错误处理中销毁ws);

回到整个Reactor Tree上考虑这种连锁反应;Node.js事件循环的一个tick,总是从一个基础异步API的callback开始,即最小的和原子的Reactor开始向整个应用抛出事件。

这个事件可能向上传播产生连锁反应,在向上传播的过程中可能产生向下传播,但是它不可能在向下传播的过程中再次产生向上传播,这是Reactor的异步特性保证的。换句话说,Reactor的异步特性保证整个Reactor Tree的Reaction是可以结束的。

如果Reactor没有这种异步保证,Reactor Tree上就可能出现循环(cyclic)的reaction;在理论意义上说它是livelock,在实践意义上说它是stack-overflow。

Reactive

在Reactor的基础特性中我们定义了一个Reactive,它指的是对一个Reactor的内部实现使用状态机方法,它响应(React)外部事件更新状态,作为它的行为定义。

在组合模式下我们看到第二个Reactive的含义:在Reactor Tree上,执行流程是以连锁反应的方式构成的。这和我们在Process模型下,top-down的方式控制执行流程的方式完全相反,它是bottom-up的。

在这个意义上说,不仅仅是一个Reactor单元是Reactive的,整个应用都是用Reactive的方式组成的(Composition)。

同步

我们在Reactor组合的状态通讯协议中约定了同步通讯,同时Reactor的Reaction过程也是同步的,这导致针对任何原始事件,整个Reactor Tree的Reaction是同步完成的。

要理解为什么这个同步特性重要,我们对比一下Process模型中的并发编程模型。

Process模型中定义的并发是指两个或两个以上Process在同时执行;基于Process模型并发编程只需要编写两个逻辑:Process的执行逻辑和Process之间的通讯逻辑(ipc)。

Process的编程在代码形式上是同步的(blocking);Process之间的通讯可以通过某种系统或run-time提供的ipc机制实现。

如果所有ipc都是同步的(blocking & unbuffered),这简化对Process之间交互逻辑的编程,就像一个transport层的传输协议使用了stop-and-wait(ack)方式实现,但效率上这是无法使用的;而异步实现ipc会让编写并发过程的交互逻辑显著困难。

在React模型中,React组合关系的Reaction,对应了Process模型中的ipc通讯和Process之间的交互逻辑;在Reactor模型下,我们为每个过程建立了显式状态,定义了通讯时的状态约定,Reactor可以同步获得成员的状态,可以同步的创建新过程、销毁正在执行的过程(强制状态迁移),那么Reactor之间的全部交互逻辑,都可以同步完成。这会大大简化这种逻辑的设计、实现、调试和测试。

我们在前面的定义里看到了,Reactor和Process是不同的概念,但是他们表示的都是过程;而并发编程的本质(和难点),就是在编程并发过程之间的交互和关系。在Reactor模型中的同步通讯,可以给这种编程带来的显著收益。

这种同步方法的相关理论研究与实践,参见脚注4。

确定性

Reactor模型下的并发编程,其系统行为具有确定性:

  1. 在事件模型下,单线程的执行方式消除了任务调度导致的不确定性;
  2. Reactor的同步状态迁移和通讯消除了(异步)ipc通讯导致的不确定性;
  3. 虽然在任何一个时刻,整个系统无法预知下一个到来的事件会是谁?但无论是谁,我们都可以获得当前状态+下一事件=下一状态意义上的确定性。

在并发系统编程中,这种确定性是宝贵的财富。

完备性

经典的状态机方法无法scale,因为软件的实际状态空间太大了,而且是动态的。

在数学上我们解决无限(infinity)问题唯一的工具就是递归(recursion),与之等价的是归纳法(induction)。

如果我们定义了并发系统的初始状态,它的第一个事件到来,因为Reactor模型具有上述的确定性,我们可以得到下一个系统状态的定义;即使我们无法确切的预知下一个事件是谁,理论上我们可以考虑对所有可能的下一个事件,我们都可以给出一个具有确定性的整体状态迁移的设计;在这个过程中应用归纳法,我们可以得到如果确定知道系统在第N个事件处理结束后的状态,我们可以给出它在第N+1个事件到来的确定设计,这构成了在设计上的完备性。

Reactor模型中Reactor具有外部黑盒定义和组合方式保证了这种设计也是可以黑盒组合的,符合我们在软件工程中构建系统时使用的方法论要求(Divide and Conquer, or Decomposition and Composition)。

缺点

Reactor在模型上的主要缺点是:和Process模型相比,它的reactive composition流程的书写,不如在Process内书写if/then/else流程语句来得方便;但它的Reaction具有同步特性,书写上要更加便利。

Process模型和Reactor模型,或者说Thread模型和Event模型,他们构成了Duality,前者在编写过程逻辑时简单,在编写过程交互逻辑时困难,后者正好相反;所以到底孰优孰劣,取决于需要编程的问题,和实际系统实现的种种限制。

在Node.js中使用Reactor模型编程,在计算任务上有限制。因为Node.js是单线程执行的,所有的Reaction逻辑都运行在一个CPU线程下,它有收益,但是在计算密集型任务时,会遇到性能瓶颈。

有这样一些解决方法:

  1. Node.js和JavaScript直接支持异步计算过程;实际执行可以把计算任务抛到其他线程去执行,但主线程仍然采用异步函数或EventEmitter的接口形式,即计算任务也是Reactor;这种方式应该是JavaScript的正确进化路线(而不是thread和lock),但是遥遥无期;它需要JavaScript在根本上支持immutable数据结构。
  2. 用native add-on实现异步计算;这是目前可用的方式,优点是效率,缺点是需要书写C/C++代码;Node.js中内置的部分crypto函数是这样实现的;
  3. 用子进程计算;ipc的效率是较大的问题,子进程的启动时间和内存消耗也是问题,适合某些使用场景但不是好的通用问题解决方案。

小结

至此我们完整阐述完了Reactor模型,也看到了它和Process模型编程模式的差异与关系。

Reactor是动态对象,具有可组合特性,组合的Reactor Tree可描述整个应用的运行时状态。

Reactor的组合特性根本上改变了执行流程的构建方式,它在单一Reactor的Reaction代码书写上仍然使用代码控制流程,但是在Reaction级联时成为Reactive模式;整个应用都是Reactive的。

与Process模型相反,用Reactor为过程建模,过程间的通讯与交互逻辑成为同步逻辑;这既是Reactor模型的特征,也是这种编程模式的最大收益。

Reactor模型理论上具有确定性和设计完备性;实际应用中这需要严格遵循Reactor模型的行为、状态与状态通讯协议的设计规范。

并发

我们以并发作为命题和出发点,但是到目前位置并没有深入谈并发编程。

因为在Reactor模型下,我们已经构建的概念和规则,构成了这个模型的语法(Syntax);但是并发问题,由于Reactor模型的同步特性,它演化成了一个纯粹的语义(Semantics)问题(或者说算法问题)。

在并发中有一些常见的概念,都是在Process模型下建立的,包括fork/join,race/settle,push/pull,并发控制和调度;另外一些概念是属于并发编程模型本身的,不限于是Process模型还是Reactor模型,例如responsive/latency/priority,starving,fairness等等。

这些概念在Reactor模型中可以这样简单陈述:

  1. fork: 创建多个新的Reactor;
  2. join: 把多个任务从已完成队列中移除,同时把新任务推入等待队列,或直接创建Reactor;
  3. race: 在Reactor的非finish事件中创建下一步逻辑的Reactor;
  4. settle: 在Reactor的finish事件中创建下一步逻辑的Reactor;
  5. push: 在Reactor完成时立刻推入下一个任务队列;
  6. pull: 在Reactor完成时如果下一个任务队列已满,停止调度;等到下一个任务队列可以填充时向上一任务的已完成任务队列提取任务,然后用调度器推入执行队列;
  7. concurrency control: 限制并发成员的数量,使用等待队列;
  8. schedule:根据设计要求把等待队列中的任务推入执行队列;
  9. responsive:使用优先级和并发较少的执行队列;
  10. starving:使用bounded执行队列和控制任务的单次执行时间;
  11. fairness:全局调度;

好消息是所有这些问题可以用一把锤子解决:数学上的Petri Nets模型,它直接支持并发过程,和Reactor模型的结合美好到天衣无缝。

坏消息是你可能会感到代码是外星人写的。但是他们仍然简单、易于理解、和易于调试。它的古怪不是模型带来的问题,而是Node.js中事件模型和异步过程的奇葩结合的结果。

但我们仍然认为Node.js是天赋禀异的,因为无论代码能力多强,你永远不可能打败数学;在数学上:

Concurrent System === Reactive System

Stay Tuned.

参考资料

一下参考资料中,3和4的部分章节是大力推荐想全面了解状态机方法如何应用到scalable的系统的读者阅读的。

[1] UML State Diagram,wikipedia entry

[2] David Harel,1984,paper, Statecharts: A Visual Formalism for Complex SystesmPDF download

[3] MIT OCW textbook,Ch4, State Machine,PDF download

[4] Edward A. Lee and Sanjit A. Seshia, 2017 book, Introduction to Embedded SystemsPDF download

[5] Albert Benveniste,1991,paper,The synchronous approach to reactive and real-time systemsPDF download

[6] Nicolas Halbwachs,1993,book,Synchronous Programming of Reactive SystemsPDF download

Footnotes

1 概念

在逻辑学家和语言学家那里是用一种近乎苛刻的方式理解“概念”一词的。

例如冯小刚是一个人的名字(name),这个名字所指的人,即冯小刚本人,称为这个名字的denotation (the thing denoted)。

但是我们也可以用其他的名字指冯小刚这个人,例如“徐帆的老公”或“集结号的导演”,这两个名字是不同的含义(sense),在逻辑学和语言学上把这个含义称为概念(concept)。

在这里通用的“过程”一词,“Process模型中的Process”,和“Reactor模型中的Reactor”,它们所指(denote)的事物都是一样的,但是它们的概念(concept)不同,取决于上下文,即它和外部其他概念的关系。

2 Explicit State和Super State

一个Reactor的Explicit State是其内部实现的状态机中的Super State;这里Super State符合UML State Diagram的定义,其模型来自David Harel定义的Statecharts(Hierarchical State Machine)。

在Graph的意义上说,一个Reactor的Explicit State的状态迁移图是内部状态机的状态迁移图经过Graph Contraction操作之后得到的结果。

3 动态状态机

动态性在软件领域看起来是一个常识,过程和对象都可以动态创建和销毁;但是在硬件电路设计、控制系统,最小的嵌入式系统,和类似的状态机应用领域中并不常见。

经典状态机模型不是动态的,Harel设计的Hierarchical State Machine所提供的Hierarchy并不是组件模型意义上的组合定义,它只是把复杂的Flat State Machine作抽象形成Hierarchical结构,是一种Graph变换,它无法scale。

经典状态机能够Scale的方式是通过io通讯组合(compose)状态机单元,包括级联(cascading),并行(parallel),反馈(feedback)或任意有向图组合;这种状态机模型称io automata,它是可以scale的,也是人类能够设计出具有72亿晶体管且可以可靠工作的集成电路芯片的根本;但是它仍然是静态的,因为芯片上的硬件单元并没有动态创建和销毁的可能,单元之间的通讯线路也无法动态增加。

在io automata上继续拓展出动态能力的模型称dynamic io automata,这是相对而言在理论研究方面较新的领域,其形式化工作在最近两年才有相关的学术成果。

Reactor Model完全符合dynamic io automata的定义。

4 同步方法

本文所述的Reactor模型中的同步反应和通讯,在研究领域不是新课题。

最初的研究从法国Inria大学开始,文献5是我能找到的最早的文章,法国的研究者们(主要来自Inria大学)在这个领域做了广泛的工作,并设计了多种编程语言直接支持这种同步特性。文献6是对这些工作全面翔实的介绍。

在文献4中,作者把这种应用同步方法构成的系统称为Synchronous-Reactive Models,本文所述Reactor模型和书中所述SRM完全一致。

Reactor模型的主要工作是在SRM基础上给出了组合过程时的通讯协议约定,把SRM中的概念对应到Node.js的编程实践中,并用于解决实际的并发编程问题。

    原文作者:uglee
    原文地址: https://segmentfault.com/a/1190000011534247
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞