AQS
AQS AbstractQueuedSynchronized 是一个用来构建锁和同步器的框架,是对 CAS 的一种封装和丰富,AQS 引入了独占锁,共享锁等性质。
使用 AQS 能够基于 Java API,简单高效地构建出应用广泛的大量同步器,用于 Java 多线程之间的同步。
核心思想
AQS 使用一个 volatile int state 成员变量表示同步状态。
使用 CAS compareAndSetState 对该同步状态进行原子操作,实现对其值的修改。
| 1
2
 | // 共享变量,使用 volatile 保障可见性
private volatile int state;
 | 
AQS 通过内置的 FIFO 双向队列来完成同步状态的管理,获取资源线程的排队工作。
| 1
2
 | private transient volatile Node head;
private transient volatile Node tail;
 | 
当请求的共享资源空闲时,将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态。
如果被请求的共享资源被占用,将暂时获取不到锁的线程构造成一个 Node 加入到队列中。
资源的共享方式
独占
当一个线程以独占模式获取锁时,其他任何线程都必须等待。
共享
当一个线程以共享模式获取锁时,其他也想以共享模式获取锁的线程也能够一起访问共享资源,但其他想以独占模式获取锁的线程需要等待。
- CountDownLatch
- CyclicBarrier
- Semaphore
实现
CountDownLatch
CountDownLatch 同步工具允许一条或多条线程,等待其他线程中的一组操作完成后,再继续执行。
CountDownLatch 任务分为 n 个子线程执行,state 将被初始化为 n,这 n 个线程将会并发执行,每个子线程执行完后调用 countDown() 方法,state 会通过 CAS - 1。当所有的子线程都执行完之后,state=0,会 unpack 主线程,继续后续动作。
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
 | public class Demo {
    static class SearchTask implements Runnable {
        private Integer id;
        private CountDownLatch latch;
        public SearchTask(int id, CountDownLatch latch) {
            this.id = id;
            this.latch = latch;
        }
        // **子线程**
        // 各自寻找相应目标的龙珠
        @Override
        public void run() {
            System.out.println("🏃♂️ 开始寻找" + id + "号龙珠");
            int seconds = ThreadLocalRandom.current().nextInt(20);
            try {
                Thread.sleep(seconds * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("👏 花了" + seconds + "秒,找到了" + id + "号龙珠");
            
            // 每当一个任务完成,就调用一次 countDown() 方法
            latch.countDown();
        }
    }
    // **主线程**
    // 等子线程都执行完毕后,找到所有龙珠,召唤神龙
    public static void main(String[] args) {
        List<Integer> idList = Arrays.asList(1, 2, 3, 4, 5, 6, 7);
        
        // state 初始化为任务的个数
        CountDownLatch latch = new CountDownLatch(idList.size());
        
        for (int id : idList) {
            Thread thread = new Thread(new SearchTask(id, latch));
            thread.start();
        }
        try {
            latch.await();  // 等待其他线程完成
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("找到所有龙珠了!召唤神龙 🐲");
    }
}
 | 

在这个例子中,任务分为 7 个子线程执行,将 CountDownLatch 的值初始化为与线程相同的次数。主线程通过 await() 等待其他任务完成。
这几个子任务并发执行,每个子线程执行完成后 countDown() 一次,state 会通过 CAS 操作 -1。
直到所有的子线程都执行完毕之后,state=0,主线程会结束等待,继续执行后续任务。
CyclicBarrier
CyclicBarrier 用于多个线程相互等待对方执行到某个状态后,这些线程再继续执行。

|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
 | public class Demo {
    private static int count = 5;
    // CyclicBarrier 可以设置子线程全部到达 barrier,执行的任务
    private static CyclicBarrier barrier = new CyclicBarrier(count, () -> {
        System.out.println("👌 人员到齐!准备爬山!");
    });
    public static void main(String[] args) {
        // 子线程,模拟 5 个同学相约爬山
        for (int i = 1; i <= 5; i++) {
            final String name = i + "同学";
            new Thread(() -> {
                System.out.println("👋 " + name + "准备出发去集合点了。");
                int time = ThreadLocalRandom.current().nextInt(10);
                try {
                    TimeUnit.SECONDS.sleep(time);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("🦶 " + name + "花了 " + time + " 秒到达集合点。");
                try {  // 等待所有的同学(子线程)运行到此处
                    barrier.await();  // 当 await 的线程足够,再继续执行线程后续任务
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(name + "和大家一起爬山了。⛰");
            }, i + "同学").start();
        }
    }
}
 | 

在这个例子中,有 5 个子线程分别开始执行(5 个同学从家里出发),每个线程到达某个状态后 await() 进行等待(每个同学到达集合点会等待其他同学到达),当线程全部达到之后,“屏障”会释放,线程将继续执行(人员到齐后开始爬山)。
CyclicBarrier 可以多次使用,可以自动或手动重置计数。
使用 barrier.reset() 会进行重置,等待的线程会抛出 BrokenBarrierException
| 1
2
3
4
5
 | // CyclicBarrier 的计数
count = 5
// 子线程的数量
i = 10
 | 
将子线程的数量设置为 10,循环屏障设置为 5,将会得到如下结果:每有 5 个,线程到达“屏障”点,就会释放一次。

Semaphore
通过使用 Semaphore,我们可以决定某个资源同一时间能够被访问的最大线程数,它相当于对某个资源的访问进行了流量控制。
是一个可以被 n 个线程占用的排它锁,可以在最开始设定 Semaphore 许可证数量,每个线程都可以获得 1 个或多个许可证,当许可证耗尽或不足以供其他线程获取时,其他线程将被阻塞。
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 | public static void main(String[] args) {
    Semaphore semaphore = new Semaphore(2);  // 有三个许可,假设每个线程只获取一个许可时,最多两个线程可访问资源
    for (int i = 0; i < 3; i++) {
        new Thread(() -> {
            try {
                try {
                    semaphore.acquire();  // 无参为申请一个许可证,没有许可的线程会进行等待 
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                try {
                    System.out.println("申请许可证成功");
                    TimeUnit.SECONDS.sleep(5);  // 模拟任务耗时 
                } finally {
                    semaphore.release();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}
 | 
ReentrantLock
相比于 synchronized,都支持 ✨可重入,还具备如下特点:
- ✨ 等待可中断
- ✨ 可以设置超时时间
- ✨ 公平锁
- ✨ 支持多个条件变量
ReentrantLock 需要使用 try-finally 手动加锁和解锁
| 1
2
3
4
5
6
 | reentrantLock.lock();
try {
    // 临界区
} finally {
    reentrantLock.unlock();
}
 | 

ReentrantLock 默认实现为非公平锁
| 1
2
3
 | public ReentrantLock() {
    sync = new NonfairSync();
}
 | 
可重入
一个线程获取了🔒 锁之后,可以多次获取同一把锁。
一个线程在执行一个带锁的方法,该方法中又调用了另一个需要相同锁的方法,则改线程可以直接获取锁,并执行该方法,可重入锁可有效地避免死锁。
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 | public static void main(String[] args) {
    method1();
}
private static ReentrantLock lock = new ReentrantLock();
public static void method1() {
    lock.lock();  // 第一次获取🔒 锁
    try {
        System.out.println("do something in method 1");
        method2()
    } finally {
        lock.unlock();
    }
}
public static void method2() {
    lock.lock();  // 第二次获取🔒 锁(重入)
    try {
        System.out.println("do something in method 2");
    } finally {
        lock.unlock();
    }
}
 | 
等待可中断
lock.lockInterruptibly()
- 没有竞争就会获取锁
- 有竞争进入阻塞队列,但是可以被中断(throw InterruptedException)
使用 lock.lock(),即使进行 interrupt() 也不会让线程等待中断。
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
 | public static void main(String[] args) {
    ReentrantLock lock = new ReentrantLock();
    Thread thread = new Thread(() -> {
        try {
            lock.lockInterruptibly();  // 尝试获取可中断锁
        } catch (InterruptedException e) {
            System.out.println("锁在等待过程中被其他线程中断");
            return;
        }
        try {
            System.out.println("成功获得了锁 🔒");
        } finally {
            lock.unlock();
        }
    });
    lock.lock();
    System.out.println("主线程获取了锁 🔒");
    thread.start();
    try {
        Thread.sleep(1000);
        System.out.println("主线程打断 thread");
        thread.interrupt();
        Thread.sleep(1000);
    } finlaly {
        lock.unlock();
    }
}
 | 
超时时间
- boolean tryLock():获取锁失败,立即返回
- boolean tryLock(long timeout, TimeUnit unit):获取锁失败,等待超时时间
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
 | Thread thread = new Thread(() -> {
    try {
        if (!lock.tryLock(2, TimeUnit.SECONDS)) {
            System.out.println("等待 2s 后,成功获取了锁");
            return;
        } 
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    try {
        System.out.println("成功获取了锁");
    } finally {
        lock.unlock();
    }
});
 | 
公平锁
ReentrantLock 默认是非公平锁
| 1
2
3
4
5
6
7
 | public ReentrantLock() {
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
 | 
- 公平锁:获取锁的线程根据进入等待队列的顺序获取锁
- 非公平锁:新线程先尝试插队获取锁,插队失败再进入等待队列排序等待
| 1
2
 | // 获取公平锁
ReentrantLock lock = new ReentrantLock(true);
 | 
多个条件变量
ReentrantLock 支持多个条件变量,可以对等待的线程进行精准唤醒。
使用 await(),线程会释放获取的锁,进入对应的 conditionObject 进行等待;被 singnal() 唤醒后,线程重新去竞争锁。
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
 | // 三个线程循环打印 ABC 10 次
public class ThreadDemo {
    static ReentrantLock lock = new ReentrantLock();
    static Condition conditionA = lock.newCondition();
    static Condition conditionB = lock.newCondition();
    static Condition conditionC = lock.newCondition();
    // 0->print A  1->print B  2->print C
    static int commonState = 0;
    static class MyTread implements Runnable {
        private int state;
        public MyThread(int state) {
            this.state = state;
        }
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                lock.lock();
                try {
                    if (state == 0) {
                        while (commonState != 0) {
                            conditionA.await();
                        }
                        System.out.println("A");
                        commonState = 1;
                        conditionB.singnal();
                    }
                    if (state == 1) {
                        while (commonState != 1) {
                            conditionB.await();
                        }
                        System.out.println("B");
                        commonState = 2;
                        conditionC.singnal();
                    }
                    if (state == 2) {
                        while (commonState != 2) {
                            conditoinC.await();
                        }
                        System.out.println("C");
                        commonState = 0;
                        conditionA.singnal();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
    public static void main(String[] args) {
        new Thread(new MyThread(0)).start();
        new Thread(new MyThread(1)).start();
        new Thread(new MyThread(2)).start();
    }
}
 | 
ReentrantReadWriteLock
参考