Java源码篇-AQS


​ 总结了LockSupport的作用,并从源码分析了AbstractQueuedSynchronizer的实现逻辑

1 LockSupport

1.1 总结

Java中实现当前线程的阻塞和定时阻塞,并提供唤醒指定线程的工具,在内部使用sun.misc.Unsafe来实现这一系列的操作。在AQS中普遍被使用

  • 阻塞当前线程:通过 park() 方法使当前线程进入等待状态。
  • 定时阻塞:通过 parkNanos(long nanos)parkUntil(long deadline) 方法使当前线程在指定时间内等待。
  • 唤醒指定线程:通过 unpark(Thread thread) 方法唤醒指定的处于等待状态的线程。

1.2 核心代码

/**
 * 唤醒指定的线程(如果该线程被park了)
 * 如果线程先被unpark(解除等待)了,那么该线程下一次调用park(进入等待)则不起作用,也就不会被阻塞
 */
public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

/**
 * 阻塞当前线程,并设置一个blocker(俗称阻塞器,这个只是用来jstack查看,并不能通过notifyAll来唤醒阻塞的线程)
 * blocker只能用来调试和诊断,并不影响线程的阻塞和唤醒
 */
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}

/**
 * 定时等待,阻塞当前线程指定的纳秒数,当时间到达时就自动唤醒(定时任务会调用)
 */
public static void parkNanos(long nanos) {
    if (nanos > 0)
        UNSAFE.park(false, nanos);
}

/**
 * 定时等待,阻塞当前线程直到指定的时间戳(deadline)到来就自动唤醒(定时任务会调用)
 */
public static void parkUntil(long deadline) {
    UNSAFE.park(true, deadline);
}

2 AbstractQueuedSynchronizer

​ Node是AQS的核心内部类,它是构建同步器的基础数据结构,通过不同的配置可以实现同步队列,也可实现等待队列

2.1 同步队列

当线程尝试获取锁时,未获取到锁的线程会被构造成一个Node,利用CAS放入同步尾部作为尾节点,等待被唤醒。同步队列关联的是整个锁,一对一的关系。而同步队列中的Node又根据nextWaiter字段判断当前Node是共享节点还是独占节点

  • Node之间通过prev和next指针构成双向链表

  • 头节点(head)代表当前持有锁的线程

  • 包含waitStatus字段标记节点状态

    • CANCELLED(1): 线程已取消
    • SIGNAL(-1): 后继节点需要唤醒
    • CONDITION(-2): 节点在等待队列中
    • PROPAGATE(-3): 共享锁需要向后传播
    • 0: 初始状态
  • 使用nextWaiter区分共享/互斥模式

    • 共享节点:共享锁的实现(Semaphore、CountDownLatch等)。nextWaiter字段为固定的Node#SHARED。释放当前节点的线程后,还具有向后传播的能力(根据state的值判断是否需要释放后继共享节点里的线程)

    • 互斥节点:互斥锁的实现(ReentrantLock等),nextWaiter字段为Node#EXCLUSIVE(即null),只会释放当前节点里的线程

2.2 等待队列

​ 当已经获取到锁的线程触发java.util.concurrent.locks.Condition#await()方法阻塞自己,让出锁时。会将当前线程构造成一个Node(等待节点,状态为CONDITION),利用CAS放入等待队列尾部。等待队列关联的是Condition。所以,当ReentrantLock构造多个Condition时,就有多个等待队列,ReentrantLock和等待队列可以为一对多,而Condition和等待队列时一对一。而当其他线程获取当前锁(ReentrantLock)的线程调用java.util.concurrent.locks.Condition#signal等方法时,便会将等待队列的首节点转入到同步队列的尾节点,并重新设置Node的状态

  • 单向链表结构,只使用nextWaiter指针

  • nextWaiter字段为等待队列中下一个等待节点的指针

  • 当调用signal()时,节点从等待队列转移到同步队列过程中的状态变化如下

    1. CONDITION -> 0
    2. 入队同步队列
    3. 等待获取锁

2.3 核心代码

