线程池
2026/1/15大约 4 分钟
线程池
为什么使用线程池
- 降低资源消耗:重复利用已创建的线程,减少线程创建和销毁的开销
- 提高响应速度:任务到达时,无需等待线程创建即可执行
- 提高线程可管理性:统一管理线程,避免无限制创建线程导致系统崩溃
ThreadPoolExecutor 核心参数
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 工作队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 拒绝策略
)参数详解
| 参数 | 说明 |
|---|---|
| corePoolSize | 核心线程数,即使空闲也不会被回收(除非设置 allowCoreThreadTimeOut) |
| maximumPoolSize | 最大线程数,当队列满时会创建新线程,直到达到此值 |
| keepAliveTime | 非核心线程空闲时的存活时间 |
| workQueue | 存放待执行任务的阻塞队列 |
| threadFactory | 创建线程的工厂,可自定义线程名称等 |
| handler | 当线程池和队列都满时的拒绝策略 |
线程池工作流程
工作队列类型
ArrayBlockingQueue
// 有界队列,基于数组
new ArrayBlockingQueue<>(100)LinkedBlockingQueue
// 可选有界队列,基于链表
new LinkedBlockingQueue<>() // 无界(Integer.MAX_VALUE)
new LinkedBlockingQueue<>(100) // 有界SynchronousQueue
// 不存储元素,每个插入操作必须等待一个移除操作
new SynchronousQueue<>()PriorityBlockingQueue
// 优先级队列,任务按优先级执行
new PriorityBlockingQueue<>()拒绝策略
// 1. AbortPolicy(默认):抛出 RejectedExecutionException
new ThreadPoolExecutor.AbortPolicy()
// 2. CallerRunsPolicy:由调用线程执行任务
new ThreadPoolExecutor.CallerRunsPolicy()
// 3. DiscardPolicy:直接丢弃任务
new ThreadPoolExecutor.DiscardPolicy()
// 4. DiscardOldestPolicy:丢弃队列中最老的任务
new ThreadPoolExecutor.DiscardOldestPolicy()
// 5. 自定义拒绝策略
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录日志、持久化任务等
log.warn("任务被拒绝: {}", r.toString());
}
}Executors 工厂方法
注意
阿里巴巴 Java 开发手册建议不要使用 Executors 创建线程池,而是通过 ThreadPoolExecutor 手动创建,以便更好地控制参数。
newFixedThreadPool
// 固定大小线程池
// 问题:使用无界队列,可能导致 OOM
ExecutorService executor = Executors.newFixedThreadPool(10);newCachedThreadPool
// 可缓存线程池,线程数无上限
// 问题:最大线程数为 Integer.MAX_VALUE,可能创建大量线程
ExecutorService executor = Executors.newCachedThreadPool();newSingleThreadExecutor
// 单线程线程池,保证任务顺序执行
ExecutorService executor = Executors.newSingleThreadExecutor();newScheduledThreadPool
// 定时任务线程池
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
// 延迟执行
scheduler.schedule(() -> System.out.println("延迟执行"), 1, TimeUnit.SECONDS);
// 固定频率执行
scheduler.scheduleAtFixedRate(() -> System.out.println("固定频率"), 0, 1, TimeUnit.SECONDS);
// 固定延迟执行
scheduler.scheduleWithFixedDelay(() -> System.out.println("固定延迟"), 0, 1, TimeUnit.SECONDS);推荐的线程池配置
CPU 密集型任务
// 线程数 = CPU 核心数 + 1
int cpuCores = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
cpuCores + 1,
cpuCores + 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy()
);IO 密集型任务
// 线程数 = CPU 核心数 * 2 或 CPU 核心数 / (1 - 阻塞系数)
int cpuCores = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
cpuCores * 2,
cpuCores * 2,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy()
);自定义线程工厂
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "my-pool-" + threadNumber.getAndIncrement());
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
};线程池监控
ThreadPoolExecutor executor = ...;
// 获取线程池状态
int poolSize = executor.getPoolSize(); // 当前线程数
int activeCount = executor.getActiveCount(); // 活跃线程数
int queueSize = executor.getQueue().size(); // 队列中任务数
long completedTaskCount = executor.getCompletedTaskCount(); // 已完成任务数
long taskCount = executor.getTaskCount(); // 总任务数线程池关闭
// 优雅关闭:不再接受新任务,等待已提交任务执行完成
executor.shutdown();
// 等待终止
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
// 超时后强制关闭
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}CompletableFuture
基本使用
// 异步执行
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello";
});
// 获取结果
String result = future.get();链式调用
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World") // 转换结果
.thenAccept(System.out::println) // 消费结果
.thenRun(() -> System.out.println("Done")); // 执行后续操作组合多个 Future
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
// 等待所有完成
CompletableFuture.allOf(future1, future2).join();
// 任意一个完成
CompletableFuture.anyOf(future1, future2).join();
// 合并两个结果
future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2)
.thenAccept(System.out::println);异常处理
CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("出错了");
return "Hello";
})
.exceptionally(ex -> {
System.out.println("异常: " + ex.getMessage());
return "默认值";
})
.thenAccept(System.out::println);