博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
简要理解锁、同步器之间的关系
阅读量:6454 次
发布时间:2019-06-23

本文共 8498 字,大约阅读时间需要 28 分钟。

hot3.png

自定义独占锁

package com.lock;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.AbstractQueuedSynchronizer;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;/** * Created by cxx on 2018/1/16. * * 独占锁示例 */public class Mutex implements Lock {    //静态内部类,自定义同步器    private static class Sync extends AbstractQueuedSynchronizer{        //是否处于占用状态        protected boolean isHeldExclusively(){            return getState() == 1;        }        //当状态为0的时候,获取锁        public boolean tryAcquire(int acquires){            if (compareAndSetState(0,1)){                setExclusiveOwnerThread(Thread.currentThread());                return true;            }            return false;        }        //释放锁,将状态设置为0        public boolean tryRelease(int releases){            if (getState() == 0) {                throw new IllegalMonitorStateException();            }            setExclusiveOwnerThread(null);            setState(0);            return true;        }        //返回一个condition,每个condition包含了一个condition队列        Condition newCondition(){            return new ConditionObject();        }    }    /***     * 将操作代理到Sync上即可     */    private final Sync sync = new Sync();    @Override    public void lock() {        sync.acquire(1);    }    @Override    public void lockInterruptibly() throws InterruptedException {        sync.acquireInterruptibly(1);    }    @Override    public boolean tryLock() {        return sync.tryAcquire(1);    }    public boolean isLocked(){        return sync.isHeldExclusively();    }    public boolean hasQueuedThreads(){        return sync.hasQueuedThreads();    }    @Override    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {        return sync.tryAcquireNanos(1,unit.toNanos(time));    }    @Override    public void unlock() {        sync.release(1);    }    @Override    public Condition newCondition() {        return sync.newCondition();    }}

如代码所示,独占锁实现了在同一时刻只能用一个线程获取到锁,而其他获取锁的线程只能处于同步等待队列中等待,只有获取锁的线程释放了锁,后继的线程才能够获取锁。

#锁、同步器、使用者

  • 锁是面向使用者的,定义了使用者与锁交互的接口,隐藏了实现细节。
  • 同步器面向的是锁的实现着,简化了锁的实现方式,屏蔽了同步状态管理、线程的队列、等待与唤醒等底层操作。
  • 锁和同步器很好的隔离了使用者和实现着所需要关注的领域。

如上的mutex锁一样,具体的实现代理到sync,mutex只需要向用户定义交互方式即可。

AQS的方法

公有方法:

独占锁的获取与释放(包括了对同步队列的操作)

  • acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;

  • release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;

共享锁的获取与释放(包括了对同步队列的操作)

  • acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
  • releaseShared(int arg):共享式释放同步状态;

需要由子类实现的保护方法

独占锁的获取与释放的具体实现(没有对同步队列的操作,功能单一)

  • tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态;
  • tryRelease(int arg):独占式释放同步状态;

共享锁的获取与释放(没有对同步队列的操作)

  • tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;
  • tryReleaseShared(int arg):共享式释放同步状态;

操作队列同步器的状态

  • getState():返回同步状态的当前值;
  • setState(int newState):设置当前同步状态;
  • compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;

其他的方法

  • isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;
  • acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;
  • tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;
  • acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
  • tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;

AQS的全部方法如图所示

输入图片说明

输入图片说明

输入图片说明

AQS小结

  • 在基于AQS构建的同步器中,只能在一个时刻发生阻塞,从而降低上下文切换的开销,提高了吞吐量。同时在设计AQS时充分考虑了可伸缩行,因此J.U.C中所有基于AQS构建的同步器均可以获得这个优势。

  • AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。

  • AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。

  • AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程

  • 当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。

CLH 同步队列的实现

在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其定义如下:

static final class Node {    /** 共享 */    static final Node SHARED = new Node();    /** 独占 */    static final Node EXCLUSIVE = null;    /**     * 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;     */    static final int CANCELLED =  1;    /**     * 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行     */    static final int SIGNAL    = -1;    /**     * 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,改节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中     */    static final int CONDITION = -2;    /**     * 表示下一次共享式同步状态获取将会无条件地传播下去     */    static final int PROPAGATE = -3;    /** 等待状态 */    volatile int waitStatus;    /** 前驱节点 */    volatile Node prev;    /** 后继节点 */    volatile Node next;    /** 获取同步状态的线程 */    volatile Thread thread;    Node nextWaiter;    final boolean isShared() {        return nextWaiter == SHARED;    }    final Node predecessor() throws NullPointerException {        Node p = prev;        if (p == null)            throw new NullPointerException();        else            return p;    }    Node() {    }    Node(Thread thread, Node mode) {        this.nextWaiter = mode;        this.thread = thread;    }    Node(Thread thread, int waitStatus) {        this.waitStatus = waitStatus;        this.thread = thread;    }}

状态(waitStatus)

  • CANCELLED :值为1,由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态将不会发生变化。
  • SIGNAL:值为 -1,后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行。
  • CONDITION: 值为-1,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中。
  • Propagate:值为 -3,表示下一次共享式同步状态获取将会无条件地被传播下去。
  • Initial:值为0,初始状态。

入队

输入图片说明

  • compareAndSetTail(Node expect,Node update)方法来确保节点能够被线程安全添加。
  • enq(final Node node),同步器通过 “死循环”来保证节点的正确添加,在“死循环”中只有通过CAS将节点设置成为尾节点之后,当前线程才能够从该方法返回,否则,当前线程不断地尝试设置。
  • acquireQueued ,进入一个自旋的过程,每个节点都在自省地观察,当条件满足,获取到同步状态,就可以从这个自旋过程中退出。

第一,头结点是成功获取到同步状态的节点,而头结点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头结点。

第二,维护同步队列的FIFO原则。

AQS:阻塞和唤醒线程

在线程获取同步状态时如果获取失败,则加入CLH同步队列,通过通过自旋的方式不断获取同步状态,但是在自旋的过程中则需要判断当前线程是否需要阻塞,其主要方法在acquireQueued():

if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;

通过这段代码我们可以看到,在获取同步状态失败后,线程并不是立马进行阻塞,需要检查该线程的状态,检查状态的方法为 shouldParkAfterFailedAcquire(Node pred, Node node) 方法,该方法主要靠前驱节点判断当前线程是否应该被阻塞,代码如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {        //前驱节点        int ws = pred.waitStatus;        //状态为signal,表示当前线程处于等待状态,直接放回true        if (ws == Node.SIGNAL)            return true;        //前驱节点状态 > 0 ,则为Cancelled,表明该节点已经超时或者被中断了,需要从同步队列中取消        if (ws > 0) {            do {                node.prev = pred = pred.prev;            } while (pred.waitStatus > 0);            pred.next = node;        }         //前驱节点状态为Condition、propagate        else {            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);        }        return false;    }

这段代码主要检查当前线程是否需要被阻塞,具体规则如下:

  • 如果当前线程的前驱节点状态为SINNAL,则表明当前线程需要被阻塞,调用unpark()方法唤醒,直接返回true,当前线程阻塞

  • 如果当前线程的前驱节点状态为CANCELLED(ws > 0),则表明该线程的前驱节点已经等待超时或者被中断了,则需要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态 <= 0 ,返回false

  • 如果前驱节点非SINNAL,非CANCELLED,则通过CAS的方式将其前驱节点设置为SINNAL,返回false

如果 shouldParkAfterFailedAcquire(Node pred, Node node) 方法返回true,则调用parkAndCheckInterrupt()方法阻塞当前线程:

private final boolean parkAndCheckInterrupt() {        LockSupport.park(this);        return Thread.interrupted();    }

parkAndCheckInterrupt() 方法主要是把当前线程挂起,从而阻塞住线程的调用栈,同时返回当前线程的中断状态。其内部则是调用LockSupport工具类的park()方法来阻塞该方法。

当线程释放同步状态后,则需要唤醒该线程的后继节点:

public final boolean release(int arg) {        if (tryRelease(arg)) {            Node h = head;            if (h != null && h.waitStatus != 0)				//唤醒后继节点                unparkSuccessor(h);            return true;        }        return false;    }

调用unparkSuccessor(Node node)唤醒后继节点:

private void unparkSuccessor(Node node) {        //当前节点状态        int ws = node.waitStatus;        //当前状态 < 0 则设置为 0        if (ws < 0)            compareAndSetWaitStatus(node, ws, 0);        //当前节点的后继节点        Node s = node.next;        //后继节点为null或者其状态 > 0 (超时或者被中断了)        if (s == null || s.waitStatus > 0) {            s = null;            //从tail节点来找可用节点            for (Node t = tail; t != null && t != node; t = t.prev)                if (t.waitStatus <= 0)                    s = t;        }        //唤醒后继节点        if (s != null)            LockSupport.unpark(s.thread);    }

可能会存在当前线程的后继节点为null,超时、被中断的情况,如果遇到这种情况了,则需要跳过该节点,但是为何是从tail尾节点开始,而不是从node.next开始呢?原因在于node.next仍然可能会存在null或者取消了,所以采用tail回溯办法找第一个可用的线程。最后调用LockSupport的unpark(Thread thread)方法唤醒该线程。

链接

转载于:https://my.oschina.net/u/3421984/blog/1802914

你可能感兴趣的文章
OAuth2授权原理
查看>>
微软的Framework导致该内存不能为written或read的错误?
查看>>
C#强化系列文章九:代码访问安全性使用
查看>>
顺序表的算法
查看>>
用 PyMedia 解码并播放 mp3 文件
查看>>
走在网页游戏开发的路上(一)
查看>>
优秀的Web开发人员是这样炼成的 (share)
查看>>
信息收集工具(ReconDog)
查看>>
LINUX-内核-中断分析-中断向量表(3)-arm【转】
查看>>
unity3d 学习笔记(两)
查看>>
如何安装配置Intelligent landing page for AIMS/MapGuide Ajax viewer
查看>>
DOS下从硬盘安装XP系统方法与要点
查看>>
MapGuide应用开发系列
查看>>
使用 Python 开始你的机器学习之旅【转】
查看>>
IIS Enabling HTTP Keep-Alives
查看>>
备忘录模式(Memento)
查看>>
怎样用Javascript停止或者启动AJAX Timer
查看>>
第19届Jolt大奖揭晓(转载)
查看>>
【原】iOS容易造成循环引用的三种场景,就在你我身边!
查看>>
【SQL】关于无法附加文件的错误
查看>>