Java源码篇-Future


​ jdk中Future接口实现类相关源码解析。包括FutureTask和 ScheduledFutureTask

Future接口表示一个异步操作的结果,即未来的结果,同时实现了 RunnableFuture 接口。提供了如下的一些基础方法可获取、判断和取消等操作

  • get()阻塞直到计算完成并返回结果(支持超时设置)

  • isDone()非阻塞检查任务是否完成(成功/失败/取消)

  • cancel(boolean mayInterruptIfRunning):尝试取消任务,参数决定是否中断执行中的线程

  • isCancelled():判断任务是否被取消

其实现类为FutureTask,就是用它来实现Callable接口的功能


1.1 FutureTask

1.1.1 重点字段和方法

public class FutureTask<V> implements RunnableFuture<V> {
    // state字段,表示了当前Future的状态,取值为如下字段
    private volatile int state;

    private static final int NEW          = 0; // 初始状态,新建
    private static final int COMPLETING   = 1; // 正在结束
    private static final int NORMAL       = 2; // 正常执行完毕
    private static final int EXCEPTIONAL  = 3; // 异常执行完毕
    private static final int CANCELLED    = 4; // 前一个状态必须是NEW,已取消(未中断)
    private static final int INTERRUPTING = 5; // 前一个状态必须是NEW,正在中断(中断)
    private static final int INTERRUPTED  = 6; // 取消成功的才可以设置,中断完成(中断)

    // 待运行的Callable任务
    private Callable<V> callable;
    // Callable执行的结果。如果出现执行的过程中异常,则保存的是异常对象
    private Object outcome;
    // 运行Callable#call方法的线程(也即是运行业务代码的线程)
    private volatile Thread runner;
    // 当这里面的Callable还未执行完,却有其他线程调用Future#get()方法,
    // 	会将其他线程阻塞并构造为等待节点,维持一个链表结构,以便在Callable执行完毕后唤醒并回调
    private volatile WaitNode waiters;


    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    // 执行业务方法
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex); // 异常结束,将状态设为EXCEPTIONAL,如果等待队列有节点,则唤醒对应的线程
                }
                if (ran)
                    set(result); // 正常结束,将状态设为NORMAL,如果等待队列有节点,则唤醒对应的线程
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    // 阻塞获取
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING) // 还未完全结束Callabke,进入等待
            s = awaitDone(false, 0L);
        return report(s);
    }
    // 根据state判断是否需要阻塞并做对于的事
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) { // 当前线程支持响应中断
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) { // Callable运行完毕,且result已经设置完毕
                if (q != null)
                    q.thread = null;
                return s;
            }
            // 进入到以下分支,就代表Callable还未完全执行完毕

            else if (s == COMPLETING) // cannot time out yet  Callable运行完毕,但正在设置result,让出执行时间,等待下次判断
                Thread.yield();
            else if (q == null) // 第一次循环,构造等待节点
                q = new WaitNode();
            else if (!queued) // 还未加入等待队列,则将节点加入到等待队列中
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) { // 是否运行超时判断
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else // 暂停当前线程,等待任务执行完毕的唤醒
                LockSupport.park(this);
        }
    }
    // 任务执行完毕,唤醒等待队列的所有节点
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }
}

1.1.2 总结

  1. FutureTask根据内部的state字段来判断当前任务运行到了哪个阶段并作出对于的抉择,也使用volatile修饰保证它在多线程环境下的可见性
    • state状态流转
      • NEWCOMPLETINGNORMAL(成功)
      • NEWCOMPLETINGEXCEPTIONAL(失败)
      • NEWCANCELLED/INTERRUPTED(取消)
  2. 如果想获取任务执行的结果,要使用get来获取结果,get是个阻塞的方法。当任务还未执行完毕时,会将调用get的方法阻塞并构造成WaitNode,再通过内部的next字段链接下一个WaitNode,形成一个链表结构。当任务执行完毕后,内部调用的finishCompletion方法会判断等待链表是否为空,不为空就代表有线程在获取结果时被阻塞了,这时唤醒阻塞队列的所有线程,最终,调用get方法的线程返回结果。即使用 WaitNode 链表 + CAS 操作管理阻塞线程,避免显式锁开销
  3. 只会允许任务执行一次,状态不可逆转

