Java源码篇-AQS
总结了LockSupport的作用,并从源码分析了AbstractQueuedSynchronizer的实现逻辑
1 LockSupport
1.1 总结
Java中实现当前线程的阻塞和定时阻塞,并提供唤醒指定线程的工具,在内部使用sun.misc.Unsafe来实现这一系列的操作。在AQS中普遍被使用
- 阻塞当前线程:通过
park()
方法使当前线程进入等待状态。 - 定时阻塞:通过
parkNanos(long nanos)
或parkUntil(long deadline)
方法使当前线程在指定时间内等待。 - 唤醒指定线程:通过
unpark(Thread thread)
方法唤醒指定的处于等待状态的线程。
1.2 核心代码
/**
* 唤醒指定的线程(如果该线程被park了)
* 如果线程先被unpark(解除等待)了,那么该线程下一次调用park(进入等待)则不起作用,也就不会被阻塞
*/
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
/**
* 阻塞当前线程,并设置一个blocker(俗称阻塞器,这个只是用来jstack查看,并不能通过notifyAll来唤醒阻塞的线程)
* blocker只能用来调试和诊断,并不影响线程的阻塞和唤醒
*/
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
/**
* 定时等待,阻塞当前线程指定的纳秒数,当时间到达时就自动唤醒(定时任务会调用)
*/
public static void parkNanos(long nanos) {
if (nanos > 0)
UNSAFE.park(false, nanos);
}
/**
* 定时等待,阻塞当前线程直到指定的时间戳(deadline)到来就自动唤醒(定时任务会调用)
*/
public static void parkUntil(long deadline) {
UNSAFE.park(true, deadline);
}
2 AbstractQueuedSynchronizer
Node是AQS的核心内部类,它是构建同步器的基础数据结构,通过不同的配置可以实现同步队列,也可实现等待队列
2.1 同步队列
当线程尝试获取锁时,未获取到锁的线程会被构造成一个Node,利用CAS放入同步尾部作为尾节点,等待被唤醒。同步队列关联的是整个锁,一对一的关系。而同步队列中的Node又根据nextWaiter字段判断当前Node是共享节点还是独占节点
Node之间通过prev和next指针构成双向链表
头节点(head)代表当前持有锁的线程
包含waitStatus字段标记节点状态
- CANCELLED(1): 线程已取消
- SIGNAL(-1): 后继节点需要唤醒
- CONDITION(-2): 节点在等待队列中
- PROPAGATE(-3): 共享锁需要向后传播
- 0: 初始状态
使用nextWaiter区分共享/互斥模式
共享节点:共享锁的实现(Semaphore、CountDownLatch等)。nextWaiter字段为固定的Node#SHARED。释放当前节点的线程后,还具有向后传播的能力(根据state的值判断是否需要释放后继共享节点里的线程)
互斥节点:互斥锁的实现(ReentrantLock等),nextWaiter字段为Node#EXCLUSIVE(即null),只会释放当前节点里的线程
2.2 等待队列
当已经获取到锁的线程触发java.util.concurrent.locks.Condition#await()方法阻塞自己,让出锁时。会将当前线程构造成一个Node(等待节点,状态为CONDITION),利用CAS放入等待队列尾部。等待队列关联的是Condition。所以,当ReentrantLock构造多个Condition时,就有多个等待队列,ReentrantLock和等待队列可以为一对多,而Condition和等待队列时一对一。而当其他线程获取当前锁(ReentrantLock)的线程调用java.util.concurrent.locks.Condition#signal等方法时,便会将等待队列的首节点转入到同步队列的尾节点,并重新设置Node的状态
单向链表结构,只使用nextWaiter指针
nextWaiter字段为等待队列中下一个等待节点的指针
当调用signal()时,节点从等待队列转移到同步队列过程中的状态变化如下
- CONDITION -> 0
- 入队同步队列
- 等待获取锁
2.3 核心代码
// 同步队列专属的头尾节点。
// 因为只有在同步队列里的线程才需要被唤醒。等待队列里的线程如果要被唤醒,需要先加入到同步队列
private transient volatile Node head;
private transient volatile Node tail;
// 核心,可获取到锁的次数
// - ReentrantLock: 表示重入次数
// - Semaphore: 表示剩余许可数
// - CountDownLatch: 表示剩余计数
private volatile int state;
// 自旋的阈值(纳秒)。当超时等待时间小于这个值时,就不会再暂停线程,而是自旋。因为这个时间已经很少了,考虑到阻塞线程后上线文切换会消耗时间,就没必要再阻塞了
static final long spinForTimeoutThreshold = 1000L;
// 获取到独占锁的线程
private transient Thread exclusiveOwnerThread;
/**
留给子类实现的尝试获取共享锁的方法,共享锁获取,返回AQS里state的剩余值
1:返回值 > 0,代表当前线程获取成功,且state还有剩余值,表示可以继续传播给下一个共享节点线程,让其尝试获取锁
2:返回值 = 0,代表当前线程获取成功,但state值刚好被用完,那么下一个共享节点线程就不应该被唤醒了(因为这时已经获取不到state的剩余值了)
3:返回值 < 0,代表当前线程都没获取成功,直接获取失败,阻塞等待被其他线程唤醒后在尝试获取
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 获取共享锁
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) { // 首节点的下个节点才有资格获取锁(首节点就是获取到锁的节点)
int r = tryAcquireShared(arg);
if (r >= 0) { // 至少当前线程获取成功了,但可能state值已经被用完了
// 获取成功,传播给下一个共享Node,根据state的剩余值来判断是否需要唤醒下一个共享Node里的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 不能获取到锁线程就park
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 释放共享锁(Semaphore会使用)
private void doReleaseShared() {
// 必要的循环
// 1. CAS操作可能失败需要重试
// 2. 在设置head的过程中可能有新的节点入队
// 3. 传播机制要求必须确保传播状态正确设置
for (;;) {
Node h = head;
// h != tail 检查确保队列中还有后继节点
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
/*
将目标节点(参数node)设为同步队列的尾部(使用CAS来解决并发问题)。
所以,在这整个过程中,链表中除首节点外其余节点的prev在任何时刻都不会为空;
但除尾节点外其余节点的next字段有可能为空 (刚好走完第②步,还没走到第③步)
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 初始化同步队列,设置一个空Node为首尾节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t; // 先将目标节点的prev设置程原尾节点 ①
if (compareAndSetTail(t, node)) { // CAS设置尾节点 ②
t.next = node; // 设置成功了,才把原尾节点的next设为目标节点(现尾节点)③
return t;
}
}
}
}
// 唤醒目标节点(参数node)的最近下一个可唤醒节点中的线程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 首节点的下个节点唤醒失败时,就从尾节点向前遍历,直到找到距首节点最近的可唤醒的节点
// 目的是避免并发时(节点入队列和唤醒),倒数第二个节点(甚至不止)的next字段为空,导致拿不到其实已经入队列里的后续节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
// ============内部的Node数据结构=================
static final class Node {
// 共享锁
static final Node SHARED = new Node();
// 互斥锁
static final Node EXCLUSIVE = null;
// 取消获取锁
static final int CANCELLED = 1;
static final int SIGNAL = -1;
// 等待condition唤醒(等待队列才会用到这个状态)
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 当前节点的状态
volatile int waitStatus;
// 同步队列专用
volatile Node prev;
// 同步队列专用
volatile Node next;
// 等待线程
volatile Thread thread;
// 1. 当前Node为同步队列中的共享节点时:SHARED
// 2. 当前Node为同步队列中的独占节点时:null
// 3. 当前Node为等待队列中的节点时:下一个等待节点的指针
Node nextWaiter;
// 判断当前节点是互斥锁,还是共享锁
final boolean isShared() {
return nextWaiter == SHARED;
}
// 当前节点的前驱结点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
}