Featured image of post Java并发 - 线程池

Java并发 - 线程池

👨‍🏭 创建

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // ...
}
  • 🧩 corePoolSize 核心线程数

    • 每次向线程池提交一个多线程任务,都会创建一个新的核心线程,无论是否有空闲线程,直到创建的线程数量到达核心线程数,才会开始复用线程资源
    • 使用 prestartAllCoreThreads() 直接初始化全部线程
  • 🧩 maximumPoolSize 最大线程数

    • 当所有线程都处于运行状态,且等待队列已满,会开始创建非核心线程,但不超过最大线程数
  • 🧩 keepAliveTime 线程最大空闲时间

    • 非核心线程超过最大空闲时间,会自动销毁
  • 🧩 unit 最大空闲时间单位

  • 🧩 workQueue 线程等待队列

    • 核心线程数已满时,会将任务暂存到等待队列,直到线程资源可用;当等待队列已满时,会开始创建非核心线程
  • 🧩 threadFactory 线程创建的工厂

  • 🧩 handler 拒绝策略

    • 当等待队列已满,并且达到最大线程数,新的任务会进行拒绝处理

线程池的大小设计

线程池中线程数量太少 -> 当有大量任务处理时,队列会堆积大量任务,导致相应变慢。

线程池中线程数量太多 -> 会有大量的线程抢占 CPU 资源,导致频繁的上下文切换,反而会增加任务的执行时间,影响执行效率。

  • 💻 CPU 密集型

主要是执行计算任务,相应很快,CPU 利用率高。推荐:核心线程数 = CPU 核心数 + 1

  • 💽 IO 密集型

主要是进行 IO 操作,如硬盘读取数据等,IO 操作时间长,CPU 利用不高,推荐:核心线程数 = 2 * CPU 核心数。

拒绝策略

  • ❌ AbortPolicy

    • 拒绝任务,并抛出异常 RejectedExecutionException
  • ❌ DiscardPolicy

    • 不抛出异常,直接丢弃新任务
  • ❌ DiscardOldestPolicy

    • 丢弃最老的任务
  • ❌ CallerRunsPolicy

    • 将任务交给调用线程执行

使用

1
2
3
4
// execute  无返回值
executor.execute(() -> {
    // do ...
});
1
2
3
4
5
6
7
// submit  有返回值
Future<String> future = executor.submit(() -> {
    // do...
    return "返回值";
});

String res = future.get();

👓 常见的线程池

🌊 newSingleThreadExecutor

  • 只有一个线程的线程池
  • 唯一线程可以保证所提交的任务顺序执行

核心线程数 = 最大线程数 = 1

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(
        new ThreadPoolExecutor(1, 1,
                                 0L, TimeUnit.MILLISECONDS,
                                 new LinkedBlockingQueue<Runnable>()));
}

🌊 newFixedThreadPool

  • 固定线程数的线程池

核心线程数 = 最大线程数 = n

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThread, nThread,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>());
}

🌊 newCachedThreadPool

  • 根据任务会不断创建新线程的线程池

当提交新任务时,如果线程池为空或没有空闲线程,则创建新线程执行任务。线程空闲时间超过 keepAliveTime=60s,会自动释放线程资源。长时间空闲的 CachedThreadPool 不会持有任务任何线程资源。

核心线程数 = 0

最大线程数 = Integer.MAX_VALUE

SynchronousQueue 不存储元素的阻塞队列,每一个 put() 都会等待一个 take() 操作

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
}

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 可以用来提交定时任务,extends ThreadPoolExecutor

1
2
3
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

ScheduledThreadPoolExecutor 最大线程池容量为 Integer.MAX_VALUE;都是采用的DelayedWorkQueue 作为等待队列。

1
2
3
4
5
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

// 延迟 3 s 后执行任务
executor.schedule(() -> System.out.println("定时任务"), 3, TimeUnit.SECONDS);
executor.shutdown();
1
2
3
4
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

// 延迟 3 s 后,每 1 s 执行一次任务
executor.scheduleAtFixedRate(() -> System.out.println("定时任务"), 3, 1, TimeUnit.SECONDS);

ForkJoinPool

Fork -> 拆分 Pool -> 合并

把大任务拆分为多个小任务,最后汇总多个小任务的结果,得到整大任务的结果。这些小任务都是同时进行,可提高运算效率。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Demo {
  public static void main(String[] args) {
    ForkJoinTask<Integer> task = new SubTask(1, 1000);
    int res = ForkJoinPool.commonPool().invoke(task);
    System.out.println(res);
  }

  private static class SubTask extends RecursiveTask<Integer> {
    private final int start;
    private final int end;

    public SubTask(int start, int end) {
      this.start = start;
      this.end = end;
    }

    @Override
    protected Integer compute() {
      if (end - start < 125) {  // 任务足够小,直接计算
        System.out.println(Thread.currentThread().getName() + " 开始计算 " + start + "-" + end + "的值");
        int result = 0;
        for (int i = start; i <= end; i++) {
          result += i;
        }
        return result;
      }

      // 任务太大,拆分成子任务
      SubTask subTask1 = new SubTask(start, (end + start) / 2);
      subTask1.fork();

      SubTask subTask2 = new SubTask((end + start) / 2 + 1, end);
      subTask2.fork();

      return subTask1.join() + subTask2.join();
    }
  }
}

工作窃取:

子任务会被放到不同的队列中,每个队列会有对应的线程运行任务。

若某个队列中的任务计算完毕,但其他的任务队列中还有任务,会从其他队列中“窃取”任务继续执行。

参考