多线程基础

多线程 JUC 并发工具类

多线程 JUC 并发工具类:CountDownLatch、CyclicBarrier、Semaphore、Condition 使用场景,注意事项。

#多线程基础#JUC#并发

JUC 常用并发工具类

较为核心的 CountDownLatch、Semaphore、Condition、CyclicBarrier

这四个工具类主要位于 java.util.concurrent (JUC) 包下,大部分基于 AQS (AbstractQueuedSynchronizer) 实现,AQS 说明可查看我这篇文章


1. CountDownLatch (倒计时门闩)

1.1 概念

CountDownLatch 的逻辑很简单:减法计数。 它允许一个或多个线程等待其他线程完成操作,初始化一个计数器(count),每当一个线程完成任务后,调用 countDown() 使计数器减 1,当计数器变为 0 时,在 await() 上等待的线程会被唤醒。

1.2 代码示例

示例一

// 模拟场景:主线程等待 3 个服务组件加载完毕
CountDownLatch latch = new CountDownLatch(3);

for (int i = 1; i <= 3; i++) {
  new Thread(() -> {
    System.out.println(Thread.currentThread().getName() + " 开始加载...");
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {}
    System.out.println(Thread.currentThread().getName() + " 加载完成");
    latch.countDown(); // 计数器 -1
  }, "Service-" + i).start();
}

System.out.println("主线程等待所有服务加载...");
latch.await(); // 阻塞等待,直到 count = 0
System.out.println("所有服务加载完毕,系统启动!");
JUC 常用并发工具演示 CountDownLatch
倒计时门闩模型:主线程在 await() 处等待,直到所有子任务 countDown() 归零。
当前计数
3
Main Thread
运行中
Service-1
加载中...
Service-2
加载中...
Service-3
加载中...
[05:02:33] 初始化 CountDownLatch(3)

示例二-跑步

/**
 * CountDownLatchDemo 用跑步场景演示一等多和多等一
 * <p>
 * 一等多场景: 多个选手准备,裁判发令枪,选手起跑;
 * <p>
 * 多等一场景: 多个选手跑完,裁判等待所有选手到达终点,宣布比赛结束;
 *
 * @author suremotoo
 * @date 2026/01/06 11:30
 */
public class CountDownLatchDemo {


  private static final CountDownLatch START_LATCH = new CountDownLatch(1);
  private static final CountDownLatch FINISH_LATCH = new CountDownLatch(3);

  public static void main(String[] args) {
    Runnable player = () -> {
      try {
        System.out.println(Thread.currentThread().getName() + " 已准备就绪,等待发令枪...");
        START_LATCH.await(); // 所有人准备集合,等待裁判发令
        System.out.println(Thread.currentThread().getName() + " 起跑!");
        Thread.sleep((long) (Math.random() * 10000)); // 模拟跑步时间
        System.out.println(Thread.currentThread().getName() + " 到达终点!");
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      } finally {
        FINISH_LATCH.countDown(); // 选手到达终点,计数器减一
      }
    };

    // 启动选手线程
    for (int i = 1; i <= 3; i++) {
      new Thread(player, "选手 " + i).start();
    }

    try {
      Thread.sleep(1000); // 模拟裁判准备时间, 检查信号枪等等
      System.out.println("裁判:预备,开始!");
      START_LATCH.countDown(); // 发令枪响,选手起跑

      FINISH_LATCH.await(); // 等待所有选手到达终点

      System.out.println("裁判:比赛结束!");
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }

}

1.3 使用场景

最典型的就是多源数据汇总,比如你要导出一个复杂的 Excel,需要从 5 个不同的微服务拉数据,你可以开 5 个线程去跑,主线程用 await() 等它们全拿回来再最后组装。

1.4 注意事项

    1. 计数器减到 0 后,无法重置,如果需要重置,请使用 CyclicBarrier
    1. countDown() 最好放在 finally 块中,防止线程报错导致计数器永远不归零,万一某个子线程抛了异常没执行到扣减,主线程会死等下去,会引发事故。

2. CyclicBarrier (循环栅栏)

2.1 概念

相比之下,CyclicBarrier 是做加法,它可以让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。

通俗点说,它更像是一个「集合点」,大家(线程)都到了,才能进行下一步,而且还能顺便触发一个回调。

2.2 代码示例

// 模拟场景:3 个线程分别收集数据,都收集完后,触发一次汇总操作
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    System.out.println(">> 所有线程到达屏障,开始执行汇总任务 <<");
});

