多线程 JUC 并发工具类
多线程 JUC 并发工具类:CountDownLatch、CyclicBarrier、Semaphore、Condition 使用场景,注意事项。
JUC 常用并发工具类
较为核心的 CountDownLatch、Semaphore、Condition、CyclicBarrier
这四个工具类主要位于 java.util.concurrent (JUC) 包下,大部分基于 AQS (AbstractQueuedSynchronizer) 实现,AQS 说明可查看我这篇文章。
1. CountDownLatch (倒计时门闩)
1.1 概念
CountDownLatch 的逻辑很简单:减法计数。
它允许一个或多个线程等待其他线程完成操作,初始化一个计数器(count),每当一个线程完成任务后,调用 countDown() 使计数器减 1,当计数器变为 0 时,在 await() 上等待的线程会被唤醒。
1.2 代码示例
示例一
// 模拟场景:主线程等待 3 个服务组件加载完毕
CountDownLatch latch = new CountDownLatch(3);
for (int i = 1; i <= 3; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 开始加载...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
System.out.println(Thread.currentThread().getName() + " 加载完成");
latch.countDown(); // 计数器 -1
}, "Service-" + i).start();
}
System.out.println("主线程等待所有服务加载...");
latch.await(); // 阻塞等待,直到 count = 0
System.out.println("所有服务加载完毕,系统启动!");
示例二-跑步
/**
* CountDownLatchDemo 用跑步场景演示一等多和多等一
* <p>
* 一等多场景: 多个选手准备,裁判发令枪,选手起跑;
* <p>
* 多等一场景: 多个选手跑完,裁判等待所有选手到达终点,宣布比赛结束;
*
* @author suremotoo
* @date 2026/01/06 11:30
*/
public class CountDownLatchDemo {
private static final CountDownLatch START_LATCH = new CountDownLatch(1);
private static final CountDownLatch FINISH_LATCH = new CountDownLatch(3);
public static void main(String[] args) {
Runnable player = () -> {
try {
System.out.println(Thread.currentThread().getName() + " 已准备就绪,等待发令枪...");
START_LATCH.await(); // 所有人准备集合,等待裁判发令
System.out.println(Thread.currentThread().getName() + " 起跑!");
Thread.sleep((long) (Math.random() * 10000)); // 模拟跑步时间
System.out.println(Thread.currentThread().getName() + " 到达终点!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
FINISH_LATCH.countDown(); // 选手到达终点,计数器减一
}
};
// 启动选手线程
for (int i = 1; i <= 3; i++) {
new Thread(player, "选手 " + i).start();
}
try {
Thread.sleep(1000); // 模拟裁判准备时间, 检查信号枪等等
System.out.println("裁判:预备,开始!");
START_LATCH.countDown(); // 发令枪响,选手起跑
FINISH_LATCH.await(); // 等待所有选手到达终点
System.out.println("裁判:比赛结束!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
1.3 使用场景
最典型的就是多源数据汇总,比如你要导出一个复杂的 Excel,需要从 5 个不同的微服务拉数据,你可以开 5 个线程去跑,主线程用 await() 等它们全拿回来再最后组装。
1.4 注意事项
-
- 计数器减到 0 后,无法重置,如果需要重置,请使用
CyclicBarrier。
- 计数器减到 0 后,无法重置,如果需要重置,请使用
-
countDown()最好放在finally块中,防止线程报错导致计数器永远不归零,万一某个子线程抛了异常没执行到扣减,主线程会死等下去,会引发事故。
2. CyclicBarrier (循环栅栏)
2.1 概念
相比之下,CyclicBarrier 是做加法,它可以让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
通俗点说,它更像是一个「集合点」,大家(线程)都到了,才能进行下一步,而且还能顺便触发一个回调。
2.2 代码示例
// 模拟场景:3 个线程分别收集数据,都收集完后,触发一次汇总操作
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println(">> 所有线程到达屏障,开始执行汇总任务 <<");
});
for (int i = 1; i <= 3; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 正在收集数据...");
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 到达屏障");
barrier.await(); // 等待其他人
System.out.println(Thread.currentThread().getName() + " 冲破屏障,继续后续工作");
} catch (Exception e) { e.printStackTrace(); }
}, "Worker-" + i).start();
}
2.3 使用场景
比如开始游戏加载那样的场景,等待 10 名玩家都加载进度达到 100% 后,同时进入战场。
2.4 注意事项
它和 CountDownLatch 最大的不同是 CyclicBarrier 可重用,当屏障打开后,内部计数器会自动重置,可以用于多轮次的并发任务。
另外要注意一个异常处理: BrokenBarrierException,如果等待过程中某个线程中断或超时,栅栏会损坏,所有在等待的线程都会抛出此异常。
2.5 异同对比
| 特性 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 侧重点 | 一个等多个 (也可多个等一个),侧重于任务完成的“状态” | 多个互相等,侧重于线程到达同步点 |
| 可重用性 | 一次性,计数器归零后失效 | 可循环使用,屏障打开后自动重置 |
| 动作触发 | 调用 countDown() 仅减计数,线程继续执行 | 调用 await() 会阻塞,直到所有人到齐 |
| 构造参数 | 仅接收数值 | 可接收数值 + Runnable 回调 |
3. Semaphore (信号量)
3.1 概念
Semaphore 就是限流,用来控制同时访问特定资源的线程数量,通过 acquire() 获取许可,release() 释放许可。
3.2 代码示例
// 模拟场景:只有 2 个停车位,但有 5 辆车要停
Semaphore semaphore = new Semaphore(2);
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 抢占车位
System.out.println(Thread.currentThread().getName() + " 抢到车位,停车中...");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " 离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放车位
}
}, "Car-" + i).start();
}
3.3 使用场景
接口限流:防止瞬间高并发把服务打挂,例如,限制某个 API 接口每秒只能处理 50 个请求。不过目前的实际项目中基本是不会用 Semaphore 的,更多是用 Guava RateLimiter 或 Sentinel 一些框架,但底层思想类似。
3.4 注意事项
支持公平性选择,new Semaphore(int permits, boolean fair),fair 设置为 true 可以实现公平锁(先来后到),但性能会下降,默认是非公平。
还有就是一定要在 finally 中释放,否则信号量泄漏,拿走的用于不归还,后面能用的就越来越少,直到卡死。
4. Condition (条件对象)
4.1 概念
它是 Object.wait() 和 notify() 的升级版,必须配合 Lock (如 ReentrantLock) 使用。它支持多个等待队列,可以分组唤醒,意味着你可以精准地唤醒“某类”等待的线程,而不是像 notifyAll() 那样一股脑全唤醒。
举个例子就应该更明了,notifyAll() 就像是在寝室楼下喊一嗓子,把所有人都吵醒;而 Condition 是你可以精准地给张三发个微信,让他醒来干活。
4.2 代码示例
// 模拟场景:生产者-消费者 (交替打印 A 和 B)
Lock lock = new ReentrantLock();
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();
// 标志位
boolean isATurn = true;
// 线程 A
new Thread(() -> {
lock.lock();
try {
if (!isATurn) conditionA.await(); // 不是我的回合,去 A 队列睡
System.out.println("A");
isATurn = false;
conditionB.signal(); // 唤醒 B 队列的一个线程
} catch (Exception e) {} finally { lock.unlock(); }
}).start();
// 线程 B (类似逻辑,省略)
4.3 使用场景
ArrayBlockingQueue 内部就是用两个 Condition (notEmpty 和 notFull) 来分别控制“队列满了生产者等待”和“队列空了消费者等待”的逻辑。
如果是复杂的线程通信,需要严格控制线程执行顺序的场景也可以考虑。
用法注意:必须在 lock.lock() 和 lock.unlock() 之间使用,否则抛出 IllegalMonitorStateException。
Tip
如果想对 Condition 有更多了解, 可查看我的 AQS 篇文章的 Condition 部分。
5. 总结
- CountDownLatch:主线程想等一堆子线程干完活。
- Semaphore:资源有限,用来控制并发资源,限流、资源控制场景。
- CyclicBarrier:一帮线程步调必须一致,且可能要循环好几轮,复杂计算、多阶段任务聚合。
- Condition:写底层组件(如阻塞队列、生产者消费者模型),需要精细控制线程唤醒。
6. 个人使用感受
我个人在项目中很少用到这些工具类,但是我常常会在测试时用到,比如写个小工具类,用来测试个别接口的并发性能,比如下面这个:
public static void concurrentTest(int concurrentReqs) throws InterruptedException {
// 模拟并发请求数量 (例如 5 个请求同时进来)
final int CONCURRENT_REQUESTS = concurrentReqs;
System.out.println("====== 开始并发压力测试 ======");
System.out.println("模拟并发数: " + CONCURRENT_REQUESTS);
// 模拟线程池 (多线程同时发起请求)
ExecutorService requestPool = Executors.newFixedThreadPool(CONCURRENT_REQUESTS);
// 闭锁:用于确保所有线程“同时”开始请求(模拟瞬时高峰)
CountDownLatch startGate = new CountDownLatch(1);
// 闭锁:用于等待所有任务结束
CountDownLatch endGate = new CountDownLatch(CONCURRENT_REQUESTS);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failCount = new AtomicInteger(0);
long globalStart = System.currentTimeMillis();
for (int i = 0; i < CONCURRENT_REQUESTS; i++) {
final int index = i;
requestPool.submit(() -> {
try {
// 所有线程在这里阻塞,等待主线程发令
startGate.await();
System.out.println("[Client-Thread-" + index + "] 请求已提交...");
long t1 = System.currentTimeMillis();
// --- 核心调用 ---
Office2PdfReq req = new Office2PdfReq();
String wordUrl = wordUrlsList.get(index % wordUrlsList.size());
System.out.println("wordUrl = " + wordUrl);
req.setFileUrl(wordUrl);
req.setTraceId(IDUtil.getSecureRandomId());
req.setChannel("X01");
req.setOperationUserId(361);
req.setRequestTime(new Date());
ServiceResult<String> serviceResult = service.word2Pdf(req);
System.out.println("serviceResult = " + serviceResult);
System.out.println("-----------");
// ---------------
long t2 = System.currentTimeMillis();
System.out.println("[Client-Thread-" + index + "] ✔️ 转换成功,耗时: " + (t2 - t1) + "ms");
successCount.incrementAndGet();
} catch (Exception e) {
System.err.println("[Client-Thread-" + index + "] ❌ 转换失败: " + e.getMessage());
failCount.incrementAndGet();
} finally {
endGate.countDown(); // 完成一个任务
}
});
}
// 发令枪响,所有线程同时开始执行
System.out.println(">>> 所有线程就绪,3秒后开始轰炸...");
Thread.sleep(3000);
startGate.countDown();
// 等待所有任务完成
endGate.await();
long globalEnd = System.currentTimeMillis();
long totalTime = globalEnd - globalStart;
System.out.println("\n====== 测试报告 ======");
System.out.println("总耗时: " + totalTime + " ms");
System.out.println("平均每个文档耗时: " + (totalTime / CONCURRENT_REQUESTS) + " ms");
System.out.println("成功: " + successCount.get());
System.out.println("失败: " + failCount.get());
requestPool.shutdown();
System.exit(0);
}