本文重点

20210423210107.png

前情提要

上文中详细介绍了 AQS 源码的执行流程和核心思想, 如下。

  • CAS
  • 自旋
  • LockSupport.park() unpark()
  • 双端队列

AQS 中 tryAcquire / tryRelease, tryAcquireShared / tryReleaseShared 都需要具体子类根据不同的策略来实现,而具体的排队逻辑、控制加锁及释放都是在 AQS 中实现的

AQS 的子类

1、ReentrantLock

  • 独占锁
  • 可重入
  • 公平与非公平
  • 通过 CAS 设置 state,失败则进入 AQS 排队逻辑
  • 如果是当前线程重入,则 state + 1

用法如下:

ReentrantLock lock = new ReentrantLock(true);
lock.lock();
try {
    //do something
} finally {
    lock.unlock();
}

2、ReentrantReadWriteLock

  • 提供 ReadLock / WriteLock
  • 读读共享, 读写、写写互斥
  • ReadLock.lock() 执行 acquireShared()
  • ReadLock.lock() 执行 acquire()
  • 适合读多写少的场景

一个缓存字典的示例:

class RWDictionary {
   private final Map<String, Data> m = new TreeMap<String, Data>();
   private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
   private final Lock r = rwl.readLock();
   private final Lock w = rwl.writeLock();

   public Data get(String key) {
     r.lock();
     try {
        return m.get(key); 
     }
     finally {
        r.unlock(); 
     }
   }
   public String[] allKeys() {
     r.lock();
     try {
        return m.keySet().toArray(); 
     }
     finally {
        r.unlock(); 
     }
   }
   public Data put(String key, Data value) {
     w.lock();
     try {
        return m.put(key, value);
     }
     finally {
        w.unlock(); 
     }
   }
   public void clear() {
     w.lock();
     try {
        m.clear();
     }
     finally {
        w.unlock(); 
     }
   }
 }

3、CountDownLatch

  • 共享锁
  • 核心是计数,初始化 state = count
  • countDown() 是释放锁, state -1
  • await(), 是尝试获取锁,获取不到则进入队列等待
  • 适用于主线程等待各个子线程执行完毕后再继续执行

实例:

package com.lyqiang.aqs;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @author lyqiang
 */
public class CountDownLatchTest {

    public static void main(String[] args) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++) {
            Student student = new Student(i, countDownLatch);
            new Thread(student).start();
        }
        countDownLatch.await();
        System.out.println("所有同学都上完车了,请系好安全带,老司机发车了");
    }
}

class Student implements Runnable {

    private final int sno;

    private final CountDownLatch countDownLatch;

    public Student(int sno, CountDownLatch countDownLatch) {
        this.sno = sno;
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        long prepare = (long) (Math.random() * 1000);
        try {
            TimeUnit.MILLISECONDS.sleep(prepare);
            System.out.println(sno + " 号同学准备了" + prepare + "毫秒后上车了");
        } catch (InterruptedException e) {
            //
        } finally {
            countDownLatch.countDown();
        }
    }
}

输出结果:

2 号同学准备了476毫秒后上车了
3 号同学准备了537毫秒后上车了
0 号同学准备了811毫秒后上车了
1 号同学准备了918毫秒后上车了
4 号同学准备了970毫秒后上车了
所有同学都上完车了,请系好安全带,老司机发车了

4、StampedLock

  • 带有版本戳的读写锁
  • tryOptimisticRead() 得到版本号,validate()可以校验版本号
  • 在读操作时,可以不加锁,读完数据,校验版本号一致则执行逻辑,不一致则再获取读锁
  • 乐观锁,不必在每次读操作时都加读锁,进一步提高性能
public class Point {
    private final StampedLock stampedLock = new StampedLock();

    private double x;
    private double y;

    public void move(double deltaX, double deltaY) {
        long stamp = stampedLock.writeLock(); // 获取写锁
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            stampedLock.unlockWrite(stamp); // 释放写锁
        }
    }

    public double distanceFromOrigin() {
        long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁
        // 注意下面两行代码不是原子操作
        // 假设x,y = (100,200)
        double currentX = x;
        // 此处已读取到x=100,但x,y可能被写线程修改为(300,400)
        double currentY = y;
        // 此处已读取到y,如果没有写入,读取是正确的(100,200)
        // 如果有写入,读取是错误的(100,400)
        if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
            stamp = stampedLock.readLock(); // 获取一个悲观读锁
            try {
                currentX = x;
                currentY = y;
            } finally {
                stampedLock.unlockRead(stamp); // 释放悲观读锁
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
}

5、Semaphore

  • 信号量
  • 初始化,设置运行线程数量 state
  • 获取锁 state - 1, 释放锁 state + 1
  • 类似限流的思想
package com.lyqiang.aqs;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * @author lyqiang
 */
public class SemaphoreTest {

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 5; i++) {
            Driver driver = new Driver(i, semaphore);
            new Thread(driver).start();
        }
    }
}

class Driver implements Runnable {

    private final int dno;

    private final Semaphore semaphore;

