Java源码篇-Future
jdk中Future接口实现类相关源码解析。包括FutureTask和 ScheduledFutureTask
Future接口表示一个异步操作的结果,即未来的结果,同时实现了 Runnable
和 Future
接口。提供了如下的一些基础方法可获取、判断和取消等操作
get()
:阻塞直到计算完成并返回结果(支持超时设置)isDone()
:非阻塞检查任务是否完成(成功/失败/取消)cancel(boolean mayInterruptIfRunning)
:尝试取消任务,参数决定是否中断执行中的线程isCancelled()
:判断任务是否被取消
其实现类为FutureTask,就是用它来实现Callable接口的功能
1.1 FutureTask
1.1.1 重点字段和方法
public class FutureTask<V> implements RunnableFuture<V> {
// state字段,表示了当前Future的状态,取值为如下字段
private volatile int state;
private static final int NEW = 0; // 初始状态,新建
private static final int COMPLETING = 1; // 正在结束
private static final int NORMAL = 2; // 正常执行完毕
private static final int EXCEPTIONAL = 3; // 异常执行完毕
private static final int CANCELLED = 4; // 前一个状态必须是NEW,已取消(未中断)
private static final int INTERRUPTING = 5; // 前一个状态必须是NEW,正在中断(中断)
private static final int INTERRUPTED = 6; // 取消成功的才可以设置,中断完成(中断)
// 待运行的Callable任务
private Callable<V> callable;
// Callable执行的结果。如果出现执行的过程中异常,则保存的是异常对象
private Object outcome;
// 运行Callable#call方法的线程(也即是运行业务代码的线程)
private volatile Thread runner;
// 当这里面的Callable还未执行完,却有其他线程调用Future#get()方法,
// 会将其他线程阻塞并构造为等待节点,维持一个链表结构,以便在Callable执行完毕后唤醒并回调
private volatile WaitNode waiters;
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行业务方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); // 异常结束,将状态设为EXCEPTIONAL,如果等待队列有节点,则唤醒对应的线程
}
if (ran)
set(result); // 正常结束,将状态设为NORMAL,如果等待队列有节点,则唤醒对应的线程
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
// 阻塞获取
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING) // 还未完全结束Callabke,进入等待
s = awaitDone(false, 0L);
return report(s);
}
// 根据state判断是否需要阻塞并做对于的事
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) { // 当前线程支持响应中断
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) { // Callable运行完毕,且result已经设置完毕
if (q != null)
q.thread = null;
return s;
}
// 进入到以下分支,就代表Callable还未完全执行完毕
else if (s == COMPLETING) // cannot time out yet Callable运行完毕,但正在设置result,让出执行时间,等待下次判断
Thread.yield();
else if (q == null) // 第一次循环,构造等待节点
q = new WaitNode();
else if (!queued) // 还未加入等待队列,则将节点加入到等待队列中
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) { // 是否运行超时判断
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else // 暂停当前线程,等待任务执行完毕的唤醒
LockSupport.park(this);
}
}
// 任务执行完毕,唤醒等待队列的所有节点
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
}
1.1.2 总结
- FutureTask根据内部的state字段来判断当前任务运行到了哪个阶段并作出对于的抉择,也使用volatile修饰保证它在多线程环境下的可见性。
- state状态流转:
- NEW
→
COMPLETING→
NORMAL(成功)
- NEW
→
COMPLETING→
EXCEPTIONAL(失败)
- NEW
→
CANCELLED/INTERRUPTED(取消)
- NEW
- state状态流转:
- 如果想获取任务执行的结果,要使用get来获取结果,get是个阻塞的方法。当任务还未执行完毕时,会将调用get的方法阻塞并构造成WaitNode,再通过内部的next字段链接下一个WaitNode,形成一个链表结构。当任务执行完毕后,内部调用的finishCompletion方法会判断等待链表是否为空,不为空就代表有线程在获取结果时被阻塞了,这时唤醒阻塞队列的所有线程,最终,调用get方法的线程返回结果。即使用
WaitNode
链表 + CAS 操作管理阻塞线程,避免显式锁开销 - 只会允许任务执行一次,状态不可逆转
1.2 ScheduledFutureTask
ScheduledFutureTask继承了FutureTask,当向定时任务线程池投递任务时(Runnable或Callable),都会将其封装为ScheduledFutureTask
1.2.1 重点字段和方法
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
/**
* 当前任务的id(自增的),代表了进入队列的顺序 <br/>
* 当两个定时任务下次执行时间一致时,sequenceNumber越小就会越早执行
*/
private final long sequenceNumber;
// 下次执行当前任务的纳秒时间戳
private long time;
/**
* 执行定时任务的纳秒间隔时间
* 大于0:代表固定的频率,不受任务的执行所花费的时间影响
* 等于0:代表不是一个重复的任务(只会执行一次
* 小于0:代表固定的时间间隔,基于任务执行完毕后的时间计算。(任务执行完后再基于当前时间计算下次执行时间)
*/
private final long period;
// 当前任务在数组中的索引
int heapIndex;
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
// 比较方法,决定了放入数组的位置
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0) // 当前任务的下次执行时间更长,返回正数
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
/**
* false: 代表当前任务为一次性任务
* true: 定时任务
*/
public boolean isPeriodic() {
return period != 0;
}
// 当前任务执行完毕后,用来计算下次执行时间
private void setNextRunTime() {
long p = period;
// p为两次执行时间的时间间隔的纳秒值
if (p > 0) // p大于0,即为固定时间执行的任务,基于初始运行时间计算下一次的执行时间
time += p;
else // p小于0,为基于完成任务的时间来执行,基于任务运行完的时间,来计算出下一次的执行时间
time = triggerTime(-p);
}
// 主方法,运行当前定时任务
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic) // 非定时任务,当作普通任务直接调用FutureTask的run方法运行
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) { // 运行定时任务,且运行成功(没抛异常)
setNextRunTime(); // 设置下一次执行时间
reExecutePeriodic(outerTask); // 再把当前任务重新入队列
}
}
/**
父类的FutureTask中的方法,运行并重置状态,用于任务的多次执行
* 正常执行时:不会修改运行状态(也就是说这个操作不会修改state字段值,保持初始值,以支持重复执行)。
* 出现异常时:还是将state设为EXCEPTIONAL,也就是说一个定时任务要是抛出了异常,之后就不会再执行它了
*/
protected boolean runAndReset() {
if (state != NEW || // 不为NEW状态的都不执行
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // 不设置返回结果。多次执行的任务就不该有执行结果
ran = true;
} catch (Throwable ex) {
setException(ex); // 抛出异常,修改state为EXCEPTIONAL,以后也不在执行它
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
}
1.2.2 总结
ScheduledFutureTask
通过 period
字段判断任务类型:0
表示一次性任务,>0
表示固定频率,<0
表示固定延迟。
在 run()
方法中,若任务为周期性任务,执行完当前任务后会计算下次执行时间,并将自身重新提交至基于小顶堆的 DelayedWorkQueue
中,以维持调度。
- 基于纳秒时间精度,避免
System.currentTimeMillis()
的系统时间变动干扰。 - 复用
FutureTask
的任务封装机制,增强任务调度能力。 - 精简实现,不依赖额外锁,主要通过最小堆和
Delayed
接口实现调度。