基于Netty的IdleStateHandler实现Mqtt心跳
IdleStateHandler
解析
最近研究jetlinks
编写的基于Netty
的mqtt-client
(https://github.com/jetlinks/netty-mqtt-client),总结若干知识点.
Netty
中,实现心跳机制较为简单,主要依赖于IdleStateHandler
判断channel
的读写超时.
/**
* Creates a new instance firing {@link IdleStateEvent}s.
*
* @param readerIdleTimeSeconds
* an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
* will be triggered when no read was performed for the specified
* period of time. Specify {@code 0} to disable.
* @param writerIdleTimeSeconds
* an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
* will be triggered when no write was performed for the specified
* period of time. Specify {@code 0} to disable.
* @param allIdleTimeSeconds
* an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
* will be triggered when neither read nor write was performed for
* the specified period of time. Specify {@code 0} to disable.
*/
public IdleStateHandler(
int readerIdleTimeSeconds,
int writerIdleTimeSeconds,
int allIdleTimeSeconds) {
this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
TimeUnit.SECONDS);
}
以上是IdleStateHandler
的构造函数,主要依赖于三个参数readerIdleTimeSeconds
,writerIdleTimeSeconds
以及allIdleTimeSeconds
.
如果难于理解英文注释,可参考< > https://segmentfault.com/a/1190000006931568一文中的解释:
- readerIdleTimeSeconds, 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.
- writerIdleTimeSeconds, 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.
- allIdleTimeSeconds, 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.
在IdleStateHandler
中,分别通过如下函数实现对channel
读写操作事件的跟踪:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
reading = true;
firstReaderIdleEvent = firstAllIdleEvent = true;
}
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
lastReadTime = ticksInNanos();
reading = false;
}
ctx.fireChannelReadComplete();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// Allow writing with void promise if handler is only configured for read timeout events.
if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
ctx.write(msg, promise.unvoid()).addListener(writeListener);
} else {
ctx.write(msg, promise);
}
}
// Not create a new ChannelFutureListener per write operation to reduce GC pressure.
private final ChannelFutureListener writeListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
lastWriteTime = ticksInNanos();
firstWriterIdleEvent = firstAllIdleEvent = true;
}
};
其中:
channelRead
: 判断channel
是否有数据可读取;channelReadComplete
: 判断channel
是否有数据可读取;write
: 判断channel
是否有数据写(通过writeListener
判断当前写操作是否执行成功).
IdleStateHandler
在channel
激活或注册时,会执行initialize
函数,根据读写超时时间创建对应的定时任务.
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// Initialize early if channel is active already.
if (ctx.channel().isActive()) {
initialize(ctx);
}
super.channelRegistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// This method will be invoked only if this handler was added
// before channelActive() event is fired. If a user adds this handler
// after the channelActive() event, initialize() will be called by beforeAdd().
initialize(ctx);
super.channelActive(ctx);
}
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
// 创建读超时判断定时任务
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
// 创建写超时判断定时任务
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
// 创建读写超时判断定时任务
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
此处,我们将剖析AllIdleTimeoutTask
任务.
此任务,会判断在超时时间段内,是否有读写操作:
- 有读或者写操作,则重新创建定时任务,等待下次执行;
- 没有读或者写操作,则创建
IdleStateEvent
对象,通过ChannelHandlerContext
通知注册了用户事件触发器的handler
(即handler
重载了userEventTriggered
函数).
private final class AllIdleTimeoutTask extends AbstractIdleTask {
AllIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = allIdleTimeNanos;
if (!reading) {
nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
}
if (nextDelay <= 0) {
// Both reader and writer are idle - set a new timeout and
// notify the callback.
allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstAllIdleEvent;
firstAllIdleEvent = false;
try {
if (hasOutputChanged(ctx, first)) {
return;
}
IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Either read or write occurred before the timeout - set a new
// timeout with shorter delay.
allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
了解了IdleStateHandler
,我们接下来学习如何编写Mqtt
的心跳handler
.
Mqtt
心跳handler
以下是jetlinks
编写的Mqtt
心跳handler
代码,我们截取部分代码学习.
final class MqttPingHandler extends ChannelInboundHandlerAdapter {
private final int keepaliveSeconds;
private ScheduledFuture<?> pingRespTimeout;
MqttPingHandler(int keepaliveSeconds) {
this.keepaliveSeconds = keepaliveSeconds;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof MqttMessage)) {
ctx.fireChannelRead(msg);
return;
}
MqttMessage message = (MqttMessage) msg;
if (message.fixedHeader().messageType() == MqttMessageType.PINGREQ) {
this.handlePingReq(ctx.channel());
} else if (message.fixedHeader().messageType() == MqttMessageType.PINGRESP) {
this.handlePingResp();
} else {
ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
}
}
/**
* IdleStateHandler,在连接处于idle状态超过设定时间后,会发送IdleStateEvent
* 接收到IdleStateEvent,当前类会发送心跳包至server,保持连接
*
* @param ctx 上下文
* @param evt 事件
* @throws Exception 异常
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
// 确认监听事件为IdleStateEvent,即发送心跳包至server
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
this.sendPingReq(ctx.channel());
}
}
}
/**
* 发送心跳包至server端,并建立心跳超时断开连接任务
* 此处,先行创建心跳超时任务,后续再发送心跳包(避免收到心跳响应时,心跳超时任务未建立完成)
*
* @param channel 连接
*/
private void sendPingReq(Channel channel) {
// 创建心跳超时,断开连接任务
if (this.pingRespTimeout == null) {
this.pingRespTimeout = channel.eventLoop().schedule(() -> {
MqttFixedHeader disconnectHeader =
new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
channel.writeAndFlush(new MqttMessage(disconnectHeader)).addListener(ChannelFutureListener.CLOSE);
//TODO: what do when the connection is closed ?
}, this.keepaliveSeconds, TimeUnit.SECONDS);
}
// 创建心跳包,并发送至Mqtts Server
MqttFixedHeader pingHeader = new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
channel.writeAndFlush(new MqttMessage(pingHeader));
}
/**
* 处理ping resp,取消ping超时任务(断开连接)
*/
private void handlePingResp() {
if (this.pingRespTimeout != null && !this.pingRespTimeout.isCancelled() && !this.pingRespTimeout.isDone()) {
this.pingRespTimeout.cancel(true);
this.pingRespTimeout = null;
}
}
}
函数解析:
(1) 接收超时事件,发送心跳请求
MqttPingHandler
中重载了userEventTriggered
函数,用以接收ChannelHandlerContext
传递的事件,代码中会判断事件是否为IdleStateEvent
.
如果当前接收事件为IdleStateEvent
,则说明当前channel
在超时时间内未发生读写事件,则客户端发送Mqtt
心跳请求.
(2) 发送心跳请求,建立请求响应超时关闭连接任务
sendPingReq
函数中(以下两步操作,顺序可任意安排):
- 建立心跳请求响应超时判断任务,如果在一定时长内未接收到心跳响应,则会关闭连接;
- 构建
Mqtt
心跳包,发送至远端服务器.
(3) 取消心跳响应超时关闭连接任务
channelRead
读取数据,判断是否是Mqtt
的心跳响应包.
如果是,则执行handlePingResp
函数,取消心跳响应超时关闭连接任务.
handler
添加
ch.pipeline().addLast("idleStateHandler",
new IdleStateHandler(keepAliveTimeSeconds, keepAliveTimeSeconds, 0));
ch.pipeline().addLast("mqttPingHandler",
new MqttPingHandler(MqttClientImpl.this.clientConfig.getKeepAliveTimeSeconds()));
只需要以上两句代码,就可以完成Mqtt
心跳维持功能.