Netty(三) — EventLoopGroup和EventLoop
基于Netty 4.1.34版本,分析了 Netty 中NIO相关的 EventLoopGroup 与 EventLoop 的核心功能和实现原理,重点涵盖了JDK Selector 的性能瓶颈与 Netty 优化策略、事件循环线程的阻塞与唤醒机制(含空轮询 Bug 修复方案)、IO 事件与任务调度细节,并配套流程图直观展示 EventLoop 工作流程
EventLoopGroup
在 NIO 模型中,Netty 默认的实现是 NioEventLoopGroup,它是 NioEventLoop 的管理组件,本质上就是一个线程池(每个线程绑定一个 EventLoop)。它实现的核心接口 EventExecutorGroup,具备以下特点:
- 投递普通任务:提交Runnable到指定的 EventLoop 执行
- 投递延时任务到EventLoop中执行
- 轮询选择 EventLoop:通过内部的 EventExecutorChooser,为新注册的 Channel 分配下一个可用的 EventLoop
职责总结:
EventLoopGroup 只负责:维护 EventLoop 数组 → 轮询分配Runnable / Channel → 交给目标 EventLoop 线程处理
具体的 IO 读写、任务执行和定时任务调度由 EventLoop 自己完成。
关键参数
io.netty.eventLoopThreads = CPU核心数 * 2
通过NioEventLoopGroup的构造方法可以看到,默认线程数为CPU核心数 * 2,可以通过上面的系统参数设置
EventLoop
EventLoop 是 Netty 最核心的组件,NIO 模式下的实现类为 NioEventLoop。 每个 NioEventLoop 在实例化时都会创建一个 Selector,所有注册到该 NioEventLoop 的 Channel 都会绑定到它的内部 Selector;并且只会启动 一个专属线程,该线程循环执行三类任务(事件循环机制):
- 处理
Selector上已就绪的 IO 事件 - 执行普通任务队列中的 Runnable
- 执行延时任务队列中到期的定时任务
Selector优化
背景
调用 JDK Selector#select 系列方法时,已就绪的 IO 事件会被封装成 SelectionKey 对象,并存放到 sun.nio.ch.SelectorImpl#selectedKeys 集合中,同时通过 publicSelectedKeys 暴露给外部遍历。这两个集合都是 HashSet,在高频 IO 场景下存在几个性能问题:
增删开销:
add、remove操作不是绝对 O(1)时间复杂度,存在hash冲突,且可能触发链表/红黑树调整GC 压力:操作过程中需要频繁地创建一些辅助的数据结构(Node, TreeNode等)
遍历效率:HashSet 的迭代器访问模式不够连续(链表或红黑树),CPU 缓存命中率低
优化
Netty 在 NioEventLoop 构造时会通过 openSelector 方法反射替换 SelectorImpl 的 selectedKeys 和 publicSelectedKeys,其实现为SelectedSelectionKeySet,具有如下特征:
数组实现:不会有其他辅助的数据结构,减少GC。且是绝对的 O(1) 插入(直接放入数组末尾),没有哈希运算
不支持 remove:因为一次事件循环中 Netty 会处理完所有就绪事件,所以SelectedSelectionKeySetSelector在select前都会调用reset来清空数组,避免了逐个删除的开销
遍历时顺序访问数组,CPU 缓存友好
这种替换极大减少了 Selector 就绪事件处理过程中的 对象创建、Hash 运算和结构调整,在高并发场景下性能收益显著
// SelectedSelectionKeySetSelector包装后的Selector,每次调用select时都进行reset
private Selector selector;
// JDK原生的Selector实现
private Selector unwrappedSelector;
// sun.nio.ch.SelectorImpl内部字段selectedKeys和publicSelectedKeys的替代品
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;
Channel注册
EventLoopGroup#register 用于将 Channel 绑定到当前 EventLoop 线程,核心调用链如下
- io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
- io.netty.channel.AbstractChannel.AbstractUnsafe#register
- io.netty.channel.AbstractChannel.AbstractUnsafe#register0
- io.netty.channel.nio.AbstractNioChannel#doRegister
在 AbstractNioChannel#doRegister 中,Channel 会被注册到当前 EventLoop 的 Selector 上(此时仅完成绑定,并未关注任何事件)。自此,Channel 生命周期中的所有 IO 事件都将由该 EventLoop 线程独立处理
事件循环
EventLoop 线程是 Netty 的核心执行单元,不仅负责 Channel 的 I/O 事件处理,还实现了 ScheduledExecutorService 接口,可同时处理 延时任务 与 普通 Runnable 任务。
EventLoop线程采用 懒启动 机制:只有首次调用 execute() 提交任务时,才会通过 CAS 启动专属线程,并在 io.netty.channel.nio.NioEventLoop#run 方法中进入循环
线程的阻塞和唤醒
事件循环开始时,会先根据当前任务队列状态决定本轮的 select 策略,具体逻辑如下:
- 有任务 → 调用
selectNow(),立即返回已就绪事件数(非阻塞)。提高任务响应速度 - 无任务 → 返回
SelectStrategy.SELECT,随后调用阻塞的select()等待 IO 事件。减少CPU空转
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
// ============= NIO只会走如下两种case ===================
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default: // 其他策略不做处理
}
} catch (IOException e) {
rebuildSelector0();
handleLoopException(e);
continue;
}
阻塞select
核心方法NioEventLoop#select代码如下,总结下其主要功能:
超时阻塞Selector
首先会根据最近的延时任务来算超时时间(没任务就默认1秒),保证不会因为一直堵在那里
-
早期JDK(6,7,8) NIO的Selector在调用超时select方法时,可能触发空轮询的bug,即立即返回,造成CPU的空转,利用率飙升
Netty 为了解决这个问题,会对连续调用
select()过程中空返回的次数进行计数。如果连续超过 512 次(io.netty.selectorAutoRebuildThreshold),说明可能触发了空轮询的 bug。此时,Netty 会重建一个新的 Selector,并将之前注册的所有 Channel 迁移到新 Selector 上,类似于给 Selector 做一次重启,避免空轮询导致的 CPU 空转 满足以下任意条件时退出阻塞循环,开始处理事件和任务
- 有IO事件准备好
- 外部线程触发的唤醒(oldWakenUp为true)
- 内部被标记为已唤醒(wakenUp标志)
- 存在待执行的普通Runnable任务
- 存在已到期的延时任务
中断响应
如果当前线程被中断,立马退出阻塞,保证能及时响应中断信号,不被卡住
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
// 本次循环调用select的次数统计,用于检测过早返回现象(即select无效唤醒)
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// select阻塞截止时间(deadLine)
// 1. 取决于最近的定时任务时间,保证select不会阻塞超过这段时间,避免错过定时任务
// 2. 没有定时任务则默认为1秒后
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 定时检测Selector的时间
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 超时判断:如果截止时间已到或超时。保证至少调用了一次select再跳出循环
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// 若有待执行任务且未被唤醒,立即非阻塞selectNow再跳出循环,避免长时间阻塞
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 调用阻塞select,最多阻塞timeoutMillis毫秒,等待IO事件或被唤醒
int selectedKeys = selector.select(timeoutMillis);
selectCnt++;
/*
* 检测是否满足退出阻塞循环条件:
* 1. 有IO事件准备好
* 2. 外部线程触发的唤醒(oldWakenUp为true)
* 3. 内部被标记为已唤醒(wakenUp标志)
* 4. 存在待执行的普通任务
* 5. 存在已到期的定时任务
*/
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
// 中断支持,跳出循环
if (Thread.interrupted()) {
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// select本次阻塞时长正常,超过了预期timeout
// 说明未出现select过早返回(空轮询)问题,重置计数器避免误判
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// select连续过早返回次数超过了阈值(默认512次),则重建Selector以避免CPU空转导致的死循环
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
// 当select连续过早返回次数过多,打印调试日志,方便问题排查
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) { // debug日志
}
}
唤醒
addTaskWakesUp 与 wakenUp 都用于控制 EventLoop 的唤醒逻辑。当主线程通过 execute() 向 EventLoop 提交 非 NonWakeupRunnable 任务时:
- 如果
addTaskWakesUp为false(如 NioEventLoop),表示仅向任务队列添加任务不会自动唤醒,需要显式唤醒 - 此时会尝试将
wakenUp从false设置为true,成功则调用Selector#wakeup(),唤醒阻塞在select()上的 EventLoop 线程 - 被唤醒的线程会从阻塞中恢复,继续执行包括普通 Runnable 在内的后续任务
注意:定时任务在投递时会被包装为普通 Runnable,由该 Runnable 负责向延时队列提交任务,因此同样具备触发 wakeUp 的能力
/**
*
* 1. addTaskWakesUp
* 表示向 taskQueue 添加任务时,是否能自动唤醒 EventLoop 线程
* - true :添加任务即可唤醒线程(由任务队列本身负责唤醒,例如 DefaultEventLoop 使用的阻塞队列)
* - false :添加任务不会自动唤醒,需要手动调用 wakeup()。如 NioEventLoop,线程阻塞在 Selector#select,而不是 taskQueue
*
* NioEventLoop设为了false
*
*/
private final boolean addTaskWakesUp;
/**
* 一个原子标志,用于避免重复调用 Selector#wakeup()
* - 当需要唤醒时,如果之前未设置为 true,则设置为 true 并实际唤醒
* - 事件循环开始时会重置为 false
*/
private final AtomicBoolean wakenUp = new AtomicBoolean();
IO事件处理
NioEventLoop 既能管 SocketChannel,也能管 ServerSocketChannel。
所以它的 processSelectedKey 方法是通用的,会去检查四种事件类型,但实际上每个 Channel 只会注册自己感兴趣的事件,因此一个 SelectionKey 只会触发自己该处理的那几种:
- SocketChannel
- OP_CONNECT:确保连接建立,并触发 Netty 的
ChannelActive事件 - OP_WRITE:把发送缓冲区里的数据刷出去
- OP_READ:从Channel里读数据
- OP_CONNECT:确保连接建立,并触发 Netty 的
- ServerSocketChannel
- OP_ACCEPT:接收新连接,并把它注册到对应的
EventLoop上
- OP_ACCEPT:接收新连接,并把它注册到对应的
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// ... 省略SelectionKey无效的处理
try {
// bitmask数据结构,检测每一位可知对于的事件是否就绪
int readyOps = k.readyOps();
// 先处理 OP_CONNECT:确保 TCP 连接已经完成
// SocketChannel专属:SocketChannel在connect服务端时,如果未及时连接成功,才注册OP_CONNECT事件。代码在NioSocketChannel.doConnect里
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// 取消OP_CONNECT的监听(避免JDK的OP_CONNECT未取消导致 select() 永远立即返回的bug)
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// 处理SocketChannel的OP_WRITE事件:尝试发送积压的数据
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
// 特殊情况:JDK 的 bug 可能导致 readyOps == 0,但实际上还是有事件需要处理
// OP_READ(SocketChannel):从 socket 读取数据
// OP_ACCEPT(ServerSocketChannel):接收新连接并注册到 EventLoop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
普通和延时任务处理
每个 EventLoop 都有自己的普通任务队列和延时任务队列,它们在一次循环中能占用的执行时间由 ioRatio 控制。其表示 IO 处理时间在一次循环中的比例,默认 50%,也就是 IO 时间 = 任务执行时间
为了减少 nanoTime() 的性能开销,Netty 会每执行 64 (硬编码的)个任务才检查一次是否超时。所以,如果某个 Runnable 本身执行很慢,就会拖长任务阶段的时间,从而影响 IO 事件的及时处理
/**
* IO 事件处理时间占总循环时间的比例(百分比)
*
* EventLoop 既要处理 Channel 的 IO 事件,也要执行普通任务(Runnable)和定时任务,
* 这个比例决定了一次循环中 IO 事件和任务执行的时间分配,默认 50:50
*/
private volatile int ioRatio = 50;
// 当前EventLoop线程的普通任务队列(调用 execute() 提交的 Runnable 会进入这里)
private final Queue<Runnable> taskQueue;
// 当前EventLoop线程的定时任务队列(小顶堆),定时任务会封装成 ScheduledFutureTask 放在这里
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
protected boolean runAllTasks(long timeoutNanos) {
// 把已到期的定时任务转移到普通任务队列
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
// 安全执行任务(捕获并吞掉 Throwable,防止任务异常影响循环)
safeExecute(task);
runTasks++;
// 每执行 64 个任务检查一次是否超时(减少调用 nanoTime() 的频率,避免性能开销)
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) { // 超时就结束任务执行
break;
}
}
task = pollTask();
if (task == null) { // 没有任务了,直接跳出循环
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
// after hook:即执行tailTasks里的任务
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
shutdown检测
每轮循环结束时都会检测 isShuttingDown(),以实现 优雅关闭。其主要做了如下几件事:
- 关闭当前 EventLoop 管理的所有 Channel
- 取消所有定时任务
- 执行剩余普通任务,无任务时运行 shutdownHooks
- 满足静默期或超时条件后才退出线程
总结
综合上面的分析,可发现Netty 在 NioEventLoop 上做了不少魔改和优化,不仅绕开了 JDK NIO 的一些坑,还顺带提升了性能:
- 先说空轮询 bug —— Netty 会监控空轮询的次数,一旦发现触发了这个问题,就直接重建 Selector,彻底规避早期 JDK NIO 里
Selector.select()立刻返回导致的 CPU 空转 - 再说提速 —— 它会用反射把 JDK Selector 内部的关键字段(
selectedKeys、publicSelectedKeys)换成自己优化过的版本,这样不仅处理速度更快,还能明显减少 GC 压力
另外,EventLoop 线程跟普通的线程池不一样:
- 每个 EventLoop 都有自己独立的任务队列和延时队列
- 普通任务的特点是多线程投递、单线程消费,所以Netty 选择用 MpscQueue(多生产者单消费者的无锁队列)来减少竞争和锁开销
- 延时任务都是 EventLoop 自己投递(外部投递也会包装成普通 Runnable 再投递),这种单线程生产和消费的场景下,Netty 就用上了自己实现的 无锁
DefaultPriorityQueue来处理延时任务,既简单又高效
事件循环流程图