emqttd 0.9.0版本重新设计了MQTT连接会话管理
每个MQTT客户端连接,不管是否持久的(Persistent),都启动一个连接会话进程。
该会话进程管理:
- 客户端的全部订阅(Subscription)。
- 服务器发动到客户端的,已发送未确认的Qos1/2消息。
- 客户端发送到服务端,未接收到PUBREL的QoS2消息。
- 客户端离线时,持久会话保存离线的Qos1/2消息。
- 可选设置,持久会话保存离线的QoS0消息。
MQTT客户端连接可以重新启用(Resume)在其他集群节点上的会话(Session)。
消息队列(Message Queue)和飞行窗口(Inflight Window)
每个MQTT连接会话,创建一个简单的内存消息队列,和一个正在处理消息的飞行窗口。
设计如下:
|<----------------- Max Len ----------------->|
-----------------------------------------------
IN -> | Pending Messages | Inflight Window | -> Out
-----------------------------------------------
|<--- Win Size --->|
飞行窗口(Inflight Window)保存当前正在发送未确认的Qos1/2消息。窗口值越大,吞吐越高;窗口值越小,消息顺序越严格。
当客户端离线或者飞行窗口(Inflight Window)满时,消息缓存到队列。
如果消息队列满,先丢弃Qos0消息,或者丢弃最早进入队列的消息。
MQTT QoS1/2消息分配全局唯一的消息ID
每一条QoS1/2消息,分配一个全局唯一的、时间序列的消息ID,用于端到端的消息处理。
PktId <-- --> MsgId <-- --> MsgId <-- --> PktId
|<--- Qos --->|<---PubSub--->|<-- Qos -->|
全局唯一消息ID结构:
--------------------------------------------------------
| Timestamp | NodeID + PID | Sequence |
|<------- 64bits ------->|<--- 48bits --->|<- 16bits ->|
--------------------------------------------------------
- 64bits时间戳: erlang:system_time if Erlang >= R18, otherwise os:timestamp
- Erlang节点ID: 编码为2字节
- Erlang进程PID: 编码为4字节
- 进程内部序列号: 2字节的进程内部序列号
总结,emqttd-0.9.0版本的会话、队列、唯一消息Id设计,配合新版broker扩展Hooks可以实现端到端的消息处理。