Java源码篇-锁

​ jdk中AQS实现类相关源码解析。包括 ReentrantLock,Condition,CountDownLatch,Semaphore,ReentrantReadWriteLock

1 ReentrantLock

​ 基于AQS实现的一种可重入互斥锁,所以只允许一个线程获取到锁。获取到锁时state设为1,当获取到锁的线程尝试重入时,便会增加state,同理需要将state减到0才会释放锁

1.1 非公平锁(NonfairSync)

lock

  1. java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire:利用CAS尝试设置state,能设置成功,代表获取到锁,成功返回。设置失败,代表已经被其他线程获取了锁,返回失败
  2. 返回失败后将当前线程构造为Node节点,设置到同步队列的链表中进入到java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued方法:死循环获取当前Node的前一个节点(同步队列的首节点是成功获取到锁的节点),如果前驱结点为首节点,当前Node才有资格获取锁。如果还是获取不到,就调用java.util.concurrent.locks.LockSupport#park(java.lang.Object)方法阻塞当前线程,等待其他线程唤醒再去竞争锁

unlock

  1. java.util.concurrent.locks.ReentrantLock.Sync#tryRelease:复原state(将其归0),exclusiveOwnerThread设为null
  2. java.util.concurrent.locks.AbstractQueuedSynchronizer#release:在tryRelease成功后,使用java.util.concurrent.locks.LockSupport#unpark方法唤醒同步队列首节点的下一个节点里的线程,让他再去尝试获取锁

1.2 公平锁(FairSync)

lock

​ 和非公平锁很像,不同的部分就在覆盖了java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquire这个方法和非公平锁略有不同。在新的线程获取锁失败,并将自己构造为Node节点并放入同步队列链表后,还会通过调用java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors方法

unlock:和非公平锁一样

核心代码


// ReentrantLock的公平锁第一次尝试获取锁
protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() && // 测试当前线程是否是等待最久的线程
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
} 

/** 
 * 查询是否有线程等待获取的时间长于当前线程
 * 判断是否存在队列中第二个Node(因为首节点是个空节点),且第二个节点中的线程是否是当前线程
 * 也就是说:判断同步队列中当前节点是否有前驱结点
 * true:代表当前线程不是等待最久的线程或压根就没有等待的线程
 * false:在代表当前线程已经是等待最久的线程(毕竟队列越前面,则代表进去的越久)<p/>
 * 只有公平锁才需要用到这个方法,来判断当前线程是否等待时间最长
 */
public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    // 用 h != t 来做判断是因为调用这个方法的线程此时还没有进入等待队列
    // 如果 h != t,则代表队列中有线程在等待获取锁
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

1.3 总结

1.3.1 为什么叫公平锁和非公平锁

​ 根据上面的分析,公平锁在获取锁是总是会先判断当前线程是否是等待最久的线程所以,就算是同步队列存在大量Node,且有线程第一次在获取锁,那么,下一次获取到锁的线程也一定是同步队列的首节点的下一个节点,即必须排队。(首节点就是当前获取到锁的节点,只有获取成功了,同步才会更新首节点)

非公平锁中:对于已经进入同步队列的线程来说,也只能首节点的下一个节点里的线程能尝试获取锁。但对于还未构造成Node加入到同步队列的线程来说,这个线程和首节点的下一个节点里的线程能竞争获取锁,所以非公平。但对于已经进入同步队列的线程来说,前驱结点是一定比后面的节点先获取到锁的

1.3.2 各自优势

  • 公平锁:防止线程饥饿,分散性很好,适合线程等待时间敏感的场景
  • 非公平锁:更快。一是获取锁是不用判断当前线程是否是等待最久的线程。二是上下文交换没有公平锁频繁。在存在大量锁竞争的前提下,可以肯定,公平锁上下文切换很频繁,获取锁后的线程再次获取锁时是一定会阻塞的。而非公平锁则不一样,下一次获取到锁的线程仍可能是上一次获取到锁的线程,没有上下文切换

2 Condition

等待通知接口,代替Object原生的wait和notify,其具体实现为AQS里的ConditionObject(定义在AQS里的非静态内部类,所以使用了AQS部分方法来实现其功能)。只有获取到锁的线程才能调用Condition的阻塞和唤醒方法。三个核心组件如下

  • 等待队列:使用 Node 节点串联,与 AQS 同步队列共用 Node 结构
  • 状态转换:Node 在等待队列和同步队列之间的转换
  • 线程控制:包括阻塞、唤醒、中断处理等机制

