Java源码篇-线程池
对jdk的ThreadPoolExecutor和ScheduledThreadPoolExecutor进行了详细的源码分析
1 ThreadPoolExecutor
1.1 重要字段
//状态控制器,初始值: 1110 0000 0000 0000 0000 0000 0000 0000
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 29位
// 0001 1111 1111 1111 1111 1111 1111 1111
// 1110 0000 0000 0000 0000 0000 0000 0000 取反后
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 运行中:111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 不再接受新任务的入队列,但已经入队列还未还未的任务还可以继续执行
// 000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 不接受新任务入队列,也不处理队列中的任务,中断正在处理任务的worker
// 001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 全部完成,任务终止,worker数为0
// 010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// 计算线程池的状态
private static int runStateOf(int c) { return c & ~CAPACITY; } // 后29位为0,前3为跟随c
// 计算线程池有多少工作线程
private static int workerCountOf(int c) { return c & CAPACITY; } // 前3位为0,后面29为跟随 c
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 任务队列
private final BlockingQueue<Runnable> workQueue;
// 主锁
private final ReentrantLock mainLock = new ReentrantLock();
// 工作线程的Set
private final HashSet<Worker> workers = new HashSet<Worker>();
private final Condition termination = mainLock.newCondition();
// 池已经创建的线程最大数(一个动态值,线程池整个周期同时存在的最多线程数)
private int largestPoolSize;
// 完成的任务数
private long completedTaskCount;
// 创建线程的工厂
private volatile ThreadFactory threadFactory;
// 拒绝策略
private volatile RejectedExecutionHandler handler;
// 非核心线程数的保持时间
private volatile long keepAliveTime;
// 是否允许核心线程过期
private volatile boolean allowCoreThreadTimeOut;
// 核心线程数
private volatile int corePoolSize;
// 最大线程数
private volatile int maximumPoolSize;
ThreadPoolExecutor利用一个int类型的数来同时保存当前线程池状态和工作线程的数量,高3为用来表示当前线程的状态,低29为用来保存工作线程的数量。通过位运算实现状态和数量的原子性操作,避免单独维护两个变量时的竞态条件
ThreadPoolExecutor内部的Worker就是工作线程的抽象,每一个Worker都是一个工作线程。同时,Worker又继承了AQS可以充当锁的角色,目的是更好的让外部知道当前worker是否正在运行,以帮助回收或中断Worker。worker运行时(获取到任务后开始运行)会加锁,通过测试当前worker是否加上锁或者是否可以获得当前worker的锁,便可知道worker是否繁忙,有助于worker的清理
1.2 核心方法
1.2.1 shutdown(平滑关闭)
将当前线程池状态设为SHUTDOWN状态,再中断空闲的Worker(判断Worker是否空闲就通过它的锁方法)。所以,执行了这个方法后,正在执行的任务不会被中断,且已经存在workQueue中的Runnable也可以被执行,但是不能放入新的Runnable
1.2.2 shutdownNow(立即关闭)
将当前线程池状态设为STOP状态,将所有Worker设置为中断位,且倒出workQueue中的所有Runnable。所以,执行了这个方法后,正在运行的任务如果检测了中断位就会立即退出,如果没检测就还是会执行完,而已经存在workQueue中的Runnable将不会被执行,会将这些Runnable返回给调用者,让调用者处理
/**
* 平滑关闭线程池:
* 1. 将线程池状态设为SHUTDOWN,此时:
* - 继续执行已提交的任务(包括正在执行的和队列中的)
* - 拒绝新任务提交(execute()会抛出RejectedExecutionException)
* 2. 仅中断空闲Worker(通过tryLock()判断)
*
* 注意:正在执行的任务不会被中断,调用者需确保任务有合理的终止逻辑
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();// 检查每个worker线程是否可以修改
advanceRunState(SHUTDOWN); // CAS操作更新状态为SHUTDOWN
interruptIdleWorkers(); // interrupt所有空闲的worker
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
/**
* 立即关闭线程池:
* 1. 将线程池状态设为STOP,此时:
* - 中断所有Worker(无论是否在执行任务)
* - 丢弃队列中未执行的任务
* - 拒绝新任务提交
* 2. 返回被丢弃的任务列表供调用者处理
*
* 注意:
* - 正在执行的任务是否停止取决于任务是否响应中断
* - 典型使用场景:需要快速释放资源的紧急关闭
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);// 设置当前线程池状态为STOP
interruptWorkers();// interrupt所有Worker
tasks = drainQueue(); // 将任务队列中的task全部丢弃给方法调用者
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
/**
* 尝试终止线程池的最终状态转换:
* 1. 检查是否满足终止条件(3种直接返回的情况):
* - RUNNING状态:还有任务在执行
* - 已经是TIDYING/TERMINATED状态:避免重复操作
* - SHUTDOWN状态但队列不空:等待任务处理完成
* 2. 如果仍有活跃Worker,尝试中断单个空闲Worker
* 3. 最终状态转换:
* SHUTDOWN/STOP -> TIDYING -> TERMINATED
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 三种情况下直接退出
// 1.线程池处于Running状态,还在运行
// 2.线程池状态大于TIDYING,代表当前线程池已经终结
// 3.shutdown状态,并且任务队列不为空,代表需等待这些任务完成
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated(); // hook方法
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
1.2.3 execute(投递任务)
// 执行execute的方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取当前线程池状态
int c = ctl.get();
// 判断是否小于核心线程数,是则新建线程运行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 核心数满了,并且当前线程池状态为Running,加到等待队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 等待队列满了,新建线程,但不能大于最大线程数
else if (!addWorker(command, false))
// 创建失败,直接调用拒绝策略
reject(command);
}
1.2.4 worker的运行和阻塞
// Worker的Runnable方法
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 利用阻塞队列,一直循环取任务执行(阻塞队列为空时会阻塞当前想取出元素的线程)
// 如果getTask为null,就代表会终结当前工作线程
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // hook before
Throwable thrown = null;
try {
task.run(); // 真正的运行Runnable
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // hook after
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
// 核心方法之一,从阻塞队列中取任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
// 死循环取任务
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 允许核心线程过期和非核心线程都可以超时取任务
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 原子性的尝试减少一个工作线程,减少成功才返回结束线程
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果是超时取任务,时间结束后还是取不到,则设置timedOut为true,下次循环就可以直接返回null退出了,这样,这个Worker也就终结了
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 不为null才返回,就不用担心返回null而终结了当前线程
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
1.3 总结
execute Runnable的流程
- 先判断线程池的工作线程数量是否小于核心线程数,小于核心线程数直接新建线程来执行
- 如果核心线程数满了,则将Runnable投入到workQueue中
- 如果workQueue满了,则创建非核心线程来继续执行任务
- 如果线程池中的工作现场数量到达了maximumPoolSize的值,则使用拒绝策略来执行任务
Worker的工作流程
调用getTask取任务来执行,如果取出的任务为空,则这个Worker也就结束了(终结了)。getTask不为空的话,还是先进性一系列的线程池状态校验,在执行hook函数(beforeExecute),在真正的执行这个Runnable,再执行hook函数(afterExecute),最后再将completedTasks加1,表示当前Worker完成的任务总数
getTask流程(实现线程超时回收的关键)
- 先进行一系列的状态校验
- 判断是否允许超时(满足任意一个就行)
- allowCoreThreadTimeOut为true(都允许核心线程超时了,那没任务的情况下线程池就不该有worker线程)
- 当前线程池的工作线程数量大于核心线程数量就允许超时
- 判断是否触发减少工作线程数量的机制,然后使用CAS减少工作线程数量,减少成功才返回null,结束当前工作线程
- 通过阻塞队列取Runnable,如果不允许超时,则会一直阻塞到这。如果允许超时,则会超时等待keepAliveTime纳秒取Runnable,如果取不出来,则设置一次已经超时,再来循环一次,来判断是否该减少工作线程
2 ScheduledThreadPoolExecutor
2.1 ScheduledExecutorService
public interface ScheduledExecutorService extends ExecutorService {
/**
* 创建一个一次性的延迟(定时)任务。
* 框架中cron表达式就是通过此接口实现(只需要在任务完成后,在计算下一次的执行时间,再用此方法定时执行,以此类推)
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
/**
* 执行Callable接口的任务,也是一个一次性的定时任务
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
/**
* 基于固定的频率执行定时任务
* 例:初始执行任务的时间戳:当前时间戳(调用时)+ initialDelay
* 第二次执行:初始执行任务开始时的时间戳 + period
* 第三次执行:第二次执行任务开始时的时间戳 + period
*
* 典型场景:严格周期性的任务,如:
* - 每分钟采集一次系统指标
* - 每5秒发送心跳包
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
/**
* 基于固定的周期执行定时任务
* 例:初始执行任务的时间戳:当前时间戳(调用时)+ initialDelay
* 第二次执行:初始任务执行完结时的时间戳 + delay
* 第三次执行:第二次任务执行完结时的时间戳 + delay
*
* 典型场景:需要冷却时间的任务,如:
* - 数据库批量处理(保证每次处理完成后再间隔)
* - 异步结果轮询(避免密集请求)
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
2.2 DelayedWorkQueue
为ScheduledThreadPoolExecutor内部固定的阻塞队列,基于小顶堆数据结构实现。
投递的每个任务被封装后都扔进DelayedWorkQueue中,按照任务被执行的时间戳进行小顶堆排序,堆顶就刚好是队列中下个需要执行的任务。同时基于Leader-Follower 模式进行线程调度的优化,只有leader进行延时等待堆首任务,其余线程直接阻塞等待
核心字段和方法如下
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
// 数组初始容量
private static final int INITIAL_CAPACITY = 16;
// 数组实现的最小顶堆结构,queue[0]始终都是最快需要被执行的那个任务
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
// leader线程,定时等待queue[0]任务的那个线程
private Thread leader = null;
private final Condition available = lock.newCondition();
// 向堆尾新加入任务,进行上移(和父节点换个位置)调整位置
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1; // 父节点的索引
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0) // 调整完毕,直接break
break;
// 当前节点的下次执行时间更快,继续递归向上遍历,直到放到合适的位置
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
/**
* Sifts element added at top down to its heap-ordered spot.
* Call only when holding lock.<p/>
* 元素下移操作(弹出堆顶元素后,将堆尾元素放置到堆顶再重新调整下移)
*/
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
while (k < half) {
// 较小值节点的索引(初始为左子节点)
int child = (k << 1) + 1;
// 小值,初始为左子节点
RunnableScheduledFuture<?> c = queue[child];
// 右子节点索引
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0) // 如果右子节点更小,则将c替换为右子节点,同时替换child为右子节点索引
c = queue[child = right];
if (key.compareTo(c) <= 0) // 目标元素比最小的子节点元素还小,目的就达成了,直接break
break;
// 左右子节点中较小的节点和父节点交换
queue[k] = c;
setIndex(c, k);
// 替换目标索引,继续将参数k向下比较
k = child;
}
queue[k] = key;
setIndex(key, k);
}
/**
重写的offer方法(该方法就是线程池投递任务的方法)
*/
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) { // 数组中还没有任务,直接放在首位
queue[0] = e;
setIndex(e, 0);
} else { // 已存在定时任务,看是否需要调整位置
siftUp(i, e);
}
if (queue[0] == e) {// 代表向队列添加了一个需要最快执行的任务
// 需要重置leader线程,并唤醒一个阻塞的线程(可能为无限阻塞的,也可能为上个定时等待的leader线程)来定时等待这个任务
// 换种角度,如果唤醒的是上个定时等待的leader线程,那肯定是很赚的,因为不需要启动多个定时等待的线程了
// 如果唤醒的不是上个leader线程,那就会存在多个定时等待的线程,这是没法避免的
leader = null;
available.signal(); // 将阻塞的线程从等待队列转移到同步队列,当下面的unlock后再唤醒阻塞线程
}
} finally {
lock.unlock();
}
return true;
}
/*
重写的take方法
所以,多线程多任务且没有任务需要立即执行造成的结果就是:
1、1个leader线程定时等待队首任务(实时的向线程池添加最快需要执行的任务,可能存在多个定时等待的线程,且至少他们曾经是leader线程)
2、其余全部线程无限期等待,最大程度的减少资源损耗(因为任务都有顺序,没必要同时让所有线程都定时等待,给底层的通知增加压力)
总结:
1、当没有任务时:所有线程都无限等待,没有leader线程,等待任务入队列的唤醒
2、当有任务时:唤醒的线程成为leader线程,当这个leader线程等待到期时,
取消自己为leader线程(另一种说法就是自己变成了follower线程),
唤醒一个无限期等待的线程,然后自己就去执行这个到期的任务,被唤醒的线程就会变成新的leader线程。一直这么循环下去
3、当实时向线程池添加最快需要执行的任务时:会取消当前leader线程,并唤醒一个阻塞的线程,让其成为新的leader线程
*/
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) // 不存在定时任务,所有线程都在这等待
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) // 时间已过,弹出队首任务去执行它
return finishPoll(first);
// 进入下面,无论怎样都要等待,所以直接把first置为null,下次循环再获取
// 因为可能多个线程走到下面,都持有了队首的引用。避免出现RunnableScheduledFuture运行完了但不能及时回收的情况
// 当然,也只有一次性的RunnableScheduledFuture才会回收,定时任务都是循环使用这个RunnableScheduledFuture的
first = null; // don't retain ref while waiting
if (leader != null) // 由leader存在,其他线程只需要无限期等待就行
available.await();
else { // 没有leader存在,设置当前线程为leader,并定时等待(时间就为最近待执行的那个任务的距离下次执行时间间隔)
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay); // 定时等待
} finally { // 时间一到,说明队首任务可执行了,但当前线程可能不是leader线程了,需要判断一下再置空
if (leader == thisThread) // 必须判断,有可能实时的添加了一个最快需要执行的线程,导致当前线程被取消了leader
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
// 任务取出来了,leader为空且存在队首任务,需要唤醒一个无限等待的线程
// 让其成为leader线程并继续定时等待
available.signal();
lock.unlock();
}
}
}
2.3 总结
ScheduledThreadPoolExecutor本质还是个线程池,内部的DelayedWorkQueue就是工作队列。投递的定时任务和普通任务都会封装为ScheduledFutureTask,并最终放入DelayedWorkQueue里的那个数组(只不过定时任务有延时,可能会放在队列中的任何位置。而普通任务封装的ScheduledFutureTask执行时间就是当前而已,始终会放到队列的队首并立马执行)
DelayedWorkQueue实现了BlockingQueue,是基于数组的最小顶堆的数据结构实现,以此保证数组的第一个位置就是最近需要被执行的任务。结构图和特点如下
ScheduledThreadPoolExecutor还使用了Leader-Follower模式,leader线程定时等待工作队列中第一个任务,其余线程一般就都无限期等待(如果向工作队列添加的是一个最快需要被执行的任务,可能就有多个定时等待的线程,但leader线程始终都会是最快需要被执行任务的线程)。
为什么使用Leader-Follower模式:
避免资源的浪费。定时任务再怎么排序,也只会有一个是最快需要执行的任务(时间相同会根据sequenceNumber排序),只需要设计一个定时等待线程等待这个最快需要执行的任务。当这个最快需要执行的任务触发后,再设计一个新的leader线程等待下一个最近的定时任务。理想的情况下,定时任务线程池只会有一个定时等待的线程(Leader线程),其余线程要么正在运行定时任务,要么全部无限期阻塞(Follower线程),最大程度的避免资源浪费(无限期等待的线程不用想其它的,乖乖等待被其他线程唤醒就行。而定时等待的线程需要在时间到达后被唤醒,至少需要被定时器监视以用来执行唤醒操作)
- 固定周期:受执行时常影响,只有当任务结束后才相对于结束时间来计算任务的下次执行时间
- 固定频率:不受任务的执行时常所影响,当任务投递到队列时就可以预判到以后任何执行该任务的时间
一个被投递的周期任务首先会封装成ScheduledFutureTask,再根据其下次执行时间放在DelayedWorkQueue的某个位置。如果放在了DelayedWorkQueue的队首,则使用定时任务线程池里的线程超时等待,以便时间到达后开始执行。正常执行完毕则会先根据其是固定周期任务还是固定频率的任务来计算下次执行时间并修赋值到ScheduledFutureTask的time字段,再将这个任务再次入队列,这样递归去执行。执行中如果抛出了异常,则会将ScheduledFutureTask的state修改为异常,之后就不再执行这个任务了