Java源码篇-锁
jdk中AQS实现类相关源码解析。包括 ReentrantLock,Condition,CountDownLatch,Semaphore,ReentrantReadWriteLock
1 ReentrantLock
基于AQS实现的一种可重入互斥锁,所以只允许一个线程获取到锁。获取到锁时state设为1,当获取到锁的线程尝试重入时,便会增加state,同理需要将state减到0才会释放锁
1.1 非公平锁(NonfairSync)
lock
- java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire:利用CAS尝试设置state,能设置成功,代表获取到锁,成功返回。设置失败,代表已经被其他线程获取了锁,返回失败
- 返回失败后将当前线程构造为Node节点,设置到同步队列的链表中进入到java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued方法:死循环获取当前Node的前一个节点(同步队列的首节点是成功获取到锁的节点),如果前驱结点为首节点,当前Node才有资格获取锁。如果还是获取不到,就调用java.util.concurrent.locks.LockSupport#park(java.lang.Object)方法阻塞当前线程,等待其他线程唤醒再去竞争锁
unlock
- java.util.concurrent.locks.ReentrantLock.Sync#tryRelease:复原state(将其归0),exclusiveOwnerThread设为null
- java.util.concurrent.locks.AbstractQueuedSynchronizer#release:在tryRelease成功后,使用java.util.concurrent.locks.LockSupport#unpark方法唤醒同步队列首节点的下一个节点里的线程,让他再去尝试获取锁
1.2 公平锁(FairSync)
lock
和非公平锁很像,不同的部分就在覆盖了java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquire这个方法和非公平锁略有不同。在新的线程获取锁失败,并将自己构造为Node节点并放入同步队列链表后,还会通过调用java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors方法
unlock:和非公平锁一样
核心代码
// ReentrantLock的公平锁第一次尝试获取锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && // 测试当前线程是否是等待最久的线程
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
/**
* 查询是否有线程等待获取的时间长于当前线程
* 判断是否存在队列中第二个Node(因为首节点是个空节点),且第二个节点中的线程是否是当前线程
* 也就是说:判断同步队列中当前节点是否有前驱结点
* true:代表当前线程不是等待最久的线程或压根就没有等待的线程
* false:在代表当前线程已经是等待最久的线程(毕竟队列越前面,则代表进去的越久)<p/>
* 只有公平锁才需要用到这个方法,来判断当前线程是否等待时间最长
*/
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// 用 h != t 来做判断是因为调用这个方法的线程此时还没有进入等待队列
// 如果 h != t,则代表队列中有线程在等待获取锁
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
1.3 总结
1.3.1 为什么叫公平锁和非公平锁
根据上面的分析,公平锁在获取锁是总是会先判断当前线程是否是等待最久的线程。所以,就算是同步队列存在大量Node,且有线程第一次在获取锁,那么,下一次获取到锁的线程也一定是同步队列的首节点的下一个节点,即必须排队。(首节点就是当前获取到锁的节点,只有获取成功了,同步才会更新首节点)
非公平锁中:对于已经进入同步队列的线程来说,也只能首节点的下一个节点里的线程能尝试获取锁。但对于还未构造成Node加入到同步队列的线程来说,这个线程和首节点的下一个节点里的线程能竞争获取锁,所以非公平。但对于已经进入同步队列的线程来说,前驱结点是一定比后面的节点先获取到锁的
1.3.2 各自优势
- 公平锁:防止线程饥饿,分散性很好,适合线程等待时间敏感的场景
- 非公平锁:更快。一是获取锁是不用判断当前线程是否是等待最久的线程。二是上下文交换没有公平锁频繁。在存在大量锁竞争的前提下,可以肯定,公平锁上下文切换很频繁,获取锁后的线程再次获取锁时是一定会阻塞的。而非公平锁则不一样,下一次获取到锁的线程仍可能是上一次获取到锁的线程,没有上下文切换
2 Condition
等待通知接口,代替Object原生的wait和notify,其具体实现为AQS里的ConditionObject(定义在AQS里的非静态内部类,所以使用了AQS部分方法来实现其功能)。只有获取到锁的线程才能调用Condition的阻塞和唤醒方法。三个核心组件如下
- 等待队列:使用 Node 节点串联,与 AQS 同步队列共用 Node 结构
- 状态转换:Node 在等待队列和同步队列之间的转换
- 线程控制:包括阻塞、唤醒、中断处理等机制
主要字段
// 等待队列中的首节点
private transient Node firstWaiter;
// 等待队列中的尾节点
private transient Node lastWaiter;
2.1 Condition#await
流程
- 首先将当前线程构造为等待节点,并加入到等待队列的末尾
- 其次释放锁资源(能够await的线程一定是获取到锁的),同时唤醒同步队列的第二个节点,让其尝试获取锁
- 死循环判断当前节点是否为同步节点(等待节点在等待队列里,是一定要阻塞的。同步节点在同步队列里,是可以并被唤醒并尝试获取锁的),await到这里线程就阻塞了
- 当被唤醒后,当前节点一定被加入了同步队列,再尝试获取锁,如果能获取到,代表就可以返回了。如果获取不到,就表示当前同步块被其他线程暂用了,也还是阻塞。不过下一次被唤醒后就会通过同步队列的唤醒方式来尝试获取锁
代码
public final void await() throws InterruptedException {
if (Thread.interrupted()) // 响应中断
throw new InterruptedException();
// 构建等待节点并加入等待队列
Node node = addConditionWaiter();
// 先检查当前线程是否已获取到锁,否则抛异常。然后完全释放锁并且唤醒同步队列中的第二个节点
int savedState = fullyRelease(node);
int interruptMode = 0;
// 死循环判断当前节点是否在等待队列中
// 等待队列中的节点一定要阻塞,而同步队列中的节点是可以被唤醒的
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 当signal后,需要重新获取锁,要复原现场,需要重新持有上一次所持有的所有的state值
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0) // 当前节点有中断
reportInterruptAfterWait(interruptMode);
}
/**
* 将当前线程构造为一个等待节点,并加入到等待队列的尾部,并通过nextWaiter字段建立联系 <br/>
* 注意:等待队列建立关联用的是nextWaiter字段,不是prev和next字段
*/
private Node addConditionWaiter() {
Node lastW = lastWaiter; // 尾节点
// If lastWaiter is cancelled, clean out.
if (lastW != null && lastW.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
lastW = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (lastW == null)
firstWaiter = node;
else
lastW.nextWaiter = node;
lastWaiter = node;
return node;
}
// ====================以下为AQS中的方法===================
// 判断这个节点是否在同步队列上
// false -> 这个节点在等待队列上
// true -> 这个节点在同步队列上
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
/**
* 当前节点尝试获取锁
* 返回true -> 获取锁的过程有中断
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node prevNode = node.predecessor();
// 只有当前节点的前驱结点为首节点,当前节点里的线程才有资格获取锁
// 只可能有一个线程获取成功(即获取锁),所以设置首节点不需要同步了
if (prevNode == head && tryAcquire(arg)) {
setHead(node);
prevNode.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(prevNode, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2.2 Condition#signal和signalAll
分析
singal的目的很简单,就是将等待队列的首节点转移到同步队列的尾节点,signalAll则是将等待队列中的所有节点都转移到同步节点。signal方法本身不能唤醒线程,只是让这些节点里的线程有资格被唤醒,可以将signal和排队买票做类比
- 等待队列相当于候补区
- signal 相当于叫号,让候补区的人去正式排队区(同步队列)
- 但叫号本身并不会直接让人拿到票,还需要排队区的人按顺序获取票(锁)
代码
private void doSignal(Node first) {
do {
// 将首节点的nextWaiter转移到首节点,如果nextWaiter为空,则表示队列中只有一个节点,且首尾相同
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null; // gc处理
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 将当前的等待节点转换为同步节点,并加入到同步队列的末尾
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 前驱节点被取消了,或者设置为SIGNAL失败
return true;
}
2.3 总结
Condition实现了等待通知,当一个线程进入同步块后,就可以调用await,释放自己获取的锁资源,将自己阻塞。内部实现是首先将当前线程构造成一个等待节点,加入ConditionObject的等待队列的末尾,再释放锁资源,之后唤醒同步队列的第二个节点让其尝试获取锁。而当其他进入同步块的线程调用signal后,会将等待队列的首节点转移到同步队列,并将其变成同步节点,最后再使用同步队列的唤醒机制等待被唤醒。
所以signal并不能直接唤醒一个await的线程,最佳使用案例就是消费者发送者机制,比如阻塞队列。
3 CountDownLatch
CountDownLatch为共享锁实现,只能使用一次。用来“卡点”,阻塞的线程需要等待其他线程准备好了后(countDown直到AQS里的state为0),才继续被唤醒执行后面的代码。
在CountDownLatch中,AQS里的state值并不表示可获取到锁的次数,而是java.util.concurrent.CountDownLatch#countDown state值的次数后会释放所有调用了**java.util.concurrent.CountDownLatch#await()**的线程
内部的同步器Sync主要方法
/**
获取共享锁,只有AQS的state为0才能获取到
通过这个接口就可以猜到,当state为0时(拉下了所有门闩),总会返回1,代表获取锁成功。
并依次传播下去递归调用这个方法,直到同步队列的所有Node里的线程全部唤醒,这就是CountDownLatch的原理
*/
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 释放共享锁,state第一次被减为0才释放成功,也就表示了CountDownLatch只能用一次
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
await方法会阻塞当前线程,直到其他线程“拉下所有门闩”。阻塞的线程会构造为共享节点加入同步队列,只有队首节点的下一个节点才有资格尝试获取锁,获取不到就LockSupport#park
countDown会将state值减小1,当state将为0时,释放同步队列里的第二个共享节点里的线程。当这个线程释放后,就能成功获取到锁了,将这个事件传播下去,一次唤醒同步队列里的所有共享节点。至此,所有被阻塞的线程都被唤醒且会成功获取到锁,最终从await方法里返回
4 Semaphore
信号量,共享锁实现。可以利用构造器指定令牌(permits)的数量。当线程到达时,获取(acquire)指定数量的令牌,当没有可用令牌(premits为0)时,阻塞线程,等待令牌的释放(release)再被唤醒后继续执行。基于此,即可实现共享锁(permits大于1),也可实现不可重入的互斥锁(permits为1)
也分为公平锁和分公平锁,其判断方式完全和ReentrantLock一致。
非公平锁允许准备进入同步块的线程(还未加入同步队列)和同步队列中的第二个节点竞争获取锁。而公平锁则只允许同步队列中第二个节点里的线程能尝试获取锁。
其实现方式就是将state设为我们允许并发运行的线程数量,每当一个线程获取到锁后,将state - 1,如果state为0则阻塞所有准备进入同步块的线程,并将其构造为共享节点加入同步队列。每当有线程从同步块退出时,将state + 1,并根据是否非公平来唤醒同步队列的第二个节点来尝试获取锁
5 ReentrantReadWriteLock
读写锁,支持并发的读或互斥的写。读写锁分别各自实现,读锁使用共享锁,写锁使用互斥锁。ReentrantReadWriteLock内部的ReadLock和WriteLock都使用了内部同一个Sync对象来实现读写加锁的功能,在Sync内,他将AQS的state转换为二进制,高十六位表示读状态位,低十六位表示写状态位。由于读是共享的,所以state的高十六位表示了当前有多少个线程在读,在此期间写锁是禁用的。而低十六位是写锁,所以只可能有一个线程,但可能数字大于1(这是就表示写锁重入了)。当写锁被占用是,读是不允许的
static final int SHARED_SHIFT= 16; // 读状态位 写状态位
static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 0000000000000001 0000000000000000
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;//0000000000000000 1111111111111111
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;//0000000000000000 1111111111111111
// 获取共享锁冲入次数(读锁专用)
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 获取排他锁冲入次数(写锁专用)
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
读写锁都支持重入,但写锁只能让当前线程重入,并且要解锁时需要unlock重入的次数。
支持锁降级但不支持锁升级
锁降级:即一个线程在持有写锁的情况下,可以继续获取读锁,然后释放写锁,从而将写锁降级为读锁。在某些场景下很有用,比如
- 在写操作完成后,仍然需要保持对数据的读访问权限
- 避免其他线程在写锁释放后立即获取写锁,导致数据不一致