Netty(四) — 异步编程的实现和思考
分析了 Netty 的 Future 与 Promise 的实现与设计理念,并在此基础上总结了异步编程概念的应用及个人思考
Future
Netty 的 Future 在继承 JDK 标准接口 java.util.concurrent.Future 的基础上,扩展了一系列方法。其中最核心的扩展是 addListeners 接口
JDK的 Future 主要通过阻塞或轮询方式获取结果,而Netty 提供的 GenericFutureListener 支持在 Future 完成时(无论是成功、失败还是取消)自动触发回调
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Promise
Promise继承了Netty的Future接口,同时新增了如下接口,用于主动设置结果
public interface Promise<V> extends Future<V> {
/**
* 设置结果并标记为成功状态,同时通知所有监听器。不成功会抛异常
*/
Promise<V> setSuccess(V result);
/**
* 尝试设置结果并标记为成功状态
*/
boolean trySuccess(V result);
/**
* 设置异常并标记为失败状态,同时通知所有监听器。不成功会抛异常
*/
Promise<V> setFailure(Throwable cause);
/**
* 尝试设置异常并标记为失败状态,同时通知所有监听器。
*
*/
boolean tryFailure(Throwable cause);
}
DefaultPromise
Netty中对Promise的默认实现类为DefaultPromise,我们看一下一些核心功能是如何实现的
核心字段
/**
* listener 嵌套调用的最大深度。
*
* Netty 在触发回调时,允许 listener 调用 listener。
* 为避免递归过深导致 StackOverflow,这里限制最大调用深度,默认值为 8
* 可通过系统属性 "io.netty.defaultPromise.maxListenerStackDepth" 修改
*/
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
// result 字段的原子更新器,用于实现无锁 CAS 更新
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
// void 类型 Promise 的统一成功标记
private static final Object SUCCESS = new Object();
// 中间状态:标记当前 Promise 已经不可取消(Uncancellable)
private static final Object UNCANCELLABLE = new Object();
// canceled状态的固定结果,封装了CancellationException异常
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
new CancellationException(), DefaultPromise.class, "cancel(...)"));
// Promise 当前的执行结果(既用来表示最终的结果,也可用来获取当前Promise的执行状态)
private volatile Object result;
// listener的执行器
private final EventExecutor executor;
// 多个listeners是DefaultFutureListeners,单个是GenericFutureListener
private Object listeners;
// 等待 Promise 完成的线程数
private short waiters;
// 标记当前是否正在通知 listener。避免并发下重复触发
private boolean notifyingListeners;
阻塞
实现比较简单,老一套的状态检查 + Object#wait 挂起等待操作。只是调用时需要确保当前调用线程不能是该 Promise 绑定的 EventExecutor 线程
其余的超时阻塞也差不多,没必要看了
public Promise<V> await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 确保当前Promise的EventExecutor线程不能wait
checkDeadLock();
synchronized (this) {
// 通过检测result字段判断是否已完结
while (!isDone()) {
// waiters++
incWaiters();
try {
// Object的wait方法
wait();
} finally {
decWaiters();
}
}
}
return this;
}
listener触发
整体逻辑不复杂,主要有以下几个关键点:
- 线程调度:保证 Listener 始终在当前 Promise 绑定的
EventExecutor线程中执行 - 递归保护:通过
ThreadLocal维护调用栈深度,防止过深的嵌套触发 - 并发控制:依赖
synchronized和notifyingListeners标志位,确保同一批 Listener 不会被重复执行
private void notifyListeners() {
EventExecutor executor = executor();
// 确保 listener 在对应的 EventExecutor 线程中执行
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
// 使用 ThreadLocal 跟踪调用栈深度,避免递归过深导致栈溢出
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
private void notifyListenersNow() {
Object listeners;
synchronized (this) {
// 已在执行 listener 或没有待处理的 listener,直接返回
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
// 首先安全的调用所有listener(不会抛出异常)
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
synchronized (this) {
if (this.listeners == null) {
// 本轮 listener 全部执行完成,重置状态并结束
notifyingListeners = false;
return;
}
// 期间可能有新的 listener 被添加,继续执行
listeners = this.listeners;
this.listeners = null;
}
}
}
其它操作
其它设置结果的接口基本都是CAS更新result字段,再notify所有的waiter线程和内部的listeners。都比较简单就不展示了
总结
Future 本质上是 只读视图,而其子接口 Promise 在此基础上增加了 写能力,允许调用方主动设置结果。
与JDK的FutureTask相比,Netty 的 DefaultPromise 实现要简单的多:
- 状态与结果合一:通过一个
result字段同时表示状态与保存结果,而不是维护一整套复杂的状态机 - 同步机制更轻量:阻塞与唤醒基于
Object#wait/notify,由synchronized保证并发安全,不依赖 AQS - 事件驱动友好:内置回调机制(
addListener),与 Netty 的异步/事件模型天然契合
思考
通过前面的实现分析可以看到,DefaultPromise 本身非常轻量,所以 Netty 才能放心地在框架里到处使用 Future 和 Promise。它们已经不再是只有做异步操作时才会用到的工具,而更像是一种通用的 异步编程规范
无论一个接口内部到底是不是异步实现的,只要它返回一个 Future,调用方就能用统一的方式来判断:任务是不是完成了、结果是否正确、有没有抛异常。如果还没完成,还可以配合定时任务、超时控制,甚至加上 fallback 策略,让调用逻辑更健壮
更加灵活的是我们甚至可以 先有 Promise,后有结果。哪怕操作还没开始,就能提前创建好一个 Promise,再挂上监听器,等未来某个时刻真正触发结果时,回调会第一时间触发。比如 io.netty.channel.Channel#closeFuture 就是经典例子:我们在代码里随时都能拿到 closeFuture,只要一旦 Channel 真关闭,Netty 内部调用 trySuccess 触发完成,所有在等它的逻辑(无论是阻塞线程还是异步监听器)都会立刻被唤醒
所以Netty 的 Future/Promise 并不只是异步 IO 的产物,更多的是一种通用异步语义的工具。有了它,整个框架里无论是异步还是同步,正常结果还是异常情况,都能通过统一的方式来表达和处理。也正因如此,在很多基于 Netty 的框架中(比如 Redisson),可以看到异步编程被广泛采用,充分利用了这种统一的语义优势