🚧 遗留的安全集合
如:
这些集合的实现方式简单粗暴,在方法上直接使用 synchronized 进行修饰,在进行读写等操作时,会锁住整个对象,并发性能不太好。
📦 Collections 安全集合
如:
- Collections.synchronizedCollection(Collection c)
- Collections.synchronizedList(List list)
- Collections.synchronizedMap(Map<K,V> m)
- Collections.synchronizedSet(Set s)
- …
使用该方法,传入一个线程不安全的集合,返回一个 Collections 内部的静态类对象,用于将一个不安全的集合变成一个线程安全的集合。
该方法会将传入的集合作为成员变量,每次操作时,都使用 synchronized 加锁,锁住 mutex(Map 中为当前对象 this)。
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 | private static class SynchronizedMap<K,V>
        implements Map<K,V>, Serializable {
    private static final long serialVersionUID = 1978198479659022715L;
    private final Map<K,V> m;     // Backing Map
    final Object      mutex;        // Object on which to synchronize
    SynchronizedMap(Map<K,V> m) {
        this.m = Objects.requireNonNull(m);
        mutex = this;
    }
    SynchronizedMap(Map<K,V> m, Object mutex) {
        this.m = m;
        this.mutex = mutex;
    }
    public int size() {
        synchronized (mutex) {return m.size();}  // 操作都会锁住 mutex
    }
    // ... 其他方法
}
 | 
🛠 JUC 安全集合
java.util.concurrent 下提供的线程安全的集合类,可以分为三类:
- Concurrent 系列(适合写多的场景,内部使用 CAS 优化)
- CopyOnWrite 系列(适合于读多写少的场景)
- Blocking 系列(阻塞队列)
✍ Concurrent
📖 CopyOnWrite
采用了写入时拷贝的思想,更改操作会将底层数组拷贝一份,更改在新的数组上执行,并发时读写操作达到读写分离。
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
 | public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);  // 替换数组
        return true;
    } finally {
        lock.unlock();
    }
}
 | 
读操作不进行加锁。适合于读多写少的场景。
| 1
2
3
4
5
6
7
8
 | @SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
    return (E) a[index];
}
public E get(int index) {
    return get(getArray(), index);
}
 | 
CopyOnWriteArraySet 内部由 CopyOnWriteArrayList 实现
| 1
2
3
4
5
6
 | public class CopyOnWriteArraySet<E> extends AbstractSet<E>
        implements java.io.Serializable {
    public CopyOnWriteArraySet() {
        al = new CopyOnWriteArrayList<E>();
    }
}
 | 
🚧 Blocking
有界队列:有固定大小的队列。
无界队列:没有固定大小的队列
- ArrayBlockingQueue
 
- LinkedBlockingQueue
 - 由界带缓冲阻塞队列 (Integer.MAX_VALUE),链表实现,可以自定义上限
 
- LinkedBlockingDeque
 
- DelayQueue
 - 延时无界阻塞队列,只允许存放可以延时的元素,队列 head 是最先到期的,如果队列中元素没有到期,就算队列中有元素也不能获取到
 
- SynchronousQueue
 - 没有容量的同步队列,不存储元素,每一个 put 必须要等待一个 take 操作
 
- PriorityBlockingQueue
 - 无界优先阻塞队列,PriorityQueue 的线程安全版,不允许存放 null
 
常用方法
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
 | public interface BloockingQueue<E> extends Queue<E> {
    // 队列满抛出异常
    boolean add(E e);
    // 无元素抛出异常
    boolean remove(Object o);
    //
    // 队列满返回 false 成功添加 true
    boolean offer(E e);
    // 队列为空返回 null
    E poll();
    // 阻塞时等待
    // 队列满时,阻塞进行等待
    boolean put(E e) throws InterruptedException;
    // 队列为空时,阻塞进行等待
    E take() throws InterruptedException;
}
 | 
LinkedBlockingQueue
在 LinkedBlockingQeque 中使用了 dummy 节点和两把锁,在同一时刻,可以允许两个线程分别执行 put 和 take。
| 1
2
3
 | private final ReentrantLock putLock = new ReentrantLock();
private final ReentrantLock takeLock = new ReentrantLock();
 | 
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 | public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    // 计数
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        // 如果容量已满,进行等待
        while (count.get() == capacity) {
            notFull.await();
        }
        // 容量未满,进行入队
        enqueue(node);
        c = count.getAndIncrement(); 
        // 入队后队列还有空位, 唤醒其他 put 线程
        if (c + 1 < capacity)
            notFull.signal();  // 只唤醒一个,减少竞争
    } finally {
        putLock.unlock();
    }
    // 如果队列中有一个元素, 唤醒 take 线程
    if (c == 0)
        // notEmpty.signal() 而不是 notEmpty.signalAll();减少竞争
        signalNotEmpty();
}
 | 
|  1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 | public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 如果队列中只有一个空位时, 叫醒 put 线程
    // 如果有多个线程进行出队, 第一个线程 c == capacity, 后续线程 c < capacity
    if (c == capacity)
        // notFull.signal() 而不是 notFull.signalAll(),减少竞争
        signalNotFull();
        return x; 
}
 |