由reentrantlock浅析aqs-爱代码爱编程
概要
由面及点,了解AQS的大体思想。以ReentrantLock为例,了解其使用
整体架构流程
AQS:AbstractQueuedSynchronizer,是Java并发编程中的一个重要组件,它提供了一种基于队列的同步机制,用于实现各种同步器(如锁、信号量等)。
它的核心思想是使用一个先进先出的等待队列来管理线程的竞争和等待。通过内部的state变量(int类型)来记录同步状态,并通过cas来实现对状态的原子更新。
所以AQS的两个核心:1.队列 2.加锁
以下是AQS的几个重要的内部变量:
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;
private transient Thread exclusiveOwnerThread;
三个重要的内部变量,state记录锁的状态,Node是一个双向的链表(等待队列),exclusiveOwnerThread是父类中的变量:独占线程
ReentrantLock大致流程:当多个线程同时请求,通过cas的方式,只有一个线程拿到了锁,可以执行代码,其他的线程则被放入队列,并阻塞。当释放锁的时候,再从队列里唤醒等待的线程,继续通过cas去抢锁执行代码,如此反复
技术细节
1.ReentrantLock源码:
private final Sync sync;
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
如上,ReentrantLock没有直接继承AbstractQueuedSynchronizer,而是抽象出了一个内部类Sync实现AbstractQueuedSynchronizer。因为ReentrantLock内部有两种锁,公平锁和非公平锁,两种锁都继承Sync。以下详细描述公平锁:FairSync
2.AQS源码:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如上,当程序里对代码块进行加锁,也就是调用lock()方法,实则是调用了AbstractQueuedSynchronizer的acquire方法。先看第一个方法 tryAcquire( arg),它由FairSync重写了。
3.ReentrantLock源码:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如3:通过compareAndSetState()方法获取锁,然后通过setExclusiveOwnerThread方法,当独占线程变量设置成当前变量,以便重入锁。如果加锁成功,返回true,2就不再往下执行了。
4.AQS源码:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如果抢锁失败,执行2中的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法,如4,先调用addWaiter放入等待队列(放队列也使用了cas,因为存在多个线程同时往队列里放的情况,此处还用了一个循环,要确保一定放入队列成功),再调用acquireQueued方法,将当前线程阻塞,当调用unpark方法后,会去唤醒队列中的第一个线程,如此反复。
小结
个人觉得AQS设计的巧妙之处就是state,他可以实现很多东西。比如reentrantLock的重入锁,比如countDownLatch等等