• 作者:老汪软件技巧
  • 发表时间:2024-09-14 17:03
  • 浏览量:

前言

先讲讲几个概念,并发,同步异步(先不涉及IO阻塞),后面AQS源码是JDK11

并发:在操作系统中,同个处理器上有多个进程(线程)同时运行即并发,并发可分为同步和互斥

互斥和同步

互斥:某一资源同时只允许一个访问者对其进行访问,具有性和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的

同步:互斥的基础上,通过其它机制实现访问者对资源的有序访问。所以AQS也叫抽象队列同步器,这里实现有序访问的机制就是Queued,其实也可以不顺序执行。

同步:分布在不同进程之间的若干代码片段(如具体某个方法A),它们的运行必须严格按照规定的某种先后次序来运行(在同步代码块上就不是并发的了,那单个线程那直接就是同步了)。能让线程或者进程保持同步的实现叫同步器,通俗就是锁(但现实中锁并不能保证顺序只能保证互斥,但大致可以这样说)

异步:异步和同步是相对的,异步就是彼此独立,不需要等待其他线程处理完了再进行,可以同时并发执行,更无顺序。所以这里也可以看出讨论异步时就是涉及多个进程(线程)

在接口开发中,比如接口service A方法里调用了B方法。但是B方法太耗时了而且不依赖B方法的返回值,所以新开了一个线程处理B方法,A立马返回,这里我们也叫异步接口调用,因为两个线程确实没有互斥和顺序关系。

异步非阻塞IO其实最开始优化版本也是这样做的,最开始处理IO请求的线程会阻塞在accept处(等待IO数据),第一版优化就是new新的线程去等待accept,所以总的来说异步就是不用等待。

CAS,CLH,AQS(AbstractQueuedSynchronizer)和ReentrantLock

先给出结论ReentrantLock基于AQS,而AQS是Java中对CLH的实现,CLH是依赖CAS+自旋+双向链表的

CAS(Compare-and-Swap)机制是一种原子操作,用于实现线程安全和无锁(无Synchronized)并发控制。它通过比较和替换的方式,确保只有一个线程能够成功地更新共享变量的值(并返回true和false),加上if判断其实这里已经可以实现了锁的功能了,再实现等地线程顺序问题就完美了

这里谈CAS的原子性体现在:1、取原值 2、和期望值比较 3、赋期望值 分了三步进行是不能保证原子性的。CAS本质还是Lock,但是通过硬件层次去保证原子性,比synchronized在jvm层次通过一个监听者作为锁来保证原子性更快。

另外在32位系统中,64位数据的简单的读写操作都是不具备原子性的。所以可以使用Java中的CAS来操作或者volatile修饰变量。但在64位系统已经不存在这个问题了,所以我们可以认为单个变量的非复合操作的读写都是具有原子性的。(i++这种典型的复合操作不具有原子性)

volatile变量的特性

1.保证可见性,可以保证单个读写操作的原子性不保证复合操作的原子性 (在64位系统中可直接认为不具备原子性)

a.当写一个volatile变量时,JMM会把该线程本地内存中的变量强制刷新到主内存中去

b.这个写操作会导致其他线程中的volatile变量缓存无效

2.禁止指令重排

CLH

CLH 队列锁是由 Craig, Landin, Hagersten 三位大佬提出的,因此被称为 CLH 队列锁,它是一种基于链表的CAS以及自旋锁等待队列。它由一系列节点组成,每个节点代表一个等待锁的线程。这些节点按照FIFO(先进先出) 的原则组织在一起,形成一个双向队列

这里要注意:head,tail是同步器两个变量而已不是一个数据节点,且第一个节点是空节点。为什么这样设计和细节后面结合Java中的实现讨论。

同时这里可以看到CLH是FIFO的所以他本身就是公平的,Java中不公平锁只体现在新来的节点会先尝试tryAcquire抢占同步器的时候。

CLH队列的优势

公平性:CLH队列保证了线程按照先来先服务的顺序获取锁,避免了线程饥饿的问题。这对于需要保证公平性的场景非常重要高效性:由于CLH队列是基于链表结构实现的,每个线程只需要关注其前驱节点的状态,而不需要关注整个队列的情况。这降低了线程之间的通信开销,提高了并发性能。[可扩展性]:CLH队列的设计具有良好的可扩展性。它可以通过增加或减少节点来动态地调整队列的长度,适应不同并发场景的需求。

CLH队列的优势

