AbstractQueuedSynchronizer框架浅析
1.概述
AbstractQueuedSynchronizer(AQS)抽象类提供一个实现@H_301_5@阻塞锁和依赖于@H_301_5@FIFO等待队列的@H_301_5@同步器的框架。
AQS被设计用来作为众多同步器的基类,例如ReentrantLock、Semaphore、CountDownLatch、FutureTask以及ReentrantReadWriteLock。AQS依赖于一个@H_301_5@整数值,用来代表状态。AQS的子类可以通过覆写protect方法来改变状态,并且定义状态代表具体什么含义。不同的同步器对状态的含义解释不同:
- 在ReentrantLock中,AQS的同步状态用于保存@H_301_5@锁获取操作的次数;
- 在FutureTask中,AQS同步状态被用来保存@H_301_5@任务的状态;
- 在Semaphore和CountDownLatch中,AQS的同步状态用于保存@H_301_5@当前可用许可的数量;
针对AQS中的状态,只可以通过AQS的getState()、setState()和compareAndSetState()方法来改变状态。
同步器类应该使用私有的AQS子类来实现功能,而不应该直接继承AQS类。AQS类没有实现任何同步接口,它只是定义了一些可以被具体同步器和锁调用的方法,例如acquireInterruptibly方法。AQS子类需要自己实现以下方法:
- tryAcquire()
- tryRelease()
- tryAcquireShared()
- tryReleaseShared()
- isHeldExclusively()
这些方法默认会抛出UnsupportedOperationException异常。在这些方法实现中,可以使用getState()、setState()和compareAndSetState()来获取或修改AQS的状态。这些方法的实现必须是@H_301_5@线程安全的,并且应该实现简单且没有阻塞。只有这些方法可以被子类覆写,其他的AQS的公开方法都是final方法。
AQS同时支持@H_301_5@独占操作模式(例如ReentrantLock)和@H_301_5@非独占操作模式(例如Semaphore和CountDownLatch)。当以独占模式获取锁时,只有一个线程能访问成功,其他线程都访问失败;而以非独占模式获取锁时,多个线程可以同时访问成功。不同操作模式的线程都在同一个FIFO队列中等待。通常,AQS的子类只支持一种操作模式(独占或非独占),但也有同时支持两种操作模式的同步器,例如ReadWriteLock的子类,它的读取锁是非独占操作模式,而写入锁是独占操作模式。
由于在将线程放入FIFO等待队列之前,需要尝试一次acquire,因此有可能新的acquire线程可以获取成功,尽管等待队列中还有其他线程阻塞等待,这是一种@H_301_5@非公平策略。然而,可以在tryAcquire()或者tryAcquireShared()方法中禁止线程抢占,具体是通过hasQueuedPredecessors()方法判断等待队列中是否有线程在阻塞等待,如果有线程阻塞等待,则让tryAcquire()或者tryAcquireShared()方法返回false,这样的话,acquire线程会被放入等待队列的尾部,然后唤醒阻塞等待的线程,这是一种@H_301_5@公平的策略。
AQS类为同步器的状态、参数的获取和释放,以及内部FIFO等待队列,提供了一个高效的和可扩展的基础。当这些不能满足你的要求时,你可以自定义java.util.concurrent.atomic原子类,自定义java.util.Queue类,以及LockSupport类来提供阻塞支持。
AQS框架的一个类图如下所示:
2.源码分析
2.1 AbstractQueuedSynchronizer类的继承关系
AQS类继承了AbstractOwnableSynchronizer类,实现了Serializable接口。AbstractOwnableSynchronizer类主要是用来保存同步器被哪个线程独占使用。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ...... } public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { protected AbstractOwnableSynchronizer() { } //The current owner of exclusive mode synchronization. private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
2.2 Node内部类
在AQS类中,定义了一个FIFO等待队列节点的内部类Node。
static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; /* * 非负值意味着该节点不需要signal。 * 该值默认初始化为0,表示普通的同步节点,主要是通过CAS方法来修改该变量值。 */ volatile int waitStatus; /* * 指向前继节点,当前节点需要依赖于前继节点来检查waitStatus。 * 在入队列的时候赋值,在出队列的时候为null。 */ volatile Node prev; /* * 指向后继节点,当前节点依赖后继节点来唤醒释放 * 在入队列的时候赋值,在出队列的时候为null。 */ volatile Node next; /* * 将该节点入队列的线程,在构造节点的时候初始化,使用完之后变为null */ volatile Thread thread; /* * 指向下一个在条件等待,或者是特定的SHARED值的节点。 * 条件队列只有在获取独占锁时才可以被访问,我们需要一个单链表队列来保存节点,当他们在条件上等待时。 */ Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread,Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread,int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
等待队列是CLH锁队列的变种,CLH锁通常被用来实现自旋锁。在节点中的waitStatus域保存了线程是否需要被阻塞的信息。当一个节点的前继节点被释放了,节点将会收到通知。在等待队列中的每一个节点,都充当着特定通知形式的监控器,并且持有一个等待线程。节点中的waitStatus域并不控制线程是否能获取到锁。如果一个线程是队列中的第一个线程,并不能保证它能竞争成功获取到锁,而只是给予了它参与竞争的权利。
为了将一个Node节点放入到LCH锁队列中,只需要将该Node节点拼接到队列尾部就行;如果为了出队列,则只需设置队列的头指针位置head。
+------+ prev +-----+ +-----+ head | | <---- | | <---- | | tail +------+ +-----+ +-----+
插入节点到LCH队列中,只需一个在tail域上的原子操作。相似的,出队列仅仅需要更新head域,但是出队列还需要做一些额外的工作,来决定新的head节点的后继节点是哪个,其中需要考虑的因素有线程是否被取消了,操作是否超时了以及线程是否被中断了。
在Node节点中的prev域主要用来处理节点被@H_301_5@取消的情况。如果一个Node节点被取消了,那边它的后继节点需要重新连接到一个未被取消的前继节点。
在Node节点中的next域主要被用来实现@H_301_5@阻塞机制。在每个节点中都保存有线程ID,因此当需要唤醒下一个节点时,只需要通过遍历next域,找到后继节点,通过节点获取到线程ID,从而知道该唤醒哪个线程。
@H_301_5@等待状态只能取以下几个值:
- SIGNAL:该节点的后继节点当前是阻塞的,因此当前节点在释放和取消之后,必须唤醒它的后继节点。为了避免竞争,acquire方法必须首先进入SIGNAL等着状态,然后再尝试原子获取,这个获取过程可能会失败或者阻塞。
- CANCELLED:该节点由于超时或者中断了被取消了。节点进入了CANCELLED状态之后,就不会再发生状态的变化了。特别地,处于CANCELLED状态节点的线程不会再被阻塞了。
- CONDITION:该节点当前处于一个条件队列中。经过转移之后,该节点将会被作为同步队列的节点使用,此时节点的状态会被设置为0。
- PROPAGATE:releaseShared方法应该传递给其他Node节点。在doReleaseShared方法中,确保传递会继续。
- 0:非以上任何状态;
等待状态值使用数值来简化使用,@H_301_5@非负值意味着节点不需要被通知唤醒,因此大多数代码只需检查数值的正负就可以知道是否需要唤醒了。
2.3 AQS类的重要成员变量
/* * 等待队列的头节点,延迟初始化 * 除了初始化可以修改head值,还可以通过setHead方法设置 * 只要head节点存在,那么该节点的waitStatus状态将不会是CANCELLED */ private transient volatile Node head; /* * 等待队列的尾节点,延迟初始化 * 只能通过enq方法来添加一个新的等待节点到队列中,从而来修改tail值 */ private transient volatile Node tail; /* * 同步状态 */ private volatile int state;
可以看到,在AQS类中,有三个比较重要的成员变量,其中两个是表示等待队列的头指针和尾指针。另外一个表示同步的状态。
与state相关的方法有三个:
/* * 获取当前同步状态 */ protected final int getState() { return state; } /* * 设置新的同步状态 */ protected final void setState(int newState) { state = newState; } /* * 采用CAS方法来更新同步状态 * expect 期望的值 * update 更新为新的值 */ protected final boolean compareAndSetState(int expect,int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this,stateOffset,expect,update);//采用了sun.misc.Unsafe类来实现CAS操作 }
在AQS中使用到了sun.misc.Unsafe类来实现CAS操作,Unsafe类的compareAndSwapInt()和compareAndSwapLong()等方法包装了CAS操作,虚拟机在内部对这些方法做了特殊处理,即时编译出来的结果就是一条平台相关的处理器CAS指令。
与head和tail相关的方法有两个:
/* * 将队列的头结点设置为node结点 * 该方法只被acquire系列方法调用 */ private void setHead(Node node) { head = node;//让head指针指向node结点 node.thread = null; node.prev = null; } /* * 插入node节点到队列中,必要时初始化 * 返回node节点的前继节点,即原来的尾节点 */ private Node enq(final Node node) { for (;;) { Node t = tail; // 尾节点为空,则先进行初始化操作,创建一个新的node节点 if (t == null) { // Must initialize if (compareAndSetHead(new Node()))//head节点被成功初始化后,将tail节点指向head节点 tail = head; } else { node.prev = t; if (compareAndSetTail(t,node)) {//将tail节点更新为node节点 t.next = node; return t;//返回node的前继节点 } } } }
可以看到,head和tail节点的初始化操作是在setHead()和enq()方法中进行的,同时更新操作也是在这两个方法中进行的。
2.4 AQS类暴露给子类实现的方法
/* * 尝试以独占模式获取 * 该方法应该查询对象的状态是否允许以独占模式获取,如果允许,则获取。 * 该方法总是被执行acquire方法的线程执行,如果该方法失败了,则acquire方法将该线程放入队列中,直到有其他线程调用了release方法来发送通知信号。 * 如果获取成功了,则返回true。 */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } /* * 尝试修改状态来反映以独占模式释放 * 该方法总是被执行释放的线程触发 * 如果该对象处于完全释放的状态则返回true */ protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } /* * 尝试以非独占模式来获取 * 该方法应该查询是否对象允许以非独占模式来获取,如果允许,则获取。 * 该方法总是被执行acquire方法的线程执行。如果该方法失败了,则acquire方法将该线程放入队列中,直到有其他线程调用了release方法来发送通知信号 * 返回值为负值,表示失败; * 返回0,表示以非独占模式获取成功,但后续的以非独占模式获取将失败 * 返回正值,表示以非独占模式获取成功,后续的以非独占模式获取也会成功 */ protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } /* * 尝试修改状态来反映以非独占模式释放 * 该方法总是被执行释放的线程触发 * 返回true,表示以非独占模式释放成功 */ protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } /* * 如果同步器是被当前线程以独占模式访问,则返回true。 * 该方法在每次调用非等待ConditionObject方法时被触发。 */ protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
2.5 AQS类定义的acquire系列方法
/* * 以独占模式获取,屏蔽中断 * 至少触发一次tryAcquire()方法,如果获取成功,直接返回 * 如果获取失败,则线程入队列,然后通过tryAcquire()不断尝试,直至成功 * 该方法可以被用来实现Lock.lock()方法 */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE),arg)) selfInterrupt(); }
tryAcquire()方法是空实现,需要子类覆写该方法,实现具体的获取操作。addWaiter()方法主要是为当前线程创建一个新的Node节点,并把该Node节点以指定的模式存放入队列中。
/* * 为当前线程创建一个Node节点,并且设置为指定mode,最后将该node节点放入等待队列中 * @param mode mode有两种取值:Node.EXCLUSIVE和Node.SHARED,分别代表以独占模式和非独占模式 * @return 返回新的Node节点 */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(),mode);//给当前线程创建一个新的Node节点,节点模式为mode // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred;//将node节点的前继指针指向尾节点 if (compareAndSetTail(pred,node)) {//更新tail指针指向node节点 pred.next = node;//将原来tail节点的后继指针指向node节点 return node;//这样node节点成功链接到tail节点后面,并更新tail节点指向node节点了 } } // 如果前面将node节点入队列失败,则再通过enq()方法入队列,其实现思想和上面的过程一致 enq(node); return node; }
addWaiter()方法首先根据mode模式给当前线程创建一个node节点,然后将该node节点放入队列的尾部。acquireQueued()方法以独占不可中断方式获取。
/* * 为队列中的线程,以独占不可中断模式获取 * 该方法被条件等待和acquire方法使用 * @param node 节点 * @param arg 获取的参数 * @return 在等待的过程中被中断了,则返回true */ final boolean acquireQueued(final Node node,int arg) { boolean Failed = true; try { boolean interrupted = false; for (;;) {//无限循环 final Node p = node.predecessor();//获取当前节点的前继节点 if (p == head && tryAcquire(arg)) {//如果前继节点等于head节点,并且尝试获取成功 setHead(node);//更新head节点为node节点 p.next = null; // help GC Failed = false; return interrupted;//返回是否中断了 } // 在获取失败之后,判断是否需要挂起该线程,如果需要挂起,则通过LockSupport.lock()方法挂起该线程,等线程被唤醒后判断是否被中断过 if (shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (Failed)//发生了异常,则取消获取操作 cancelAcquire(node);//取消获取操作 } }
在acquireQueued()方法中,从尾节点开始循环前向遍历,如果当前节点的前继节点是头节点,并且tryAcquire()方法返回true了,则更新头结点,并返回。如果在向前遍历的过程中,遇到了节点获取失败需要挂起时,则会通过LockSupport的park()将当前线程挂起。
/* * 检查和更新获取失败节点的状态 * 如果线程应该被阻塞,则返回true * 该方法需要满足pred == node.prev * @param pred node节点的前继节点,保存状态 * @param node node节点 * @return 如果线程应该被阻塞,则返回true */ private static boolean shouldParkAfterFailedAcquire(Node pred,Node node) { int ws = pred.waitStatus;//获取前继节点的等待状态 if (ws == Node.SIGNAL)// SIGNAL状态 /* * This node has already set status asking a release * to signal it,so it can safely park. */ return true; if (ws > 0) {//CANCELLED状态 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev;//向前遍历搜索,找出前继节点 } while (pred.waitStatus > 0); pred.next = node; } else {// waitStatus为0或者为PROPAGATE /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal,but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred,ws,Node.SIGNAL);//设置waitStatus为SIGNAL } return false; }
在shouldParkAfterFailedAcquire()方法中,根据pred节点的等待状态做出相应的处理:
- SIGNAL状态:说明该节点已经更新了状态,请求释放,因此可以返回true
- CANCELLED状态:说明节点已经被取消了,忽略前继节点状态为CANCELLED的节点,同时更新node和pred节点值
- PROPAGATE状态或0:说明节点需要更新状态为SIGNAL。
可以看到,shouldParkAfterFailedAcquire()方法只有节点等待状态为SIGNAL时,才会返回true。后续才会执行parkAndCheckInterrupt()方法。
/* * 停止当前线程参与系统调度,即挂起当前线程,并返回当前线程是否被中断了 */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this);//挂断当前线程 return Thread.interrupted();//返回线程是否被中断了 }
LockSupport.park()方法:
/* * 挂起线程 * 当发生了以下几种情况,可以唤醒线程: * 1.其他线程触发了LockSupport.unlock()方法,唤醒线程 * 2.其他线程触发了Thread.interrupt()方法,打断当前线程 * 3.该调用无条件返回了 * 该方法不会记录什么原因导致该方法返回了,因此方法调用者,需要自己重新检查导致线程挂起的条件 * @param blocker 导致线程挂起的同步器 */ public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t,blocker); UNSAFE.park(false,0L); setBlocker(t,null); }
从前面的一些方法调用可以看到,acquire方法主要分为两种情况来处理:
- 第一次通过tryAcquire()方法获取成功了,则直接返回;
- 当第一次tryAcquire()失败时,接下来为当前线程创建一个以独占操作模式的Node节点,并把Node节点放入等待队列的尾部。然后在acquireQueued()方法中,从尾节点开始向前,循环从等待队列中取出节点,判断是否允许获取操作。如果不允许获取,即需要阻塞,则通过LockSupport.lock()方法挂起当前线程。当其他线程解除了当前线程的阻塞,或者是发生了中断,则返回继续下一个循环。最终只有遍历到头结点head,并且tryAcquire()方法返回true时,才会从acquireQueued()方法中退出,代表获取完成了。
@H_301_5@2.5.2 acquireInterruptibly()方法
/* * 以独占模式获取,如果发生了中断,则停止获取 * 在获取之前,首先检查线程是否被中断过,然后至少尝试一次tryAcquire()方法 * 如果第一次tryAcquire()方法失败,则将线程放入等待队列中,然后循环调用tryAcquire()方法,直至返回成功或者线程被中断了 * 该方法可以被用来实现Lock.lockInterruptibly() */ public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted())//如果线程被中断了,则直接返回异常 throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
当tryAcquire()失败时,则调用doAcquireInterruptibly()方法重复的获取。
/* * 以独占可中断模式获取 */ private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE);//给当前线程,创建一个独占模式的节点,并把节点放入到等待队列尾部 boolean Failed = true; try { for (;;) {//从尾节点开始循环处理 final Node p = node.predecessor();//当前节点的前继节点 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC Failed = false; return; } // 在获取失败之后,判断是否需要挂起该线程,如果需要挂起,则通过LockSupport.lock()方法挂起该线程,等线程被唤醒后判断是否被中断过 if (shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt()) throw new InterruptedException();// 如果线程被中断过,则抛出异常 } } finally { if (Failed) cancelAcquire(node); } }
可以看到doAcquireInterruptibly()方法与acquireQueued()实现大致相似,唯一的不同之处是,doAcquireInterruptibly()检测到线程被中断之后,会抛出一个中断异常。
@H_301_5@2.5.3 acquireShared()方法
/* * 以非独占模式获取,屏蔽中断 * 至少会触发调用一次tryAcquireShared() * 如果调用tryAcquireShared()失败了,则将该线程放入等待队列中,并且会不断的尝试tryAcquireShared()方法,直到返回成功 */ public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
在tryAcquireShared()方法获取失败,会调用doAcquireShared()继续重复的获取。
/* * 以非独占不可中断模式获取 */ private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED);//为当前线程创建一个非独占模式的Node节点,并把给Node节点放入到等待队列的尾部 boolean Failed = true; try { boolean interrupted = false; for (;;) {//从尾节点开始循环,不断向前遍历节点 final Node p = node.predecessor(); if (p == head) {//遍历到头节点 int r = tryAcquireShared(arg); if (r >= 0) {//获取成功了,直接返回 setHeadAndPropagate(node,r);//更新头结点和同步状态 p.next = null; // help GC if (interrupted) selfInterrupt(); Failed = false; return; } } // 在获取失败之后,判断是否需要挂起该线程,如果需要挂起,则通过LockSupport.lock()方法挂起该线程,等线程被唤醒后判断是否被中断过 if (shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (Failed) cancelAcquire(node); } }
doAcquireShared()方法与acquireQueued()方法实现类似,唯一不同之处是获取成功之后,会调用setHeadAndPropagate()方法来更head节点以及同步状态。
private void setHeadAndPropagate(Node node,int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller,* or was recorded (as h.waitStatus either before * or after setHead) by a prevIoUs operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode,* or we don't know,because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups,but only when there are multiple * racing acquires/releases,so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
在以下几种情况下,会对Node节点的后继节点进行判断是否需要释放:
- propagate大于0;
- 之前的头结点head为空;
- 之前的头结点head的waitStatus状态小于0;
- 当前的头结点head为空;
- 当前的头结点head的waitStatus状态小于0;
在满足以上几种情况后,如果后继节点为空,或者后继节点是非独占模式的,则执行释放操作。
/* * 非独占模式的释放 * 通知后继节点并且确保释放的传递 */ private void doReleaseShared() { /* * Ensure that a release propagates,even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not,status is set to PROPAGATE to * ensure that upon release,propagation continues. * Additionally,we must loop in case a new node is added * while we are doing this. Also,unlike other uses of * unparkSuccessor,we need to know if CAS to reset status * fails,if so rechecking. */ for (;;) {// 从头结点开始循环 Node h = head; // 如果头结点不为空,并且头结点不等于尾节点 if (h != null && h != tail) { int ws = h.waitStatus;//等待状态 if (ws == Node.SIGNAL) {//等待状态为SIGNAL if (!compareAndSetWaitStatus(h,Node.SIGNAL,0))//1.将Node节点的状态更新为0 continue; // loop to recheck cases unparkSuccessor(h);//唤醒h节点的后继节点 }else if (ws == 0 && !compareAndSetWaitStatus(h,Node.PROPAGATE))//2.将Node节点的状态更新为PROPAGATE continue; // loop on Failed CAS } if (h == head) // loop if head changed break; } }
可以看到,在doReleaseShared()方法中,主要的工作有:
- 先将头节点从SIGNAL状态更新为0,并且唤醒头结点的后继节点;
- 将头节点的状态从0更新为PROPAGATE;
如果头结点在更新状态的时候没有发生改变,则退出循环;
/*
唤醒Node的后继节点
*/
private void unparkSuccessor(Node node) {
/*- If status is negative (i.e.,possibly needing signal) try
- to clear in anticipation of signalling. It is OK if this
- fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node,0);
/*
- Thread to unpark is held in successor,which is normally
- just the next node. But if cancelled or apparently null,
- traverse backwards from tail to find the actual
- non-cancelled successor.
*/
Node s = node.next;//Node的后继节点
if (s == null || s.waitStatus > 0) {// 后继节点为空,或者后继节点的等待状态大于0,则尝试从尾部节点开始到Node节点为止,寻找最靠近Node节点的等待状态小于等于0的节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 存在后继节点,其等待状态小于等于0
if (s != null)
LockSupport.unpark(s.thread);//唤醒该节点关联的线程
}
在unparkSuccessor()方法中,主要是唤醒Node的后继节点中等待状态小于等于0的节点。
@H_301_5@2.5.4 acquireSharedInterruptibly()方法
/* * 以非独占模式获取,如果发生了中断,则停止获取 * 在获取之前,首先检查线程是否被中断过,然后至少尝试一次tryAcquireShared()方法 * 如果调用tryAcquireShared()失败了,则将该线程放入等待队列中,并且会不断的尝试tryAcquireShared()方法,直到返回成功 */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted())//线程被中断了 throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
在tryAcquireShared()返回失败后,会调用doAcquireSharedInterruptibly()方法重复尝试获取。
/* * 以非独占可中断模式获取 */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean Failed = true; try { for (;;) { final Node p = node.predecessor();//获取前继节点 if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node,r); p.next = null; // help GC Failed = false; return; } } // 在获取失败之后,判断是否需要挂起该线程,如果需要挂起,则通过LockSupport.lock()方法挂起该线程,等线程被唤醒后判断是否被中断过 if (shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt()) throw new InterruptedException();//抛出中断异常 } } finally { if (Failed) cancelAcquire(node); } }
可以看到doAcquireSharedInterruptibly()方法与doAcquireShared()方法实现类似,唯一的不同之处是,在线程等待的过程中,如果被中断了,则会抛出中断异常。
@H_301_5@2.5.5 tryAcquireNanos()
/* * 尝试以独占模式获取,如果发生了中断则停止,如果超时了,则返回失败 * 在获取之前,首先检查线程是否被中断过,然后至少尝试一次tryAcquire()方法 * 如果调用tryAcquire()失败了,则将该线程放入等待队列中,并且会不断的尝试tryAcquire()方法,直到返回成功,或者被中断,或者超时了。 * 该方法可以被用来实现Lock.tryLock(long,TimeUnit)方法 */ public final boolean tryAcquireNanos(int arg,long nanosTimeout) throws InterruptedException { if (Thread.interrupted())//检查线程是否被中断了 throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg,nanosTimeout); }
当tryAcquire()方法返回失败时,会去调用doAcquireNanos()方法,重复尝试获取。
/* * 以独占超时模式获取 */ private boolean doAcquireNanos(int arg,long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L)// 如果超时时间小于0,则直接返回 return false; final long deadline = System.nanoTime() + nanosTimeout;//计算截止时间 final Node node = addWaiter(Node.EXCLUSIVE);//为当前线程创建一个独占模式的Node节点,并把Node放入到等待队列中 boolean Failed = true; try { for (;;) { final Node p = node.predecessor();//前继节点 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC Failed = false; return true; } nanosTimeout = deadline - System.nanoTime();//计算剩余时间 if (nanosTimeout <= 0L)//超时了,直接返回 return false; if (shouldParkAfterFailedAcquire(p,node) && nanosTimeout > spinForTimeoutThreshold)//在获取失败后,如果需要让线程挂起,则通过LockSupport的parkNanos()方法,让线程挂起指定的时间 LockSupport.parkNanos(this,nanosTimeout); if (Thread.interrupted())//如果线程被中断了,则抛出异常 throw new InterruptedException(); } } finally { if (Failed) cancelAcquire(node); } }
doAcquireNanos()方法与acquireQueued()方法实现很类似,不同之处在于,doAcquireNanos()加了一个超时判断,如果超时了,则直接返回。另外,doAcquireNanos()使用带超时时间的LockSupport.parkNanos()方法来暂停线程。
@H_301_5@2.5.6 tryAcquireSharedNanos()方法
/* * 尝试以非独占模式获取,如果发生了中断,则停止,如果超时了,则返回失败 * 在获取之前,首先检查线程是否被中断过,然后至少尝试一次tryAcquireShared()方法 * 如果调用tryAcquireShared()失败了,则将该线程放入等待队列中,并且会不断的尝试tryAcquireShared()方法,直到返回成功,或者被中断,或者超时了。 */ public final boolean tryAcquireSharedNanos(int arg,long nanosTimeout) throws InterruptedException { if (Thread.interrupted())//线程被中断了 throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg,nanosTimeout); }
当tryAcquireShared()方法返回失败时,会去调用doAcquireSharedNanos()不断重复尝试获取。
/* * 以非独占超时模式获取 */ private boolean doAcquireSharedNanos(int arg,long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L)// 如果超时时间小于0,则直接返回 return false; final long deadline = System.nanoTime() + nanosTimeout;//计算截止时间 final Node node = addWaiter(Node.SHARED);//为当前线程创建一个非独占模式的Node节点,并把Node放入到等待队列中 boolean Failed = true; try { for (;;) { final Node p = node.predecessor();//前继节点 if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node,r); p.next = null; // help GC Failed = false; return true; } } nanosTimeout = deadline - System.nanoTime();//计算剩余时间 if (nanosTimeout <= 0L)//超时了,直接返回 return false; if (shouldParkAfterFailedAcquire(p,node) && nanosTimeout > spinForTimeoutThreshold) //在获取失败后,如果需要让线程挂起,则通过LockSupport的parkNanos()方法,让线程挂起指定的时间 LockSupport.parkNanos(this,nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (Failed) cancelAcquire(node); } }
doAcquireSharedNanos()方法与doAcquireNanos()方法实现类似,唯一的区别是doAcquireSharedNanos()方法中是以非独占模式去获取状态,调用的是tryAcquireShared()方法去获取状态。
至此,已经介绍了所有公开的与acquire相关的final方法。下面看看所有公开的与release相关的final方法。
2.6 AQS类里面定义的release方法
/* * 以独占模式释放 * 如果tryRelease()方法返回true,则至少有一个线程非阻塞 * 该方法可以用来实现Lock.unlock()方法 */ public final boolean release(int arg) { if (tryRelease(arg)) {//tryRelease()返回true,则返回true Node h = head;//头结点 if (h != null && h.waitStatus != 0) unparkSuccessor(h);//唤醒头结点的后继节点 return true; } return false; }
可以看到,在release()方法中,首先会调用tryRelease()方法尝试释放,如果释放成功,则返回true;如果释放失败,则直接返回false。在tryRelease()方法返回成功后,还会根据头结点的等待状态来判断是否需要唤醒头结点的后继节点。
@H_301_5@2.6.2 releaseShared()方法
/* * 以非独占模式释放 * 如果tryReleaseShared()方法返回true,则至少有一个线程非阻塞 */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {//tryReleaseShared()返回true,则返回true doReleaseShared(); return true; } return false; }
doReleaseShared()方法在5.3中的acquireShared()方法中有介绍到,其主要是先将头节点从SIGNAL状态更新为0,并且唤醒头结点的后继节点,然后将头节点的状态从0更新为PROPAGATE。
3.AQS的使用
我们通常不是直接继承AQS类,而是将相应的功能委托为私有的AQS子类来实现。下面是AQS类源码中介绍的两个使用范例:
- @H_301_5@使用范例1
下面是一个不可重入互斥锁Mutex,使用0代表非锁定状态,使用1代表锁定状态。虽然不可重入锁不需要严格记录持有锁的当前线程,但是在Mutex类中,实现了记录当前持有锁的线程,这样更容易监控。另外,Mutex类也支持Condition条件,并且暴露了一些方法给外部使用。
public class Mutex implements Lock,Serializable { private static class Sync extends AbstractQueuedSynchronizer{ // Reports whether in locked state @Override protected boolean isHeldExclusively() { return getState() == 1; } // Acquires the lock if state is zero @Override protected boolean tryAcquire(int acquires) { assert acquires == 1; if (compareAndSetState(0,1)){ setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // Releases the lock by setting state to zero @Override protected boolean tryRelease(int releases) { assert releases == 1; if (getState() == 0){ throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } // Provides a Condition Condition newCondition(){ return new ConditionObject(); } // Deserializes properly private void readObject(ObjectInputStream s) throws IOException,ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } // The sync object does all the hard work. We just forward to it. private final Sync sync = new Sync(); @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time,TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1,unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } public boolean isLocked(){ return sync.isHeldExclusively(); } public boolean hasQueuedThreads(){ return sync.hasQueuedThreads(); } }
- @H_301_5@使用范例2
下面是一个实现类似闭锁CountDownLatch功能的类,它是以非独占模式获取和释放。
public class BooleanLatch { private static class Sync extends AbstractQueuedSynchronizer{ boolean isSignalled(){ return getState() != 0; } @Override protected int tryAcquireShared(int ignore) { return isSignalled() ? 1 : -1; } @Override protected boolean tryReleaseShared(int ignore) { setState(1); return true; } } private final Sync sync = new Sync(); public boolean isSignalled(){ return sync.isSignalled(); } public void signal(){ sync.releaseShared(1); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } }原文链接:https://www.f2er.com/javaschema/282405.html