1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
| public class CyclicBarrier {
// 内部类,存放 broken 标记,表示屏障是否被损坏,损坏后无法正常工作
private static class Generation {
boolean broken = false;
}
// 可重入锁
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
// 屏障阻挡的线程数,构造方法传入的初始化值
private final int parties;
// 屏障释放时执行的方法
private final Runnable barrierCommand;
// 当前的 Generation 对象,每一轮都会有一个新的 Generation 对象,存放 broken 标记
private Generation generation = new Generation();
// 当前还需要阻挡几个线程,每次 -1
private int count;
// 开启下一轮屏障(count==0 或 reset)
private void nextGeneration() {
// 唤醒所有等待线程
trip.signalAll();
// 重置 count
count = parties;
generation = new Generation();
}
// 破坏当前屏障,变为不可用状态,可以使用 reset 恢复
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 加锁,有多个线程会调用 await() 只有一个线程能够进入同步方法
lock.lock();
try {
final Generation g = generation;
if (g.broken) // 屏障是否损坏
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // 达到条件,可打破屏障
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null) // 执行打破屏障的命令
command.run();
ranAction = true;
nextGeneration(); // 开启下一轮循环,里面会唤醒所有等待的线程
return 0;
} finally {
if (!ranAction) // (1) 假如 command 出现异常,破坏屏障
breakBarrier();
}
}
// 当前还未达到冲破屏障的的条件
// 一直循环等待,直到到达打破屏障的条件,中断,或超时
for (;;) {
try {
if (!timed)
trip.await(); // 未达到超时时间,进行等待,直到被唤醒
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) { // (2) 等待到达条件时,线程被中断,破坏屏障
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken) // 线程被唤醒之后,屏障被破坏,直接抛出异常
throw new BrokenBarrierException();
if (g != generation) // 返回当前线程是第几个到达的线程
return index;
if (timed && nanos <= 0L) { // (3) 等待超时,破坏屏障
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
public int getParties() {
return parties;
}
// 开始等待 返回当前线程是第几个到达的线程
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
// 判断屏障是否被破坏
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock(); // 需加锁访问,其他线程可能在执行 dowait()
try {
return generation.broken;
} finally {
lock.unlock();
}
}
// 重置操作 先破坏屏障,再进行下一轮循环屏障
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
// 获取等待线程数
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
}
|