CPU资源消耗:在高并发环境下,大量的线程自旋[等待可能]会消耗大量的CPU资源,导致系统性能下降。[自旋等待]:线程在获取锁之前会进行自旋等待,如果锁被长时间占用,线程会一直空转,造成CPU的浪费。

Java AQS源码实现

并发同步异步_同步阻塞异步阻塞其他阻塞_

AQS是基于CLH实现的同步器,同时做了很多扩展点。用来实现不同类型的锁,比如公平锁、非公平锁、信号量(可以理解为有多把钥匙的锁)等等。其中AQS主要代码如下,我们暂时只讨论公平锁和非公平锁情形:

Node节点类

static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;
    //当前线程已经被取消
    static final int CANCELLED = 1;
    //等待解锁
    static final int SIGNAL = -1;
    //等待条件
    static final int CONDITION = -2;
    //CountDownLatch等工具中使用到,暂时用不到
    static final int PROPAGATE = -3;
    //节点中线程状态,默认为0
    volatile int waitStatus;
    //前驱节点
    volatile Node prev;
    //后驱节点
    volatile Node next;
    //当前节点对应线程
    volatile Thread thread;
    //暂时用不到
    Node nextWaiter;
    final Node predecessor() {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    final boolean compareAndSetWaitStatus(int expect, int update) {
        //大写的可以看做就是对应小写waitStatus值CAS操作
        return WAITSTATUS.compareAndSet(this, expect, update);
    }
    final boolean compareAndSetNext(Node expect, Node update) {
        return NEXT.compareAndSet(this, expect, update);
    }
    final void setPrevRelaxed(Node p) {
        PREV.set(this, p);
    }
}

AQS核心代码如下

public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
    // 头节点
    private transient volatile Node head;
    // 尾结点
    private transient volatile Node tail;
    // 同步状态,抢占或重入后会+1,释放锁会-1,更新state成功代表抢占锁成功。叫同步器或者叫锁吧(连同钥匙一起)好理解
    private volatile int state;
    //这里并没有使用CAS进行操作。主要用在锁重入的时候,因为只有一个线程
    protected final void setState(int newState) {
        state = newState;
    }
    //CAS更新tail
    private final boolean compareAndSetTail(Node expect, Node update) {
        return TAIL.compareAndSet(this, expect, update);
    }
    
    //CAS更新state!!!这个方法给具体锁实现调用!!!比如ReentrantLock的tryAcquire,state CAS更新成功就代表抢占到了锁
    protected final boolean compareAndSetState(int expect, int update) {
        return STATE.compareAndSet(this, expect, update);
    }
    //更新head头节点,空白节点,一个线程抢占成功后会将其前驱节点设置为头节点
    //所以这里当一个线程运行完成后释放时并不会移动头节点而是会唤醒下一个节点,下一个节点抢占成功后让他去移动头节点
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    //获取锁方法
    public final void acquire(int arg) {
        //先尝试获取,成功该刚发就运行完成不会阻塞后续代码运行
        //失败就会将当前线程包装成Node添加到队尾同时再尝试抢占同步器
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //如果线程状态是中断这里会再次调用中断方法,之前的中断被重置了
            selfInterrupt();
    }
    //具体如何抢占逻辑交给具体锁类型实现
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    //自旋CAS抢占同步器,抢占失败判断是否需要Park最后返回线程中断状态
    final boolean acquireQueued(final Node node, int arg) {
        boolean interrupted = false;
        try {
            //自旋抢占同步器state
            for (; ; ) {
                //p当前node的前驱节点
                final Node p = node.predecessor();
                //如果p是头节点则再尝试抢占一下
                if (p == head && tryAcquire(arg)) {
                    //成功将当前节点设置为头节点,清空头节点内容,并返回中断状态
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                //判断是否需要Park在抢占失败后
                //其实shouldParkAfterFailedAcquire逻辑是由当前节点的前驱节点是否是头节点决定的,只要前驱节点不是头节点就Park
                if (shouldParkAfterFailedAcquire(p, node))
                    //更新中断状态,在Park时线程可能被中断,中断了继续抢占直到成功返回中断状态
                    interrupted |= parkAndCheckInterrupt();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            if (interrupted)
                selfInterrupt();
            throw t;
        }
    }
    //在抢占同步器state失败后,判断是否需要Park
    //waitStatus默认都是0可想这个函数会走两次,第一次赋前驱节点为Node.SIGNAL并返回false,第二次进来就返回true了
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //获取前置节点等待状态如果是Node.SIGNAL等待信号,自己就可以安心Park了
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            //大于0的就只有取消状态1,跳过所有取消状态的
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        return false;
    }
    private final boolean parkAndCheckInterrupt() {
        //Park让线程休眠,避免一直空跑浪费CPU,被唤醒后会返回当前线程是否中断并清除中断状态
        //Park遇到中断不会抛出Interrupt异常,而是解除Park继续运行后续代码!!!这里和Synchronized不同Synchronized等待过程不能响应中断!!!
        //Thread.interrupted()为啥要这句?因为需要知道是正常运行结束还是中断结束,它返回是否中断,同时清除中断状态
        LockSupport.park(this);
        return Thread.interrupted();
    }
    //将线程信息封装成Node连接到队尾,THREAD.set(this, Thread.currentThread());
    private Node addWaiter(Node mode) {
        Node node = new Node(mode);
        for (; ; ) {
            //old尾结点
            Node oldTail = tail;
            if (oldTail != null) {
                //设置新node的头节点为尾结点,这里不需要CAS只针对当前线程
                node.setPrevRelaxed(oldTail);
                //CAS抢占将尾结点设置为新node,
                if (compareAndSetTail(oldTail, node)) {
                    //成功的node可以更新old尾结点的next为自己,走到这里已经成功将当前节点连接到队尾了
                    oldTail.next = node;
                    return node;
                }
                //失败的自旋
            } else {
                //当前尾结点为空初始化队列
                initializeSyncQueue();
            }
        }
    }
    //初始化队列,第一个节点设置为空Node
    private final void initializeSyncQueue() {
        Node h;
        if (HEAD.compareAndSet(this, null, (h = new Node())))
            tail = h;
    }
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;
        node.thread = null;
        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        Node predNext = pred.next;
        node.waitStatus = Node.CANCELLED;
        if (node == tail && compareAndSetTail(node, pred)) {
            pred.compareAndSetNext(predNext, null);
        } else {
            int ws;
            if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                            (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    pred.compareAndSetNext(predNext, next);
            } else {
                unparkSuccessor(node);
            }
            node.next = node; // help GC
        }
    }
    static void selfInterrupt() {
        //再次将线程中断,因为之前清除了中断状态,这里还是设置回去可以将具体中断逻辑交给使用者处理
        Thread.currentThread().interrupt();
    }
