Netty(四) — 异步编程的实现和思考

​ 分析了 Netty 的 Future 与 Promise 的实现与设计理念,并在此基础上总结了异步编程概念的应用及个人思考

Future

​ Netty 的 Future 在继承 JDK 标准接口 java.util.concurrent.Future 的基础上,扩展了一系列方法。其中最核心的扩展是 addListeners 接口

​ JDK的 Future 主要通过阻塞或轮询方式获取结果,而Netty 提供的 GenericFutureListener 支持在 Future 完成时(无论是成功、失败还是取消)自动触发回调

Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

Promise

​ Promise继承了Netty的Future接口,同时新增了如下接口,用于主动设置结果

public interface Promise<V> extends Future<V> {

    /**
     * 设置结果并标记为成功状态,同时通知所有监听器。不成功会抛异常
     */
    Promise<V> setSuccess(V result);

    /**
     * 尝试设置结果并标记为成功状态
     */
    boolean trySuccess(V result);

    /**
     * 设置异常并标记为失败状态,同时通知所有监听器。不成功会抛异常
     */
    Promise<V> setFailure(Throwable cause);

    /**
     * 尝试设置异常并标记为失败状态,同时通知所有监听器。
     *
     */
    boolean tryFailure(Throwable cause);
}

DefaultPromise

Netty中对Promise的默认实现类为DefaultPromise,我们看一下一些核心功能是如何实现的

核心字段
  /**
   * listener 嵌套调用的最大深度。
   * 
   * Netty 在触发回调时,允许 listener 调用 listener。
   * 为避免递归过深导致 StackOverflow,这里限制最大调用深度,默认值为 8
   * 可通过系统属性 "io.netty.defaultPromise.maxListenerStackDepth" 修改
   */
  private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
          SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
  // result 字段的原子更新器,用于实现无锁 CAS 更新
  @SuppressWarnings("rawtypes")
  private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
          AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
  // void 类型 Promise 的统一成功标记
  private static final Object SUCCESS = new Object();
  // 中间状态:标记当前 Promise 已经不可取消(Uncancellable)
  private static final Object UNCANCELLABLE = new Object();
  // canceled状态的固定结果,封装了CancellationException异常
  private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
          new CancellationException(), DefaultPromise.class, "cancel(...)"));

  // Promise 当前的执行结果(既用来表示最终的结果,也可用来获取当前Promise的执行状态)
  private volatile Object result;
  // listener的执行器
  private final EventExecutor executor;
  // 多个listeners是DefaultFutureListeners,单个是GenericFutureListener
  private Object listeners;

  // 等待 Promise 完成的线程数
  private short waiters;
  // 标记当前是否正在通知 listener。避免并发下重复触发
  private boolean notifyingListeners;
阻塞

​ 实现比较简单,老一套的状态检查 + Object#wait 挂起等待操作。只是调用时需要确保当前调用线程不能是该 Promise 绑定的 EventExecutor 线程

​ 其余的超时阻塞也差不多,没必要看了

public Promise<V> await() throws InterruptedException {
        if (isDone()) {
            return this;
        }

        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }

        // 确保当前Promise的EventExecutor线程不能wait
        checkDeadLock();

        synchronized (this) {
            // 通过检测result字段判断是否已完结
            while (!isDone()) {
                // waiters++
                incWaiters();
                try {
                    // Object的wait方法
                    wait();
                } finally {
                    decWaiters();
                }
            }
        }
        return this;
    }
listener触发

​ 整体逻辑不复杂,主要有以下几个关键点:

  • 线程调度:保证 Listener 始终在当前 Promise 绑定的 EventExecutor 线程中执行
  • 递归保护:通过 ThreadLocal 维护调用栈深度,防止过深的嵌套触发
  • 并发控制:依赖 synchronizednotifyingListeners 标志位,确保同一批 Listener 不会被重复执行
private void notifyListeners() {
    EventExecutor executor = executor();
    // 确保 listener 在对应的 EventExecutor 线程中执行
    if (executor.inEventLoop()) {
        final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
        final int stackDepth = threadLocals.futureListenerStackDepth();
        // 使用 ThreadLocal 跟踪调用栈深度,避免递归过深导致栈溢出
        if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
            threadLocals.setFutureListenerStackDepth(stackDepth + 1);
            try {
                notifyListenersNow();
            } finally {
                threadLocals.setFutureListenerStackDepth(stackDepth);
            }
            return;
        }
    }

    safeExecute(executor, new Runnable() {
        @Override
        public void run() {
            notifyListenersNow();
        }
    });
}

private void notifyListenersNow() {
    Object listeners;
    synchronized (this) {
        // 已在执行 listener 或没有待处理的 listener,直接返回
        if (notifyingListeners || this.listeners == null) {
            return;
        }
        notifyingListeners = true;
        listeners = this.listeners;
        this.listeners = null;
    }
    for (;;) {
        // 首先安全的调用所有listener(不会抛出异常)
        if (listeners instanceof DefaultFutureListeners) {
            notifyListeners0((DefaultFutureListeners) listeners);
        } else {
            notifyListener0(this, (GenericFutureListener<?>) listeners);
        }
        synchronized (this) {
            if (this.listeners == null) {
                // 本轮 listener 全部执行完成,重置状态并结束
                notifyingListeners = false;
                return;
            }
            // 期间可能有新的 listener 被添加,继续执行
            listeners = this.listeners;
            this.listeners = null;
        }
    }
}
其它操作

​ 其它设置结果的接口基本都是CAS更新result字段,再notify所有的waiter线程和内部的listeners。都比较简单就不展示了

总结

Future 本质上是 只读视图,而其子接口 Promise 在此基础上增加了 写能力,允许调用方主动设置结果。

与JDK的FutureTask相比,Netty 的 DefaultPromise 实现要简单的多:

  • 状态与结果合一:通过一个 result 字段同时表示状态与保存结果,而不是维护一整套复杂的状态机
  • 同步机制更轻量:阻塞与唤醒基于 Object#wait / notify,由 synchronized 保证并发安全,不依赖 AQS
  • 事件驱动友好:内置回调机制(addListener),与 Netty 的异步/事件模型天然契合

netty-future-state.png

思考

​ 通过前面的实现分析可以看到,DefaultPromise 本身非常轻量,所以 Netty 才能放心地在框架里到处使用 Future 和 Promise。它们已经不再是只有做异步操作时才会用到的工具,而更像是一种通用的 异步编程规范

​ 无论一个接口内部到底是不是异步实现的,只要它返回一个 Future,调用方就能用统一的方式来判断:任务是不是完成了、结果是否正确、有没有抛异常。如果还没完成,还可以配合定时任务、超时控制,甚至加上 fallback 策略,让调用逻辑更健壮

​ 更加灵活的是我们甚至可以 先有 Promise,后有结果哪怕操作还没开始,就能提前创建好一个 Promise,再挂上监听器,等未来某个时刻真正触发结果时,回调会第一时间触发。比如 io.netty.channel.Channel#closeFuture 就是经典例子:我们在代码里随时都能拿到 closeFuture,只要一旦 Channel 真关闭,Netty 内部调用 trySuccess 触发完成,所有在等它的逻辑(无论是阻塞线程还是异步监听器)都会立刻被唤醒

​ 所以Netty 的 Future/Promise 并不只是异步 IO 的产物,更多的是一种通用异步语义的工具。有了它,整个框架里无论是异步还是同步,正常结果还是异常情况,都能通过统一的方式来表达和处理。也正因如此,在很多基于 Netty 的框架中(比如 Redisson),可以看到异步编程被广泛采用,充分利用了这种统一的语义优势