重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

Java 同步锁ReentrantLock与抽象同步队列AQS

AbstractQueuedSynchronizer 抽象同步队列,它是个模板类提供了许多以锁相关的操作,常说的AQS指的就是它。AQS继承了AbstractOwnableSynchronizer类,AOS用于保存线程对象,保存什么线程对象呢?保存锁被独占的线程对象

创新互联公司2013年成立,是专业互联网技术服务公司,拥有项目成都网站建设、网站制作网站策划,项目实施与项目整合能力。我们以让每一个梦想脱颖而出为使命,1280元扬中做网站,已为上家服务,为扬中各地企业和个人服务,联系电话:13518219792

抽象同步队列AQS除了实现序列化标记接口,并没有实现任何的同步接口,该类提供了许多同步状态获取和释放的方法给自定义同步器使用,如ReentrantLock的内部类Sync。抽象同步队列支持独占式或共享式的的获取同步状态,方便实现不同类型的自定义同步器。一般方法名带有Shared的为共享式,比如,尝试以共享式的获取锁的方法inttryAcquireShared(int),而独占式获取锁方法为booleantryAcquire(int)

AQS是抽象同步队列,其重点就是同步队列如何操作同步队列

同步队列

双向同步队列,采用尾插法新增节点,从头部的下一个节点获取操作节点,节点自旋获取同步锁,实现FIFO(先进先出)原则。

理解节点中的属性值作用

  • prev:前驱节点;即当前节点的前一个节点,之所以叫前驱节点,是因为前一个节点在使用完锁之后会解除后一个节点的阻塞状态;

  • next:后继节点;即当前节点的后一个节点,之所以叫后继节点,是因为“后继有人”了,表示有“下一代”节点承接这个独有的锁????;

  • nextWaiter:表示指向下一个Node.CONDITION状态的节点(本文不讲述Condition队列,在此可以忽略它);

  • thread:节点对象中保存的线程对象,节点都是配角,线程才是主角;

  • waitStatus:当前节点在队列中的等待状态;waitStatus = CANCELLED = 1,表示线程已经取消(该状态下的节点为作废节点,将从队列中断开);

    • waitStatus = SIGNAL = -1,表示线程处于请求释放的状态,后继线程需要阻塞等待(该状态下的节点线程处于阻塞等待状态或获取锁未释放状态);

    • waitStatus = CONDITION = -2,表示线程正在等待

    • waitStatus = PROPAGATE = -3,在共享情况下,表示下一个被请求的shared应该无条件传播;

    • waitStatus = 0,表示节点初始化时的默认值(int类型成员变量的默认值)。

注意1:节点对象中的prev、next和nextWaiter都是一个完整的Node节点对象,也就是说每个节点都保存了前后节点的对象,如果没有则为null。

注意2:head节点是个虚节点(prev=null、thread=null),但head本身是一个实际存在的节点对象,起到标记队列的开头;尾节点tail节点的next=null,等待新节点插入。

头节点head为什么是虚节点

重点必须明确知道头节点head为什么是虚节点!!!这很重要。

原因是,当前节点在获取到锁????之后,它这个线程对象就会被保存到AOS(AbstractOwnableSynchronizer)中的exclusiveOwnerThread 对象,(一开头就提到过了,在这里再强调一次),所以在队列中,头节点是无需存储Thread对象的了。那为什么设计成这样呢?因为存在临界情况就是只有一个线程获取锁资源时,无需初始化生成同步队列,直接获取同步锁即可。只有存在锁未释放同时又进来了新的线程时,才会去初始化同步队列,并为未释放锁的线程占个位置,这个位置就是头节点head,表明前面还有个线程在使用资源。

