Featured image of post Java 并发 - AQS

Java 并发 - AQS

AQS

AQS AbstractQueuedSynchronized 是一个用来构建锁和同步器的框架,是对 CAS 的一种封装和丰富,AQS 引入了独占锁,共享锁等性质。

使用 AQS 能够基于 Java API,简单高效地构建出应用广泛的大量同步器,用于 Java 多线程之间的同步。

核心思想

AQS 使用一个 volatile int state 成员变量表示同步状态。

使用 CAS compareAndSetState 对该同步状态进行原子操作,实现对其值的修改。

1
2
// 共享变量,使用 volatile 保障可见性
private volatile int state;

AQS 通过内置的 FIFO 双向队列来完成同步状态的管理,获取资源线程的排队工作。

1
2
private transient volatile Node head;
private transient volatile Node tail;

当请求的共享资源空闲时,将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态。

如果被请求的共享资源被占用,将暂时获取不到锁的线程构造成一个 Node 加入到队列中。

资源的共享方式

独占

当一个线程以独占模式获取锁时,其他任何线程都必须等待。

  • ReentrantLock

共享

当一个线程以共享模式获取锁时,其他也想以共享模式获取锁的线程也能够一起访问共享资源,但其他想以独占模式获取锁的线程需要等待。

  • CountDownLatch
  • CyclicBarrier
  • Semaphore

实现

CountDownLatch

CountDownLatch 同步工具允许一条或多条线程,等待其他线程中的一组操作完成后,再继续执行。

