Featured image of post Java 并发 - 并发安全的容器

Java 并发 - 并发安全的容器

🚧 遗留的安全集合

如:

  • Hashtable
  • Vector

这些集合的实现方式简单粗暴,在方法上直接使用 synchronized 进行修饰,在进行读写等操作时,会锁住整个对象,并发性能不太好。

📦 Collections 安全集合

如:

  • Collections.synchronizedCollection(Collection c)
  • Collections.synchronizedList(List list)
  • Collections.synchronizedMap(Map<K,V> m)
  • Collections.synchronizedSet(Set s)

使用该方法,传入一个线程不安全的集合,返回一个 Collections 内部的静态类对象,用于将一个不安全的集合变成一个线程安全的集合。

该方法会将传入的集合作为成员变量,每次操作时,都使用 synchronized 加锁,锁住 mutex(Map 中为当前对象 this)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static class SynchronizedMap<K,V>
        implements Map<K,V>, Serializable {
    private static final long serialVersionUID = 1978198479659022715L;

    private final Map<K,V> m;     // Backing Map
    final Object      mutex;        // Object on which to synchronize

    SynchronizedMap(Map<K,V> m) {
        this.m = Objects.requireNonNull(m);
        mutex = this;
    }

    SynchronizedMap(Map<K,V> m, Object mutex) {
        this.m = m;
        this.mutex = mutex;
    }

    public int size() {
        synchronized (mutex) {return m.size();}  // 操作都会锁住 mutex
    }

    // ... 其他方法
}

🛠 JUC 安全集合

java.util.concurrent 下提供的线程安全的集合类,可以分为三类:

  • Concurrent 系列(适合写多的场景,内部使用 CAS 优化)
  • CopyOnWrite 系列(适合于读多写少的场景)
  • Blocking 系列(阻塞队列)

✍ Concurrent

  • ConcurrentHashMap

  • ConcurrentLinkedQueue / ConcurrentLinkedDeque

  • ConcurrentSkipListMap / ConcurrentSkipListSet

📖 CopyOnWrite

  • CopyOnWriteArrayList

采用了写入时拷贝的思想,更改操作会将底层数组拷贝一份,更改在新的数组上执行,并发时读写操作达到读写分离。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);  // 替换数组
        return true;
    } finally {
        lock.unlock();
    }
}

读操作不进行加锁。适合于读多写少的场景。

1
2
3
4
5
6
7
8
@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
    return (E) a[index];
}

public E get(int index) {
    return get(getArray(), index);
}
  • CopyOnWriteArraySet

CopyOnWriteArraySet 内部由 CopyOnWriteArrayList 实现

1
2
3
4
5
6
public class CopyOnWriteArraySet<E> extends AbstractSet<E>
        implements java.io.Serializable {
    public CopyOnWriteArraySet() {
        al = new CopyOnWriteArrayList<E>();
    }
}

🚧 Blocking

有界队列:有固定大小的队列。

无界队列:没有固定大小的队列

  • ArrayBlockingQueue

    • 有界带缓冲阻塞队列,数组实现
  • LinkedBlockingQueue

    • 由界带缓冲阻塞队列 (Integer.MAX_VALUE),链表实现,可以自定义上限
  • LinkedBlockingDeque

    • 双端队列
  • DelayQueue

    • 延时无界阻塞队列,只允许存放可以延时的元素,队列 head 是最先到期的,如果队列中元素没有到期,就算队列中有元素也不能获取到
  • SynchronousQueue

    • 没有容量的同步队列,不存储元素,每一个 put 必须要等待一个 take 操作
  • PriorityBlockingQueue

    • 无界优先阻塞队列,PriorityQueue 的线程安全版,不允许存放 null

常用方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
public interface BloockingQueue<E> extends Queue<E> {

    // 队列满抛出异常
    boolean add(E e);
    // 无元素抛出异常
    boolean remove(Object o);

    //
    // 队列满返回 false 成功添加 true
    boolean offer(E e);
    // 队列为空返回 null
    E poll();

    // 阻塞时等待
    // 队列满时,阻塞进行等待
    boolean put(E e) throws InterruptedException;
    // 队列为空时,阻塞进行等待
    E take() throws InterruptedException;

}

LinkedBlockingQueue

LinkedBlockingQeque 中使用了 dummy 节点和两把锁,在同一时刻,可以允许两个线程分别执行 puttake

1
2
3
private final ReentrantLock putLock = new ReentrantLock();

private final ReentrantLock takeLock = new ReentrantLock();
  • put
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    // 计数
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        // 如果容量已满,进行等待
        while (count.get() == capacity) {
            notFull.await();
        }
        // 容量未满,进行入队
        enqueue(node);
        c = count.getAndIncrement(); 
        // 入队后队列还有空位, 唤醒其他 put 线程
        if (c + 1 < capacity)
            notFull.signal();  // 只唤醒一个,减少竞争
    } finally {
        putLock.unlock();
    }
    // 如果队列中有一个元素, 唤醒 take 线程
    if (c == 0)
        // notEmpty.signal() 而不是 notEmpty.signalAll();减少竞争
        signalNotEmpty();
}
  • take
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // 如果队列中只有一个空位时, 叫醒 put 线程
    // 如果有多个线程进行出队, 第一个线程 c == capacity, 后续线程 c < capacity
    if (c == capacity)
        // notFull.signal() 而不是 notFull.signalAll(),减少竞争
        signalNotFull();
        return x; 
}