//    ================================================== 释放逻辑
    //释放锁方法
    public final boolean release(int arg) {
        //尝试释放,其实这块都没并发了
        if (tryRelease(arg)) {
            Node h = head;
            //唤醒下个节点,这里为啥需要waitStatus != 0?
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    //具体如何释放逻辑交给具体锁类型实现
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    //唤醒head节点后面不为取消状态的节点,node为head指向节点
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            //这一步将node的waitStatus设置为了0但并没有判断是否成功
            node.compareAndSetWaitStatus(ws, 0);
        //取头节点的next节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        //找到第一个状态不为取消状态的节点唤醒
        if (s != null)
            LockSupport.unpark(s.thread);
    }
}

ReentrantLock中公平锁实现


abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;
    /**
     * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     */
    @ReservedStackAccess
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        //c为0可以尝试抢占
        if (c == 0) {
            //CAS抢占同步器!!!不论自己是不是头节点都会尝试抢占
            if (compareAndSetState(0, acquires)) {
                //设置同步器对应线程为当前线程,简单设置后就运行代码并没有对队列进行任何操作留给排队的操作
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //重入逻辑
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    @ReservedStackAccess
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        //释放,说明这里是单线程没用CAS进行,但其他锁实现不一定是单线程比如CountDownLatch
        setState(c);
        return free;
    }
}
//公平锁
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;
    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    @ReservedStackAccess
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            //公平锁区别!!!自己是头节点才会抢占
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}
//非公平锁
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

可以看到公平锁和非公平锁的释放逻辑都是一样的。只有在tryAcquire上略微不一致

公平锁只会在head为null时进行尝试抢占,只要自己不是就去排队

非公平锁一来就会抢占一次,抢占不到还是乖乖去排队总结下流程以公平锁为准

tryAcquire函数中CAS抢占state,成功后不会初始化队列,直接运行代码若第1步失败则CAS抢占队尾,这里会一直自旋直到成功插入队尾后执行acquireQueued会再判断自己上一个是不是头节点,是的话再尝试一下tryAcquire ,如果成功会设置自己为head并销毁老head如果上一个节点不是头结点或者抢占state失败就判断是否需要Park。判断逻辑,获取前一个节点状态,节点状态有如下值:CANCELLED(1)前驱节点已经取消、SIGNAL(-1)前驱节点已经设置了SIGNAL等待唤醒信号可以安心park、CONDITION(-2)代表需要等待条件、PROPAGATE(-3)暂时不管 以及默认值0。判断逻辑为:而当一个线程执行完成解锁后会设置当前线程状态为0以及head头节点的state为0,同时唤醒下一个状态不为CANCELLED(1)的线程,新的线程抢占成功后会负责移动头节点到自己读后感

