并发工具类
2026/1/15大约 4 分钟
并发工具类
CountDownLatch 倒计时器
允许一个或多个线程等待其他线程完成操作。
使用场景
- 主线程等待多个子线程完成初始化
- 多个线程等待某个信号后同时开始
示例代码
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int index = i;
new Thread(() -> {
try {
Thread.sleep((long) (Math.random() * 1000));
System.out.println("线程 " + index + " 完成");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown(); // 计数减 1
}
}).start();
}
latch.await(); // 等待计数归零
System.out.println("所有线程执行完毕");
}
}模拟并发测试
public class ConcurrentTest {
public static void main(String[] args) throws InterruptedException {
int threadCount = 100;
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
startSignal.await(); // 等待开始信号
// 执行并发操作
doSomething();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
doneSignal.countDown();
}
}).start();
}
System.out.println("准备就绪,开始并发测试");
startSignal.countDown(); // 发出开始信号
doneSignal.await(); // 等待所有线程完成
System.out.println("并发测试完成");
}
private static void doSomething() {
// 模拟业务操作
}
}CyclicBarrier 循环栅栏
让一组线程到达一个屏障点时被阻塞,直到最后一个线程到达屏障点,所有线程才会继续执行。
CountDownLatch vs CyclicBarrier
| 特性 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 计数方式 | 递减 | 递减后重置 |
| 可重用 | 不可 | 可以 |
| 等待方 | 一个或多个线程等待其他线程 | 多个线程互相等待 |
| 回调 | 无 | 可设置屏障动作 |
示例代码
public class CyclicBarrierDemo {
public static void main(String[] args) {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
// 所有线程到达屏障后执行
System.out.println("所有线程已到达屏障,继续执行");
});
for (int i = 0; i < threadCount; i++) {
final int index = i;
new Thread(() -> {
try {
System.out.println("线程 " + index + " 开始执行第一阶段");
Thread.sleep((long) (Math.random() * 1000));
System.out.println("线程 " + index + " 到达屏障");
barrier.await(); // 等待其他线程
System.out.println("线程 " + index + " 开始执行第二阶段");
Thread.sleep((long) (Math.random() * 1000));
barrier.await(); // 可重复使用
System.out.println("线程 " + index + " 完成");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}Semaphore 信号量
控制同时访问特定资源的线程数量。
使用场景
- 限流
- 资源池(数据库连接池、对象池)
示例代码
public class SemaphoreDemo {
public static void main(String[] args) {
// 最多允许 3 个线程同时访问
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
final int index = i;
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println("线程 " + index + " 获取许可,开始执行");
Thread.sleep(2000);
System.out.println("线程 " + index + " 释放许可");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放许可
}
}).start();
}
}
}限流示例
public class RateLimiter {
private final Semaphore semaphore;
public RateLimiter(int permits) {
this.semaphore = new Semaphore(permits);
}
public boolean tryAcquire() {
return semaphore.tryAcquire();
}
public void release() {
semaphore.release();
}
public void execute(Runnable task) {
if (tryAcquire()) {
try {
task.run();
} finally {
release();
}
} else {
System.out.println("请求被限流");
}
}
}Exchanger 交换器
用于两个线程之间交换数据。
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
String data = "来自线程A的数据";
System.out.println("线程A准备交换: " + data);
String received = exchanger.exchange(data);
System.out.println("线程A收到: " + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
String data = "来自线程B的数据";
System.out.println("线程B准备交换: " + data);
String received = exchanger.exchange(data);
System.out.println("线程B收到: " + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}Phaser 阶段器
更灵活的同步屏障,支持动态注册和注销参与者。
public class PhaserDemo {
public static void main(String[] args) {
Phaser phaser = new Phaser(3); // 3 个参与者
for (int i = 0; i < 3; i++) {
final int index = i;
new Thread(() -> {
System.out.println("线程 " + index + " 完成第一阶段");
phaser.arriveAndAwaitAdvance(); // 等待其他线程
System.out.println("线程 " + index + " 完成第二阶段");
phaser.arriveAndAwaitAdvance();
System.out.println("线程 " + index + " 完成第三阶段");
phaser.arriveAndDeregister(); // 注销
}).start();
}
}
}并发集合
ConcurrentHashMap
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 原子操作
map.put("key", 1);
map.putIfAbsent("key", 2); // 不存在才放入
map.computeIfAbsent("key", k -> 1); // 不存在才计算
map.computeIfPresent("key", (k, v) -> v + 1); // 存在才计算
map.merge("key", 1, Integer::sum); // 合并CopyOnWriteArrayList
适用于读多写少的场景,写操作时复制整个数组。
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("item");
// 迭代时不会抛出 ConcurrentModificationException
for (String item : list) {
list.add("new item"); // 写操作不影响当前迭代
}BlockingQueue 阻塞队列
// ArrayBlockingQueue - 有界数组队列
BlockingQueue<String> arrayQueue = new ArrayBlockingQueue<>(100);
// LinkedBlockingQueue - 链表队列
BlockingQueue<String> linkedQueue = new LinkedBlockingQueue<>();
// PriorityBlockingQueue - 优先级队列
BlockingQueue<String> priorityQueue = new PriorityBlockingQueue<>();
// DelayQueue - 延迟队列
BlockingQueue<Delayed> delayQueue = new DelayQueue<>();
// 阻塞操作
arrayQueue.put("item"); // 队列满时阻塞
arrayQueue.take(); // 队列空时阻塞
// 非阻塞操作
arrayQueue.offer("item"); // 队列满时返回 false
arrayQueue.poll(); // 队列空时返回 null
// 超时操作
arrayQueue.offer("item", 1, TimeUnit.SECONDS);
arrayQueue.poll(1, TimeUnit.SECONDS);生产者消费者模式
public class ProducerConsumer {
private final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
public void produce() throws InterruptedException {
int value = 0;
while (true) {
queue.put(value);
System.out.println("生产: " + value);
value++;
Thread.sleep(100);
}
}
public void consume() throws InterruptedException {
while (true) {
Integer value = queue.take();
System.out.println("消费: " + value);
Thread.sleep(200);
}
}
public static void main(String[] args) {
ProducerConsumer pc = new ProducerConsumer();
new Thread(() -> {
try { pc.produce(); } catch (InterruptedException e) { }
}).start();
new Thread(() -> {
try { pc.consume(); } catch (InterruptedException e) { }
}).start();
}
}StampedLock 邮戳锁
JDK 8 引入,支持乐观读,性能优于 ReadWriteLock。
public class StampedLockDemo {
private double x, y;
private final StampedLock lock = new StampedLock();
// 写锁
public void move(double deltaX, double deltaY) {
long stamp = lock.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
lock.unlockWrite(stamp);
}
}
// 乐观读
public double distanceFromOrigin() {
long stamp = lock.tryOptimisticRead(); // 乐观读,不加锁
double currentX = x, currentY = y;
if (!lock.validate(stamp)) { // 检查是否有写操作
stamp = lock.readLock(); // 升级为悲观读锁
try {
currentX = x;
currentY = y;
} finally {
lock.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}