每次 HTTP 请求,都是一个 request 到 response 的过程,所以 HTTP 服务器不需要维护任何状态,
而 WebSocket 需要 server 维护所有的连接状态,Ruby 并不很擅长处理并发。ActionCable 为了性能和易用性,用到了很多有趣的技术。
比如 message 的 pub/sub 使用了 redis 的 pub/sub (也有其他的 adapter)。
在并发处理上,使用了 current-ruby 的 ThreadPoolExecutor,而处理 WebSocket 的连接,使用了 nio4r 和 websocket-driver。
下面主要介绍一下这两种技术和 I/O 模型。
websocket-driver
websocket-driver 主要处理 WebSocket 协议,并对 I/O 层做了解耦,也就是说,可以用不同的方式来处理 I/O 事件。同时提供了 :open,:message,:close 等一系列事件,方便使用。
使用 websocket-driver 的 Server-side scoket 需要实现两个方法,url 和 write(string)url
。
url 是用户用来连接 websocket 的url。
write(string)
方法用来将数据写入到 stream 内。
之后,当 I/O 监听到数据的时候,再调用 WebSocket::Driver#parse
方法对数据进行解析。
解析完成后,WebSocket 会通过 :message 事件将解析好的数据传回。
websocket-driver README 给出的 example 使用的是 EventMachine 处理 I/O 操作,而 ActionCable 则直接使用了 nio4r。
I/O 模型
I/O 操作,就是在主存(main memory)和外设之间做数据的复制操作。
而外设一般都比主存慢很多,为了处理速度不匹配问题,可以使用不同的方法对 I/O 进行操作,既 I/O 模型。
同步阻塞IO(Blocking IO)
同步阻塞 I/O,即传统的 I/O 模型。
简单来说,就是当进行 I/O 操作的时候,线程被挂起(suspend),直到 I/O 操作进行完,再执行程序之后的操作。也就是说,在进行 IO 操作的时候,线程一直处于等在状态,所以称为 Blocking IO,Ruby 提供了 IO.read 方法。
同步非阻塞IO(Non-blocking IO)
进行 IO 操作时,无论系统是否准备好数据,都直接返回。如果没有准备好,则返回 error,当用户收到 error 的时候,可以再次尝试 IO 操作。Ruby 提供了IO.read_nonblock 方法。
IO多路复用(IO Multiplexing)
通过 select
让 Kernal 挂起(suspend)当前线程,当一个或者多个 I/O 事件发生的时候,再把控制权交给应用程序。
IO多路复用适用于同时需要监听多个 IO 对象的场景。
select 的时间复杂度是 O(n),Linux 提供了更高效的版本 epoll
,epoll 时间复杂度是 O(log n))。thin 提供了是否使用 epoll 的选项。Ruby 提供了 IO.select 方法。
nio4r
nio4r 可以说是 java.nio 的 Ruby 实现,但提供了更简单的接口。
nio4r 主要有两个部分:
Selectors: 用来同时监控多个 I/O 对象
Monitors: 用来追踪所关心的 I/O 事件,比如读。
require "nio"
server = TCPServer.new("127.0.0.1", 12345)
selector = NIO::Selector.new
3.times do
client = server.accept
_monitor = selector.register(client, :r)
end
ready = selector.select
首先是创建一个 NIO::Selector对象,然后将所关心的 I/O 事件通过 NIO::Selector#registor 注册到 selector 对象上。
最后在一个循环内,调用 NIO::Selector#select 方法,选出 ready 的事件。
在 ActionCable 中处理 I/O 的类是 Connection::StreamEventLoop,初始化 selector 在 @nio ||= NIO::Selector.new
,然后由 StreamEventLoop#attach 方法将 I/O 事件注册到 nio selector 上。
最后由 run 方法来处理事件监听 run,使用 IO.read_nonblock
对 stream 进行读取操作。