1.2 ScheduledFutureTask

​ ScheduledFutureTask继承了FutureTask,当向定时任务线程池投递任务时(Runnable或Callable),都会将其封装为ScheduledFutureTask

1.2.1 重点字段和方法

 private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {
     /** 
     * 当前任务的id(自增的),代表了进入队列的顺序 <br/>
     * 当两个定时任务下次执行时间一致时,sequenceNumber越小就会越早执行
     */
    private final long sequenceNumber;
    // 下次执行当前任务的纳秒时间戳
    private long time;

    /**
     * 执行定时任务的纳秒间隔时间
     * 大于0:代表固定的频率,不受任务的执行所花费的时间影响
     * 等于0:代表不是一个重复的任务(只会执行一次
     * 小于0:代表固定的时间间隔,基于任务执行完毕后的时间计算。(任务执行完后再基于当前时间计算下次执行时间)
     */
    private final long period;
    // 当前任务在数组中的索引
    int heapIndex;

    public long getDelay(TimeUnit unit) {
        return unit.convert(time - now(), NANOSECONDS);
    }
    // 比较方法,决定了放入数组的位置
    public int compareTo(Delayed other) {
        if (other == this) // compare zero if same object
            return 0;
        if (other instanceof ScheduledFutureTask) {
            ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0) // 当前任务的下次执行时间更长,返回正数
                return 1;
            else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
        return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }

    /**
     * false: 代表当前任务为一次性任务
     * true: 定时任务
     */
    public boolean isPeriodic() {
        return period != 0;
    }
    // 当前任务执行完毕后,用来计算下次执行时间
    private void setNextRunTime() {
        long p = period;
        // p为两次执行时间的时间间隔的纳秒值
        if (p > 0) // p大于0,即为固定时间执行的任务,基于初始运行时间计算下一次的执行时间
            time += p;
        else // p小于0,为基于完成任务的时间来执行,基于任务运行完的时间,来计算出下一次的执行时间
            time = triggerTime(-p);
    }
    // 主方法,运行当前定时任务
    public void run() {
        boolean periodic = isPeriodic();
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        else if (!periodic) // 非定时任务,当作普通任务直接调用FutureTask的run方法运行
            ScheduledFutureTask.super.run();
        else if (ScheduledFutureTask.super.runAndReset()) { // 运行定时任务,且运行成功(没抛异常)
            setNextRunTime(); // 设置下一次执行时间
            reExecutePeriodic(outerTask); // 再把当前任务重新入队列
        }
    }
    /**
      父类的FutureTask中的方法,运行并重置状态,用于任务的多次执行 
         * 正常执行时:不会修改运行状态(也就是说这个操作不会修改state字段值,保持初始值,以支持重复执行)。
         * 出现异常时:还是将state设为EXCEPTIONAL,也就是说一个定时任务要是抛出了异常,之后就不会再执行它了
         */
    protected boolean runAndReset() {
        if (state != NEW || // 不为NEW状态的都不执行
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // 不设置返回结果。多次执行的任务就不该有执行结果
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);  // 抛出异常,修改state为EXCEPTIONAL,以后也不在执行它
                }
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }
 }

1.2.2 总结

ScheduledFutureTask 通过 period 字段判断任务类型:0 表示一次性任务,>0 表示固定频率,<0 表示固定延迟。

run() 方法中,若任务为周期性任务,执行完当前任务后会计算下次执行时间,并将自身重新提交至基于小顶堆的 DelayedWorkQueue 中,以维持调度

  • 基于纳秒时间精度,避免 System.currentTimeMillis() 的系统时间变动干扰。
  • 复用 FutureTask 的任务封装机制,增强任务调度能力。
  • 精简实现,不依赖额外锁,主要通过最小堆和 Delayed 接口实现调度。