aqs

一、AQS是什么

java.util.concurrent.locks.AbstractQueuedSynchronizer,抽象队列同步器,是一个用来构建锁和同步器的框架。

常见的锁(如ReentrantLock)、同步器(如CountDownLatchSemaphoreCyclicBarrier)都是基于AQS来构建的。

二、了解AQS

核心数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//1. state
private volatile int state;

//2. 虚拟队列
private transient volatile Node head;
private transient volatile Node tail;

//2.1 队列节点 Node定义
Node
waitStatus
thread
prev 前节点
next 后节点
predecessor
nextWaiter

waitStatus枚举
- 0:初始化默认值
- CANCELLED 1:表示线程获取锁的请求已经取消了
- CONDITION -2:表示节点在等待队列中,节点线程等待唤醒
- PROPAGATE -3:当前线程处在SHARED情况下,该字段才会使用
- SIGNAL -1:表示线程已经准备好了,就等资源释放了

核心思想

如果被请求的共享资源空闲,就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;

如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获得不到锁的线程加入队列中。

CLH 队列

独占模式

三、源码-以ReentrantLock为例

3.1 lock

unfairLock.lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// ReentrantLock.Sync
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

// ReentrantLock.NonfairSync extends Sync
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

// AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

可以看到加锁流程分为几步:

  1. compareAndSetState(0, 1),非公平锁,先尝试直接获得锁
  2. acquire 1
  3. tryAcquire 1
    1. 判断当前锁是否被占有,如果未被持有则尝试获得锁,如果被当前线程持有则计算重入次数,放在state
    2. 如果这一步获得锁失败,则进入下一步
  4. addWaiter(Node.EXCLUSIVE)
    1. 通过CAS尝试往队列尾部加一个新的节点(如果队列为空,初始化将head设置为一个空节点)
  5. acquireQueued
    1. 有一个循环逻辑,首先是尝试获得锁,需要node是第一个节点
    2. 然后判断是否需要挂起(通过LockSupport.park避免空转)

加锁流程到这里就结束了,最终的结果是线程获得了锁或者获取锁失败被放入等待队列,

acquireQueued会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。

这里不会一直for循环空转,满足条件(节点的waitStatus = -1)时会将线程挂起,所以我们dump线程的时候,等待的线程一般会看到是WAITING状态,就是由于这个park。

3.2 unlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// NonfairSync
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}


//AQS
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

解锁分为这几步:

  1. 减少重入次数,如果剩余重入次数>0,说明锁还被当前线程持有,下面只分析重入次数=0的场景
  2. 设置当前线程为空,锁已经被释放了
  3. 唤醒队列中第一个waitStatus < 0的节点

四、看看CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//CountDownLatch.Sync
Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

//CountDownLatch
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
sync.releaseShared(1);
}

通过state来进行计数,await上阻塞的线程会进入队列。

五、总结

img

总的来说,AQS框架共分为五层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据。

当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。

六、参考资料

https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html


aqs
https://yzaf.top/2024/aqs/
作者
why
发布于
2024年5月5日
许可协议