Java并发编程系列:深入分析AQS原理_aqs 队列获得锁之后为什么执行thread.currentthread().interrupt()-程序员宅基地

技术标签: Java杂货铺  


AQS又称为队列同步器,它是用来构建锁或其他同步组件的基础框架,它是实现ReentrangLock、Semaphore等同步工具的基础。本文将会详细的阐述AQS实现的细节问题。

数据结构定义

AQS内部通过int类型的state控制锁的状态,当state=0时,则说明没有任何线程占有共享资源的锁,当state>0时,则说明有线程目前正在使用共享变量,其他线程必须加入同步队列进行等待。同步队列为FIFO的双向队列,竞争失败的线程会被添加至队尾。

// 同步队列的头部
private transient volatile Node head;
// 同步队列的尾部
private transient volatile Node tail;
// 同步状态
private volatile int state;

Node节点的定义:

//标识线程的状态
volatile int waitStatus;
//等待队列的前驱节点
volatile Node prev;
//等待队列的后继节点
volatile Node next;
//当前节点的线程
volatile Thread thread;
//条件队列的等待节点
Node nextWaiter;
//判断当前节点是否是共享节点
final boolean isShared() {
    
 return nextWaiter == SHARED;
}
  • 1 CANCELLED:该节点的线程可能由于超时或被中断而处于被取消(作废)状态,一旦处于这个状态,节点状态将一直 处于CANCELLED,因此应该从队列中移除
  • -1 SIGNAL:表示该节点处于等待唤醒状态,后继节点会被挂起,因此在当前节点释放锁或被取消之后必须唤醒其后继结点
  • -2 CONDITION:该节点的线程处于等待条件状态,不会被当作是同步队列上的节点,直到被唤醒(signal),设置其值为0,重新进入阻塞状态
  • 0:新加入的节点

在锁的获取时,并不一定只有一个线程才能持有这个锁,所以此时有了独占模式和共享模式的区别,通过nextWaiter来区分。
还有一个点是公平锁和非公平锁,它是由子类来实现的。在ReentrantLock中有FairSync和NonFairSync来实现。

下面以ReentrantLock为例,解释锁的获取和释放流程。

获取锁

获取锁的流程为Lock.lock -> Sync.lock -> AQS.acquire -> Sync.tryAcquire -> AQS.addWaiter ->AQS.acquireQueued,我们按照这个流程逐步分析。TODO 流程图

# Lock.lock -> Sync.lock

获取锁可以通过ReentrantLock中的lock、lockInterruptibly、tryLock,此三个方法的意义在ReentrantLock的文章中已经详细阐述。

