预览。
先给出一个基础类代码。
const EventEmitter = require('events')
const debug = require('debug')('transform')
class Transform extends EventEmitter {
constructor (options) {
super()
this.concurrency = 1
Object.assign(this, options)
this.pending = []
this.working = []
this.finished = []
this.failed = []
this.ins = []
this.outs = []
}
push (x) {
this.pending.push(x)
this.schedule()
}
pull () {
let xs = this.finished
this.finished = []
this.schedule()
return xs
}
isBlocked () {
return !!this.failed.length || // blocked by failed
!!this.finished.length || // blocked by output buffer (lazy)
this.outs.some(t => t.isBlocked()) // blocked by outputs transform
}
isStopped () {
return !this.working.length && this.outs.every(t => t.isStopped())
}
root () {
return this.ins.length === 0 ? this : this.ins[0].root()
}
pipe (next) {
this.outs.push(next)
next.ins.push(this)
return next
}
print () {
debug(this.name,
this.pending.map(x => x.name),
this.working.map(x => x.name),
this.finished.map(x => x.name),
this.failed.map(x => x.name),
this.isStopped())
this.outs.forEach(t => t.print())
}
schedule () {
// stop working if blocked
if (this.isBlocked()) return
this.pending = this.ins.reduce((acc, t) => [...acc, ...t.pull()], this.pending)
while (this.working.length < this.concurrency && this.pending.length) {
let x = this.pending.shift()
this.working.push(x)
this.transform(x, (err, y) => {
this.working.splice(this.working.indexOf(x), 1)
if (err) {
x.error = err
this.failed.push(x)
} else {
if (this.outs.length) {
this.outs.forEach(t => t.push(y))
} else {
if (this.root().listenerCount('data')) {
this.root().emit('data', y)
} else {
this.finished.push(y)
}
}
}
this.schedule()
this.root().emit('step', this.name, x.name)
})
}
}
}
module.exports = Transform
这段代码目前还是雏形。
Transform
类的设计类似node里的stream.Transform
,但是它的设计目的不是buffering或流性能,而是作为并发编程的基础模块。
如果你熟悉流式编程,Transform
的设计就很容易理解;在内部,Transform
维护四个队列:
pending
是input bufferworking
是当前正在执行的任务finished
是output buffer,它的目的不是为了buffer输出,而是在没有其他输出办法的时候作一下buffer。failed
是失败的任务
Transform
可以组合成DAG(Directed Acyclic Graph)使用,ins
和outs
用来存储前置和后置Transform
的引用,pipe
方法负责设置这种双向链接;最常见的情况是双向链表,即ins
和outs
都只有一个对象。但把他们设计成数组就可以允许fan-in, fan-out的结构。
push
和pull
是write和read的等价物。
schedule
是核心函数,它的任务是填充working
队列。在构造函数的参数里应该提供一个名字为transform
的异步函数,schedule
使用这个函数运行任务,在运行结束后,根据结果把任务推到failed
队列、推到下一个Transformer
、用root节点的emit输出、或者推到自己的finished
队列里。
Transform
设计的核心思想,就是把并发任务的状态,不使用对象属性来编码,只使用队列位置来编码;任何一个子任务,在任何时刻,仅存在于一个Transform
对象的某个队列中。换句话说,它等于把并发任务用资源来建模。如果你熟悉restful api对过程或状态的建模方式就很容易理解这一点。
在
Transform
中,任何transform
异步函数的返回,都是一个step
;step
是用Transform
实现并发组合的最重要概念;
每一次transform
函数返回,都会发生改变自己的队列或向后续的Transform
对象push
任务的动作,这个push
动作会触发后续Transform
的schedule
方法;step
结束时自己的schedule
方法也会被调用,它会重新填充任务。在这些动作结束后,所有Transform
的队列变化,就是整个组合任务状态机的下一个状态。
这个状态是显式的,可以打印出来看,对debug非常有帮助;虽然异步i/o会让这种状态具有不确定性,但至少这里坚持了组合状态机模型在处理并发问题时的同步原则,每个step
结束时整体做一次状态迁移,这个状态迁移可以良好定义和观察,这是Event模型下并发编程和Thread模型的重要区别。后者遇到并发逻辑引起的微妙错误时,很难捕捉现场分析,因为每一个Thread是黑盒。
从transform
返回开始到emit(step)
之间的一连串连锁动作都是中间过程,最终实现一次完整的状态迁移,这个过程必须是同步的。不应在这里出现异步、setImmediate或者process.nextTick等调用,这会带来额外的不确定因素和极难发现和修复的bug。
在前面很长一段时间的并发编程实践中,我指出过Promise的race/settle和错误处理逻辑在一些场景下的困难。Promise的过程逻辑不完备。我也花了很多力气试图在Process代数层面上把error, success, finish, race, settle, abort, pause, resume, 和他们的组合逻辑定义出来,但最终发现这很困难,因为实际编程中各种处理情况太多了。
所以在Transform
的设计中,这些逻辑全部被抛弃了,因为事实上它们都不是真正的基础并发逻辑。
Transform
试图实现组合的基础并发逻辑只有一个:stopped
。stopped
的定义非常简单:在一次step
结束时,所有的Transform
的working
队列为空,就是(整体的)stopped
。这里要再次强调前述的step
结束时同步方法的必要性,如果你在schedule
里使用了异步方法调用,那么这个stopped
的判断就可能是错的,因为schedule
可能会在event loop里放置了一个马上就会产生新的working
任务的动作,而isStopped()
的判断就错了。
stopped
时,整体组合状态可能是success, error, paused, 等等,都不难判断,但目前代码尚未稳定,我不打算加入语法糖。
在blocking i/o和同步的编程模式下,因果链和代码书写形式是一致的,但是在异步编程下,因果是异步和并发的,你只能去改变因,然后去观察果,这是很多程序员不适应异步编程的根本原因,因为它要改变思维的习惯。
使用Transform
来处理并发编程,仍然是在试图重建这个因果链,即使他们是并发的,但是我们要有一个办法把他们串起来;
前面说到的isStopped()
是观察到的果,能够影响它的因,是isBlocked()
函数,这个函数在schedule
中被调用,如果估值为true
,就会阻止schedule
继续向working
队列调度任务。
这里写的isBlocked()
的代码实现只是一个例子;可以阻止schedule
的原因可能有很多,比如出现错误,或者输出buffer满了,这些可以由实现者自己去定义。他们是policy,isBlocked()
本身是mechanism。这个策略的粒度是每个Transform
对象都可以有自己的策略。比如一个删除临时文件的操作,结果是无关痛痒的,那么它不该因为error就block。
isBlocked()
逻辑可以象示例代码里那样向下chain起来,即只要有后续任务block了,前置任务就该停下来;这在绝大多数情况下都是合理的逻辑。因为虽然我们写的是流式处理办法,但是我们不是在处理octet-stream,追求性能的buffering和flow control都没什么意义,如果前面任务在copy文件后面的任务要移动到目标文件夹,如果目标文件夹出了问题前面快速移动了大量文件最终也无法成功。
如果组合状态机停止了,向其中的任何一个Transform
对象执行push或者pull操作都可以让整个状态机继续动起来。从root节点push
是常见情况,从leaf节点pull
也是,向中间节点push
也是可能的;
资源建模的一个好处是你可以把状态呈现给用户,如果一个复制文件的任务因为文件名冲突而fail,你还可以让用户选择处理策略,例如覆盖或者重命名,在用户选择了操作之后,代码会从某个Transform
对象的failed
队列中取走一个对象,修改策略参数后重新push进去,那么这个状态机可以继续执行下去;这种可处理的错误不该成为block整个状态机工作(复制其他文件和文件夹)的原因,除非他们积累到可观的数量,在Transform
模式下这些都非常容易实现,开发者可以很简单的编写isBlocked()
的策略;
和node的stream一样,Transform
是lazy的,纯粹的push machine可能会在中间节点buffer大量的任务,这对把任务作为流处理来说是不合适的;同时,Lazy对于停下来的组合状态机能继续run起来很重要,pull
方法就是这个设计目的,它的schedule
逻辑和push
一样,只是方向相反;如果设置了Leaf节点会因为输出缓冲而block,它就可以block整个状态机(或者其中的一部分),这在某些情况下也是有用的功能,如果整个状态机的输出因为某种原因暂时无法被立刻消费掉。
abort
逻辑没有在代码中实现,但它很容易,可以遍历所有的Transform
,如果working
队列中的对象有abort
方法,就调用它;这不是个立即的中止,该对象仍然要通过callback返回才能stop。如果要全局的block,可以把所有的Leaf Node都pipe到一个sink节点去,把这个sink节点强制设置成isBlocked,可以block全部。pause
和resume
也是非常类似的逻辑。
当然你可能会遇到类似finally的逻辑是必须去执行的,即使在发生错误的时候,它意味着这个Transform
要向前传递isBlocked
信息,但是它的Schedule方法不必停止工作。它可以一直运行到把所有队列任务都处理完为止。
重载schedule
方法也是可能的;例如你的任务之间有前后依赖的逻辑,你就可以重载schedule
方法实现自己的调度方式。另外这里的schedule
代码只基于transform函数,很显然如果transform本身是一个Transform
对象它也应该工作,实现组合过程,包括Sequencer,Parallel等等,这些都是需要实现的。
总而言之,isBlocked
和schedule
是分开的逻辑,它们有各自不同的设计目的和使命,你可以重载它们获得自己想要的结果。所以写在这里的代码,重要的不是他们的实现,而是其机制设计和界面设计,以及接口承诺;所有逻辑都是足够原子化的,每个函数只做一件事,isBlocked
是因,可以根据需要选择策略,isStopped
是果,通过step观察和实现后续逻辑。应该避免通过向基类添加新方法来扩展能力,因为Transform
使用队列和任务描述状态,这个描述是完备的,机制也是完善的。
就像我在另一篇介绍JavaScript语言的文章里写的一样,如果针对问题的模型具有完备性,即使抽象,也可以通过组合基本操作和概念获得更多的特性,而不是在模型上增加概念,除非你认为模型不够完备。
软件工程中不是什么地方都要上状态机(automaton)这么严格的模型工具,项目软件里写到bug数量足够低就可以了,但是如果你要写系统软件或者对正确性有苛刻要求的东西,如果你没有用状态机建模,那么实际上你没有完备设计。
当然有了完备设计也不意味着软件没bug了,但一个好的设计可以让你对问题的理解、遇到问题时找到原因,有极大的帮助。
在复杂系统中,上述的同步方法状态机组合,和Hierarchical的状态机组合,是我们目前已知的两种具有完备性的模型方法。但是两者不同。虽然Transform
的组合看起来是一个Hierarchy,但是它就像你在纸上画一棵树,它仍然是二维的,每个step
的整体状态联动的迁移只是在populate一次状态迁移的范围,并不是几何级数的增加状态组合;所以我们仍然可以构筑一个线性的因果链,每个step
因果因果这样的继续下去,和没有并发的状态机是一样。
本质上这是数学归纳法:如果我们能证明如果n正确,那么n+1是正确的,这就可以证明chain下去的状态组合即使是无穷也是正确的。
第二段代码是使用的一个示例,这个class没有必要,是为了保证和老代码接口兼容,因为有一些项目内其他代码的依赖性就不解释了,很容易看明白大概逻辑;列在这里只是展示一下Transform
使用时pipe过程的代码样子。
const Promise = require('bluebird')
const path = require('path')
const fs = Promise.promisifyAll(require('fs'))
const EventEmitter = require('events')
const debug = require('debug')('dircopy')
const rimraf = require('rimraf')
const Transform = require('../lib/transform')
const { forceXstat } = require('../lib/xstat')
const fileCopy = require('./filecopy')
class DirCopy extends EventEmitter {
constructor (src, tmp, files, getDirPath) {
super()
let dst = getDirPath()
let pipe = new Transform({
name: 'copy',
concurrency: 4,
transform: (x, callback) =>
(x.abort = fileCopy(path.join(src, x.name), path.join(tmp, x.name),
(err, fingerprint) => {
delete x.abort
if (err) {
callback(err)
} else {
callback(null, (x.fingerprint = fingerprint, x))
}
}))
}).pipe(new Transform({
name: 'stamp',
transform: (x, callback) =>
forceXstat(path.join(tmp, x.name), { hash: x.fingerprint },
(err, xstat) => err
? callback(err)
: callback(null, (x.uuid = xstat.uuid, x)))
})).pipe(new Transform({
name: 'move',
transform: (x, callback) =>
fs.link(path.join(tmp, x.name), path.join(dst, x.name), err => err
? callback(err)
: callback(null, x))
})).pipe(new Transform({
name: 'remove',
transform: (x, callback) => rimraf(path.join(tmp, x.name), () => callback(null))
})).root()
let count = 0
// drain data
pipe.on('data', data => this.emit('data', data))
pipe.on('step', (tname, xname) => {
debug('------------------------------------------')
debug(`step ${count++}`, tname, xname)
pipe.print()
if (pipe.isStopped()) this.emit('stopped')
})
files.forEach(name => pipe.push({ name }))
pipe.print()
this.pipe = pipe
}
}
module.exports = DirCopy