for (int i = 1; i <= 3; i++) {
    new Thread(() -> {
        System.out.println(Thread.currentThread().getName() + " 正在收集数据...");
        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " 到达屏障");
            barrier.await(); // 等待其他人
            System.out.println(Thread.currentThread().getName() + " 冲破屏障,继续后续工作");
        } catch (Exception e) { e.printStackTrace(); }
    }, "Worker-" + i).start();
}
JUC 常用并发工具演示 CyclicBarrier
循环栅栏模型:多个线程在同一屏障点汇合,最后一个到达后统一继续执行。
到达进度
0 / 3
Worker-1
收集数据中
Worker-2
收集数据中
Worker-3
收集数据中
[05:02:33] 初始化 CyclicBarrier(3, barrierAction)

2.3 使用场景

比如开始游戏加载那样的场景,等待 10 名玩家都加载进度达到 100% 后,同时进入战场。

2.4 注意事项

它和 CountDownLatch 最大的不同是 CyclicBarrier 可重用,当屏障打开后,内部计数器会自动重置,可以用于多轮次的并发任务。

另外要注意一个异常处理: BrokenBarrierException,如果等待过程中某个线程中断或超时,栅栏会损坏,所有在等待的线程都会抛出此异常。

2.5 异同对比

特性CountDownLatchCyclicBarrier
侧重点一个等多个 (也可多个等一个),侧重于任务完成的“状态”多个互相等,侧重于线程到达同步点
可重用性一次性,计数器归零后失效可循环使用,屏障打开后自动重置
动作触发调用 countDown() 仅减计数,线程继续执行调用 await() 会阻塞,直到所有人到齐
构造参数仅接收数值可接收数值 + Runnable 回调

3. Semaphore (信号量)

3.1 概念

Semaphore 就是限流,用来控制同时访问特定资源的线程数量,通过 acquire() 获取许可,release() 释放许可。

3.2 代码示例

// 模拟场景:只有 2 个停车位,但有 5 辆车要停
Semaphore semaphore = new Semaphore(2);

for (int i = 1; i <= 5; i++) {
    new Thread(() -> {
        try {
            semaphore.acquire(); // 抢占车位
            System.out.println(Thread.currentThread().getName() + " 抢到车位,停车中...");
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + " 离开车位");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release(); // 释放车位
        }
    }, "Car-" + i).start();
}
JUC 常用并发工具演示 Semaphore
信号量模型:通过 permits 限制并发访问数量,展示获取、阻塞排队与释放唤醒。
剩余许可
2
车位 1
空闲
车位 2
空闲
Car-1
等待抢占
Car-2
等待抢占
Car-3
等待抢占
Car-4
等待抢占
等待队列: (空)
[05:02:33] 初始化 Semaphore(2)

3.3 使用场景

接口限流:防止瞬间高并发把服务打挂,例如,限制某个 API 接口每秒只能处理 50 个请求。不过目前的实际项目中基本是不会用 Semaphore 的,更多是用 Guava RateLimiter 或 Sentinel 一些框架,但底层思想类似。

3.4 注意事项

支持公平性选择,new Semaphore(int permits, boolean fair)fair 设置为 true 可以实现公平锁(先来后到),但性能会下降,默认是非公平。

还有就是一定要在 finally 中释放,否则信号量泄漏,拿走的用于不归还,后面能用的就越来越少,直到卡死。


4. Condition (条件对象)

4.1 概念

它是 Object.wait()notify() 的升级版,必须配合 Lock (如 ReentrantLock) 使用。它支持多个等待队列,可以分组唤醒,意味着你可以精准地唤醒“某类”等待的线程,而不是像 notifyAll() 那样一股脑全唤醒。

举个例子就应该更明了,notifyAll() 就像是在寝室楼下喊一嗓子,把所有人都吵醒;而 Condition 是你可以精准地给张三发个微信,让他醒来干活。

4.2 代码示例

// 模拟场景:生产者-消费者 (交替打印 A 和 B)
Lock lock = new ReentrantLock();
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();
// 标志位
boolean isATurn = true;

// 线程 A
new Thread(() -> {
    lock.lock();
    try {
        if (!isATurn) conditionA.await(); // 不是我的回合,去 A 队列睡
        System.out.println("A");
        isATurn = false;
        conditionB.signal(); // 唤醒 B 队列的一个线程
    } catch (Exception e) {} finally { lock.unlock(); }
}).start();