// 初始化队列的方法private Node enq(final Node node) {
    // 死循环
    for (;;) {
        Node t = tail;
        // 没头没尾时
        if (t == null) { // Must initialize
            // 生成头节点(尾节点)
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 有头有尾后,才把需要等待的线程节点加入队列中
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

在独占线程释放锁时,判断head是否为null,即可知道同步队列是否存在,如果同步队列不存在,那么无需执行尝试唤醒后继节点那些操作了。

在同个时间节点中,单个线程只需要操作AOS的Thread对象和AQS的state状态即可实现同步锁和锁的可重入性。

线程加入同步队列的过程

在锁被占用时,获取锁失败后,当前线程被封装成Node节点并加入到队列尾部。

tryAcquire(arg)返回false时,

执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))操作,

addWaiter(Node.EXCLUSIVE) 为新增节点到同步队列,队列未初始化时会执行enq完成初始化后再新增节点到队列。

因此,新增节点分为首次生成同步队列同时新增节点和在原有同步队列中插入新增节点。

首次生成同步队列新增节点:通过enq(final Node node)方法,先初始化头节点(虚节点),再通过原子操作compareAndSetTail方法从队列尾部插入新节点。

在原有同步队列中新增节点:通过原子操作compareAndSetTail方法从队列尾部插入新节点。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

private Node addWaiter(Node mode) {
    // 使用当前线程生成新节点
    Node node = new Node(Thread.currentThread(), mode);
    // 获取同步器的尾节点
    Node pred = tail;
    if (pred != null) {
        // 第一步:新节点的prev节点指向尾部节点(pred=tail)
        node.prev = pred;
        // 第二部:CAS比较尾节点,相等就让tail=node
        if (compareAndSetTail(pred, node)) {
            // 第三步:pred=旧的tail,即旧的尾节点的next节点指向新节点
            pred.next = node;
            return node;
        }
    }
    // 当tail=null 时执行,逻辑相类似的;enq初始化的head节点为虚节点
    enq(node);
    return node;
}