主要字段

// 等待队列中的首节点
private transient Node firstWaiter;
// 等待队列中的尾节点
private transient Node lastWaiter;

2.1 Condition#await

流程

  • 首先将当前线程构造为等待节点,并加入到等待队列的末尾
  • 其次释放锁资源(能够await的线程一定是获取到锁的),同时唤醒同步队列的第二个节点,让其尝试获取锁
  • 死循环判断当前节点是否为同步节点(等待节点在等待队列里,是一定要阻塞的。同步节点在同步队列里,是可以并被唤醒并尝试获取锁的),await到这里线程就阻塞了
  • 当被唤醒后,当前节点一定被加入了同步队列,再尝试获取锁,如果能获取到,代表就可以返回了。如果获取不到,就表示当前同步块被其他线程暂用了,也还是阻塞。不过下一次被唤醒后就会通过同步队列的唤醒方式来尝试获取锁

代码

public final void await() throws InterruptedException {
    if (Thread.interrupted()) // 响应中断
        throw new InterruptedException();
    // 构建等待节点并加入等待队列
    Node node = addConditionWaiter();
    // 先检查当前线程是否已获取到锁,否则抛异常。然后完全释放锁并且唤醒同步队列中的第二个节点
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 死循环判断当前节点是否在等待队列中
    // 等待队列中的节点一定要阻塞,而同步队列中的节点是可以被唤醒的
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 当signal后,需要重新获取锁,要复原现场,需要重新持有上一次所持有的所有的state值
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0) // 当前节点有中断
        reportInterruptAfterWait(interruptMode);
}
/**
 *   将当前线程构造为一个等待节点,并加入到等待队列的尾部,并通过nextWaiter字段建立联系 <br/>
 *  注意:等待队列建立关联用的是nextWaiter字段,不是prev和next字段
 */
