youyichannel

志于道,据于德,依于仁,游于艺!

0%

AQS详解

AQS简介

AQS 全称是 AbstractQueuedSynchronizer,抽象队列同步器。

AQS是用来构建锁或者其他同步器组件的重量级基础框架,是整个JUC体系的基石;

AQS通过内置的自定义的CLH队列来完成获取资源的线程的排队工作,将每个要抢占资源的线程封装成Node节点来实现锁的分配,同时存在一个volatile变量state表示持有锁的状态。

AQS核心数据结构CLH参考资料:https://mp.weixin.qq.com/s/jEx-4XhNGOFdCo4Nou5tqg

AQS是JUC体系中最重要的基石

AQS原理

核心思想

AQS核心思想:

  • 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态
  • 如果被请求的共享资源被占用,此时需要一套线程阻塞等待以及被唤醒时锁分配的机制,这套机制AQS是基于CLH锁实现的

CLH 锁是对自旋锁的一种改进,是一个虚拟的双向队列(不存在队列实例,仅存在结点之间的关联关系),暂时获取不到锁的线程将被加入到该队列中。AQS 将每条请求共享资源的线程封装成一个 CLH 队列锁的一个结点Node来实现锁的分配。在 CLH 队列锁中,一个节点表示一个线程,它保存着线程的引用thread、 当前节点在队列中的状态waitStatus、前驱节点prev、后继节点next

AQS使用成员变量state表示同步状态,通过内置的FIFO线程等待队列来完成获取资源线程的排队工作。

state变量被volatile修饰,用于展示当前临界资源的获锁情况:

// 共享变量,使用volatile关键字修饰保证线程可见性
private volatile int state;

除此之外,状态信息state可以通过protected final修饰的getState()setState()compareAndSetState()进行操作

//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

稍后我们会通过深入ReentrantLock类的方式来理解AQS

AQS资源共享方式

AQS定义了两种资源共享方式:

  • Exclusive:独占,只有一个线程能执行,比如ReentrantLock
  • Share:共享,多个线程可以同一执行,比如SemaphoreCountDownLatch

从ReentrantLock看AQS

PS:

  • 从方法lock()unlock()作为突破口
  • AQS#state的值:没占用 - 0, 占用 - 1, 可重入锁 - > 1
  • CLH队列中,第一个节点是哨兵节点

代码实例:

public class Demo {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
// 带入一个银行办理业务的案例来模拟AQS如何进行线程的管理和通知唤醒机制
// 3个线程模拟3个来银行网点受理窗口办理业务的顾客
// A顾客就是第一个顾客,此时受理窗口没有任何人,A可以直接去办理
new Thread(() -> {
lock.lock();
try {
System.out.println("===== A thread come in");

try {
TimeUnit.SECONDS.sleep(20);
} catch (Exception e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}, "A").start();

//第二个顾客(第二个线程) => 由于受理业务的窗口只有一个(只能一个线程持有锁),此时B只能等待,进入候客区
new Thread(() -> {
lock.lock();
try {
System.out.println("===== B thread come in");
} finally {
lock.unlock();
}
}, "B").start();

//第三个顾客(第三个线程)=> 由于受理业务的窗口只有一个(只能一个线程持有锁),此时C只能等待,进入候客区
new Thread(() -> {
lock.lock();
try {
System.out.println("===== C thread come in");
} finally {
lock.unlock();
}
}, "C").start();
}
}

lock()

先来看ReentrantLock中的公平锁和非公平锁

接着来看ReentrantLock的构造器函数

/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}

/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

可以看出,ReentrantLock默认创建的是非公平锁,只有传入的fair参数为true时才创建公平锁。

接着来看lock()方法中的核心方法tryAcquire()

1)非公平锁:

static final class NonfairSync extends Sync {
// ...
protected final boolean tryAcquire(int acquires) {
// 实际调用的父类Sync的onfairTryAcquire()
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
// ...
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

2)公平锁

static final class FairSync extends Sync {
// ...
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

可以很明显看出公平锁和非公平锁的tryAcquire()的唯一区别就在于公平锁在获取同步状态时多了一个限制条件!hasQueuedPredecessors()hasQueuedPredecessors()方法是公平锁在加锁时判断等待队列中是否存在有效节点的方法:

public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

接下来看lock()方法(以非公平锁为例):

接下来以一张图示的方式展示acquire()方法的走向:

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

tryAcquire(arg)

以非公平锁为例:

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

可以看出非公平锁的tryAcquire()方法调用的是NonFairSync的父类Sync中的nonfairTryAcquire()方法

nonfairTryAcquire()返回结果:

