自定义独占锁
package com.lock;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.AbstractQueuedSynchronizer;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;/** * Created by cxx on 2018/1/16. * * 独占锁示例 */public class Mutex implements Lock { //静态内部类,自定义同步器 private static class Sync extends AbstractQueuedSynchronizer{ //是否处于占用状态 protected boolean isHeldExclusively(){ return getState() == 1; } //当状态为0的时候,获取锁 public boolean tryAcquire(int acquires){ if (compareAndSetState(0,1)){ setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } //释放锁,将状态设置为0 public boolean tryRelease(int releases){ if (getState() == 0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } //返回一个condition,每个condition包含了一个condition队列 Condition newCondition(){ return new ConditionObject(); } } /*** * 将操作代理到Sync上即可 */ private final Sync sync = new Sync(); @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } public boolean isLocked(){ return sync.isHeldExclusively(); } public boolean hasQueuedThreads(){ return sync.hasQueuedThreads(); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1,unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); }}
如代码所示,独占锁实现了在同一时刻只能用一个线程获取到锁,而其他获取锁的线程只能处于同步等待队列中等待,只有获取锁的线程释放了锁,后继的线程才能够获取锁。
#锁、同步器、使用者
- 锁是面向使用者的,定义了使用者与锁交互的接口,隐藏了实现细节。
- 同步器面向的是锁的实现着,简化了锁的实现方式,屏蔽了同步状态管理、线程的队列、等待与唤醒等底层操作。
- 锁和同步器很好的隔离了使用者和实现着所需要关注的领域。
如上的mutex锁一样,具体的实现代理到sync,mutex只需要向用户定义交互方式即可。
AQS的方法
公有方法:
独占锁的获取与释放(包括了对同步队列的操作)
-
acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;
-
release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;
共享锁的获取与释放(包括了对同步队列的操作)
- acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
- releaseShared(int arg):共享式释放同步状态;
需要由子类实现的保护方法
独占锁的获取与释放的具体实现(没有对同步队列的操作,功能单一)
- tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态;
- tryRelease(int arg):独占式释放同步状态;
共享锁的获取与释放(没有对同步队列的操作)
- tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;
- tryReleaseShared(int arg):共享式释放同步状态;
操作队列同步器的状态
- getState():返回同步状态的当前值;
- setState(int newState):设置当前同步状态;
- compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;
其他的方法
- isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;
- acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;
- tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;
- acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
- tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;
AQS的全部方法如图所示
AQS小结
-
在基于AQS构建的同步器中,只能在一个时刻发生阻塞,从而降低上下文切换的开销,提高了吞吐量。同时在设计AQS时充分考虑了可伸缩行,因此J.U.C中所有基于AQS构建的同步器均可以获得这个优势。
-
AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。
-
AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。
-
AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程
-
当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。
CLH 同步队列的实现
在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其定义如下:
static final class Node { /** 共享 */ static final Node SHARED = new Node(); /** 独占 */ static final Node EXCLUSIVE = null; /** * 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态; */ static final int CANCELLED = 1; /** * 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行 */ static final int SIGNAL = -1; /** * 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中 */ static final int CONDITION = -2; /** * 表示下一次共享式同步状态获取将会无条件地传播下去 */ static final int PROPAGATE = -3; /** 等待状态 */ volatile int waitStatus; /** 前驱节点 */ volatile Node prev; /** 后继节点 */ volatile Node next; /** 获取同步状态的线程 */ volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; }}
状态(waitStatus)
- CANCELLED :值为1,由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态将不会发生变化。
- SIGNAL:值为 -1,后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行。
- CONDITION: 值为-1,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中。
- Propagate:值为 -3,表示下一次共享式同步状态获取将会无条件地被传播下去。
- Initial:值为0,初始状态。
入队
- compareAndSetTail(Node expect,Node update)方法来确保节点能够被线程安全添加。
- enq(final Node node),同步器通过 “死循环”来保证节点的正确添加,在“死循环”中只有通过CAS将节点设置成为尾节点之后,当前线程才能够从该方法返回,否则,当前线程不断地尝试设置。
- acquireQueued ,进入一个自旋的过程,每个节点都在自省地观察,当条件满足,获取到同步状态,就可以从这个自旋过程中退出。
第一,头结点是成功获取到同步状态的节点,而头结点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头结点。
第二,维护同步队列的FIFO原则。
AQS:阻塞和唤醒线程
在线程获取同步状态时如果获取失败,则加入CLH同步队列,通过通过自旋的方式不断获取同步状态,但是在自旋的过程中则需要判断当前线程是否需要阻塞,其主要方法在acquireQueued():
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;
通过这段代码我们可以看到,在获取同步状态失败后,线程并不是立马进行阻塞,需要检查该线程的状态,检查状态的方法为 shouldParkAfterFailedAcquire(Node pred, Node node) 方法,该方法主要靠前驱节点判断当前线程是否应该被阻塞,代码如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //前驱节点 int ws = pred.waitStatus; //状态为signal,表示当前线程处于等待状态,直接放回true if (ws == Node.SIGNAL) return true; //前驱节点状态 > 0 ,则为Cancelled,表明该节点已经超时或者被中断了,需要从同步队列中取消 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } //前驱节点状态为Condition、propagate else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
这段代码主要检查当前线程是否需要被阻塞,具体规则如下:
-
如果当前线程的前驱节点状态为SINNAL,则表明当前线程需要被阻塞,调用unpark()方法唤醒,直接返回true,当前线程阻塞
-
如果当前线程的前驱节点状态为CANCELLED(ws > 0),则表明该线程的前驱节点已经等待超时或者被中断了,则需要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态 <= 0 ,返回false
-
如果前驱节点非SINNAL,非CANCELLED,则通过CAS的方式将其前驱节点设置为SINNAL,返回false
如果 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,则调用parkAndCheckInterrupt()方法阻塞当前线程:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
parkAndCheckInterrupt() 方法主要是把当前线程挂起,从而阻塞住线程的调用栈,同时返回当前线程的中断状态。其内部则是调用LockSupport工具类的park()方法来阻塞该方法。
当线程释放同步状态后,则需要唤醒该线程的后继节点:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //唤醒后继节点 unparkSuccessor(h); return true; } return false; }
调用unparkSuccessor(Node node)唤醒后继节点:
private void unparkSuccessor(Node node) { //当前节点状态 int ws = node.waitStatus; //当前状态 < 0 则设置为 0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //当前节点的后继节点 Node s = node.next; //后继节点为null或者其状态 > 0 (超时或者被中断了) if (s == null || s.waitStatus > 0) { s = null; //从tail节点来找可用节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //唤醒后继节点 if (s != null) LockSupport.unpark(s.thread); }
可能会存在当前线程的后继节点为null,超时、被中断的情况,如果遇到这种情况了,则需要跳过该节点,但是为何是从tail尾节点开始,而不是从node.next开始呢?原因在于node.next仍然可能会存在null或者取消了,所以采用tail回溯办法找第一个可用的线程。最后调用LockSupport的unpark(Thread thread)方法唤醒该线程。