CountDownLatch 任务分为 n 个子线程执行,state 将被初始化为 n,这 n 个线程将会并发执行,每个子线程执行完后调用 countDown() 方法,state 会通过 CAS - 1。当所有的子线程都执行完之后,state=0,会 unpack 主线程,继续后续动作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class Demo {

    static class SearchTask implements Runnable {
        private Integer id;
        private CountDownLatch latch;

        public SearchTask(int id, CountDownLatch latch) {
            this.id = id;
            this.latch = latch;
        }

        // **子线程**
        // 各自寻找相应目标的龙珠
        @Override
        public void run() {
            System.out.println("🏃‍♂️ 开始寻找" + id + "号龙珠");
            int seconds = ThreadLocalRandom.current().nextInt(20);
            try {
                Thread.sleep(seconds * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("👏 花了" + seconds + "秒,找到了" + id + "号龙珠");
            
            // 每当一个任务完成,就调用一次 countDown() 方法
            latch.countDown();
        }
    }

    // **主线程**
    // 等子线程都执行完毕后,找到所有龙珠,召唤神龙
    public static void main(String[] args) {
        List<Integer> idList = Arrays.asList(1, 2, 3, 4, 5, 6, 7);
        
        // state 初始化为任务的个数
        CountDownLatch latch = new CountDownLatch(idList.size());
        
        for (int id : idList) {
            Thread thread = new Thread(new SearchTask(id, latch));
            thread.start();
        }

        try {
            latch.await();  // 等待其他线程完成
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("找到所有龙珠了!召唤神龙 🐲");
    }
}

countdownlatch

在这个例子中,任务分为 7 个子线程执行,将 CountDownLatch 的值初始化为与线程相同的次数。主线程通过 await() 等待其他任务完成。

这几个子任务并发执行,每个子线程执行完成后 countDown() 一次,state 会通过 CAS 操作 -1

直到所有的子线程都执行完毕之后,state=0,主线程会结束等待,继续执行后续任务。

CyclicBarrier

CyclicBarrier 用于多个线程相互等待对方执行到某个状态后,这些线程再继续执行。

cyclicbarrier

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class Demo {

    private static int count = 5;
    // CyclicBarrier 可以设置子线程全部到达 barrier,执行的任务
    private static CyclicBarrier barrier = new CyclicBarrier(count, () -> {
        System.out.println("👌 人员到齐!准备爬山!");
    });

    public static void main(String[] args) {
        // 子线程,模拟 5 个同学相约爬山
        for (int i = 1; i <= 5; i++) {
            final String name = i + "同学";
            new Thread(() -> {
                System.out.println("👋 " + name + "准备出发去集合点了。");
                int time = ThreadLocalRandom.current().nextInt(10);
                try {
                    TimeUnit.SECONDS.sleep(time);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("🦶 " + name + "花了 " + time + " 秒到达集合点。");

                try {  // 等待所有的同学(子线程)运行到此处
                    barrier.await();  // 当 await 的线程足够,再继续执行线程后续任务
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }

                System.out.println(name + "和大家一起爬山了。⛰");
            }, i + "同学").start();
        }
    }

}

cyclicbarrier result

在这个例子中,有 5 个子线程分别开始执行(5 个同学从家里出发),每个线程到达某个状态后 await() 进行等待(每个同学到达集合点会等待其他同学到达),当线程全部达到之后,“屏障”会释放,线程将继续执行(人员到齐后开始爬山)。

CyclicBarrier 可以多次使用,可以自动或手动重置计数。 使用 barrier.reset() 会进行重置,等待的线程会抛出 BrokenBarrierException

1
2
3
4
5
// CyclicBarrier 的计数
count = 5

// 子线程的数量
i = 10

将子线程的数量设置为 10,循环屏障设置为 5,将会得到如下结果:每有 5 个,线程到达“屏障”点,就会释放一次。

cyclicbarrier result

Semaphore

通过使用 Semaphore,我们可以决定某个资源同一时间能够被访问的最大线程数,它相当于对某个资源的访问进行了流量控制。

是一个可以被 n 个线程占用的排它锁,可以在最开始设定 Semaphore 许可证数量,每个线程都可以获得 1 个或多个许可证,当许可证耗尽或不足以供其他线程获取时,其他线程将被阻塞。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) {
    Semaphore semaphore = new Semaphore(2);  // 有三个许可,假设每个线程只获取一个许可时,最多两个线程可访问资源

    for (int i = 0; i < 3; i++) {
        new Thread(() -> {
            try {
                try {
                    semaphore.acquire();  // 无参为申请一个许可证,没有许可的线程会进行等待 
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                try {
                    System.out.println("申请许可证成功");
                    TimeUnit.SECONDS.sleep(5);  // 模拟任务耗时 
                } finally {
                    semaphore.release();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

ReentrantLock

相比于 synchronized,都支持 ✨可重入,还具备如下特点:

  • ✨ 等待可中断
  • ✨ 可以设置超时时间
  • ✨ 公平锁
  • ✨ 支持多个条件变量

ReentrantLock 需要使用 try-finally 手动加锁和解锁

1
2
3
4
5
6
reentrantLock.lock();
try {
    // 临界区
} finally {
    reentrantLock.unlock();
}

ReentrantLock

ReentrantLock 默认实现为非公平锁

1
2
3
public ReentrantLock() {
    sync = new NonfairSync();
}

可重入

一个线程获取了🔒 锁之后,可以多次获取同一把锁。

一个线程在执行一个带锁的方法,该方法中又调用了另一个需要相同锁的方法,则改线程可以直接获取锁,并执行该方法,可重入锁可有效地避免死锁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) {
    method1();
}

private static ReentrantLock lock = new ReentrantLock();

public static void method1() {
    lock.lock();  // 第一次获取🔒 锁
    try {
        System.out.println("do something in method 1");
        method2()
    } finally {
        lock.unlock();
    }
}

public static void method2() {
    lock.lock();  // 第二次获取🔒 锁(重入)
    try {
        System.out.println("do something in method 2");
    } finally {
        lock.unlock();
    }
}

等待可中断

lock.lockInterruptibly()

  • 没有竞争就会获取锁
  • 有竞争进入阻塞队列,但是可以被中断(throw InterruptedException)

使用 lock.lock(),即使进行 interrupt() 也不会让线程等待中断。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static void main(String[] args) {
    ReentrantLock lock = new ReentrantLock();

    Thread thread = new Thread(() -> {
        try {
            lock.lockInterruptibly();  // 尝试获取可中断锁
        } catch (InterruptedException e) {
            System.out.println("锁在等待过程中被其他线程中断");
            return;
        }

        try {
            System.out.println("成功获得了锁 🔒");
        } finally {
            lock.unlock();
        }
    });

    lock.lock();
    System.out.println("主线程获取了锁 🔒");
    thread.start();

    try {
        Thread.sleep(1000);
        System.out.println("主线程打断 thread");
        thread.interrupt();
        Thread.sleep(1000);
    } finlaly {
        lock.unlock();
    }
}

超时时间

  • boolean tryLock():获取锁失败,立即返回
  • boolean tryLock(long timeout, TimeUnit unit):获取锁失败,等待超时时间
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
Thread thread = new Thread(() -> {
    try {
        if (!lock.tryLock(2, TimeUnit.SECONDS)) {
            System.out.println("等待 2s 后,成功获取了锁");
            return;
        } 
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    try {
        System.out.println("成功获取了锁");
    } finally {
        lock.unlock();
    }
});

公平锁

ReentrantLock 默认是非公平锁

1
2
3
4
5
6
7
public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
  • 公平锁:获取锁的线程根据进入等待队列的顺序获取锁
  • 非公平锁:新线程先尝试插队获取锁,插队失败再进入等待队列排序等待
1
2
// 获取公平锁
ReentrantLock lock = new ReentrantLock(true);

多个条件变量

ReentrantLock 支持多个条件变量,可以对等待的线程进行精准唤醒。

使用 await(),线程会释放获取的锁,进入对应的 conditionObject 进行等待;被 singnal() 唤醒后,线程重新去竞争锁。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 三个线程循环打印 ABC 10 次
public class ThreadDemo {
    static ReentrantLock lock = new ReentrantLock();
    static Condition conditionA = lock.newCondition();
    static Condition conditionB = lock.newCondition();
    static Condition conditionC = lock.newCondition();

    // 0->print A  1->print B  2->print C
    static int commonState = 0;

    static class MyTread implements Runnable {
        private int state;
        public MyThread(int state) {
            this.state = state;
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                lock.lock();
                try {
                    if (state == 0) {
                        while (commonState != 0) {
                            conditionA.await();
                        }
                        System.out.println("A");
                        commonState = 1;
                        conditionB.singnal();
                    }
                    if (state == 1) {
                        while (commonState != 1) {
                            conditionB.await();
                        }
                        System.out.println("B");
                        commonState = 2;
                        conditionC.singnal();
                    }
                    if (state == 2) {
                        while (commonState != 2) {
                            conditoinC.await();
                        }
                        System.out.println("C");
                        commonState = 0;
                        conditionA.singnal();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    public static void main(String[] args) {
        new Thread(new MyThread(0)).start();
        new Thread(new MyThread(1)).start();
        new Thread(new MyThread(2)).start();
    }
}

ReentrantReadWriteLock

参考