Netty(三) — EventLoopGroup和EventLoop

​ 基于Netty 4.1.34版本,分析了 Netty 中NIO相关的 EventLoopGroupEventLoop 的核心功能和实现原理,重点涵盖了JDK Selector 的性能瓶颈与 Netty 优化策略事件循环线程的阻塞与唤醒机制(含空轮询 Bug 修复方案)、IO 事件与任务调度细节,并配套流程图直观展示 EventLoop 工作流程

EventLoopGroup

在 NIO 模型中,Netty 默认的实现是 NioEventLoopGroup,它是 NioEventLoop 的管理组件,本质上就是一个线程池(每个线程绑定一个 EventLoop)。它实现的核心接口 EventExecutorGroup,具备以下特点:

  • 投递普通任务:提交Runnable到指定的 EventLoop 执行
  • 投递延时任务到EventLoop中执行
  • 轮询选择 EventLoop:通过内部的 EventExecutorChooser,为新注册的 Channel 分配下一个可用的 EventLoop

职责总结

​ EventLoopGroup 只负责:维护 EventLoop 数组轮询分配Runnable / Channel交给目标 EventLoop 线程处理
​ 具体的 IO 读写、任务执行和定时任务调度由 EventLoop 自己完成。

关键参数

io.netty.eventLoopThreads = CPU核心数 * 2

通过NioEventLoopGroup的构造方法可以看到,默认线程数为CPU核心数 * 2,可以通过上面的系统参数设置

EventLoop

EventLoop 是 Netty 最核心的组件,NIO 模式下的实现类为 NioEventLoop。 每个 NioEventLoop 在实例化时都会创建一个 Selector,所有注册到该 NioEventLoopChannel 都会绑定到它的内部 Selector;并且只会启动 一个专属线程,该线程循环执行三类任务(事件循环机制):

  1. 处理 Selector 上已就绪的 IO 事件
  2. 执行普通任务队列中的 Runnable
  3. 执行延时任务队列中到期的定时任务

Selector优化

背景

​ 调用 JDK Selector#select 系列方法时,已就绪的 IO 事件会被封装成 SelectionKey 对象,并存放到 sun.nio.ch.SelectorImpl#selectedKeys 集合中,同时通过 publicSelectedKeys 暴露给外部遍历。这两个集合都是 HashSet,在高频 IO 场景下存在几个性能问题:

  • 增删开销addremove 操作不是绝对 O(1)时间复杂度,存在hash冲突,且可能触发链表/红黑树调整

  • GC 压力:操作过程中需要频繁地创建一些辅助的数据结构(Node, TreeNode等)

  • 遍历效率:HashSet 的迭代器访问模式不够连续(链表或红黑树),CPU 缓存命中率低

优化

​ Netty 在 NioEventLoop 构造时会通过 openSelector 方法反射替换 SelectorImplselectedKeyspublicSelectedKeys,其实现为SelectedSelectionKeySet,具有如下特征:

  • 数组实现不会有其他辅助的数据结构,减少GC且是绝对的 O(1) 插入(直接放入数组末尾),没有哈希运算

  • 不支持 remove:因为一次事件循环中 Netty 会处理完所有就绪事件,所以SelectedSelectionKeySetSelector在select前都会调用reset来清空数组,避免了逐个删除的开销

  • 遍历时顺序访问数组,CPU 缓存友好

​ 这种替换极大减少了 Selector 就绪事件处理过程中的 对象创建、Hash 运算和结构调整,在高并发场景下性能收益显著

// SelectedSelectionKeySetSelector包装后的Selector,每次调用select时都进行reset
private Selector selector;
// JDK原生的Selector实现
private Selector unwrappedSelector;
// sun.nio.ch.SelectorImpl内部字段selectedKeys和publicSelectedKeys的替代品
private SelectedSelectionKeySet selectedKeys;

private final SelectorProvider provider;

Channel注册

