🚧 遗留的安全集合
如:
这些集合的实现方式简单粗暴,在方法上直接使用 synchronized
进行修饰,在进行读写等操作时,会锁住整个对象,并发性能不太好。
📦 Collections 安全集合
如:
- Collections.synchronizedCollection(Collection c)
- Collections.synchronizedList(List list)
- Collections.synchronizedMap(Map<K,V> m)
- Collections.synchronizedSet(Set s)
- …
使用该方法,传入一个线程不安全的集合,返回一个 Collections 内部的静态类对象,用于将一个不安全的集合变成一个线程安全的集合。
该方法会将传入的集合作为成员变量,每次操作时,都使用 synchronized
加锁,锁住 mutex
(Map 中为当前对象 this)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| private static class SynchronizedMap<K,V>
implements Map<K,V>, Serializable {
private static final long serialVersionUID = 1978198479659022715L;
private final Map<K,V> m; // Backing Map
final Object mutex; // Object on which to synchronize
SynchronizedMap(Map<K,V> m) {
this.m = Objects.requireNonNull(m);
mutex = this;
}
SynchronizedMap(Map<K,V> m, Object mutex) {
this.m = m;
this.mutex = mutex;
}
public int size() {
synchronized (mutex) {return m.size();} // 操作都会锁住 mutex
}
// ... 其他方法
}
|
🛠 JUC 安全集合
java.util.concurrent
下提供的线程安全的集合类,可以分为三类:
- Concurrent 系列(适合写多的场景,内部使用 CAS 优化)
- CopyOnWrite 系列(适合于读多写少的场景)
- Blocking 系列(阻塞队列)
✍ Concurrent
📖 CopyOnWrite
采用了写入时拷贝的思想,更改操作会将底层数组拷贝一份,更改在新的数组上执行,并发时读写操作达到读写分离。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements); // 替换数组
return true;
} finally {
lock.unlock();
}
}
|
读操作不进行加锁。适合于读多写少的场景。
1
2
3
4
5
6
7
8
| @SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
return (E) a[index];
}
public E get(int index) {
return get(getArray(), index);
}
|
CopyOnWriteArraySet
内部由 CopyOnWriteArrayList
实现
1
2
3
4
5
6
| public class CopyOnWriteArraySet<E> extends AbstractSet<E>
implements java.io.Serializable {
public CopyOnWriteArraySet() {
al = new CopyOnWriteArrayList<E>();
}
}
|
🚧 Blocking
有界队列:有固定大小的队列。
无界队列:没有固定大小的队列
ArrayBlockingQueue
LinkedBlockingQueue
- 由界带缓冲阻塞队列 (Integer.MAX_VALUE),链表实现,可以自定义上限
LinkedBlockingDeque
DelayQueue
- 延时无界阻塞队列,只允许存放可以延时的元素,队列 head 是最先到期的,如果队列中元素没有到期,就算队列中有元素也不能获取到
SynchronousQueue
- 没有容量的同步队列,不存储元素,每一个 put 必须要等待一个 take 操作
PriorityBlockingQueue
- 无界优先阻塞队列,PriorityQueue 的线程安全版,不允许存放 null
常用方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| public interface BloockingQueue<E> extends Queue<E> {
// 队列满抛出异常
boolean add(E e);
// 无元素抛出异常
boolean remove(Object o);
//
// 队列满返回 false 成功添加 true
boolean offer(E e);
// 队列为空返回 null
E poll();
// 阻塞时等待
// 队列满时,阻塞进行等待
boolean put(E e) throws InterruptedException;
// 队列为空时,阻塞进行等待
E take() throws InterruptedException;
}
|
LinkedBlockingQueue
在 LinkedBlockingQeque
中使用了 dummy 节点和两把锁,在同一时刻,可以允许两个线程分别执行 put
和 take
。
1
2
3
| private final ReentrantLock putLock = new ReentrantLock();
private final ReentrantLock takeLock = new ReentrantLock();
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
// 计数
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 如果容量已满,进行等待
while (count.get() == capacity) {
notFull.await();
}
// 容量未满,进行入队
enqueue(node);
c = count.getAndIncrement();
// 入队后队列还有空位, 唤醒其他 put 线程
if (c + 1 < capacity)
notFull.signal(); // 只唤醒一个,减少竞争
} finally {
putLock.unlock();
}
// 如果队列中有一个元素, 唤醒 take 线程
if (c == 0)
// notEmpty.signal() 而不是 notEmpty.signalAll();减少竞争
signalNotEmpty();
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果队列中只有一个空位时, 叫醒 put 线程
// 如果有多个线程进行出队, 第一个线程 c == capacity, 后续线程 c < capacity
if (c == capacity)
// notFull.signal() 而不是 notFull.signalAll(),减少竞争
signalNotFull();
return x;
}
|