    public Driver(int dno, Semaphore semaphore) {
        this.dno = dno;
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        long stay = (long) (Math.random() * 10000);
        System.out.println(dno + " 号老司机开到停车场入口");
        try {
            semaphore.acquire();
            System.out.println(dno + " 号老司机进入了停车场");
            TimeUnit.MILLISECONDS.sleep(stay);
            System.out.println(dno + " 号老司机停留了" + stay + "毫秒后离开了");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
        }
    }
}

输出结果:

0 号老司机开到停车场入口
4 号老司机开到停车场入口
3 号老司机开到停车场入口
2 号老司机开到停车场入口
1 号老司机开到停车场入口
3 号老司机进入了停车场
4 号老司机进入了停车场
0 号老司机进入了停车场
0 号老司机停留了1346毫秒后离开了
2 号老司机进入了停车场
3 号老司机停留了3446毫秒后离开了
1 号老司机进入了停车场
2 号老司机停留了7403毫秒后离开了
4 号老司机停留了9503毫秒后离开了
1 号老司机停留了6618毫秒后离开了

基于 ReentrantLock 实现的类

1、CyclicBarrier

  • 循环屏障
  • 设置 parties,即线程总数
  • 线程调用 await, count - 1, 直到 count = 0 , 所有线程才执行
  • 和 CountDownLatch 区别
    • CountDownLatch 是所有线程都执行完,再向下执行主线程
    • CyclicBarrier 是所有线程都准备好,再各自执行线程自己的逻辑
    • 例子:CountDownLatch 是等所有人上车后,再发车;CyclicBarrier是所有人准备好,人到齐了再开始上车
package com.lyqiang.aqs;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

/**
 * @author lyqiang
 */
public class CyclicBarrierTest {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有同学都到了,老司机准备出发,3,2,1");
            }
        });
        for (int i = 0; i < 5; i++) {
            Student student = new Student(i, cyclicBarrier);
            new Thread(student).start();
        }
    }

}

class Student implements Runnable {

    private final int sno;
    private final CyclicBarrier cyclicBarrier;

    public Student(int sno, CyclicBarrier cyclicBarrier) {
        this.sno = sno;
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        long prepare = (long) (Math.random() * 1000);
        try {
            TimeUnit.MILLISECONDS.sleep(prepare);
            System.out.println(sno + " 号同学准备了" + prepare + "毫秒后到达起跑线");
            cyclicBarrier.await();
            System.out.println(sno + " 号同学踩油门出发");
        } catch (Exception e) {
            //
        }
    }
}

输出结构:

4 号同学准备了353毫秒后到达起跑线
0 号同学准备了422毫秒后到达起跑线
2 号同学准备了669毫秒后到达起跑线
3 号同学准备了816毫秒后到达起跑线
1 号同学准备了962毫秒后到达起跑线
所有同学都到了,老司机准备出发,3,2,1
1 号同学踩油门出发
4 号同学踩油门出发
0 号同学踩油门出发
2 号同学踩油门出发
3 号同学踩油门出发

2、CopyOnWriteArrayList

  • 写时复制的 ArrayList
  • 是 ArrayList 线程安全的一种实现方式
  • add 、set、remove 方法底层实现是重新复制一个新的数组来操作,执行完再赋值给原有引用
  • add 、set、remove 中使用了 ReentrantLock 来加锁
public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        //复制一份
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}
    
public E set(int index, E element) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        E oldValue = get(elements, index);

        if (oldValue != element) {
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len);
            newElements[index] = element;
            setArray(newElements);
        } else {
            // Not quite a no-op; ensures volatile write semantics
            setArray(elements);
        }
        return oldValue;
    } finally {
        lock.unlock();
    }
}

public E remove(int index) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        E oldValue = get(elements, index);
        int numMoved = len - index - 1;
        if (numMoved == 0)
            setArray(Arrays.copyOf(elements, len - 1));
        else {
            Object[] newElements = new Object[len - 1];
            System.arraycopy(elements, 0, newElements, 0, index);
            System.arraycopy(elements, index + 1, newElements, index,
                                 numMoved);
            setArray(newElements);
        }
        return oldValue;
    } finally {
        lock.unlock();
    }
}

3、CopyOnWriteArraySet

  • 写时复制的 Set 集合
  • 底层使用了 CopyOnWriteArrayList,额外判断元素是否存在,保证没有重复元素
public boolean addIfAbsent(E e) {
    Object[] snapshot = getArray();
    return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
        addIfAbsent(e, snapshot);
}

4、ArrayBlockingQueue

  • 一个由数组支持的有界阻塞队列
  • 初始化时需指定大小
  • 使用 notEmpty、notFull 来通知等待锁的线程
  • offer、put 方法使用了 ReentrantLock 来加锁
    • offer:队列满时,返回false
    • put:队列满时则等待
  • peek、poll、take 方法同样使用 ReentrantLock 来加锁
    • peek:获取但不移除队列的头元素
    • poll:获取且移除队列的头元素
    • take:获取且移除队列的头元素,如果为空则等待
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}
    
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}    

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}
    
public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}
    
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}    

5、LinkedBlockingQueue

  • 一个基于链表的阻塞队列
  • 同样使用 notEmpty、notFull 来通知等待锁的线程
  • 逻辑基本与 ArrayBlockingQueue 类似,不在赘述