EventLoopGroup#register 用于将 Channel 绑定到当前 EventLoop 线程,核心调用链如下

  1. io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
  2. io.netty.channel.AbstractChannel.AbstractUnsafe#register
  3. io.netty.channel.AbstractChannel.AbstractUnsafe#register0
  4. io.netty.channel.nio.AbstractNioChannel#doRegister

​ 在 AbstractNioChannel#doRegister 中,Channel 会被注册到当前 EventLoopSelector 上(此时仅完成绑定,并未关注任何事件)。自此,Channel 生命周期中的所有 IO 事件都将由该 EventLoop 线程独立处理

事件循环

​ EventLoop 线程是 Netty 的核心执行单元,不仅负责 Channel 的 I/O 事件处理,还实现了 ScheduledExecutorService 接口,可同时处理 延时任务普通 Runnable 任务

​ EventLoop线程采用 懒启动 机制:只有首次调用 execute() 提交任务时,才会通过 CAS 启动专属线程,并在 io.netty.channel.nio.NioEventLoop#run 方法中进入循环

线程的阻塞和唤醒

事件循环开始时,会先根据当前任务队列状态决定本轮的 select 策略,具体逻辑如下:

  • 有任务 → 调用 selectNow(),立即返回已就绪事件数(非阻塞)。提高任务响应速度
  • 无任务 → 返回 SelectStrategy.SELECT,随后调用阻塞的 select() 等待 IO 事件。减少CPU空转
 try {
    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
        case SelectStrategy.CONTINUE:
            continue;

        case SelectStrategy.BUSY_WAIT:
            // fall-through to SELECT since the busy-wait is not supported with NIO

        // ============= NIO只会走如下两种case ===================
        case SelectStrategy.SELECT:
            select(wakenUp.getAndSet(false));


            if (wakenUp.get()) {
                selector.wakeup();
            }
        default: // 其他策略不做处理
    }
} catch (IOException e) {
    rebuildSelector0();
    handleLoopException(e);
    continue;
}
阻塞select

核心方法NioEventLoop#select代码如下,总结下其主要功能:

  1. 超时阻塞Selector

    ​ 首先会根据最近的延时任务来算超时时间(没任务就默认1秒),保证不会因为一直堵在那里

  2. 解决早期JDK NIO的空轮询bug

    ​ 早期JDK(6,7,8) NIO的Selector在调用超时select方法时,可能触发空轮询的bug,即立即返回,造成CPU的空转,利用率飙升

    ​ Netty 为了解决这个问题,会对连续调用 select() 过程中空返回的次数进行计数。如果连续超过 512 次(io.netty.selectorAutoRebuildThreshold),说明可能触发了空轮询的 bug。此时,Netty 会重建一个新的 Selector,并将之前注册的所有 Channel 迁移到新 Selector 上,类似于给 Selector 做一次重启,避免空轮询导致的 CPU 空转

  3. 满足以下任意条件时退出阻塞循环,开始处理事件和任务

    1. IO事件准备好
    2. 外部线程触发的唤醒(oldWakenUp为true)
    3. 内部被标记为已唤醒(wakenUp标志)
    4. 存在待执行的普通Runnable任务
    5. 存在已到期的延时任务
  4. 中断响应

    如果当前线程被中断,立马退出阻塞,保证能及时响应中断信号,不被卡住

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        // 本次循环调用select的次数统计,用于检测过早返回现象(即select无效唤醒)
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        // select阻塞截止时间(deadLine)
        // 1. 取决于最近的定时任务时间,保证select不会阻塞超过这段时间,避免错过定时任务
        // 2. 没有定时任务则默认为1秒后
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

        for (;;) {
            // 定时检测Selector的时间
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            // 超时判断:如果截止时间已到或超时。保证至少调用了一次select再跳出循环
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            // 若有待执行任务且未被唤醒,立即非阻塞selectNow再跳出循环,避免长时间阻塞
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            // 调用阻塞select,最多阻塞timeoutMillis毫秒,等待IO事件或被唤醒
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt++;
            /*
             * 检测是否满足退出阻塞循环条件:
             * 1. 有IO事件准备好
             * 2. 外部线程触发的唤醒(oldWakenUp为true)
             * 3. 内部被标记为已唤醒(wakenUp标志)
             * 4. 存在待执行的普通任务
             * 5. 存在已到期的定时任务
             */
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                break;
            }
            // 中断支持,跳出循环
            if (Thread.interrupted()) {
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // select本次阻塞时长正常,超过了预期timeout
                // 说明未出现select过早返回(空轮询)问题,重置计数器避免误判
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { 
                // select连续过早返回次数超过了阈值(默认512次),则重建Selector以避免CPU空转导致的死循环
                selector = selectRebuildSelector(selectCnt);
                selectCnt = 1;
                break;
            }

            currentTimeNanos = time;
        }

        // 当select连续过早返回次数过多,打印调试日志,方便问题排查
        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);
            }
        }
    } catch (CancelledKeyException e) { // debug日志
    }
}
唤醒

