多线程 JUC 并发工具类
多线程 JUC 并发工具类:CountDownLatch、CyclicBarrier、Semaphore、Condition 使用场景,注意事项。
JUC 常用并发工具类
较为核心的 CountDownLatch、Semaphore、Condition、CyclicBarrier
这四个工具类主要位于 java.util.concurrent (JUC) 包下,大部分基于 AQS (AbstractQueuedSynchronizer) 实现,AQS 说明可查看我这篇文章。
1. CountDownLatch (倒计时门闩)
1.1 概念
“做减法”。
它允许一个或多个线程等待其他线程完成操作,初始化一个计数器(count),每当一个线程完成任务后,调用 countDown() 使计数器减 1,当计数器变为 0 时,在 await() 上等待的线程会被唤醒。
1.2 代码示例
示例一
// 模拟场景:主线程等待 3 个服务组件加载完毕
CountDownLatch latch = new CountDownLatch(3);
for (int i = 1; i <= 3; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 开始加载...");
try { Thread.sleep(1000); } catch (InterruptedException e) {}
System.out.println(Thread.currentThread().getName() + " 加载完成");
latch.countDown(); // 计数器 -1
}, "Service-" + i).start();
}
System.out.println("主线程等待所有服务加载...");
latch.await(); // 阻塞等待,直到 count = 0
System.out.println("所有服务加载完毕,系统启动!");
示例二-跑步
/**
* CountDownLatchDemo 用跑步场景演示一等多和多等一
* <p>
* 一等多场景: 多个选手准备,裁判发令枪,选手起跑;
* <p>
* 多等一场景: 多个选手跑完,裁判等待所有选手到达终点,宣布比赛结束;
*
* @author suremotoo
* @date 2026/01/06 11:30
*/
public class CountDownLatchDemo {
private static final CountDownLatch START_LATCH = new CountDownLatch(1);
private static final CountDownLatch FINISH_LATCH = new CountDownLatch(3);
public static void main(String[] args) {
Runnable player = () -> {
try {
System.out.println(Thread.currentThread().getName() + " 已准备就绪,等待发令枪...");
START_LATCH.await(); // 所有人准备集合,等待裁判发令
System.out.println(Thread.currentThread().getName() + " 起跑!");
Thread.sleep((long) (Math.random() * 10000)); // 模拟跑步时间
System.out.println(Thread.currentThread().getName() + " 到达终点!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
FINISH_LATCH.countDown(); // 选手到达终点,计数器减一
}
};
// 启动选手线程
for (int i = 1; i <= 3; i++) {
new Thread(player, "选手 " + i).start();
}
try {
Thread.sleep(1000); // 模拟裁判准备时间, 检查信号枪等等
System.out.println("裁判:预备,开始!");
START_LATCH.countDown(); // 发令枪响,选手起跑
FINISH_LATCH.await(); // 等待所有选手到达终点
System.out.println("裁判:比赛结束!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
1.3 使用场景
- 多服务启动检测:Spring Boot 启动时,等待数据库连接、缓存连接、MQ 连接都校验通过后,再对外暴露 HTTP 端口。
- 并行数据处理:Excel 导出时,开启 10 个线程分别查询 10 张表的数据,主线程等待它们全部查完,再进行组装。
1.4 注意事项
- 不可重用:计数器减到 0 后,无法重置,如果需要重置,请使用
CyclicBarrier。 - 异常处理:
countDown()最好放在finally块中,防止线程报错导致计数器永远不归零,主线程死锁。
2. CyclicBarrier (循环栅栏)
2.1 概念
“做加法”。 它可以让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
2.2 代码示例
// 模拟场景:3 个线程分别收集数据,都收集完后,触发一次汇总操作
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println(">> 所有线程到达屏障,开始执行汇总任务 <<");
});
for (int i = 1; i <= 3; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 正在收集数据...");
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 到达屏障");
barrier.await(); // 等待其他人
System.out.println(Thread.currentThread().getName() + " 冲破屏障,继续后续工作");
} catch (Exception e) { e.printStackTrace(); }
}, "Worker-" + i).start();
}
2.3 使用场景
- 多线程计算合并:比如对一个巨大的银行流水表进行统计,分为 4 个线程计算 4 个季度的流水,所有线程都算完后,触发回调任务把 4 个季度的结果相加。
- 游戏开发:MOBA 游戏中,等待 10 名玩家都加载进度达到 100% 后,同时进入战场。
2.4 注意事项
- 可重用:当屏障打开后,内部计数器会自动重置,可以用于多轮次的并发任务。
- BrokenBarrierException:如果等待过程中某个线程中断或超时,栅栏会损坏,所有在等待的线程都会抛出此异常。
3. Semaphore (信号量)
3.1 概念
“限流器”。
它用来控制同时访问特定资源的线程数量,通过 acquire() 获取许可,release() 释放许可。
3.2 代码示例
// 模拟场景:只有 2 个停车位,但有 5 辆车要停
Semaphore semaphore = new Semaphore(2);
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 抢占车位
System.out.println(Thread.currentThread().getName() + " 抢到车位,停车中...");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " 离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放车位
}
}, "Car-" + i).start();
}
3.3 使用场景
- 接口限流 (Rate Limiting):防止瞬间高并发把服务打挂。例如,限制某个 API 接口每秒只能处理 50 个请求(虽然现在更多用 Guava RateLimiter 或 Sentinel,但原理类似)。
- 数据库连接池:虽然连接池本身有实现,但 Semaphore 是实现连接池资源借出的底层思想。
3.4 注意事项
- 公平性:
new Semaphore(int permits, boolean fair),fair设置为true可以实现公平锁(先来后到),但性能会下降,默认是非公平。 - Release 必须执行:一定要在
finally中释放,否则信号量泄漏,系统吞吐量会越来越低直到卡死。
4. Condition (条件对象)
4.1 概念
“精准通知”。
它是 Object.wait() 和 notify() 的升级版,必须配合 Lock (如 ReentrantLock) 使用。它支持多个等待队列,意味着你可以精准地唤醒“某类”等待的线程,而不是像 notifyAll() 那样一股脑全唤醒。
4.2 代码示例
// 模拟场景:生产者-消费者 (交替打印 A 和 B)
Lock lock = new ReentrantLock();
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();
// 标志位
boolean isATurn = true;
// 线程 A
new Thread(() -> {
lock.lock();
try {
if (!isATurn) conditionA.await(); // 不是我的回合,去 A 队列睡
System.out.println("A");
isATurn = false;
conditionB.signal(); // 唤醒 B 队列的一个线程
} catch (Exception e) {} finally { lock.unlock(); }
}).start();
// 线程 B (类似逻辑,省略)
4.3 使用场景
- 阻塞队列源码:
ArrayBlockingQueue内部就是用两个 Condition (notEmpty和notFull) 来分别控制“队列满了生产者等待”和“队列空了消费者等待”的逻辑。 - 复杂的线程通信:需要严格控制线程执行顺序的场景。
4.4 注意事项
- 死锁风险:必须在
lock.lock()和lock.unlock()之间使用,否则抛出IllegalMonitorStateException。 - 虚假唤醒:在使用
await()时,建议总是使用while循环检查条件,而不是if。
5. 总结与对比
- CountDownLatch: 用来等别人干完,常用,业务逻辑中等待异步结果
- Semaphore: 用来控制并发资源,限流、资源控制场景
- CyclicBarrier: 用来等大家到齐,复杂计算、多阶段任务聚合
- Condition: 用来精细化线程通信,通常在造轮子写中间件时用,业务代码直接用的少,直接用
BlockingQueue更多
CountDownLatch vs CyclicBarrier 的区别
| 特性 | CountDownLatch (倒计时门闩) | CyclicBarrier (循环栅栏) |
|---|---|---|
| 侧重点 | 一个等多个 (也可多个等一个),侧重于任务完成的“状态”。 | 多个互相等,侧重于线程到达同步点。 |
| 可重用性 | 一次性,计数器归零后失效。 | 可循环使用,屏障打开后自动重置。 |
| 动作触发 | 调用 countDown() 仅减计数,线程继续执行。 | 调用 await() 会阻塞,直到所有人到齐。 |
| 构造参数 | 仅接收数值。 | 可接收数值 + Runnable 回调 (这是亮点)。 |
简单选择建议
- 如果你需要 主线程等待子线程 干完活再继续:选 CountDownLatch。
- 如果你需要控制 并发度(比如只允许 10 个线程同时访问):选 Semaphore。
- 如果你需要 多线程步调一致(大家都在这等着,等人齐了一起跑):选 CyclicBarrier。
实际使用
我个人在项目中很少用到这些工具类,但是我常常会在测试时用到,比如写个小工具类,用来测试个别接口的并发性能,比如下面这个:
public static void concurrentTest(int concurrentReqs) throws InterruptedException {
// 模拟并发请求数量 (例如 5 个请求同时进来)
final int CONCURRENT_REQUESTS = concurrentReqs;
System.out.println("====== 开始并发压力测试 ======");
System.out.println("模拟并发数: " + CONCURRENT_REQUESTS);
// 模拟线程池 (多线程同时发起请求)
ExecutorService requestPool = Executors.newFixedThreadPool(CONCURRENT_REQUESTS);
// 闭锁:用于确保所有线程“同时”开始请求(模拟瞬时高峰)
CountDownLatch startGate = new CountDownLatch(1);
// 闭锁:用于等待所有任务结束
CountDownLatch endGate = new CountDownLatch(CONCURRENT_REQUESTS);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failCount = new AtomicInteger(0);
long globalStart = System.currentTimeMillis();
for (int i = 0; i < CONCURRENT_REQUESTS; i++) {
final int index = i;
requestPool.submit(() -> {
try {
// 所有线程在这里阻塞,等待主线程发令
startGate.await();
System.out.println("[Client-Thread-" + index + "] 请求已提交...");
long t1 = System.currentTimeMillis();
// --- 核心调用 ---
Office2PdfReq req = new Office2PdfReq();
String wordUrl = wordUrlsList.get(index % wordUrlsList.size());
System.out.println("wordUrl = " + wordUrl);
req.setFileUrl(wordUrl);
req.setTraceId(IDUtil.getSecureRandomId());
req.setChannel("X01");
req.setOperationUserId(361);
req.setRequestTime(new Date());
ServiceResult<String> serviceResult = service.word2Pdf(req);
System.out.println("serviceResult = " + serviceResult);
System.out.println("-----------");
// ---------------
long t2 = System.currentTimeMillis();
System.out.println("[Client-Thread-" + index + "] ✔️ 转换成功,耗时: " + (t2 - t1) + "ms");
successCount.incrementAndGet();
} catch (Exception e) {
System.err.println("[Client-Thread-" + index + "] ❌ 转换失败: " + e.getMessage());
failCount.incrementAndGet();
} finally {
endGate.countDown(); // 完成一个任务
}
});
}
// 发令枪响,所有线程同时开始执行
System.out.println(">>> 所有线程就绪,3秒后开始轰炸...");
Thread.sleep(3000);
startGate.countDown();
// 等待所有任务完成
endGate.await();
long globalEnd = System.currentTimeMillis();
long totalTime = globalEnd - globalStart;
System.out.println("\n====== 测试报告 ======");
System.out.println("总耗时: " + totalTime + " ms");
System.out.println("平均每个文档耗时: " + (totalTime / CONCURRENT_REQUESTS) + " ms");
System.out.println("成功: " + successCount.get());
System.out.println("失败: " + failCount.get());
requestPool.shutdown();
System.exit(0);
}