  • false:继续推进条件,调用下一个方法addWaiter()
  • true:结束

addWaiter(Node.EXCLUSIVE)

此时假设线程C也来办理业务。

addWaiter()

CLH双向链表,第一个节点是哨兵节点,并不存储任何信息,仅起到占位的作用。第一个有效数据的节点是从第二个节点开始的。

enq(node)

线程B入队后的队列:

同理线程C也会入队:

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

可以看出acquireQueued()方法会调用shouldParkAfterFailedAcquire()parkAndCheckInterrupt()方法和setHead()方法;

我们先来看第一个方法shouldParkAfterFailedAcquire()

// 第一次调用:pred => 前一个节点(哨兵节点),node => 当前节点(线程B)
// 第二次调用:pred => 前一个节点(线程B),node => 当前节点(线程C)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// ws = 0 (第一次调用)
// ws = -1 (第二次调用) => 直接返回true, ws == Node.SIGNAL
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将前一个节点的waitStatus设置为-1
// => 哨兵节点和线程B的waitStatus都是SIGNAL(-1)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

接着我们来看第二个方法parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt() {
// 当前线程挂起,程序不会继续执行
LockSupport.park(this);
// 在调用unpark()方法后,会往这里向下执行
return Thread.interrupted();
}

我们再回来看acquireQueued()方法:

final boolean acquireQueued(final Node node, int arg) {
// 取消排队标志
boolean failed = true;
try {
// 是否中断标志
boolean interrupted = false;
for (;;) {
// 获取到上一个节点,线程B的上一个节点是哨兵节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 执行到这里,说明线程A已经unpark,线程B获取了permit
// 设置head为线程B
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 只有以下的if块执行完毕后,才会去执行上述的if块
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

最后我们来看setHead()方法:

private void setHead(Node node) {
head = node; // 将head指向节点B
node.thread = null; // 将B节点的线程置为null
node.prev = null; // 将B节点的prev置为null
}

unlock()

现在线程A办理完了业务,需要释放锁了unlock()

public void unlock() {
sync.release(1);
}

可以看出调用的Syncrelease()方法:

public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

tryRelease()

protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 1 - 1 = 0
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
// 将当前持有者线程设置为null
setExclusiveOwnerThread(null);
}
// 设置状态为 0
setState(c);
return free;
}

接着回到release()方法,它还调用了unparkSuccessor()方法:

private void unparkSuccessor(Node node) {
// 获取到哨兵节点和线程B等待状态
int ws = node.waitStatus;
if (ws < 0) // 将哨兵节点状态改为0
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next; // 获取节点B
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 解锁线程B,获取permit通信证
LockSupport.unpark(s.thread);
}

上述代码执行完毕后,哨兵节点和节点B的waitStatus从-1改为0;B、C线程获取到permit。

总结

  1. 业务场景:有三个线程A、B、C去银行办理业务,A线程最先抢到执行权开始办理业务,那么B、C两个线程就在CLH队列里面排队,注意傀儡结点和B结点的状态都会改为-1
  2. 当A线程办理好业务,离开的时候,会把傀儡结点的waitStatus从-1改为0、将status从1改为0,将当前线程置为null
  3. 这个时候如果B上位,首先将status从0改为1(表示占用),把thread置为线程B,然后就把第一个傀儡结点给清除掉了(GC),这个时候原来的B结点重新成为傀儡结点

常用同步工具类

Semaphore(信号量)

简介

synchronizedReentrantLock 都是一次只允许一个线程访问某个资源,而Semaphore(信号量)可以用来控制同时访问特定资源的线程数量。

Semaphore的使用比较简单,此处假设有N(N > 3)个线程来获取Semaphore中的共享资源,我们需要保证一次最多只有三个线程来获取该共享资源,其他线程均被阻塞;只有在获取到共享资源的线程释放了共享资源后,其他阻塞的线程才能获取到:

// 初始化共享变量
final Semaphore semaphore = new Semaphore(3);

// 获取一个permit
semaphore.acquire();

// 释放一个permit
semaphore.release();

当初始的资源数为1的时候,Semaphore退化成排它锁。

Semaphore模式:

  • 公平模式:调用acquire()方法的顺序就是获取许可证的顺序
  • 非公平模式:抢占式的

构造器函数:

public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

可以看出,初始化Semaphore必须指定许可的数量,还可以传入fair参数来指定生成公平模式还是非公平模式,默认是非公平模式。

📢注意:Semaphore 通常用于那些资源有明确访问数量限制的场景比如限流,仅限于单机系统。

原理

Semaphore是共享锁的一种实现,它默认构造AQS的state = permits,即许可证的数量,只有拿到许可证的线程才可以访问共享资源。

获取许可

以无参的acquire()方法为例,线程调用acquire()尝试获取许可证:

  • state > 0:获取成功
  • state <= 0:许可证数量不足,获取失败

如果获取成功,Semaphore会尝试使用CAS操作去修改state = state - 1;如果获取失败,会创建一个Node节点加入等待队列,挂起当前线程(同ReentrantLock

// 获取一个许可证
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 获取一个或者多个许可证
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

acquireSharedInterruptibly()方法是AQS中的默认实现:

// 共享模式下获取许可证,获取成功则返回,失败则加入等待队列,挂起线程
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取许可证,arg为获取许可证个数,当获取失败时,则创建一个节点加入等待队列,挂起当前线程。
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

再以非公平模式(NonfairSync)为例,看看 tryAcquireShared 方法的实现:

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}

// 非公平的共享模式获取许可证
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 当前可用许可证数量
int available = getState();
// 尝试获取许可证,当前可用许可证数量小于等于0时,返回负值,表示获取失败,
// 当前可用许可证大于0时才可能获取成功,CAS失败了会循环重新获取最新的值尝试获取
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

释放许可

以无参 release() 方法为例,线程调用release() 尝试释放许可证,并使用 CAS 操作去修改 state = state + 1。释放许可证成功之后,同时会唤醒等待队列中的一个线程。被唤醒的线程会重新尝试去修改 state 的值 state=state-1 ,如果 state > 0 则获取令牌成功,否则重新进入等待队列,挂起线程。

// 释放一个许可证
public void release() {
sync.releaseShared(1);
}

// 释放一个或者多个许可证
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}

releaseShared()方法是AQS中的默认实现:

// 释放共享锁,如果 tryReleaseShared 返回 true,就唤醒等待队列中的一个或多个线程。
public final boolean releaseShared(int arg) {
//释放共享锁
if (tryReleaseShared(arg)) {
//释放当前节点的后置等待节点
doReleaseShared();
return true;
}
return false;
}

tryReleaseShared 方法是Semaphore 的内部类 Sync 重写的一个方法:

protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
// 可用许可证+1
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS修改state的值
if (compareAndSetState(current, next))
return true;
}
}