addTaskWakesUpwakenUp 都用于控制 EventLoop 的唤醒逻辑。当主线程通过 execute() 向 EventLoop 提交 非 NonWakeupRunnable 任务时:

  1. 如果 addTaskWakesUpfalse(如 NioEventLoop),表示仅向任务队列添加任务不会自动唤醒,需要显式唤醒
  2. 此时会尝试将 wakenUpfalse 设置为 true,成功则调用 Selector#wakeup(),唤醒阻塞在 select() 上的 EventLoop 线程
  3. 被唤醒的线程会从阻塞中恢复,继续执行包括普通 Runnable 在内的后续任务

注意:定时任务在投递时会被包装为普通 Runnable,由该 Runnable 负责向延时队列提交任务,因此同样具备触发 wakeUp 的能力

/**
 *
 * 1. addTaskWakesUp
 *    表示向 taskQueue 添加任务时,是否能自动唤醒 EventLoop 线程
 *    - true  :添加任务即可唤醒线程(由任务队列本身负责唤醒,例如 DefaultEventLoop 使用的阻塞队列)
 *    - false :添加任务不会自动唤醒,需要手动调用 wakeup()。如 NioEventLoop,线程阻塞在 Selector#select,而不是 taskQueue
 * 
 *  NioEventLoop设为了false
 *
 */
private final boolean addTaskWakesUp;
/**
 *  一个原子标志,用于避免重复调用 Selector#wakeup()
 *    - 当需要唤醒时,如果之前未设置为 true,则设置为 true 并实际唤醒
 *    - 事件循环开始时会重置为 false
 */
private final AtomicBoolean wakenUp = new AtomicBoolean();

IO事件处理

NioEventLoop 既能管 SocketChannel,也能管 ServerSocketChannel
所以它的 processSelectedKey 方法是通用的,会去检查四种事件类型,但实际上每个 Channel 只会注册自己感兴趣的事件,因此一个 SelectionKey 只会触发自己该处理的那几种:

  • SocketChannel
    • OP_CONNECT:确保连接建立,并触发 Netty 的 ChannelActive 事件
    • OP_WRITE:把发送缓冲区里的数据刷出去
    • OP_READ:从Channel里读数据
  • ServerSocketChannel
    • OP_ACCEPT:接收新连接,并把它注册到对应的 EventLoop
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    // ... 省略SelectionKey无效的处理

    try {
        // bitmask数据结构,检测每一位可知对于的事件是否就绪
        int readyOps = k.readyOps();
        // 先处理 OP_CONNECT:确保 TCP 连接已经完成
        // SocketChannel专属:SocketChannel在connect服务端时,如果未及时连接成功,才注册OP_CONNECT事件。代码在NioSocketChannel.doConnect里
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // 取消OP_CONNECT的监听(避免JDK的OP_CONNECT未取消导致 select() 永远立即返回的bug)
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // 处理SocketChannel的OP_WRITE事件:尝试发送积压的数据
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }

        // 特殊情况:JDK 的 bug 可能导致 readyOps == 0,但实际上还是有事件需要处理
        // OP_READ(SocketChannel):从 socket 读取数据
        // OP_ACCEPT(ServerSocketChannel):接收新连接并注册到 EventLoop
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