// 同步队列专属的头尾节点。
// 因为只有在同步队列里的线程才需要被唤醒。等待队列里的线程如果要被唤醒,需要先加入到同步队列
private transient volatile Node head;
private transient volatile Node tail;
// 核心,可获取到锁的次数
// - ReentrantLock: 表示重入次数
// - Semaphore: 表示剩余许可数
// - CountDownLatch: 表示剩余计数
private volatile int state;
// 自旋的阈值(纳秒)。当超时等待时间小于这个值时,就不会再暂停线程,而是自旋。因为这个时间已经很少了,考虑到阻塞线程后上线文切换会消耗时间,就没必要再阻塞了
static final long spinForTimeoutThreshold = 1000L;
// 获取到独占锁的线程
private transient Thread exclusiveOwnerThread;

/**
    留给子类实现的尝试获取共享锁的方法,共享锁获取,返回AQS里state的剩余值 
    1:返回值 > 0,代表当前线程获取成功,且state还有剩余值,表示可以继续传播给下一个共享节点线程,让其尝试获取锁 
    2:返回值 = 0,代表当前线程获取成功,但state值刚好被用完,那么下一个共享节点线程就不应该被唤醒了(因为这时已经获取不到state的剩余值了)
    3:返回值 < 0,代表当前线程都没获取成功,直接获取失败,阻塞等待被其他线程唤醒后在尝试获取
*/
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

// 获取共享锁
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) { // 首节点的下个节点才有资格获取锁(首节点就是获取到锁的节点)
                int r = tryAcquireShared(arg);
                if (r >= 0) { // 至少当前线程获取成功了,但可能state值已经被用完了
                    // 获取成功,传播给下一个共享Node,根据state的剩余值来判断是否需要唤醒下一个共享Node里的线程
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) // 不能获取到锁线程就park
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
// 释放共享锁(Semaphore会使用)
private void doReleaseShared() {
      // 必要的循环
      // 1. CAS操作可能失败需要重试
    // 2. 在设置head的过程中可能有新的节点入队
    // 3. 传播机制要求必须确保传播状态正确设置
    for (;;) {
        Node h = head;
          // h != tail 检查确保队列中还有后继节点
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

/*
    将目标节点(参数node)设为同步队列的尾部(使用CAS来解决并发问题)。
    所以,在这整个过程中,链表中除首节点外其余节点的prev在任何时刻都不会为空;
        但除尾节点外其余节点的next字段有可能为空 (刚好走完第②步,还没走到第③步)
*/
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 初始化同步队列,设置一个空Node为首尾节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t; // 先将目标节点的prev设置程原尾节点 ①
            if (compareAndSetTail(t, node)) { // CAS设置尾节点 ②
                t.next = node; // 设置成功了,才把原尾节点的next设为目标节点(现尾节点)③
                return t;
            }
        }
    }
}

// 唤醒目标节点(参数node)的最近下一个可唤醒节点中的线程
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 首节点的下个节点唤醒失败时,就从尾节点向前遍历,直到找到距首节点最近的可唤醒的节点
    // 目的是避免并发时(节点入队列和唤醒),倒数第二个节点(甚至不止)的next字段为空,导致拿不到其实已经入队列里的后续节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

// ============内部的Node数据结构=================
static final class Node {
    // 共享锁
    static final Node SHARED = new Node();
    // 互斥锁
    static final Node EXCLUSIVE = null;
    //  取消获取锁
    static final int CANCELLED =  1;

    static final int SIGNAL    = -1;
    // 等待condition唤醒(等待队列才会用到这个状态)
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
        // 当前节点的状态
    volatile int waitStatus;
    // 同步队列专用
    volatile Node prev;
    // 同步队列专用
    volatile Node next;
    // 等待线程
    volatile Thread thread;
    // 1. 当前Node为同步队列中的共享节点时:SHARED
    // 2. 当前Node为同步队列中的独占节点时:null
    // 3. 当前Node为等待队列中的节点时:下一个等待节点的指针
    Node nextWaiter;
        // 判断当前节点是互斥锁,还是共享锁
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
        // 当前节点的前驱结点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
}