public void lock() {
    
    sync.lock();
}
public void lockInterruptibly() throws InterruptedException {
    
    sync.acquireInterruptibly(1);
}
public boolean tryLock() {
    
    return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

可以看出内部都是通过sync来实现,抽象类Sync继承了AQS,并且Sync的实现类为FairSync和NonFairSync。因此调用根据构造方法实例化出的FairSync或着NonFairSync的lock方法:

// Fair
final void lock() {
    
    acquire(1);
}
// NonFair
final void lock() {
    
    // 以cas方式尝试将AQS中的state从0更新为1
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

公平锁和非公平锁的lock方法这里就有区别,非公共锁先通过CAS操作去竞争锁,然后再去执行AQS实现的acquire方法。
exclusiveOwnerThread属性是AQS从父类AbstractOwnableSynchronizer中继承的属性,用来保存当前占用锁的线程。

# AQS.acquire -> Sync.tryAcquire

继续跟进AQS

public final void acquire(int arg) {
    
    if (tryAcquire(arg){
    
        return;
    }
    if(acquireQueued(addWaiter(Node.EXCLUSIVE), arg))) {
    
        selfInterrupt();
    }
}

首先执行tryAcquire方法,由具体的子类实现,不同的子类有不同的实现方式,如果失败,表示该线程获取锁失败,就调用addWaiter方法,将当前线程加入到等待队列中,然后返回当前线程的node节点。将node节点传递给acquireQueued方法,如果node节点的前驱节点是头结点,就再次尝试获取到锁,如果获取锁成功(成功返回的是false不会执行selfInterrupt方法),就将该节点设置为头结点,如果获取失败,就将当前节点的线程挂起。

下面看非公平锁的tryAcquire实现:

// NonFair
protected final boolean tryAcquire(int acquires) {
    
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    
        // state为0,说明当前锁未被任何线程持有
        if (compareAndSetState(0, acquires)) {
    // CAS设置state,如果成功,则设置锁的拥有者为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果是重入得情况
    else if (current == getExclusiveOwnerThread()) {
    
        int nextc = c + acquires;
        setState(nextc);// 更改进入的次数
        return true;
    }
    return false;
}

这里公平锁和非公平锁的实现几乎相同,只是多了一个!hasQueuedPredecessors()判断条件,意思是当前同步队列中如果没有正在排队的线程,才会进行后续的步骤。

# addWaiter

如果执行到addWaiter,则说明前面的tryAcquire没有抢到锁,那么会将将节点加入到等待队列。这里需要注意前面提到独享锁和共享锁,
ReentrantLock属于独享锁,并且AQS通过Node节点也就是线程的封装来表示独享/共享,因此这里传入的Mode的参数为Node.EXCLUSIVE。

private Node addWaiter(Node mode) {
    
    // 将当前线程构造为等待节点
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
    // 如果尾节点部为空
        node.prev = pred;// 将当前节点添加至队列尾部
        if (compareAndSetTail(pred, node)) {
    
            pred.next = node;
            return node;
        }
    }
    // 如果尾节点为空,或者CAS操作失败,则通过死循环更新尾节点
    enq(node);
    return node;
}

enq方法没什么好说的,死循环+CAS,返回node的前驱节点

private Node enq(final Node node) {
    
    for (;;) {
    
        Node t = tail;
        if (t == null) {
     // 如果尾节点为空,那么初始化尾和头,头节点是一个空节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
     // 
            node.prev = t;
            if (compareAndSetTail(t, node)) {
    
                t.next = node;
                return t;
            }
        }
    }
}
# acquireQueued

在把node插入队列末尾后,它并不立即挂起该节点中线程,因为在插入它的过程中,前面的线程可能已经执行完成,
所以它会先进行自旋操作,尝试让该线程重新获取锁。代码如下:

final boolean acquireQueued(final Node node, int arg) {
    
    boolean failed = true;
    try {
    
        boolean interrupted = false;
        for (;;) {
    
            // 得到前驱节点
            final Node p = node.predecessor();
            // 如果前驱节点是head节点并且tryAcquire获取到锁
            if (p == head && tryAcquire(arg)) {
    
                setHead(node); // 设置Head为节点当前节点
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果当前节点前驱节点不是head或者CAS设置失败,去挂起线程
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() )
                interrupted = true;
        }
    } finally {
    
    	// 正常情况下failed = false,cancelAcquire的作用是删除节点,
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire()方法的作用是判断当前结点的前驱结点是否为SIGNAL状态,如果是则返回true。
如果为CANCELLED状态(值为1>0),即结束状态,则说明该前驱结点已没有用应该从同步队列移除,直到寻找到非CANCELLED状态的结点。倘若前驱结点的ws值不为CANCELLED,也不为SIGNAL(当从Condition的条件等待队列转移到同步队列时,结点状态为CONDITION因此需要转换为SIGNAL),那么将其转换为SIGNAL状态,以便在下轮循环中将其挂起。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    
    int ws = pred.waitStatus;// 前驱节点的状态
    if (ws == Node.SIGNAL)// 如果是等待唤醒状态,返回true,
        return true;
    if (ws > 0) {
     // >0 则为CNACLE状态,被取消了,需要将前驱节点移除
        do {
    
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
    // 
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    
        //将当前线程挂起
        LockSupport.park(this);
        //获取线程中断状态,interrupted()是判断当前中断状态,
        return Thread.interrupted();
}

parkAndCheckInterrupt()方法挂起当前线程,需要等待一个unpark()操作来唤醒它,调用interrupte方法可以中断,稳定后列表的状态为

  • 除了头节点,剩余节点都被阻塞,线程处于WAITING状态。
  • 除了尾节点,剩余节点都满足waitStatus==SIGNAL,表示释放后需要唤醒后继节点。

到此ReetrantLock内部间接通过AQS的FIFO的同步队列就完成了lock()操作。一张图总结lock的流程:
lock

释放锁

下面继续看unLock的流程:

public final boolean release(int arg) {
    
   if (tryRelease(arg)) {
    
       Node h = head;
       if (h != null && h.waitStatus != 0)
           unparkSuccessor(h);
       return true;
   }
   return false;
}

释放锁其实就两个步骤,1.释放锁,2.如果完全释放则唤醒等待的线程。先看释放锁:

protected final boolean tryRelease(int releases) {
    
    int c = getState() - releases;// 
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
    // 完全释放
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

完全释放表示ownerThread的所有重入操作均已结束,接着是唤醒后面的线程,注意这里并没有将head置为null,只是将ExclusiveOwnerThread和state初始化。

private void unparkSuccessor(Node node) {
           
    int ws = node.waitStatus;
    if (ws < 0) // 正常情况下为-1
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
    // 如果下一个节点为空或者被取消,继续从尾节点开始找离头结点最近的
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)// 状态<0 的节点
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);// 唤醒下一个节点
}

当最近可用的节点被唤醒后,会进入acquireQueued()函数的if (p == head && tryAcquire(arg))的判断,继续开始自旋。

Condition实现原理

在 https://blog.csdn.net/TheLudlows/article/details/76962006 中介绍了Condition的用法,类似于Object的wait和notify。

# await

Condition接口提供了await、signal方法,它的实现为AQS的内部类ConditionObject,在它的内部也有一个队列,称为等待队列,单向列表实现。AQS内部的队列叫做同步队列。同步队列主要用来保存阻塞的线程,而等待队列用来保存调用了await方法的线程。ConditionObject的成员变量如下:

 public class ConditionObject implements Condition, java.io.Serializable {
    
    //等待队列第一个等待结点
    private transient Node firstWaiter;
    //等待队列最后一个等待结点
    private transient Node lastWaiter;
}

等待队列的元素和同步队列中的元素都是Node类型。当一个线程调用了await()相关的方法,那么该线程将会释放锁,并构建一个Node节点封装当前线程的相关信息加入到等待队列中进行等待,直到被唤醒、中断、超时才从队列中移出。

等待队列中结点的状态只有两种即CANCELLED和CONDITION,前者表示线程已结束需要从等待队列中移除,后者表示条件结点等待被唤醒。每个Codition对象对于一个等待队列,也就是说AQS中只能存在一个同步队列,但可拥有多个等待队列。

下面分析await方法的逻辑:

public final void await() throws InterruptedException {
    
    if (Thread.interrupted())
        throw new InterruptedException();
    // 构建为Node节点,并加入队尾
    Node node = addConditionWaiter();
    // 释放当前线程锁即释放同步状态
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
    // 阻塞,直到收到信号或被中断
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 唤醒之后执行,同时判断线程是否被中断
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

添加到等待队列:

private Node addConditionWaiter() {
    
    Node t = lastWaiter;
    // lastWaiter初始化为null
    // 清除被唤醒的node
    if (t != null && t.waitStatus != Node.CONDITION) {
    
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 新建node,状态为 Node.CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)// 初始化队列
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

尽管此处没有任何线程安全的保护,但实际使用时不会出现任何线程安全问题——因为条件队列的使用要求我们在调用await或signal时持有与该条件队列唯一相关的锁。共享锁中没有实现Lock接口,因此没有newCondition方法。

final int fullyRelease(Node node) {
    
    boolean failed = true;
    try {
    
        int savedState = getState();
        if (release(savedState)) {
    // 上节讲过的release,注意参数
            failed = false;
            return savedState;// 返回状态
        } else {
    
            throw new IllegalMonitorStateException();
        }
    } finally {
    
        if (failed)// 正常情况不会进入此分支
            node.waitStatus = Node.CANCELLED;
    }
}
# signal
public final void signal() {
    
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

signal()方法做了两件事,一是判断当前线程是否持有独占锁,没有就抛出异常,从这点也可以看出只有独占模式先采用等待队列,而共享模式下是没有等待队列的,也就没法使用Condition。二是将等待队列的头节点从等待队列中删除,同时将它加入到同步队列,意思是次线程可以去竞争锁了。

private void doSignal(Node first) {
    
   do {
    
   	   firstWaiter = first.nextWaiter;// 移除首节点
       if (firstWaiter == null)
           lastWaiter = null;
       first.nextWaiter = null; // 将旧的首节点next属性置为null
       
   } while (!transferForSignal(first) && (first = firstWaiter) != null);
}     

transferForSignal 将 first节点移出等待队列,通过时修改状态,加入同步队列,根据在同步队列中的前驱节点的状态和来决定是否唤醒等待阻塞的线程。

final boolean transferForSignal(Node node) {
    
	// 设置节点状态为初始状态
   if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    Node p = enq(node);// 加入同步队列,得到前驱节点
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

其实这里不唤醒阻塞的线程也是可以的,因为此线程已经加入到同步队列中,同步队列中等待的线程是通过前驱节点的来唤醒的。但是这里为什么要多次一举?能够进入此分支说明前驱节点是CANCELLED状态,那么说明当前节点距离Head又进了一步,早些将此CANCELLED节点清除,因此将次线程唤醒,去竞争锁,同时删除无效的节点。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/TheLudlows/article/details/88696874

智能推荐

win10和linux双系统安装步骤(详细!)_怎么装双系统win10和linux-程序员宅基地

文章浏览阅读5k次,点赞11次,收藏42次。Windows10安装ubuntu双系统教程ubuntu分区方案_怎么装双系统win10和linux

从图的邻接表表示转换成邻接矩阵表示_typedef struct arcnode{int adjvex;-程序员宅基地

文章浏览阅读1.1k次。从图的邻接表表示转换成邻接矩阵表示typedef struct ArcNode{ int adjvex;//该弧指向的顶点的位置 struct ArcNode *next;//下一条弧的指针 int weight;//弧的权重} ArcNode;typedef struct{ VertexType data;//顶点信息 ArcNode *firstarc;} VNode,AdList[MAXSIZE];typedef struct{ int vexnum;//顶点数 int _typedef struct arcnode{int adjvex;

学好Python开发你一定会用到这30框架种(1)-程序员宅基地

文章浏览阅读635次,点赞18次,收藏26次。14、fabric是基于Python实现的SSH命令行工具,简化了SSH的应用程序部署及系统管理任务,它提供了系统基础的操作组件,可以实现本地或远程shell命令,包括命令执行,文件上传,下载及完整执行日志输出等功能。7、pycurl 是一个用C语言写的libcurl Python实现,功能强大,支持的协议有:FTP,HTTP,HTTPS,TELNET等,可以理解为Linux下curl命令功能的Python封装。Scipy是Python的科学计算库,对Numpy的功能进行了扩充,同时也有部分功能是重合的。

Ubuntu中anaconda图形化界面的使用_ubuntu安装anaconda后怎么运行图形化节目-程序员宅基地

文章浏览阅读4.9k次,点赞3次,收藏25次。看网上教程,跟着配置,然后装完anaconda之后,大家都继续安装pycharm,然后傻吊的以为Ubuntu下的anaconda是没有图形化界面的,只有win下面 装完anaconda之后,可以直接在jupyter下面写代码今天突然发现Ubuntu下anaconda也是有图像化界面的$ conda --version /* 查看版本 */$ conda create --name my_en..._ubuntu安装anaconda后怎么运行图形化节目

深度学习RNN-程序员宅基地

文章浏览阅读771次,点赞22次,收藏11次。只记得我在一个昏暗潮湿的地方喵喵地哭泣着。——夏目漱石《我是猫》到目前为止,我们看到的神经网络都是前馈型神经网络。(feedforward)是指网络的传播方向是单向的。具体地说,先将输入信号传给下一层(隐藏层),接收到信号的层也同样传给下一层,然后再传给下一层……像这样,信号仅在一个方向上传播。虽然前馈网络结构简单、易于理解,而且可以应用于许多任务中。不过,这种网络存在一个大问题,就是不能很好地处理时间序列数据(以下简称为“时序数据”)。

Rsync数据复制——本地数据传输_rsync本地拷贝-程序员宅基地

文章浏览阅读2.5k次。1本地数据传输类似cp的复制,实现文件,目录的增量复制。#语法模式rsync命令 参数 src源文件/目录 dest目标文件/目录1.本地文件复制# 复制hosts文件[root@chaogelinux ~]# rsync /etc/hosts /tmp/2.复制目录内容-r, --recursive 对子目录以递归模式处理# 复制/data下所有内容到/tmp[root@lb01 ~]# rsync -r /data/ /tmp/# 复制/data整个文件夹到/tmp_rsync本地拷贝

随便推点

AHAS arms调用链查询中,接口实际耗时和监听耗时差异在什么地方?_arms调用链路耗时看不懂-程序员宅基地

文章浏览阅读109次。监听耗时仅代表了 AHAS ARMS Listener(即调用链收集器)在收集并处理当前请求的调用信息时所需要的时间。它不包括网络传输、处理请求、执行操作、处理响应等其他阶段的时间,仅代表 Listener 所需的时间。通常这个时间会很短,只有几毫秒甚至更短。接口实际耗时包括了整个请求/响应周期中的所有时间,包括网络传输、处理请求、执行操作、处理响应等阶段消耗的时间。它代表了该请求在客户端发起到最终服务器响应完成所花费的总时间。_arms调用链路耗时看不懂

常见的Web应用的漏洞总结(原理、危害、防御)_web 应用中常见的漏洞及其危害有哪些-程序员宅基地

文章浏览阅读2.5k次。一、 SQL注入1.原理:SQL注入就是把SQL命令插入到Web表单然后提交到所在页面请求(查询字符串),从而达到欺骗服务器执行恶意的SQL命令。它是利用现在已有的应用程序,将SQL语句插入到数据库中执行,执行一些并非按照设计者意图的SQL语句。2.原因:根据相关技术原理,SQL注入可以分为平台层注入和代码层注入。前者由不安全的数据库配置或数据库平台的漏洞所致;后者主要是由于程序员对输入..._web 应用中常见的漏洞及其危害有哪些

离散数学——命题逻辑_离散数学命题逻辑-程序员宅基地

离散数学中的命题逻辑,包括命题的表示和联结词的运用,推理理论和常用的证明方法,如真值表法和直接证明法。还介绍了附加前提证明法或CP规则。

Spring Expression Language(SpEL)-程序员宅基地

文章浏览阅读8.6k次。Spring Expression Language(SpEL)spring的一种表达式。用来动态的获取,值、对象等。 样式: #{ …} 使用既定的规则放置在花括号中。式中的规则得以运行,可以反馈结果。理论上可以返回任何类型。 说说两种方式去设置SpELAnnotation注解。@Value()方便快捷。 你可以在里面方式任何符合SpEL规范的语句,并把它的返回值注..._spring expression language

ansible最大并发_通过这7种方法来最大程度地提高Ansible技能-程序员宅基地

文章浏览阅读1.7k次。ansible最大并发 Ansible是一种功能强大的无代理(但易于使用且轻巧)的自动化工具,自2012年推出以来一直稳步流行。这种流行在一定程度上是由于其简单性。 默认情况下,Ansible的最基本依赖项(Python和SSH)几乎在所有地方都可用,这使得Ansible可以轻松用于各种系统:服务器,工作站,Raspberry Pi,工业控制器,Linux容器,网络设备等。 Ansible可..._ansible 提升 高并发

Barcode Reader在45毫秒内实现条码识别-程序员宅基地

文章浏览阅读479次。应我的客户要求,需要找到一款可以在极短时间识别二维条码的软件以应对他们现在极其迅速的货品入库需求。正好听说过一款Dynamsoft Barcode Reader的开发包,根据其官网介绍最新版对条码检测速度比以前的版本快2倍以上。根据对Dynamsoft Barcode Reader8.8SDK包拆解,其中含了JavaScript Package /.NET Package /C/C++ Package /Python Package /Java Package /iOS Package /A..._barcode reader