AQS
AQS AbstractQueuedSynchronized
是一个用来构建锁和同步器的框架,是对 CAS 的一种封装和丰富,AQS 引入了独占锁,共享锁等性质。
使用 AQS 能够基于 Java API,简单高效地构建出应用广泛的大量同步器,用于 Java 多线程之间的同步。
核心思想
AQS 使用一个 volatile int state
成员变量表示同步状态。
使用 CAS compareAndSetState
对该同步状态进行原子操作,实现对其值的修改。
1
2
| // 共享变量,使用 volatile 保障可见性
private volatile int state;
|
AQS 通过内置的 FIFO
双向队列来完成同步状态的管理,获取资源线程的排队工作。
1
2
| private transient volatile Node head;
private transient volatile Node tail;
|
当请求的共享资源空闲时,将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态。
如果被请求的共享资源被占用,将暂时获取不到锁的线程构造成一个 Node 加入到队列中。
资源的共享方式
独占
当一个线程以独占模式获取锁时,其他任何线程都必须等待。
共享
当一个线程以共享模式获取锁时,其他也想以共享模式获取锁的线程也能够一起访问共享资源,但其他想以独占模式获取锁的线程需要等待。
- CountDownLatch
- CyclicBarrier
- Semaphore
实现
CountDownLatch
CountDownLatch
同步工具允许一条或多条线程,等待其他线程中的一组操作完成后,再继续执行。
CountDownLatch
任务分为 n
个子线程执行,state
将被初始化为 n
,这 n
个线程将会并发执行,每个子线程执行完后调用 countDown()
方法,state
会通过 CAS - 1
。当所有的子线程都执行完之后,state=0
,会 unpack 主线程,继续后续动作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
| public class Demo {
static class SearchTask implements Runnable {
private Integer id;
private CountDownLatch latch;
public SearchTask(int id, CountDownLatch latch) {
this.id = id;
this.latch = latch;
}
// **子线程**
// 各自寻找相应目标的龙珠
@Override
public void run() {
System.out.println("🏃♂️ 开始寻找" + id + "号龙珠");
int seconds = ThreadLocalRandom.current().nextInt(20);
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("👏 花了" + seconds + "秒,找到了" + id + "号龙珠");
// 每当一个任务完成,就调用一次 countDown() 方法
latch.countDown();
}
}
// **主线程**
// 等子线程都执行完毕后,找到所有龙珠,召唤神龙
public static void main(String[] args) {
List<Integer> idList = Arrays.asList(1, 2, 3, 4, 5, 6, 7);
// state 初始化为任务的个数
CountDownLatch latch = new CountDownLatch(idList.size());
for (int id : idList) {
Thread thread = new Thread(new SearchTask(id, latch));
thread.start();
}
try {
latch.await(); // 等待其他线程完成
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("找到所有龙珠了!召唤神龙 🐲");
}
}
|
在这个例子中,任务分为 7 个子线程执行,将 CountDownLatch
的值初始化为与线程相同的次数。主线程通过 await()
等待其他任务完成。
这几个子任务并发执行,每个子线程执行完成后 countDown()
一次,state
会通过 CAS 操作 -1
。
直到所有的子线程都执行完毕之后,state=0
,主线程会结束等待,继续执行后续任务。
CyclicBarrier
CyclicBarrier
用于多个线程相互等待对方执行到某个状态后,这些线程再继续执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| public class Demo {
private static int count = 5;
// CyclicBarrier 可以设置子线程全部到达 barrier,执行的任务
private static CyclicBarrier barrier = new CyclicBarrier(count, () -> {
System.out.println("👌 人员到齐!准备爬山!");
});
public static void main(String[] args) {
// 子线程,模拟 5 个同学相约爬山
for (int i = 1; i <= 5; i++) {
final String name = i + "同学";
new Thread(() -> {
System.out.println("👋 " + name + "准备出发去集合点了。");
int time = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("🦶 " + name + "花了 " + time + " 秒到达集合点。");
try { // 等待所有的同学(子线程)运行到此处
barrier.await(); // 当 await 的线程足够,再继续执行线程后续任务
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + "和大家一起爬山了。⛰");
}, i + "同学").start();
}
}
}
|
在这个例子中,有 5 个子线程分别开始执行(5 个同学从家里出发),每个线程到达某个状态后 await()
进行等待(每个同学到达集合点会等待其他同学到达),当线程全部达到之后,“屏障”会释放,线程将继续执行(人员到齐后开始爬山)。
CyclicBarrier 可以多次使用,可以自动或手动重置计数。
使用 barrier.reset() 会进行重置,等待的线程会抛出 BrokenBarrierException
1
2
3
4
5
| // CyclicBarrier 的计数
count = 5
// 子线程的数量
i = 10
|
将子线程的数量设置为 10
,循环屏障设置为 5
,将会得到如下结果:每有 5
个,线程到达“屏障”点,就会释放一次。
Semaphore
通过使用 Semaphore
,我们可以决定某个资源同一时间能够被访问的最大线程数,它相当于对某个资源的访问进行了流量控制。
是一个可以被 n 个线程占用的排它锁,可以在最开始设定 Semaphore
许可证数量,每个线程都可以获得 1 个或多个许可证,当许可证耗尽或不足以供其他线程获取时,其他线程将被阻塞。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2); // 有三个许可,假设每个线程只获取一个许可时,最多两个线程可访问资源
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
try {
semaphore.acquire(); // 无参为申请一个许可证,没有许可的线程会进行等待
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println("申请许可证成功");
TimeUnit.SECONDS.sleep(5); // 模拟任务耗时
} finally {
semaphore.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
|
ReentrantLock
相比于 synchronized
,都支持 ✨可重入,还具备如下特点:
- ✨ 等待可中断
- ✨ 可以设置超时时间
- ✨ 公平锁
- ✨ 支持多个条件变量
ReentrantLock
需要使用 try-finally
手动加锁和解锁
1
2
3
4
5
6
| reentrantLock.lock();
try {
// 临界区
} finally {
reentrantLock.unlock();
}
|
ReentrantLock 默认实现为非公平锁
1
2
3
| public ReentrantLock() {
sync = new NonfairSync();
}
|
可重入
一个线程获取了🔒 锁之后,可以多次获取同一把锁。
一个线程在执行一个带锁的方法,该方法中又调用了另一个需要相同锁的方法,则改线程可以直接获取锁,并执行该方法,可重入锁可有效地避免死锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| public static void main(String[] args) {
method1();
}
private static ReentrantLock lock = new ReentrantLock();
public static void method1() {
lock.lock(); // 第一次获取🔒 锁
try {
System.out.println("do something in method 1");
method2()
} finally {
lock.unlock();
}
}
public static void method2() {
lock.lock(); // 第二次获取🔒 锁(重入)
try {
System.out.println("do something in method 2");
} finally {
lock.unlock();
}
}
|
等待可中断
lock.lockInterruptibly()
- 没有竞争就会获取锁
- 有竞争进入阻塞队列,但是可以被中断(throw InterruptedException)
使用 lock.lock()
,即使进行 interrupt()
也不会让线程等待中断。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread thread = new Thread(() -> {
try {
lock.lockInterruptibly(); // 尝试获取可中断锁
} catch (InterruptedException e) {
System.out.println("锁在等待过程中被其他线程中断");
return;
}
try {
System.out.println("成功获得了锁 🔒");
} finally {
lock.unlock();
}
});
lock.lock();
System.out.println("主线程获取了锁 🔒");
thread.start();
try {
Thread.sleep(1000);
System.out.println("主线程打断 thread");
thread.interrupt();
Thread.sleep(1000);
} finlaly {
lock.unlock();
}
}
|
超时时间
boolean tryLock()
:获取锁失败,立即返回boolean tryLock(long timeout, TimeUnit unit)
:获取锁失败,等待超时时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| Thread thread = new Thread(() -> {
try {
if (!lock.tryLock(2, TimeUnit.SECONDS)) {
System.out.println("等待 2s 后,成功获取了锁");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println("成功获取了锁");
} finally {
lock.unlock();
}
});
|
公平锁
ReentrantLock
默认是非公平锁
1
2
3
4
5
6
7
| public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
|
- 公平锁:获取锁的线程根据进入等待队列的顺序获取锁
- 非公平锁:新线程先尝试插队获取锁,插队失败再进入等待队列排序等待
1
2
| // 获取公平锁
ReentrantLock lock = new ReentrantLock(true);
|
多个条件变量
ReentrantLock
支持多个条件变量,可以对等待的线程进行精准唤醒。
使用 await()
,线程会释放获取的锁,进入对应的 conditionObject 进行等待;被 singnal()
唤醒后,线程重新去竞争锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
| // 三个线程循环打印 ABC 10 次
public class ThreadDemo {
static ReentrantLock lock = new ReentrantLock();
static Condition conditionA = lock.newCondition();
static Condition conditionB = lock.newCondition();
static Condition conditionC = lock.newCondition();
// 0->print A 1->print B 2->print C
static int commonState = 0;
static class MyTread implements Runnable {
private int state;
public MyThread(int state) {
this.state = state;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
lock.lock();
try {
if (state == 0) {
while (commonState != 0) {
conditionA.await();
}
System.out.println("A");
commonState = 1;
conditionB.singnal();
}
if (state == 1) {
while (commonState != 1) {
conditionB.await();
}
System.out.println("B");
commonState = 2;
conditionC.singnal();
}
if (state == 2) {
while (commonState != 2) {
conditoinC.await();
}
System.out.println("C");
commonState = 0;
conditionA.singnal();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
public static void main(String[] args) {
new Thread(new MyThread(0)).start();
new Thread(new MyThread(1)).start();
new Thread(new MyThread(2)).start();
}
}
|
ReentrantReadWriteLock
参考