TLDR;
这篇文章的作风是在致敬 Jim 先生;致敬,致敬,懂吗,不是剽窃,顺序员的事怎么能叫剽窃。
固然我对 Node.js 的 stream 也是现学现卖,有运用不当的处所,敬请指出。
原文链接 迎接 star。
写这篇文章的初志是年前看 SICP 的时刻,第二章引见组织数据笼统的时刻有提到 Lisp 对序列的处置惩罚采纳相似『信号流』的体式格局。所以很天然的就想到了 Node.js 中的 pipe 体式格局,于是就一向想用 pipe 的体式格局尝试一下。
同 Jim 先生的这篇 文章 中形貌的一样, 我也是懒癌发生发火,从年末拖到今年年终,然后在年终又看到了 Jim 先生 的博客,深受启示,终究下定决心要最先码了…… 然后,嗯,又拖到昨天。促使我下定决心要写的重要原因是昨天部门的年会!横竖年会跟我这类死肥宅也没多大关联,在人人 happy 的时刻构想了下代码完成,回家用了一晚上的时刻补上了代码。
Jim 先生在他的文章内里也说了,JS 的那些数组操纵 (map
/ reduce
/filter
) 啥的,每次挪用的时刻都邑举行一次完全的遍历。试想一下假如有一个第一个数是1,长度是 1亿 的递增为 1 的数组,须要把一切的数组都乘 3,再消除个中的奇数,假如用 (map
/filter
) 的要领,只需也须要轮回 一亿五千万次;那末假如有其他方法能只轮回一亿次,是否是节省了大批的内存资本和轮回斲丧的时候。
空话不多说,直接上代码吧。
pipe
在编写代码时,我们应该有一些要领将顺序像衔接水管一样衔接起来 — 当我们须要猎取一些数据时,能够去经由过程”拧”其他的部份来到达目标。这也应该是IO应有的体式格局。 — Doug McIlroy. October 11, 1964
关于 node 的 stream 能够看看这篇 文章。
下面是代码部份,全部代码我是在边学 pipe 边用一晚上的时候急急写就的,懒癌发生发火,也不想再重构了,列位相公考究看吧,求别喷代码。
进口
const stream = require('stream')
const last = Symbol()
// 在 selfArray 中吸收一个真正的数组
// 返回一个可读流
// 假如再做的精细点,能够做成可读可写流,如许就可以经由过程掌握流的大小,来掌握内存的大小,别几亿条数据直接撑爆内存了
// 不过对背面 reduce 的处置惩罚就比较贫苦
function selfArray(a) {
const rs = new stream.Readable({
objectMode: true
})
a.forEach((v, index) => {
rs.push(v)
})
rs.push(last)
rs.push(null)
return rs
}
上面的 selfArray 在流的最背面 push 了一个 Symbol 对象来标志全部流的输入完毕,留待为以后 reduce 的运用。
Map
/Filter
/Reduce
的完成
function forEach(callback) {
const ws = new stream.Writable({
objectMode: true
})
let index = 0
ws._write = function (chunk, enc, next) {
if (chunk !== last) {
callback(chunk, index++)
next()
}
}
return ws
}
function filter(callback) {
const trans = new stream.Transform({
readableObjectMode: true,
writableObjectMode: true
})
let index = 0
trans._transform = function (chunk, enc, next) {
if (chunk === last) {
next(null, last)
} else {
let condition = callback(chunk, index++)
if (condition) {
this.push(chunk)
}
next()
}
}
return trans
}
function map(callback) {
const trans = new stream.Transform({
readableObjectMode: true,
writableObjectMode: true
})
let index = 0
trans._transform = function (chunk, enc, next) {
if (chunk === last) {
next(null, last)
} else {
next(null, callback(chunk, index++))
}
}
return trans
}
function reduce(callback, initial) {
const trans = new stream.Transform({
readableObjectMode: true,
writableObjectMode: true
})
let index = 0,
current = initial,
prev = initial
trans._transform = function (chunk, enc, next) {
if (chunk === last) {
if (index > 1) {
prev = callback(prev, current, index - 1)
}
this.push(prev)
this.push(last)
return next(null, last)
}
if (initial === void 0 && index === 0) {
prev = chunk
}
if (index > 0) {
prev = callback(prev, current, index - 1)
}
current = chunk
index++
next()
}
return trans
}
上面的代码在 reduce 的完成轻微贫苦了一些,reduce 对没有初始值,原始数组为空的前提下有种种差别的处置惩罚状况,翻看了下 MDN 的诠释又本身完成了下。
运用
selfArray([9, 2, 6, 3, 5, 6, 7, 1, 4, 4])
.pipe(map(v => v * 3))
.pipe(filter(v => v % 2))
.pipe(reduce((p, c) => p + c, 0))
.pipe(forEach(v => {
console.log('pipe 盘算末了的结果是:', v)
}))
为了悦目我有意把种种括号都删掉了。嗯,看起来还挺圆满,我们来测试下
selfArray([9, 2, 6, 3, 5, 6, 7, 1, 4, 4])
.pipe(map(v => {
console.log('map:', v)
return v * 3
}))
.pipe(filter(v => {
console.log('filter:', v)
return v % 2
}))
.pipe(reduce((p, c) => {
console.log('reduce:', p, c)
return p + c
}, 0))
.pipe(forEach(v => {
console.log('pipe 盘算末了的结果是:', v)
}))
加上 log 以后能够看到结算结果是:
map: 9
filter: 27
map: 2
filter: 6
map: 6
filter: 18
map: 3
filter: 9
reduce: 0 27
map: 5
filter: 15
reduce: 27 9
map: 6
filter: 18
map: 7
filter: 21
reduce: 36 15
map: 1
filter: 3
reduce: 51 21
map: 4
filter: 12
map: 4
filter: 12
reduce: 72 3
pipe 盘算末了的结果是: 75
从上面的 log 能够看到, 第一个数 9 先执行了 map
,然后在 3 以后就直接进入了 filter
,此时第 2 个数 2 也最先被 map
处置惩罚,然后被 filter
处置惩罚,然则因为 3 以后是偶数不会被 reduce
吸收, reduce
会一向比及第二个奇数,也就是 3 进入以后才会被处置惩罚… 嗯,直到终究的盘算结果是 75, 被 forEach
斲丧。
总结
虽然我没有像 Jim 先生一样举行机能测试,然则猜想也晓得 pipe 的体式格局在数目比较小的时刻肯定要弱于一般体式格局,pipe 的优点在于数据量比较大的时刻,能够运用比较小的内存,尽快的处置惩罚数组中前置的数据。