其实AQS的核心就是:CAS+自旋 负责队列安全,Park(避免CPU浪费),排队(简化了线程通信)看了一遍感觉懂了,但是有些地方又有疑问:

如果A线程unPark在B线程Park之前会怎样?

unPark在Park之前会导致Park无效

Node(head) NodeA NodeB

System.out.println(1);

LockSupport.unpark(Thread.currentThread());

LockSupport.park();

System.out.println(2);

会输出1 2

当前线程把前驱节点更新为SIGNAL(-1),结果自己Park的依据是前驱节点状态是否为SIGNAL(-1),那为啥自己不直接Park还多此一举?shouldParkAfterFailedAcquire本质是什么?

其实可以直接Park的,现在的写法可以多尝试一次,即acquireQueued方法中执行了两次tryAcquire。个人感觉这里确实有些多此一举也增加了理解的难度,尝试一次就可以直接Park了shouldParkAfterFailedAcquire本质是只要前节点不是head节点当前节点就Park

为啥当前Node要更新前一个Node状态以及根据前一个节点状态决定是否自己需要Park?

虽然这块现在这里是合理的,但总感觉怪怪的。在JDK14中已经改了将状态放在了自己节点上,有些人还强行解释哈哈

# JDK8 AQS中SIGNAL标志为什么要放前面节点上,放自己节点上不好吗?

为啥移动头节点不在解锁后进行而是交给下抢占到节点进行?

这里应该是为了兼顾非公平锁的情况,如果单独只实现一个公平锁感觉逻辑要简单不少

Synchronized和ReentrantLock功能区别(网上找的,代码示例自己改的)ReentrantLock 是一个类,而 Synchronized 是 Java 中的关键字ReentrantLock 必须手动释放锁。通常需要在 finally 块中调用 unlock 方法以确保锁被正确释放, Synchronized 会自动释放锁,当同步块执行完毕时,由 JVM 自动释放,不需要手动操作ReentrantLock可以指定是公平锁还是非公平锁,而Synchronized只能是非公平锁。所谓的公平锁就是先等待的线程先获得锁ReentrantLock 可以实现多路选择通知(可以绑定多个 Condition),而 Synchronized 只能通过 wait 和 notify/notifyAll 方法唤醒一个线程或者唤醒全部线程(单路通知)ReentrantLock提供了一种能够中断等待锁的线程的机制,通过 lock.lockInterruptibly() 来实现这个机制。也就是说正在等待的线程可以选择放弃等待,改为处理其他事情。而Synchronized不具备这种特点。

这个太有用了我们现在用的并发工具都提供了等待多久中断等待!!!

public class InterruptibleLockExample {
    private final ReentrantLock lock = new ReentrantLock();
    public void processTask() {
        try {
            try {
                lock.lockInterruptibly();
                System.out.println(Thread.currentThread().getName() + "抢到锁,运行临界区代码");
                // 假设需要等待一段时间
                Thread.sleep(10000);
                System.out.println(Thread.currentThread().getName() + "结束运行临界区代码");
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getName() + "太久了!不等了!中断");
            } finally {
                lock.unlock();
                System.out.println(Thread.currentThread().getName() + "释放锁");
            }
        } catch (Exception e) {
            System.out.println(Thread.currentThread().getName() + " 临界区代码异常");
        }
        System.out.println(Thread.currentThread().getName() + " 运行非临界区代码");
    }
    public static void main(String[] args) throws InterruptedException {
        InterruptibleLockExample example = new InterruptibleLockExample();
        Thread thread1 = new Thread(example::processTask);
        thread1.setName("线程1");
        //让thread1抢到锁
        TimeUnit.MICROSECONDS.sleep(20);
        Thread thread2 = new Thread(example::processTask);
        thread2.setName("线程2");
        thread1.start();
        thread2.start();
        // thread2一直没抢到锁,等5s后中断
        TimeUnit.SECONDS.sleep(5);
        thread2.interrupt();
    }
}

ReentrantLock: 通常提供更好的性能,特别是在高竞争环境下Synchronized: 在某些情况下,性能可能稍差一些,但随着 JDK 版本的升级,性能差距已经不大了