Stream
流可以是可读的、可写的、或者是可读写的。所有的流都是EventEmitter的实例。
对象模式
所有使用Node.js API创建的流对象都只能操作strings和Buffer(或Unit8Array)。但是一些第三方流的实现,能够操作其他类型的javascript值(除了null,它在流处理中有特殊意义),这种类型的流被认为是工作在‘对象模式’
缓冲
Writable和Readable流都会将数据存储到内部的缓存(buffer)中
- 可读流的实现调用stream.push(chunk)时,数据被放到缓存中。如果流的消费者没有调用stream.read()方法,就会始终存在于内部队列中,直到被消费。缓存的大小取决于传递给流构造函数的highWaterMark选项。当内部可读缓存的大小达到highWaterMark指定的阀值时,流会暂停从底层资源读取数据,直到当前缓存的数据被消费。
- 可写流通过反复调用writable.write(chunk)方法将数据放到缓存。当内部可写缓存的总大小小于highWaterMark指定的阀值时,调用writable.write()返回true,一旦达到或超过highWaterMark,调用writable.write()返回false,此时应该停止向流中写入数据,直到drain事件被触发。
可读的流Readable
例如fs.createReadStream()
可读流(Readable streams)是对提供数据的源头(source)的抽象
两种工作模式:flowing(流动模式)和paused(暂停模式)
- flowing模式下:可读流自动从系统底层读取数据,并通过EventEmitter()接口的事件尽快将数据提供给应用。
- paused模式下:必须显式调用stream.read()方法来从流中读取数据片段。
初始工作模式都是paused的Readable流,可以通过三种途径切换到flowing模式。
- 监听data事件
- 调用stream.resume()方法
- 调用stream.pipe()方法将数据发送到Writable
可读流可以通过两种方式切换到paused
- 如果可读流没有桥接可写流成为管道,调用stream.pause()实现
- 如果可读流桥接了若干可写流组成了管道,通过取消data事件监听,并调用stream.unpipe()方法移除所有管道目标实现
-
close
事件:
在流或其底层资源(比如一个文件)关闭后触发。事件触发后,该流将不会再触发任何事件。
-
data
事件:
在流将数据传递给消费者时触发。当流转换到flowing模式时会触发该事件。处理器的参数是Buffer对象,如果你调用了Readable的setEncoding(encoding)方法,处理器的参数就是String对象。
-
end
事件:
在流中再没有数据可供消费时触发。
const readable = readableStreamSomehow()
readable.on('data', (chunk) => {
console.log(`received ${chunk.length} bytes of buffer data.`)
})
readable.on('end', () => {
console.log('no more data.')
})
-
error
事件:
通常底层系统内部出错从而不能产生数据,或当流的实现试图传递错误数据时发生。回调函数将接收到一个Error对象。
-
readable
事件:
将在流中有数据可供读取时触发。stream.read()返回可用的数据。
const readable = readableStreamSomehow()
readable.on('readable', () => {
// 有一些数据可读
})
在到达流数据尾部时,该事件也会触发。触发顺序在end事件之前。stream.read()返回null
// foo.txt是一个空文件
const fs = require('fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
console.log('readable', rr.read) // null
})
rr.on('end', () => {
console.log('end') // end
})
-
readable.pipe(destination[,options])
:
绑定一个writable到readable上,形成一个管道,并将所有数据传给绑定的writable。可以在单个可读流上绑定多个可写流。
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)
默认情况下,当源可读流触发end事件时,目标流也会调用stream.end()方法从而结束写入。要禁止这一默认行为,end选项应该指定为false。
reader.pipe(writer, {end: false})
reader.on('end', () => {
writer.end('goodbye)
})
如果可读流在处理时发生错误,目标可写流不会自动关闭。 如果发生错误,需要手动关闭所有流以避免内存泄漏。
一般来说,建议开发人员避免使用’readable’事件和readable.read()方法,使用readable.pipe()或’data’事件代替。
-
writable.unpipe([destination])
:
readable.unpipe()方法将之前通过stream.pipe()方法绑定的流分离
如果 destination 没有传入, 则所有绑定的流都会被分离.
如果传入 destination, 但它没有被pipe()绑定过,则该方法不作为.
可写的流Writable
例如fs.createWriteStream()
Writable streams是destination的一种抽象,这种destination允许数据写入
-
close
事件:
在流或者底层资源(比如一个文件)关闭后触发,事件触发后该流将不会再触发任何事件。
-
drain
事件`:
如果调用stream.write(chunk)方法返回false,流将在适当的时机触发drain事件,这时才可以继续向流中写入数据。
-
error
事件`:
在写入数据出错或者使用管道(pipe)出错时触发,事件发生时,回调函数仅会接收到一个Error参数。注意:error事件发生时,流并不会关闭。
-
finish事件
:
在调用了stream.end()方法,且缓冲区数据都已经传给底层系统之后,finish事件将被触发。
-
pipe事件
:
在可读流上调用stream.pipe()方法,并在目标流向中添加当前可写流时,将会在可写流上触发pipe事件。
const writer = writeStreamSomehow()
const reader = readStreamSomehow()
writer.on('pipe', (src) => {
console.log('piping into the writer')
assert.equal(src, reader)
})
reader.pipe(writer)
-
writable.end([chunk][,encoding][,callback])
:
调用writable.end()方法表明接下来没有数据要被写入writable,通过传入可选的chunk和encoding参数,可以在关闭流之前再写入一段数据。如果传入了可选的callback函数,将作为finish事件的回掉函数。
在调用了stream.end()方法之后,再调用stream.write()方法会导致错误。
// 写入hello 并用world结束写入
const file = fs.createWriteStream('example.txt)
file.write('hello, ')
file.end('wrold!)
// 后面不允许再写入数据
-
writable.write(chunk[,encoding][,callback])
:
向流中写入数据,并在数据处理完成后调用callback。我们建议,一旦write()返回false,在’drain’事件触发前,不能写入任何数据块。
writable.uncork()
可读写的流Duplex
例如net.Socket()
在读写过程中可以修改和变换数据的Duplex流Transform
例如zlib.createDeflate()