Netty源码分析——Reactor的processSelectedKeys

Netty源码分析——Reactor的processSelectedKeys

上一篇我们已经看过了Reactor轮训注册到selector的channel。这篇看下如何处理这些IO事件。

process

run方法中:

final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}

这里的ioRatio是指处理io事件的比例。默认50,也就是说,如果花5s处理io事件,就花5s处理任务。这里不细说,重点看processSelectedKeys

if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}

分两种,第一种是优化过的,第二种是普通的。Netty会尝试获取权限去操作原生Selector,如果可以,selectedKeys不为null,都是走的优化过的处理方式。我们看下如何操作和处理原生Selector

优化原生selector

初始化NioEventLoop时,会去打开一个原生的Selector

//初始化优化过的selectionKeySet
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
//利用反射修改这两个字段,使用替换过的KeySet
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}

selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
//如果出现异常,则selectedKeys为null,返回一个原生selector
selectedKeys = null;
Exception e = (Exception) maybeException;
return new SelectorTuple(unwrappedSelector);
}
//如果没有出现异常,把设置好的KeySet维护到NioEventLoop的selectedKeys字段里
//这里的selectedKeySet就是设置到Selector中的selectedKeySet,也就是说,select以后key会直接放到NioEventLoop里
selectedKeys = selectedKeySet;

继续看一下,原生selector里这俩字段:

protected Set<SelectionKey> selectedKeys = new HashSet();
protected HashSet<SelectionKey> keys = new HashSet();
private Set<SelectionKey> publicKeys;
private Set<SelectionKey> publicSelectedKeys;
protected SelectorImpl(SelectorProvider var1) {
super(var1);
if (Util.atBugLevel("1.4")) {
this.publicKeys = this.keys;
this.publicSelectedKeys = this.selectedKeys;
} else {
this.publicKeys = Collections.unmodifiableSet(this.keys);
this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
}
}

这两个就是包装了一下的HashSetselector在调用select()族方法的时候,如果有IO事件发生,就会往里面的两个field中塞相应的selectionKey

这里selectedKeys就是,可以准备被处理的key集合,而publicSelectedKeys实际上就是selectedKeys(代码里我们也可以看出来),只不过这个publicSelectedKeys会通过一些方法暴露出去使用(这也是为什么publicKeys被设置为不可修改 —— 不可添加也不能移除)。

Netty的SelectedSelectionKeySet如何做到优化,这个还是要看源码:

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys;
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
@Override
public boolean remove(Object o) {
return false;
}
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}

keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}

return true;
}
private void increaseCapacity() {
SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
System.arraycopy(keys, 0, newKeys, 0, size);
keys = newKeys;
}
}

很简单,就是把新的SelectionKey放到了下一个位置,如果满了就扩容。这里好处是,add操作永远都是O(1)的时间复杂度,而HashSet需要更长的add时间。

优化到此为止,继续说Process,我们看一下processSelectedKeysOptimized

for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}

这里其实也能体会到优化过的SelectedSelectionKeySet的好处,这个结构非常巧妙,因为我们并不需要根据hash值(或者说Key)去取,而是遍历,这么说来,数组的性能远大于HashSet。

这里有一步:selectedKeys.keys[i] = null;,这里是解决了一个bug,我之前并没了解过,也是在读别人的文章时发现的。这里解释一下,这是解决有些Channel已经关闭了,但是在数组中可能仍然存在SelectionKey,比如存在在整个数组的最末端,比如数组长度100,这个SelectionKey就放在第99这个槽位上。如果说这个时间点往后,一直只有10个SelectionKey准备好被处理,那么这些最末尾的SelectionKey由于没被删除,会一直存在影响GC。而且由于准备好的SelectionKey一直是10个,程序也不会处理到最后这几个Key,不会发现对应的Channel已经关掉了而把这些key置为null。

这里我们注意一下SelectedSelectionKeySet这个结构,这个结构是不允许remove操作的。这也解释了为什么这个SelectionKey会一直存在。

这里其实我还有一个疑问:for (int i = 0; i < selectedKeys.size; ++i)语句,是轮训整个selectedKeys.size,就算这个SelectionKey是放在尾部,也一定会被遍历到。为什么官方解释上说是不会被遍历到呢?而且,下面的要说的select again过程同样会进行一次reset操作(下面会细说),那么这里还有必要做这个置空操作么?希望有了解的朋友说一下。

继续来说,取出来的k上绑定的对象,这里不考虑这个NioTask,看AbstractNioChannel。这里要从绑定说起了。

绑定

这里要追溯到EventLoopGroup注册一个Channel这里了。这里不一步一步看了,直接看AbstractNioChannel#doRegister

boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}

javaChannel()的返回值是java原生channel,这里其实是将AbstractNioChannel内部的jdk类SelectableChannel对象注册到原生Selector对象上去,并且将AbstractNioChannel作为SelectableChannel对象的一个attachment附属上,这样再jdk轮询出某条SelectableChannel有IO事件发生时,就可以直接取出AbstractNioChannel进行后续操作。

具体的processSelectedKey(SelectionKey k, AbstractNioChannel ch)我们放到另外一篇里讲(Boss和Worker),这里说下基本流程:

  1. 对于Boss来说,轮询到的是基本上就是连接事件,后续的事情就通过他的pipeline将连接扔给一个Worker处理。
  2. 对于Worker来说,轮询到的基本上都是io读写事件,后续的事情就是通过他的pipeline将读取到的字节流传递给每个channelHandler来处理。

这些东西都会在Boss和Worker篇中说到。

跳过这些,继续看processSelectedKeysOptimized

if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}

private void selectAgain() {
needsToSelectAgain = false;
try {
selector.selectNow();
} catch (Throwable t) {
}
}

select again过程:把当前往后的所有位置的SelectionKey都置为null并且立即执行一次不等待的select。什么时候需要select again,这个在cancel方法里:

void cancel(SelectionKey key) {
key.cancel();
cancelledKeys ++;
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
needsToSelectAgain = true;
}
}

//调用cancel的地方,就是接触注册方法
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
}

Channelselector上移除的时候,调用cancel函数将key取消,并且当被去掉的key到达CLEANUP_INTERVAL的时候,设置needsToSelectAgain为true,CLEANUP_INTERVAL默认值为256。也就是说,对于每个NioEventLoop而言,每隔256个Channelselector上移除的时候,就标记needsToSelectAgain为true。

上面也说了select again的过程,这里这么做主要是把还没轮训过的SelectionKey清理掉。结合上面我们提出过的问题,主要是防止一些SelectionKey一直在尾部由于Channel已经关闭而永远不更新,保证SelectionKey尽可能有效。

这里其实上面已经提出过问题了,就是说既然我们在select again触发时,会清理掉一些SelectionKey而且一定包含数组尾部的,那还有必要在Process循环开始的时候进行一次置为null的操作么?

后记

后面又看了一下上文中最后的问题,Process循环开始的时候进行一次置为null的操作,应该是及时清理(当前位置),而Channel在某个Selector上解除注册256次以后,触发的select again清理,应该主要目的是清理尾部(当前位置以后的所有)key集合。

    原文作者:java集合源码分析
    原文地址: https://juejin.im/entry/5b0c06776fb9a00a04068604
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