AQS简介
AQS 全称是
AbstractQueuedSynchronizer
,抽象队列同步器。
AQS是用来构建锁或者其他同步器组件的重量级基础框架,是整个JUC体系的基石;
AQS通过内置的自定义的CLH队列来完成获取资源的线程的排队工作,将每个要抢占资源的线程封装成Node节点来实现锁的分配,同时存在一个volatile
变量state表示持有锁的状态。
AQS核心数据结构CLH参考资料:https://mp.weixin.qq.com/s/jEx-4XhNGOFdCo4Nou5tqg
AQS是JUC体系中最重要的基石
AQS原理
核心思想
AQS核心思想:
如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态
如果被请求的共享资源被占用,此时需要一套线程阻塞等待以及被唤醒时锁分配的机制,这套机制AQS是基于CLH锁实现的
CLH
锁是对自旋锁的一种改进,是一个虚拟的双向队列(不存在队列实例,仅存在结点之间的关联关系),暂时获取不到锁的线程将被加入到该队列中。AQS
将每条请求共享资源的线程封装成一个 CLH
队列锁的一个结点Node
来实现锁的分配。在 CLH
队列锁中,一个节点表示一个线程,它保存着线程的引用thread
、
当前节点在队列中的状态waitStatus
、前驱节点prev
、后继节点next
。
AQS使用成员变量state
表示同步状态,通过内置的FIFO线程等待队列来完成获取资源线程的排队工作。
state
变量被volatile
修饰,用于展示当前临界资源的获锁情况:
private volatile int state;
除此之外,状态信息state
可以通过protected final
修饰的getState()
、setState()
、compareAndSetState()
进行操作
protected final int getState () { return state; } protected final void setState (int newState) { state = newState; } protected final boolean compareAndSetState (int expect, int update) { return unsafe.compareAndSwapInt(this , stateOffset, expect, update); }
稍后我们会通过深入ReentrantLock
类的方式来理解AQS
AQS资源共享方式
AQS定义了两种资源共享方式:
Exclusive
:独占,只有一个线程能执行,比如ReentrantLock
Share
:共享,多个线程可以同一执行,比如Semaphore
和CountDownLatch
从ReentrantLock看AQS
PS:
从方法lock()
和unlock()
作为突破口
AQS#state
的值:没占用 - 0, 占用 - 1, 可重入锁 - >
1
CLH队列中,第一个节点是哨兵节点
代码实例:
public class Demo { public static void main (String[] args) { ReentrantLock lock = new ReentrantLock (); new Thread (() -> { lock.lock(); try { System.out.println("===== A thread come in" ); try { TimeUnit.SECONDS.sleep(20 ); } catch (Exception e) { e.printStackTrace(); } } finally { lock.unlock(); } }, "A" ).start(); new Thread (() -> { lock.lock(); try { System.out.println("===== B thread come in" ); } finally { lock.unlock(); } }, "B" ).start(); new Thread (() -> { lock.lock(); try { System.out.println("===== C thread come in" ); } finally { lock.unlock(); } }, "C" ).start(); } }
lock()
先来看ReentrantLock中的公平锁和非公平锁
接着来看ReentrantLock的构造器函数
public ReentrantLock () { sync = new NonfairSync (); } public ReentrantLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); }
可以看出,ReentrantLock默认创建的是非公平锁,只有传入的fair
参数为true
时才创建公平锁。
接着来看lock()
方法中的核心方法tryAcquire()
:
1)非公平锁:
static final class NonfairSync extends Sync { protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } } abstract static class Sync extends AbstractQueuedSynchronizer { final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } }
2)公平锁
static final class FairSync extends Sync { protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } }
可以很明显看出公平锁和非公平锁的tryAcquire()
的唯一区别就在于公平锁在获取同步状态时多了一个限制条件!hasQueuedPredecessors()
。hasQueuedPredecessors()
方法是公平锁在加锁时判断等待队列中是否存在有效节点的方法:
public final boolean hasQueuedPredecessors () { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
接下来看lock()
方法(以非公平锁为例):
接下来以一张图示的方式展示acquire()
方法的走向:
public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire(arg)
以非公平锁为例:
protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); }
可以看出非公平锁的tryAcquire()
方法调用的是NonFairSync
的父类Sync
中的nonfairTryAcquire()
方法
nonfairTryAcquire()
返回结果:
false:继续推进条件,调用下一个方法addWaiter()
true:结束
addWaiter(Node.EXCLUSIVE)
此时假设线程C也来办理业务。
addWaiter()
:
CLH双向链表,第一个节点是哨兵节点,并不存储任何信息,仅起到占位的作用。第一个有效数据的节点是从第二个节点开始的。
enq(node)
线程B入队后的队列:
同理线程C也会入队:
acquireQueued(addWaiter(Node.EXCLUSIVE),
arg)
final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
可以看出acquireQueued()
方法会调用shouldParkAfterFailedAcquire()
、parkAndCheckInterrupt()
方法和setHead()
方法;
我们先来看第一个方法shouldParkAfterFailedAcquire()
:
private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }
接着我们来看第二个方法parkAndCheckInterrupt()
:
private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
我们再回来看acquireQueued()
方法:
final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
最后我们来看setHead()
方法:
private void setHead (Node node) { head = node; node.thread = null ; node.prev = null ; }
unlock()
现在线程A办理完了业务,需要释放锁了unlock()
:
public void unlock () { sync.release(1 ); }
可以看出调用的Sync
的release()
方法:
public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
tryRelease()
:
protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; }
接着回到release()
方法,它还调用了unparkSuccessor()
方法:
private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
上述代码执行完毕后,哨兵节点和节点B的waitStatus
从-1改为0;B、C线程获取到permit。
总结
业务场景:有三个线程A、B、C去银行办理业务,A线程最先抢到执行权开始办理业务,那么B、C两个线程就在CLH队列里面排队,注意傀儡结点和B结点的状态都会改为-1
当A线程办理好业务,离开的时候,会把傀儡结点的waitStatus从-1改为0、将status从1改为0,将当前线程置为null
这个时候如果B上位,首先将status从0改为1(表示占用),把thread置为线程B,然后就把第一个傀儡结点给清除掉了(GC),这个时候原来的B结点重新成为傀儡结点
常用同步工具类
Semaphore(信号量)
简介
synchronized
和 ReentrantLock
都是一次只允许一个线程访问某个资源,而Semaphore
(信号量)可以用来控制同时访问特定资源的线程数量。
Semaphore
的使用比较简单,此处假设有N(N > 3)
个线程来获取Semaphore
中的共享资源,我们需要保证一次最多只有三个线程来获取该共享资源,其他线程均被阻塞;只有在获取到共享资源的线程释放了共享资源后,其他阻塞的线程才能获取到:
final Semaphore semaphore = new Semaphore (3 );semaphore.acquire(); semaphore.release();
当初始的资源数为1的时候,Semaphore
退化成排它锁。
Semaphore
模式:
公平模式:调用acquire()
方法的顺序就是获取许可证的顺序
非公平模式:抢占式的
构造器函数:
public Semaphore (int permits ) { sync = new NonfairSync (permits ); } public Semaphore (int permits , boolean fair) { sync = fair ? new FairSync (permits ) : new NonfairSync (permits ); }
可以看出,初始化Semaphore必须指定许可的数量,还可以传入fair
参数来指定生成公平模式还是非公平模式,默认是非公平模式。
📢注意:Semaphore
通常用于那些资源有明确访问数量限制的场景比如限流,仅限于单机系统。
原理
Semaphore
是共享锁的一种实现,它默认构造AQS的state = permits
,即许可证的数量,只有拿到许可证的线程才可以访问共享资源。
获取许可
以无参的acquire()
方法为例,线程调用acquire()
尝试获取许可证:
state > 0
:获取成功
state <= 0
:许可证数量不足,获取失败
如果获取成功,Semaphore会尝试使用CAS操作去修改state = state - 1
;如果获取失败,会创建一个Node节点加入等待队列,挂起当前线程(同ReentrantLock
)
public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public void acquire (int permits ) throws InterruptedException { if (permits < 0 ) throw new IllegalArgumentException (); sync.acquireSharedInterruptibly(permits ); }
acquireSharedInterruptibly()
方法是AQS中的默认实现:
public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); }
再以非公平模式(NonfairSync
)为例,看看
tryAcquireShared
方法的实现:
protected int tryAcquireShared (int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
释放许可
以无参 release()
方法为例,线程调用release()
尝试释放许可证,并使用 CAS
操作去修改
state = state + 1
。释放许可证成功之后,同时会唤醒等待队列中的一个线程。被唤醒的线程会重新尝试去修改
state
的值 state=state-1
,如果
state > 0
则获取令牌成功,否则重新进入等待队列,挂起线程。
public void release () { sync.releaseShared(1 ); } public void release (int permits ) { if (permits < 0 ) throw new IllegalArgumentException (); sync.releaseShared(permits ); }
releaseShared()
方法是AQS中的默认实现:
public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
tryReleaseShared
方法是Semaphore
的内部类
Sync
重写的一个方法:
protected final boolean tryReleaseShared (int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error ("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } }
PS:除了 acquire()
方法之外,另一个比较常用的与之对应的方法是 tryAcquire()
方法,该方法如果获取不到许可就立即返回 false。
CountDownLatch(倒计时器)
简介
CountDownLatch
允许 count
个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
CountDownLatch
是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当
CountDownLatch
使用完毕后,它不能再次被使用。
原理
同Semaphore
典型应用
某一线程在开始运行前需要等待N个线程执行完毕:将
CountDownLatch
的计数器初始化为N,每当一个任务线程执行完毕,就调用countDown()
将计数器减
1,当计数器的值变为 0 时,在
CountDownLatch
上await()
的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
实现多个线程开始执行任务的最大并行性(注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行):初始化一个共享的
CountDownLatch
对象,将其计数器初始化为 1
,多个线程在开始执行任务前首先 await()
,当主线程调用
countDown()
时,计数器变为0,多个线程同时被唤醒,同时运行
CyclicBarrier(循环栅栏)
简介
CyclicBarrier
和CountDownLatch
类似,同样可以实现线程间的技术等待,但是它的功能更强大和复杂。CountDownLatch
是基于AQS实现的,而CyclicBarrier
是基于ReentrantLock
和Condition
实现的。
CyclicBarrier
的字面意思:可循环使用(Cyclic)的屏障(Barrier)
作用:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
原理
CyclicBarrier
内部通过一个 count
变量作为计数器,count
的初始值为 parties
属性的初始化值,每当一个线程执行到了栅栏这里,那么就将计数器减 1。如果
count == 0
,表示这是最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。
构造函数:
private final int parties;private int count;public CyclicBarrier (int parties) { this (parties, null ); } public CyclicBarrier (int parties, Runnable barrierAction) { if (parties <= 0 ) throw new IllegalArgumentException (); this .parties = parties; this .count = parties; this .barrierCommand = barrierAction; }
当调用 await()
方法时,实际上调用的是
dowait(false, 0L)
方法。 await()
方法就像树立起一个栅栏的行为一样,将线程挡住,当拦住的线程数量达到
parties
的值时,栅栏才会打开,线程才得以通过执行。
public int await () throws InterruptedException, BrokenBarrierException { try { return dowait(false , 0L ); } catch (TimeoutException toe) { throw new Error (toe); } } private int count;private int dowait (boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this .lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException (); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException (); } int index = --count; if (index == 0 ) { boolean ranAction = false ; try { final Runnable command = barrierCommand; if (command != null ) command.run(); ranAction = true ; nextGeneration(); return 0 ; } finally { if (!ranAction) breakBarrier(); } } for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L ) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException (); if (g != generation) return index; if (timed && nanos <= 0L ) { breakBarrier(); throw new TimeoutException (); } } } finally { lock.unlock(); } }
参考文章
https://blog.csdn.net/TZ845195485/article/details/118517936
https://javaguide.cn/java/concurrent/aqs.html
https://pdai.tech/md/java/thread/java-thread-x-lock-AbstractQueuedSynchronizer.html