Netty源码分析——Reactor的processSelectedKeys
上一篇我们已经看过了Reactor轮训注册到selector的channel。这篇看下如何处理这些IO事件。
process
run
方法中:
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
这里的ioRatio是指处理io事件的比例。默认50,也就是说,如果花5s处理io事件,就花5s处理任务。这里不细说,重点看processSelectedKeys
:
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
分两种,第一种是优化过的,第二种是普通的。Netty会尝试获取权限去操作原生Selector
,如果可以,selectedKeys
不为null,都是走的优化过的处理方式。我们看下如何操作和处理原生Selector
。
优化原生selector
初始化NioEventLoop
时,会去打开一个原生的Selector
:
//初始化优化过的selectionKeySet
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
//利用反射修改这两个字段,使用替换过的KeySet
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
//如果出现异常,则selectedKeys为null,返回一个原生selector
selectedKeys = null;
Exception e = (Exception) maybeException;
return new SelectorTuple(unwrappedSelector);
}
//如果没有出现异常,把设置好的KeySet维护到NioEventLoop的selectedKeys字段里
//这里的selectedKeySet就是设置到Selector中的selectedKeySet,也就是说,select以后key会直接放到NioEventLoop里
selectedKeys = selectedKeySet;
继续看一下,原生selector里这俩字段:
protected Set<SelectionKey> selectedKeys = new HashSet();
protected HashSet<SelectionKey> keys = new HashSet();
private Set<SelectionKey> publicKeys;
private Set<SelectionKey> publicSelectedKeys;
protected SelectorImpl(SelectorProvider var1) {
super(var1);
if (Util.atBugLevel("1.4")) {
this.publicKeys = this.keys;
this.publicSelectedKeys = this.selectedKeys;
} else {
this.publicKeys = Collections.unmodifiableSet(this.keys);
this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
}
}
这两个就是包装了一下的HashSet
。selector
在调用select()
族方法的时候,如果有IO事件发生,就会往里面的两个field中塞相应的selectionKey
。
这里selectedKeys
就是,可以准备被处理的key集合,而publicSelectedKeys
实际上就是selectedKeys
(代码里我们也可以看出来),只不过这个publicSelectedKeys
会通过一些方法暴露出去使用(这也是为什么publicKeys
被设置为不可修改 —— 不可添加也不能移除)。
Netty的SelectedSelectionKeySet
如何做到优化,这个还是要看源码:
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys;
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
@Override
public boolean remove(Object o) {
return false;
}
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}
return true;
}
private void increaseCapacity() {
SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
System.arraycopy(keys, 0, newKeys, 0, size);
keys = newKeys;
}
}
很简单,就是把新的SelectionKey
放到了下一个位置,如果满了就扩容。这里好处是,add操作永远都是O(1)的时间复杂度,而HashSet
需要更长的add时间。
优化到此为止,继续说Process,我们看一下processSelectedKeysOptimized
:
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
这里其实也能体会到优化过的SelectedSelectionKeySet
的好处,这个结构非常巧妙,因为我们并不需要根据hash值(或者说Key)去取,而是遍历,这么说来,数组的性能远大于HashSet。
这里有一步:selectedKeys.keys[i] = null;
,这里是解决了一个bug,我之前并没了解过,也是在读别人的文章时发现的。这里解释一下,这是解决有些Channel已经关闭了,但是在数组中可能仍然存在SelectionKey
,比如存在在整个数组的最末端,比如数组长度100,这个SelectionKey
就放在第99这个槽位上。如果说这个时间点往后,一直只有10个SelectionKey
准备好被处理,那么这些最末尾的SelectionKey
由于没被删除,会一直存在影响GC。而且由于准备好的SelectionKey
一直是10个,程序也不会处理到最后这几个Key,不会发现对应的Channel
已经关掉了而把这些key置为null。
这里我们注意一下SelectedSelectionKeySet
这个结构,这个结构是不允许remove
操作的。这也解释了为什么这个SelectionKey
会一直存在。
这里其实我还有一个疑问:for (int i = 0; i < selectedKeys.size; ++i)
语句,是轮训整个selectedKeys.size
,就算这个SelectionKey
是放在尾部,也一定会被遍历到。为什么官方解释上说是不会被遍历到呢?而且,下面的要说的select again过程同样会进行一次reset操作(下面会细说),那么这里还有必要做这个置空操作么?希望有了解的朋友说一下。
继续来说,取出来的k上绑定的对象,这里不考虑这个NioTask
,看AbstractNioChannel
。这里要从绑定说起了。
绑定
这里要追溯到EventLoopGroup
注册一个Channel
这里了。这里不一步一步看了,直接看AbstractNioChannel#doRegister
:
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
javaChannel()
的返回值是java原生channel,这里其实是将AbstractNioChannel
内部的jdk类SelectableChannel
对象注册到原生Selector
对象上去,并且将AbstractNioChannel
作为SelectableChannel
对象的一个attachment
附属上,这样再jdk轮询出某条SelectableChannel
有IO事件发生时,就可以直接取出AbstractNioChannel
进行后续操作。
具体的processSelectedKey(SelectionKey k, AbstractNioChannel ch)
我们放到另外一篇里讲(Boss和Worker),这里说下基本流程:
- 对于
Boss
来说,轮询到的是基本上就是连接事件,后续的事情就通过他的pipeline
将连接扔给一个Worker
处理。 - 对于
Worker
来说,轮询到的基本上都是io读写事件,后续的事情就是通过他的pipeline
将读取到的字节流传递给每个channelHandler
来处理。
这些东西都会在Boss和Worker篇中说到。
跳过这些,继续看processSelectedKeysOptimized
:
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
private void selectAgain() {
needsToSelectAgain = false;
try {
selector.selectNow();
} catch (Throwable t) {
}
}
select again
过程:把当前往后的所有位置的SelectionKey都置为null并且立即执行一次不等待的select。什么时候需要select again,这个在cancel
方法里:
void cancel(SelectionKey key) {
key.cancel();
cancelledKeys ++;
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
needsToSelectAgain = true;
}
}
//调用cancel的地方,就是接触注册方法
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
}
在Channel
从selector
上移除的时候,调用cancel
函数将key取消,并且当被去掉的key到达CLEANUP_INTERVAL
的时候,设置needsToSelectAgain
为true,CLEANUP_INTERVAL
默认值为256。也就是说,对于每个NioEventLoop
而言,每隔256个Channel
从selector
上移除的时候,就标记needsToSelectAgain
为true。
上面也说了select again
的过程,这里这么做主要是把还没轮训过的SelectionKey
清理掉。结合上面我们提出过的问题,主要是防止一些SelectionKey
一直在尾部由于Channel
已经关闭而永远不更新,保证SelectionKey
尽可能有效。
这里其实上面已经提出过问题了,就是说既然我们在select again
触发时,会清理掉一些SelectionKey
而且一定包含数组尾部的,那还有必要在Process循环开始的时候进行一次置为null的操作么?
后记
后面又看了一下上文中最后的问题,Process循环开始的时候进行一次置为null的操作,应该是及时清理(当前位置),而Channel
在某个Selector
上解除注册256次以后,触发的select again
清理,应该主要目的是清理尾部(当前位置以后的所有)key集合。