// 将节点插入队列,必要时初始化(即tail=null时,也即是同步队列没有节点时初始化)。private Node enq(final Node node) {
    // 死循环
    for (;;) {
        // 第一次进来tail=null,第二次进来tail=head=new Node()
        Node t = tail;
        if (t == null) {
            // 创建一个空节点作为head节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else { // 下面就是正常的尾插法新增节点
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

配合源码和动图理解:新增队列节点过程(三部曲)

使用尾插法新增同步队列节点

  • 第一步:新增节点的prev节点指向尾节点tail;

  • 第二步:尾节点tail 和新节点做CAS操作,即compareAndSetTail(pred,node) ,即同步器的tail节点指向新节点;

  • 第三步:旧的尾节点的next节点指向新节点(此时的新节点=尾节点tail)

最终结果图

新增节点加入队列之后,在同步队列中线程怎么等待?线程怎么获取锁呢?

节点线程获取锁

看代码前必须明确知道哪个节点是要获取锁的。头节点为虚节点,标记队列的开头,真正要获取锁的是头节点的后继节点。

获取锁过程

锁在释放时调用的关键流程:

ReentrantLock#lock() -> Sync#lock() -> AQS#acquire(1) -> NonfairSync#tryAcquire(1) 【或FairSync#tryAcquire(1)】-> AQS#addWaiter(node) -> AQS#acquireQueued(node,1) -> AQS#selfInterrupt()

关键代码????

// java.util.concurrent.locks.AbstractQueuedSynchronizer/**
 * 以独占不可中断模式获取已在队列中的线程。
 */final boolean acquireQueued(final Node node, int arg) {
    // 异常标志状态
    boolean failed = true;
    try {
        // 是否发生过中断的标志
        boolean interrupted = false;
        // 自旋锁>>>死循环:每个node都独立执行着这个死循环,直至线程被阻塞
        for (;;) {
            // 获取前驱节点(当前节点的前一个节点)
            final Node p = node.predecessor();
            // 当前节点的前驱节点是否等于头节点(虚节点),等于就会执行尝试获取锁 tryAcquire(arg)
            if (p == head && tryAcquire(arg)) {
                setHead(node); // 设置当前节点为头节点,在获取锁成功时,thread对象已经保存到AQS中的exclusiveOwnerThread了
                p.next = null; // 前驱节点的next指向null,断开前驱节点(旧的头节点)
                failed = false; // 只要当前节点node正常获取到锁,就不会执行finally的cancelAcquire(node)
                return interrupted;
            }
            // 前驱节点的waitStatus=-1时,当前node会被阻塞,防止无限循环浪费资源
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        /* 正常情况:只有return时,才会执行finally代码,而只要return,failed都等于false,
         * 所以,failed 是为了避免节点发生异常时,node没有被释放。
         * 比如:node.predecessor() 可能产生空指针异常。
         */
        if (failed)
            cancelAcquire(node);
    }
}


// java.util.concurrent.locks.AbstractQueuedSynchronizerprivate void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

为什么在尝试获取锁前要判断前驱节点是否为头节点?

因为除了阻塞被释放会让死循环继续执行的情况外,还有中断指令也会使线程从阻塞状态中被释放,所以存在任意节点提前重新执行“死循环”尝试获取锁的情况,如果不判断获取锁节点的前驱节点是否为头节点,那就会出现提前尝试获取锁,从而破坏了同步队列的先进先出(FIFO)原则,说白了,就是被插队了。

当前节点不是头节点时,执行以下代码

// java.util.concurrent.locks.AbstractQueuedSynchronizer

/**
 * 检查和更新获取锁失败的节点的状态。如果线程需要等待,则返回true,使其执行阻塞操作。
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取前驱节点的等待状态
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 因为前驱节点处于请求释放的状态,所以当前节点需要阻塞等待,会返回true,从而执行后续方法进入阻塞状态
        return true;
    if (ws > 0) {
        // 前驱节点被标上取消标志了,需要跳过前驱节点并不断重试
        do {
            // 循环向前查找取消节点,把取消节点从队列中移除
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus必须为0或等于PROPAGATE=-3
         * 表示需要设置前驱节点等待状态为SIGNAL,
         * 将会在外层循环再次尝试获取锁,如果再次获取锁失败,那么就会阻塞当前线程
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

shouldParkAfterFailedAcquire(p, node) 返回true时,将会执行阻塞操作parkAndCheckInterrupt()),其通过线程阻塞工具类方法LockSupport.park(this) 来阻塞当前线程。

private final boolean parkAndCheckInterrupt() {
    // 阻塞当前线程线程调度
    LockSupport.park(this);
    // 清除当前线程的中断状态,并返回上一次的中断状态
    return Thread.interrupted();
}

注意:LockSupport.park(this) 阻塞后,需要唤醒阻塞才会执行后续操作,可通过解除阻塞LockSupport.unpark(thread) 或 中断thread.interrupt() 来唤醒阻塞。

只有shouldParkAfterFailedAcquireparkAndCheckInterrupt都返回true时,才会执行interrupted = true,即只有是中断导致阻塞结束时,才返回true,此时 selfInterrupt()重新执行一次中断操作。

// java.util.concurrent.locks.AbstractQueuedSynchronizer
final boolean acquireQueued(final Node node, int arg) {
    ......
    if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 只有是中断导致阻塞结束时,才返回true
                    interrupted = true;
    ......
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 中断当前线程
        selfInterrupt();
}

static void selfInterrupt() {
    // 中断当前线程
    Thread.currentThread().interrupt();
}

为什么需要再一次执行中断呢?

因为存在中断thread.interrupt() 唤醒自旋锁阻塞的情况,而Thread.interrupted() 获取中断状态并清除当前线程的中断状态,所以需要重新执行一次中断操作selfInterrupt(),将中断标志置为true。

这种获取锁的方式是非中断锁,就是无法通过中断的方式结束锁的获取,区别于中断锁,所以该方式在获取锁的过程中,不会处理中断,只是记录中断状态,Thread.interrupted() 获取中断状态后清除中断状态,所以需要重新设置中断标志为true。

如果你想要处理中断的情况,那我们可以在acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 返回true 的时候去处理。比如:抛出中断异常。

如果你需要在线程发生中断时结束获取锁,那么可以考虑使用lockInterruptibly()来获取锁。

两种方式获取锁的区别

lock()方式获取锁:自旋锁只会在正常获取到锁或发生异常时结束自旋锁(死循环)。

void lockInterruptibly() 方式获取锁:会在发生中断的情况下,抛出中断异常throw new InterruptedException(); 从而跳出自旋锁(死循环),而调用lockInterruptibly() 的方法需要捕获中断异常,做一些异常处理。

获取锁的lock()lockInterruptibly()的主要区别是:中断是否会结束锁的获取

看看源码怎么实现中断结束锁的获取

private void doAcquireInterruptibly(int arg)
 throws InterruptedException {
 final Node node = addWaiter(Node.EXCLUSIVE);
 boolean failed = true;
 try {
  for (;;) {
   final Node p = node.predecessor();
   if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return;
   }
   if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    // 只有是中断导致阻塞结束时,才抛出中断异常
    throw new InterruptedException();
  }
 } finally {
  if (failed)
   cancelAcquire(node);
 }
}

中断异常抛出后,将会执行finally 代码块,取消正在进行尝试获取锁的节点。

到此为止,lock()获取锁的概要过程

先尝试获取锁、失败就将线程封装成节点并加入到队列尾部、进入自旋锁(第一次尝试获取锁失败,将前驱节点的waitStatus改为-1;第二次尝试获取锁失败,因为前驱节点的waitStatus=-1,所以执行阻塞当前线程操作避免死循环耗费资源)、等待头节点线程释放同步状态之后,将发起解除阻塞指令或阻塞线程被中断后,后继节点再次尝试获取锁。

取消异常节点

前面提到,在AQS#shouldParkAfterFailedAcquire(pred, node) 中谈到,当节点waitStatus>0 时,也即是对带有取消状态的节点进行移除。那么节点在什么时候被改为CANCELLED 的呢?

在AQS#acquireQueued(node, arg)中,正常获取到锁时,failed都等于false,只有当发生异常时,failed 才等于true,从而执行到AQS#cancelAcquire(node)。也就是说cancelAcquire() 是用于处理获取锁过程中,对发生异常节点的进行移除。

final boolean acquireQueued(final Node node, int arg) {
    // 是否发生异常的标志
    boolean failed = true;
    try {
        ......
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                ......
                failed = false; // 只要当前节点node正常获取到锁,就不会执行finally的cancelAcquire(node)
                return interrupted;
            }
            ......
        }
    } finally {
        /* 正常情况:只有return时,才会执行finally代码,而只要return,failed都等于false,
         * 所以,failed 是为了避免节点发生异常时,node没有被移除。
         */
        if (failed)
            cancelAcquire(node);
    }
}

详细看下AQS#cancelAcquire(node) 是怎么处理的


// java.util.concurrent.locks.AbstractQueuedSynchronizer
/**
 * 取消正在进行尝试获取锁的节点
 */
private void cancelAcquire(Node node) {
    // 如果节点不存在,则忽略
    if (node == null)
        return;
    // 使当前节点变成虚节点
    node.thread = null;

    // 跳过带有“取消状态”的前驱节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext节点
    Node predNext = pred.next;

    // 修改当前节点的waitStatus为CANCELLED=1
    node.waitStatus = Node.CANCELLED;

    // 如果当前节点为尾节点tail,则只需要移除自己即可。
    if (node == tail && compareAndSetTail(node, pred)) {
        // pred节点变成了尾节点tail,所以 pred.next=null
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        /* 当前节点的前驱节点不为头节点,则true;
         * 前驱节点的ws状态为SIGNAL,则true;ws不为SIGNAL,但ws<=0时(即不是取消状态),则CAS操作改为SIGNAL,改成功则为true;
         * 前驱节点不是虚接点,则true
         */ 
        if (pred != head
            && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
            && pred.thread != null) {
            /* 如果上述都满足,则将“当前节点的前驱节点的后继节点”指向“当前节点的后继节点”
             * 说白了,节点的next指向就是由 A->B->C 改为 A->C;B为当前节点。
             * 关于节点的prev指向就没有变 A<-B<-C 还是 A<-B<-C,也就是说B节点还没真正断开。
             * 节点的prev指向的修改需要判断ws状态是否为CANCELLED后,才做修改。
             */
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // 唤醒当前节点的后继节点的阻塞线程
            unparkSuccessor(node);
        }
        // 当前节点的后继节点指向自己
        node.next = node;
    }
}

以上都是对next节点的指向做修改,关于节点的prev指向的修改需要判断ws状态是否为CANCELLED后,才做修改,循环向前查找取消节点,把取消节点从队列中剔除。其对应源码如下

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取前驱节点的等待状态
    int ws = pred.waitStatus;
        ......
    if (ws > 0) {
        // 前驱节点被标上取消标志了,需要跳过前驱节点并不断重试
        do {
            // 循环向前查找取消节点,把取消节点从队列中剔除
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        ......
    }
    return false;
}

在AQS#unparkSuccessor() 通过线程阻塞工具类方法LockSupport.unpark(thread) 来唤醒后继节点的阻塞线程。AQS#unparkSuccessor() 除了取消异常节点时用到外,还在锁的释放时调用,实现功能都是--唤醒当前节点的后继节点的阻塞线程,后继节点就会继续执行自旋锁来尝试获取锁。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

/**
 * 唤醒当前节点的后继节点的阻塞线程(如果存在)
 */
private void unparkSuccessor(Node node) {
    /*
     * 如果waitStatus<0,则将waitStatus 置为默认值0
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * 但如果为null 或为取消状态,则从tail向前遍历以查找到实际未取消的后继节点。
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 对后继节点的线程释放阻塞
    if (s != null)
        LockSupport.unpark(s.thread);
}

节点线程释放锁

获取锁搞懂后,释放锁就是很简单了

处理流程

锁在释放时调用的关键流程:ReentrantLock#unlock() -> Sync#release(1) -> AQS#tryRelease(1) -> LockSupport#unparkSuccessor(head)

unparkSuccessor(head)方法在前面已经讲述过不再赘述,前三个方法源码如下????

// java.util.concurrent.locks.ReentrantLock
public void unlock() {
    // ReentrantLock API 交由同步队列模板方法实现
    sync.release(1);
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final boolean release(int arg) {
    // 尝试释放锁,成功则唤醒后继节点
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 唤醒当前节点的后继节点的阻塞线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}


// java.util.concurrent.locks.ReentrantLock.Sync
protected final boolean tryRelease(int releases) {
    // state:每释放1次锁就会-1(相反:重入性,每获取1次锁就会+1)
    int c = getState() - releases;
    // 当前线程是否是独占锁线程,不是就抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 当state=0时,说明获取锁的次数已经释放完,可以解除独占锁线程
    if (c == 0) {
        // 锁释放成功
        free = true;
        // 独占锁线程置为null
        setExclusiveOwnerThread(null);
    }
    // 记录每次state的变化
    setState(c);
    return free;
}

最后附上以 ReentrantLock 的lock()为例,里面几乎画出了获取锁的所有代码的执行过程

Java 可重入锁的那些事(一)

Java中的线程安全与线程同步

Java线程状态(生命周期)--一篇入魂

自己编写平滑加权轮询算法,实现反向代理集群服务的平滑分配

Java实现平滑加权轮询算法--降权和提权

Java实现负载均衡算法--轮询和加权轮询

Java全栈学习路线、学习资源和面试题一条龙

更多优质文章,请关注WX公众号:Java全栈布道师


网站标题:Java 同步锁ReentrantLock与抽象同步队列AQS
文章位置:http://cqcxhl.com/article/dsopcss.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP