👨🏭 创建
1
2
3
4
5
6
7
8
9
| public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// ...
}
|
🧩 corePoolSize
核心线程数
- 每次向线程池提交一个多线程任务,都会创建一个新的核心线程,无论是否有空闲线程,直到创建的线程数量到达核心线程数,才会开始复用线程资源
- 使用
prestartAllCoreThreads()
直接初始化全部线程
🧩 maximumPoolSize
最大线程数
- 当所有线程都处于运行状态,且等待队列已满,会开始创建非核心线程,但不超过最大线程数
🧩 keepAliveTime
线程最大空闲时间
🧩 unit
最大空闲时间单位
🧩 workQueue
线程等待队列
- 核心线程数已满时,会将任务暂存到等待队列,直到线程资源可用;当等待队列已满时,会开始创建非核心线程
🧩 threadFactory
线程创建的工厂
🧩 handler
拒绝策略
- 当等待队列已满,并且达到最大线程数,新的任务会进行拒绝处理
线程池的大小设计
线程池中线程数量太少 -> 当有大量任务处理时,队列会堆积大量任务,导致相应变慢。
线程池中线程数量太多 -> 会有大量的线程抢占 CPU 资源,导致频繁的上下文切换,反而会增加任务的执行时间,影响执行效率。
主要是执行计算任务,相应很快,CPU 利用率高。推荐:核心线程数 = CPU 核心数 + 1
主要是进行 IO 操作,如硬盘读取数据等,IO 操作时间长,CPU 利用不高,推荐:核心线程数 = 2 * CPU 核心数。
拒绝策略
❌ AbortPolicy
- 拒绝任务,并抛出异常 RejectedExecutionException
❌ DiscardPolicy
❌ DiscardOldestPolicy
❌ CallerRunsPolicy
使用
1
2
3
4
| // execute 无返回值
executor.execute(() -> {
// do ...
});
|
1
2
3
4
5
6
7
| // submit 有返回值
Future<String> future = executor.submit(() -> {
// do...
return "返回值";
});
String res = future.get();
|
👓 常见的线程池
🌊 newSingleThreadExecutor
- 只有一个线程的线程池
- 唯一线程可以保证所提交的任务顺序执行
核心线程数 = 最大线程数 = 1
1
2
3
4
5
6
| public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(
new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
|
🌊 newFixedThreadPool
核心线程数 = 最大线程数 = n
1
2
3
4
5
| public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThread, nThread,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
|
🌊 newCachedThreadPool
当提交新任务时,如果线程池为空或没有空闲线程,则创建新线程执行任务。线程空闲时间超过 keepAliveTime=60s,会自动释放线程资源。长时间空闲的 CachedThreadPool 不会持有任务任何线程资源。
核心线程数 = 0
最大线程数 = Integer.MAX_VALUE
SynchronousQueue 不存储元素的阻塞队列,每一个 put() 都会等待一个 take() 操作
1
2
3
4
5
| public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
|
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
可以用来提交定时任务,extends ThreadPoolExecutor
。
1
2
3
| public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
|
ScheduledThreadPoolExecutor
最大线程池容量为 Integer.MAX_VALUE
;都是采用的DelayedWorkQueue
作为等待队列。
1
2
3
4
5
| ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
// 延迟 3 s 后执行任务
executor.schedule(() -> System.out.println("定时任务"), 3, TimeUnit.SECONDS);
executor.shutdown();
|
1
2
3
4
| ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 延迟 3 s 后,每 1 s 执行一次任务
executor.scheduleAtFixedRate(() -> System.out.println("定时任务"), 3, 1, TimeUnit.SECONDS);
|
ForkJoinPool
Fork -> 拆分 Pool -> 合并
把大任务拆分为多个小任务,最后汇总多个小任务的结果,得到整大任务的结果。这些小任务都是同时进行,可提高运算效率。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| public class Demo {
public static void main(String[] args) {
ForkJoinTask<Integer> task = new SubTask(1, 1000);
int res = ForkJoinPool.commonPool().invoke(task);
System.out.println(res);
}
private static class SubTask extends RecursiveTask<Integer> {
private final int start;
private final int end;
public SubTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start < 125) { // 任务足够小,直接计算
System.out.println(Thread.currentThread().getName() + " 开始计算 " + start + "-" + end + "的值");
int result = 0;
for (int i = start; i <= end; i++) {
result += i;
}
return result;
}
// 任务太大,拆分成子任务
SubTask subTask1 = new SubTask(start, (end + start) / 2);
subTask1.fork();
SubTask subTask2 = new SubTask((end + start) / 2 + 1, end);
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
}
|
工作窃取:
子任务会被放到不同的队列中,每个队列会有对应的线程运行任务。
若某个队列中的任务计算完毕,但其他的任务队列中还有任务,会从其他队列中“窃取”任务继续执行。
参考