private Node addConditionWaiter() {
    Node lastW = lastWaiter; // 尾节点
    // If lastWaiter is cancelled, clean out.
    if (lastW != null && lastW.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        lastW = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (lastW == null)
        firstWaiter = node;
    else
        lastW.nextWaiter = node;
    lastWaiter = node;
    return node;
}

// ====================以下为AQS中的方法===================
// 判断这个节点是否在同步队列上
// false -> 这个节点在等待队列上
// true -> 这个节点在同步队列上
final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    return findNodeFromTail(node);
}
/**
* 当前节点尝试获取锁
* 返回true -> 获取锁的过程有中断
*/
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node prevNode = node.predecessor();
            // 只有当前节点的前驱结点为首节点,当前节点里的线程才有资格获取锁
            // 只可能有一个线程获取成功(即获取锁),所以设置首节点不需要同步了
            if (prevNode == head && tryAcquire(arg)) {
                setHead(node);
                prevNode.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(prevNode, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

2.2 Condition#signal和signalAll

分析

singal的目的很简单,就是将等待队列的首节点转移到同步队列的尾节点,signalAll则是将等待队列中的所有节点都转移到同步节点。signal方法本身不能唤醒线程,只是让这些节点里的线程有资格被唤醒,可以将signal和排队买票做类比

  • 等待队列相当于候补区
  • signal 相当于叫号,让候补区的人去正式排队区(同步队列)
  • 但叫号本身并不会直接让人拿到票,还需要排队区的人按顺序获取票(锁)

代码

private void doSignal(Node first) {
    do {
        // 将首节点的nextWaiter转移到首节点,如果nextWaiter为空,则表示队列中只有一个节点,且首尾相同
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null; // gc处理
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
// 将当前的等待节点转换为同步节点,并加入到同步队列的末尾
final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread); // 前驱节点被取消了,或者设置为SIGNAL失败
    return true;
}

2.3 总结

​ Condition实现了等待通知,当一个线程进入同步块后,就可以调用await,释放自己获取的锁资源,将自己阻塞。内部实现是首先将当前线程构造成一个等待节点,加入ConditionObject的等待队列的末尾,再释放锁资源,之后唤醒同步队列的第二个节点让其尝试获取锁。而当其他进入同步块的线程调用signal后,会将等待队列的首节点转移到同步队列,并将其变成同步节点,最后再使用同步队列的唤醒机制等待被唤醒。

​ 所以signal并不能直接唤醒一个await的线程,最佳使用案例就是消费者发送者机制,比如阻塞队列。

3 CountDownLatch

CountDownLatch为共享锁实现,只能使用一次。用来“卡点”,阻塞的线程需要等待其他线程准备好了后(countDown直到AQS里的state为0),才继续被唤醒执行后面的代码。

在CountDownLatch中,AQS里的state值并不表示可获取到锁的次数,而是java.util.concurrent.CountDownLatch#countDown state值的次数后会释放所有调用了**java.util.concurrent.CountDownLatch#await()**的线程

内部的同步器Sync主要方法

/**
    获取共享锁,只有AQS的state为0才能获取到
    通过这个接口就可以猜到,当state为0时(拉下了所有门闩),总会返回1,代表获取锁成功。
    并依次传播下去递归调用这个方法,直到同步队列的所有Node里的线程全部唤醒,这就是CountDownLatch的原理
*/
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

// 释放共享锁,state第一次被减为0才释放成功,也就表示了CountDownLatch只能用一次
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

await方法会阻塞当前线程,直到其他线程“拉下所有门闩”。阻塞的线程会构造为共享节点加入同步队列,只有队首节点的下一个节点才有资格尝试获取锁,获取不到就LockSupport#park

countDown会将state值减小1,当state将为0时,释放同步队列里的第二个共享节点里的线程。当这个线程释放后,就能成功获取到锁了,将这个事件传播下去,一次唤醒同步队列里的所有共享节点。至此,所有被阻塞的线程都被唤醒且会成功获取到锁,最终从await方法里返回

4 Semaphore

信号量,共享锁实现。可以利用构造器指定令牌(permits)的数量。当线程到达时,获取(acquire)指定数量的令牌,当没有可用令牌(premits为0)时,阻塞线程,等待令牌的释放(release)再被唤醒后继续执行。基于此,即可实现共享锁(permits大于1),也可实现不可重入的互斥锁(permits为1)

也分为公平锁和分公平锁,其判断方式完全和ReentrantLock一致。

非公平锁允许准备进入同步块的线程(还未加入同步队列)和同步队列中的第二个节点竞争获取锁。而公平锁则只允许同步队列中第二个节点里的线程能尝试获取锁。

其实现方式就是将state设为我们允许并发运行的线程数量,每当一个线程获取到锁后,将state - 1,如果state为0则阻塞所有准备进入同步块的线程,并将其构造为共享节点加入同步队列。每当有线程从同步块退出时,将state + 1,并根据是否非公平来唤醒同步队列的第二个节点来尝试获取锁

5 ReentrantReadWriteLock

​ 读写锁,支持并发的读或互斥的写。读写锁分别各自实现,读锁使用共享锁,写锁使用互斥锁。ReentrantReadWriteLock内部的ReadLock和WriteLock都使用了内部同一个Sync对象来实现读写加锁的功能,在Sync内,他将AQS的state转换为二进制,高十六位表示读状态位,低十六位表示写状态位。由于读是共享的,所以state的高十六位表示了当前有多少个线程在读,在此期间写锁是禁用的。而低十六位是写锁,所以只可能有一个线程,但可能数字大于1(这是就表示写锁重入了)。当写锁被占用是,读是不允许的

static final int SHARED_SHIFT= 16;                     //   读状态位            写状态位
static final int SHARED_UNIT  = (1 << SHARED_SHIFT); // 0000000000000001 0000000000000000
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;//0000000000000000 1111111111111111
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;//0000000000000000 1111111111111111
// 获取共享锁冲入次数(读锁专用)
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
// 获取排他锁冲入次数(写锁专用)
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
  • 读写锁都支持重入,但写锁只能让当前线程重入,并且要解锁时需要unlock重入的次数。

  • 支持锁降级但不支持锁升级

    锁降级:即一个线程在持有写锁的情况下,可以继续获取读锁,然后释放写锁,从而将写锁降级为读锁。在某些场景下很有用,比如

    • 在写操作完成后,仍然需要保持对数据的读访问权限
    • 避免其他线程在写锁释放后立即获取写锁,导致数据不一致