普通和延时任务处理

​ 每个 EventLoop 都有自己的普通任务队列延时任务队列,它们在一次循环中能占用的执行时间由 ioRatio 控制。其表示 IO 处理时间在一次循环中的比例默认 50%,也就是 IO 时间 = 任务执行时间

为了减少 nanoTime() 的性能开销,Netty 会每执行 64 (硬编码的)个任务才检查一次是否超时。所以,如果某个 Runnable 本身执行很慢,就会拖长任务阶段的时间,从而影响 IO 事件的及时处理

/**
 * IO 事件处理时间占总循环时间的比例(百分比)
 * 
 * EventLoop 既要处理 Channel 的 IO 事件,也要执行普通任务(Runnable)和定时任务,
 * 这个比例决定了一次循环中 IO 事件和任务执行的时间分配,默认 50:50
 */
private volatile int ioRatio = 50;
// 当前EventLoop线程的普通任务队列(调用 execute() 提交的 Runnable 会进入这里)
private final Queue<Runnable> taskQueue;
// 当前EventLoop线程的定时任务队列(小顶堆),定时任务会封装成 ScheduledFutureTask 放在这里
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

protected boolean runAllTasks(long timeoutNanos) {
    // 把已到期的定时任务转移到普通任务队列
    fetchFromScheduledTaskQueue();
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }

    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        // 安全执行任务(捕获并吞掉 Throwable,防止任务异常影响循环)
        safeExecute(task);

        runTasks++;

        // 每执行 64 个任务检查一次是否超时(减少调用 nanoTime() 的频率,避免性能开销)
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) { // 超时就结束任务执行
                break;
            }
        }

        task = pollTask();
        if (task == null) { // 没有任务了,直接跳出循环
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    // after hook:即执行tailTasks里的任务
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

shutdown检测

​ 每轮循环结束时都会检测 isShuttingDown(),以实现 优雅关闭。其主要做了如下几件事:

  • 关闭当前 EventLoop 管理的所有 Channel
  • 取消所有定时任务
  • 执行剩余普通任务,无任务时运行 shutdownHooks
  • 满足静默期或超时条件后才退出线程

总结

​ 综合上面的分析,可发现Netty 在 NioEventLoop 上做了不少魔改和优化,不仅绕开了 JDK NIO 的一些坑,还顺带提升了性能:

  • 先说空轮询 bug —— Netty 会监控空轮询的次数,一旦发现触发了这个问题,就直接重建 Selector,彻底规避早期 JDK NIO 里 Selector.select() 立刻返回导致的 CPU 空转
  • 再说提速 —— 它会用反射把 JDK Selector 内部的关键字段selectedKeyspublicSelectedKeys)换成自己优化过的版本,这样不仅处理速度更快,还能明显减少 GC 压力

另外,EventLoop 线程跟普通的线程池不一样:

  • 每个 EventLoop 都有自己独立的任务队列和延时队列
  • 普通任务的特点是多线程投递、单线程消费,所以Netty 选择用 MpscQueue(多生产者单消费者的无锁队列)来减少竞争和锁开销
  • 延时任务都是 EventLoop 自己投递(外部投递也会包装成普通 Runnable 再投递),这种单线程生产和消费的场景下,Netty 就用上了自己实现的 无锁 DefaultPriorityQueue 来处理延时任务,既简单又高效

事件循环流程图

netty-eventloop-process.png

相关链接