AbstractQueuedSynchronizer框架
2016 年 04 月 01 日
java

    日常开发中,大多数程序员并不会直接接触AbstractQueuedSynchronizer(AQS)类,但其在并发工具中缺无处不在,并作为内部的标准同步器,如ReentrantLockSemaphoreJava线程池中的Worker等。本文将介绍AQS相关的实现细节。

  • 什么是AbstractQueuedSynchronizer(AQS)

  • AQS负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过getStatesetStatecompareAndSetState等方法进行操作。这个整数状态的意义由子类来赋予,如ReentrantLock中该状态值表示所有者线程已经重复获取该锁的次数Semaphore中该状态值表示剩余的许可数量。可以看下使用的AbstractQueuedSynchronizer的并发工具类:

  • AbstractQueuedSynchronizer(AQS)实现

  • AQS定义比较简单,继承自AbstractOwnableSynchronizer接口:

  • AbstractOwnableSynchronizer

  • 当一个同步器可以由单个线程独占时,AbstractOwnableSynchronizer定义了基础的创建锁和相关同步器的方法,但其本身并不管理维护这些信息,而是交由子类去实现:

    public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    
        private static final long serialVersionUID = 3737899427754241961L;
    
        protected AbstractOwnableSynchronizer() { }
    
        /**
         * 当前独占同步器的线程
         */
        private transient Thread exclusiveOwnerThread;
    
        /**
         * 设置当前独占同步器的线程
         */
        protected final void setExclusiveOwnerThread(Thread t) {
            exclusiveOwnerThread = t;
        }
    
        /**
         * 获取当前独占同步器的线程
         */
        protected final Thread getExclusiveOwnerThread() {
            return exclusiveOwnerThread;
        }
    }
        
  • AbstractQueuedSynchronizer

  • AbstractQueuedSynchronizer内部使用CLH锁(CLH锁是一种基于链表的可扩展高性能公平的自旋锁,申请线程不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋)的变种来实现对线程的阻塞。CLH锁的链表中的节点被抽象为Node

    static final class Node {
        
        /**
         * 标记节点正以共享模式等待
         */
        static final Node SHARED = new Node();
    
        /**
         * 标记节点正以独占模式等待
         */ 
        static final Node EXCLUSIVE = null;
    
        // ===== 以下表示节点的等待状态 =====
        
        /** 
         * 表示当前的线程被取消
         */
        static final int CANCELLED =  1;
    
        /** 
         * 表示当前节点的后继节点包含的线程需要运行,也就是unpark
         */
        static final int SIGNAL    = -1;
    
        /** 
         * 表示当前节点在等待condition,也就是在condition队列中
         */
        static final int CONDITION = -2;
    
        /** 
         * 示当前场景下后续的acquireShared能够得以执行
         */
        static final int PROPAGATE = -3;
    
        /** 
         * 状态
         */
        volatile int waitStatus;
    
        /**
         * 前驱节点,比如当前节点被取消时,那就需要前驱节点和后继节点来完成连接。
         */
        volatile Node prev;
    
        /**
         * 后继结点
         */
        volatile Node next;
    
        /**
         * 入队列时的当前线程
         */
        volatile Thread thread;
    
        /**
         * 存储condition队列中的后继节点。
         */
        Node nextWaiter;
    
        ...
    }
        

    其中AbstractQueuedSynchronizer维护的链表结构大致如下:

  • ReentrantLock

  • 可以先从ReentrantLock的实现来探究AbstractQueuedSynchronizer的作用。ReentrantLock内部封装了一个Sync类,来实现基本的lockunlock操作:

     public class ReentrantLock implements Lock, java.io.Serializable {
       
        // 同步器,用于实现锁机制
        private final Sync sync;
    
        /**
         * 基础的同步器实现
         */
        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = -5179523762034025860L;
    
            /**
             * 由公平锁和非公平锁实现
             */
            abstract void lock();
    
            /**
             * 非公平锁时,尝试加锁
             */
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                // 同步器状态
                int c = getState();
                if (c == 0) {
                	// 若同步器状态为初始状态,则尝试加锁
                    if (compareAndSetState(0, acquires)) {
                    	// 设置锁的占用线程
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                } else if (current == getExclusiveOwnerThread()) {
                	// 当前线程已经加锁过,则设置state为锁的重入次数+1
                    int nextc = c + acquires;
                    if (nextc < 0) // 超出了锁重入的最大次数
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
            /**
             * 尝试释放同步器
             */ 
            protected final boolean tryRelease(int releases) {
                // 释放后的新状态
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    // 非占用线程
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                	// state归零,释放成功
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
            /* 
             * 当前线程是否独占该锁
             */
            protected final boolean isHeldExclusively() {
                return getExclusiveOwnerThread() == Thread.currentThread();
            }
    
            /* 
             * 创建一个条件对象
             */
            final ConditionObject newCondition() {
                return new ConditionObject();
            }
    
            /* 
             * 获取当前独占线程
             */
            final Thread getOwner() {
                return getState() == 0 ? null : getExclusiveOwnerThread();
            }
    
            /*
             * 获取锁被重入的次数
             */
            final int getHoldCount() {
                return isHeldExclusively() ? getState() : 0;
            }
    
            /* 
             * 锁是否被占用
             */
            final boolean isLocked() {
                return getState() != 0;
            }
    
            /**
             * 从对象流中反序列化锁对象
             */
            private void readObject(java.io.ObjectInputStream s)
                throws java.io.IOException, ClassNotFoundException {
                s.defaultReadObject();
                // 重置为初始状态
                setState(0); 
            }
        }
    
        /**
         * 非公平锁
         */
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * 加锁
             */
            final void lock() {
            	// 先尝试直接加锁,即抢占式
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                	// 失败后,就排队抢锁
                    acquire(1);
            }
    
            /**
             * 尝试获取锁
             */
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    
        /**
         * 公平锁
         */
        static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            final void lock() {
            	// 直接进行排队抢锁,保持公平
                acquire(1);
            }
    
            /**
             * 尝试获取锁
             */
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                	// 若没有其他线程已经在等待队列中,则尝试加锁
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                } else if (current == getExclusiveOwnerThread()) {
                	// 当前线程已经占有锁,则重入次数 + acquires
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    
        /**
         * 默认非公平锁
         */
        public ReentrantLock() {
            sync = new NonfairSync();
        }
    
        /**
         * 请求锁
         */
        public void lock() {
            sync.lock();
        }
    
        /**
         * 尝试加锁,可被中断
         */
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
    
        /**
         * 尝试加锁
         */
        public boolean tryLock() {
            return sync.nonfairTryAcquire(1);
        }
    
        /**
         * 加锁,具有超时限制
         */
        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }
    
        /**
         * 解锁
         */
        public void unlock() {
            sync.release(1);
        }
    
        /**
         * 创建一个条件对象
         */
        public Condition newCondition() {
            return sync.newCondition();
        }
    
        /**
         * 获取锁的重入次数
         */
        public int getHoldCount() {
            return sync.getHoldCount();
        }
    
        /**
         * 锁是否被当前线程持有
         */
        public boolean isHeldByCurrentThread() {
            return sync.isHeldExclusively();
        }
    
        /**
         * 锁是否已被持有
         */
        public boolean isLocked() {
            return sync.isLocked();
        }
    
        /**
         * 是否是公平锁
         */
        public final boolean isFair() {
            return sync instanceof FairSync;
        }
    
        /**
         * 获取占用锁的线程
         */
        protected Thread getOwner() {
            return sync.getOwner();
        }
    
        /**
         * 是否有等待的线程
         */
        public final boolean hasQueuedThreads() {
            return sync.hasQueuedThreads();
        }
    
        /**
         * 判断线程是否在等待队列中
         */
        public final boolean hasQueuedThread(Thread thread) {
            return sync.isQueued(thread);
        }
    
        /**
         * 获取等待队列长度,并发时,不是绝对精确
         */
        public final int getQueueLength() {
            return sync.getQueueLength();
        }
    
        /**
         * 获取等待的线程集合,不是绝对精确
         */
        protected Collection<Thread> getQueuedThreads() {
            return sync.getQueuedThreads();
        }
    
        /**
         * 判断是否有线程在某条件上等待
         */
        public boolean hasWaiters(Condition condition) {
            if (condition == null)
                throw new NullPointerException();
            if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
                throw new IllegalArgumentException("not owner");
            return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
        }
    
        /**
         * 获取在某条件上等待的线程数
         */
        public int getWaitQueueLength(Condition condition) {
            if (condition == null)
                throw new NullPointerException();
            if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
                throw new IllegalArgumentException("not owner");
            return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
        }
    
        /**
         * 获取在某条件上等待的线程集
         */
        protected Collection<Thread> getWaitingThreads(Condition condition) {
            if (condition == null)
                throw new NullPointerException();
            if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
                throw new IllegalArgumentException("not owner");
            return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
        }
    }   
        
  • ReentrantLock执行lock()时,主要是通过AbstractQueuedSynchronizeracquire()方法实现:
  • // ReentrantLock.lock()   
    public void lock() {
        sync.lock();
    }   
    
    // FairSync.sync()
    final void lock() {
        acquire(1);
    }
    
    // 获取锁
    public final void acquire(int arg) {
        // 尝试获取锁:
        // 1. 成功时,直接返回
        // 2. 失败时,以独占的方式将当前线程入队addWaiter,并且等待自旋等待acquireQueued
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 若等待返回了,则中断自己
            selfInterrupt();
    }
    
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 先尝试直接入队尾,
        // 并发时有可能失败,则通过enq入队
        Node pred = tail;
        if (pred != null) {
            // 已经有线程在等待,则尝试直接设置node为tail
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                // 链接旧的tail.next -> node
                pred.next = node;
                return node;
            }
        }
        // 等待队列为空 或 并发时compareAndSetTail失败,则尝试继续插入等待节点
        enq(node);
        return node;
    }
    
    // 新节点入队,返回旧的尾节点
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) {
            	// 等待队列此时为空,初始化头节点
                if (compareAndSetHead(new Node()))
                	// 初始化尾节点
                    tail = head;
            } else {
            	// 等待队列不为空
            	// 链接新节点的前驱节点为尾节点
                node.prev = t;
                // 设置新节点为尾节点
                if (compareAndSetTail(t, node)) {
                	// 链接旧尾节点的后驱节点为新节点 
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    // 自旋等待
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 从当前节点往前找到头节点
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                	// 已获取到锁,设置新的头节点,相当于节点出队
                    setHead(node);
                    // 释放掉等待节点,头节点是没有next属性的
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 当请求锁失败后,检查节点是否被需要阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
        	// 若排队失败,则取消获取锁的请求
            if (failed)
                cancelAcquire(node);
        }
    }
    
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取前驱节点的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * 节点的前驱节点状态为SIGNAL时,表示该节点已经请求过需要被唤醒,可以安全地阻塞
             */
            return true;
        if (ws > 0) {
            /*
             * 若前驱节点已被取消,则忽略这些取消的节点,继续往前查找未
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 此时前驱节点的状态为0或PROPAGATE(-3),此时需要一个唤醒节点信号,但没必要阻塞线程
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    private final boolean parkAndCheckInterrupt() {
        // 阻塞当前线程
        LockSupport.park(this);
        // 线程是否被中断,且重置中断状态
        return Thread.interrupted();
    }
    
    // 尝试取消请求
    private void cancelAcquire(Node node) {
        // 忽略不存在的节点
        if (node == null)
            return;
        // 节点线程置空
        node.thread = null;
    
        // 忽略取消的节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
    
        // predNext节点,表示node节点前的第一个非取消状态节点的后继节点  
        Node predNext = pred.next;
    
        // 将节点状态设置为取消
        node.waitStatus = Node.CANCELLED;
    
        // 如果当前节点是尾节点,设置新的尾节点
        if (node == tail && compareAndSetTail(node, pred)) {
        	// 将node的后继节点置空
            compareAndSetNext(pred, predNext, null);
        } else {
        	// 若node不为尾节点,即为链表中间的节点
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                //如果node的前驱节点不是头节点,那么需要给当前节点的后继节点一个"等待唤醒"的标记,  
                //即将当前节点的前驱节点等待状态设置为SIGNAL,然后将其设置为当前节点的后继节点的前驱节点
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
            	// 唤醒node节点的后继节点。  
                unparkSuccessor(node);
            }
            // 让取消节点的next引用会指向自己
            node.next = node;
        }
    }
        

    下图揭示了从同步器获取锁时,内部的等待队列的状态变化图:

    初始状态
    当只有一个线程t1进行lock()操作时,由于tryAcquire()将返回true,不用进行等待,等待队列状态不变。
    若在线程t1还未unlock()线程t2就进行了lock()操作,此时等待队列将被初始化,并将线程t2插入等待队列
    此时,若线程t3也进行lock()操作:

    以上则是ReentrantLock加锁(lock)机制,下面则是ReentrantLock解锁(unlock)机制

    // ReentrantLock.unlock()	
    public void unlock() {
    	sync.release(1);
    }
    
    // Sync.release()
    public final boolean release(int arg) {
    	// 尝试解锁,由子类ReentrantLock区定义
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
            	// 唤醒第一个节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    // ReentrantLock.tryRelease()
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
        	// 仅当state = 0时,才算释放锁成功,即统一线程的lock()次数必须与unlock()次数相同
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    
    // AbstractQueuedSynchronizer.unparkSuccessor()
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        // 寻找有效的后继节点
       
        Node s = node.next;
        // 后继节点不存在,或状态为取消时,则查询最前面的一个非取消的节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
        	// 唤醒对应节点锁在线程
            LockSupport.unpark(s.thread);
    }
    
    	

    一旦 LockSupport.unpark(s.thread);执行完,对应的等待节点将被唤醒:

    private final boolean parkAndCheckInterrupt() {
        // 唤醒后返回
        LockSupport.park(this);
        return Thread.interrupted();
    }
    
    final boolean acquireQueued(final Node node, int arg) {
        ... 
        try {
           	...
            for (;;) {
                // 设置新的head节点,等待节点得到执行
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; 
                    failed = false;
                    return interrupted;
                }
               
            }
        } finally {
            ...
        }
    }
    	

    以上,则是AbstractQueuedSynchronizer同步器的基本实现机制,其作为很多并发工具的基础,规范了如何阻塞唤醒线程,相比普通的锁机制(如synchronized),其通过自旋等待精确唤醒,可以提高一些并发时的性能。

  • 参考文献

好人,一生平安。