// 线程 B (类似逻辑,省略)
JUC 常用并发工具演示 Condition
条件队列模型:结合 ReentrantLock 的 await/signal 做分组等待与精准唤醒。
当前回合
A 的回合
Thread A
就绪
Thread B
就绪
Condition A 队列: (空)
Condition B 队列: (空)
输出结果: (空)
[05:02:33] 初始化 ReentrantLock + ConditionA/ConditionB,isATurn = true

4.3 使用场景

ArrayBlockingQueue 内部就是用两个 Condition (notEmptynotFull) 来分别控制“队列满了生产者等待”和“队列空了消费者等待”的逻辑。

如果是复杂的线程通信,需要严格控制线程执行顺序的场景也可以考虑。

用法注意:必须在 lock.lock()lock.unlock() 之间使用,否则抛出 IllegalMonitorStateException

Tip

如果想对 Condition 有更多了解, 可查看我的 AQS 篇文章的 Condition 部分


5. 总结

  • CountDownLatch:主线程想等一堆子线程干完活。
  • Semaphore:资源有限,用来控制并发资源,限流、资源控制场景。
  • CyclicBarrier:一帮线程步调必须一致,且可能要循环好几轮,复杂计算、多阶段任务聚合。
  • Condition:写底层组件(如阻塞队列、生产者消费者模型),需要精细控制线程唤醒。

6. 个人使用感受

我个人在项目中很少用到这些工具类,但是我常常会在测试时用到,比如写个小工具类,用来测试个别接口的并发性能,比如下面这个:

public static void concurrentTest(int concurrentReqs) throws InterruptedException {
    // 模拟并发请求数量 (例如 5 个请求同时进来)
    final int CONCURRENT_REQUESTS = concurrentReqs;

    System.out.println("====== 开始并发压力测试 ======");
    System.out.println("模拟并发数: " + CONCURRENT_REQUESTS);

    // 模拟线程池 (多线程同时发起请求)
    ExecutorService requestPool = Executors.newFixedThreadPool(CONCURRENT_REQUESTS);

    // 闭锁:用于确保所有线程“同时”开始请求(模拟瞬时高峰)
    CountDownLatch startGate = new CountDownLatch(1);
    // 闭锁:用于等待所有任务结束
    CountDownLatch endGate = new CountDownLatch(CONCURRENT_REQUESTS);

    AtomicInteger successCount = new AtomicInteger(0);
    AtomicInteger failCount = new AtomicInteger(0);

    long globalStart = System.currentTimeMillis();

    for (int i = 0; i < CONCURRENT_REQUESTS; i++) {
      final int index = i;
      requestPool.submit(() -> {
        try {
          // 所有线程在这里阻塞,等待主线程发令
          startGate.await();

          System.out.println("[Client-Thread-" + index + "] 请求已提交...");

          long t1 = System.currentTimeMillis();

          // --- 核心调用 ---
          Office2PdfReq req = new Office2PdfReq();
          String wordUrl = wordUrlsList.get(index % wordUrlsList.size());
          System.out.println("wordUrl = " + wordUrl);
          req.setFileUrl(wordUrl);
          req.setTraceId(IDUtil.getSecureRandomId());
          req.setChannel("X01");
          req.setOperationUserId(361);
          req.setRequestTime(new Date());

          ServiceResult<String> serviceResult = service.word2Pdf(req);
          System.out.println("serviceResult = " + serviceResult);
          System.out.println("-----------");
          // ---------------

          long t2 = System.currentTimeMillis();
          System.out.println("[Client-Thread-" + index + "] ✔️ 转换成功,耗时: " + (t2 - t1) + "ms");
          successCount.incrementAndGet();

        } catch (Exception e) {
          System.err.println("[Client-Thread-" + index + "] ❌ 转换失败: " + e.getMessage());
          failCount.incrementAndGet();
        } finally {
          endGate.countDown(); // 完成一个任务
        }
      });
    }

    // 发令枪响,所有线程同时开始执行
    System.out.println(">>> 所有线程就绪,3秒后开始轰炸...");
    Thread.sleep(3000);
    startGate.countDown();

    // 等待所有任务完成
    endGate.await();

    long globalEnd = System.currentTimeMillis();
    long totalTime = globalEnd - globalStart;

    System.out.println("\n====== 测试报告 ======");
    System.out.println("总耗时: " + totalTime + " ms");
    System.out.println("平均每个文档耗时: " + (totalTime / CONCURRENT_REQUESTS) + " ms");
    System.out.println("成功: " + successCount.get());
    System.out.println("失败: " + failCount.get());

    requestPool.shutdown();
    System.exit(0);
  }