PS:除了 acquire() 方法之外,另一个比较常用的与之对应的方法是 tryAcquire() 方法,该方法如果获取不到许可就立即返回 false。

CountDownLatch(倒计时器)

简介

CountDownLatch 允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。

CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。

原理

Semaphore

典型应用

  1. 某一线程在开始运行前需要等待N个线程执行完毕:将 CountDownLatch 的计数器初始化为N,每当一个任务线程执行完毕,就调用countDown()将计数器减 1,当计数器的值变为 0 时,在 CountDownLatchawait() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
  2. 实现多个线程开始执行任务的最大并行性(注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行):初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 ,多个线程在开始执行任务前首先 await(),当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒,同时运行

CyclicBarrier(循环栅栏)

简介

CyclicBarrierCountDownLatch类似,同样可以实现线程间的技术等待,但是它的功能更强大和复杂。CountDownLatch是基于AQS实现的,而CyclicBarrier是基于ReentrantLockCondition实现的。

CyclicBarrier 的字面意思:可循环使用(Cyclic)的屏障(Barrier)

作用:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。

原理

CyclicBarrier 内部通过一个 count 变量作为计数器,count 的初始值为 parties 属性的初始化值,每当一个线程执行到了栅栏这里,那么就将计数器减 1。如果 count == 0,表示这是最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。

构造函数:

//每次拦截的线程数
private final int parties;
//计数器
private int count;

public CyclicBarrier(int parties) {
this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

当调用 await() 方法时,实际上调用的是 dowait(false, 0L)方法。 await() 方法就像树立起一个栅栏的行为一样,将线程挡住,当拦住的线程数量达到 parties 的值时,栅栏才会打开,线程才得以通过执行。

public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

// 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行
private int count;

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
// 上锁
lock.lock();
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

// 线程中断,抛出异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// count 减1
int index = --count;
// 当 count 数量减为 0 之后说明最后一个线程已经到达栅栏了
// 也就是达到了可以执行await 方法之后的条件
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 将 count 重置为 parties 属性的初始化值
// 唤醒之前等待的线程
// 下一波执行开始
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

参考文章

  • https://blog.csdn.net/TZ845195485/article/details/118517936
  • https://javaguide.cn/java/concurrent/aqs.html
  • https://pdai.tech/md/java/thread/java-thread-x-lock-AbstractQueuedSynchronizer.html