多线程基础

多线程 JUC 并发工具类

多线程 JUC 并发工具类:CountDownLatch、CyclicBarrier、Semaphore、Condition 使用场景,注意事项。

#多线程基础#JUC#并发

JUC 常用并发工具类

较为核心的 CountDownLatch、Semaphore、Condition、CyclicBarrier

这四个工具类主要位于 java.util.concurrent (JUC) 包下,大部分基于 AQS (AbstractQueuedSynchronizer) 实现,AQS 说明可查看我这篇文章


1. CountDownLatch (倒计时门闩)

1.1 概念

“做减法”。 它允许一个或多个线程等待其他线程完成操作,初始化一个计数器(count),每当一个线程完成任务后,调用 countDown() 使计数器减 1,当计数器变为 0 时,在 await() 上等待的线程会被唤醒。

1.2 代码示例

示例一

// 模拟场景:主线程等待 3 个服务组件加载完毕
CountDownLatch latch = new CountDownLatch(3);

for (int i = 1; i <= 3; i++) {
    new Thread(() -> {
        System.out.println(Thread.currentThread().getName() + " 开始加载...");
        try { Thread.sleep(1000); } catch (InterruptedException e) {}
        System.out.println(Thread.currentThread().getName() + " 加载完成");
        latch.countDown(); // 计数器 -1
    }, "Service-" + i).start();
}

System.out.println("主线程等待所有服务加载...");
latch.await(); // 阻塞等待,直到 count = 0
System.out.println("所有服务加载完毕,系统启动!");

示例二-跑步

/**
 * CountDownLatchDemo 用跑步场景演示一等多和多等一
 * <p>
 * 一等多场景: 多个选手准备,裁判发令枪,选手起跑;
 * <p>
 * 多等一场景: 多个选手跑完,裁判等待所有选手到达终点,宣布比赛结束;
 *
 * @author suremotoo
 * @date 2026/01/06 11:30
 */
public class CountDownLatchDemo {


  private static final CountDownLatch START_LATCH = new CountDownLatch(1);
  private static final CountDownLatch FINISH_LATCH = new CountDownLatch(3);

  public static void main(String[] args) {
    Runnable player = () -> {
      try {
        System.out.println(Thread.currentThread().getName() + " 已准备就绪,等待发令枪...");
        START_LATCH.await(); // 所有人准备集合,等待裁判发令
        System.out.println(Thread.currentThread().getName() + " 起跑!");
        Thread.sleep((long) (Math.random() * 10000)); // 模拟跑步时间
        System.out.println(Thread.currentThread().getName() + " 到达终点!");
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      } finally {
        FINISH_LATCH.countDown(); // 选手到达终点,计数器减一
      }
    };

    // 启动选手线程
    for (int i = 1; i <= 3; i++) {
      new Thread(player, "选手 " + i).start();
    }

    try {
      Thread.sleep(1000); // 模拟裁判准备时间, 检查信号枪等等
      System.out.println("裁判:预备,开始!");
      START_LATCH.countDown(); // 发令枪响,选手起跑

      FINISH_LATCH.await(); // 等待所有选手到达终点

      System.out.println("裁判:比赛结束!");
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }

}

1.3 使用场景

  • 多服务启动检测:Spring Boot 启动时,等待数据库连接、缓存连接、MQ 连接都校验通过后,再对外暴露 HTTP 端口。
  • 并行数据处理:Excel 导出时,开启 10 个线程分别查询 10 张表的数据,主线程等待它们全部查完,再进行组装。

1.4 注意事项

  • 不可重用:计数器减到 0 后,无法重置,如果需要重置,请使用 CyclicBarrier
  • 异常处理countDown() 最好放在 finally 块中,防止线程报错导致计数器永远不归零,主线程死锁。

2. CyclicBarrier (循环栅栏)

2.1 概念

“做加法”。 它可以让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。

2.2 代码示例

// 模拟场景:3 个线程分别收集数据,都收集完后,触发一次汇总操作
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    System.out.println(">> 所有线程到达屏障,开始执行汇总任务 <<");
});

for (int i = 1; i <= 3; i++) {
    new Thread(() -> {
        System.out.println(Thread.currentThread().getName() + " 正在收集数据...");
        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " 到达屏障");
            barrier.await(); // 等待其他人
            System.out.println(Thread.currentThread().getName() + " 冲破屏障,继续后续工作");
        } catch (Exception e) { e.printStackTrace(); }
    }, "Worker-" + i).start();
}

2.3 使用场景

  • 多线程计算合并:比如对一个巨大的银行流水表进行统计,分为 4 个线程计算 4 个季度的流水,所有线程都算完后,触发回调任务把 4 个季度的结果相加。
  • 游戏开发:MOBA 游戏中,等待 10 名玩家都加载进度达到 100% 后,同时进入战场。

2.4 注意事项

  • 可重用:当屏障打开后,内部计数器会自动重置,可以用于多轮次的并发任务。
  • BrokenBarrierException:如果等待过程中某个线程中断或超时,栅栏会损坏,所有在等待的线程都会抛出此异常。

3. Semaphore (信号量)

3.1 概念

“限流器”。 它用来控制同时访问特定资源的线程数量,通过 acquire() 获取许可,release() 释放许可。

3.2 代码示例

// 模拟场景:只有 2 个停车位,但有 5 辆车要停
Semaphore semaphore = new Semaphore(2);

for (int i = 1; i <= 5; i++) {
    new Thread(() -> {
        try {
            semaphore.acquire(); // 抢占车位
            System.out.println(Thread.currentThread().getName() + " 抢到车位,停车中...");
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + " 离开车位");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release(); // 释放车位
        }
    }, "Car-" + i).start();
}

3.3 使用场景

  • 接口限流 (Rate Limiting):防止瞬间高并发把服务打挂。例如,限制某个 API 接口每秒只能处理 50 个请求(虽然现在更多用 Guava RateLimiter 或 Sentinel,但原理类似)。
  • 数据库连接池:虽然连接池本身有实现,但 Semaphore 是实现连接池资源借出的底层思想。

3.4 注意事项

  • 公平性new Semaphore(int permits, boolean fair)fair 设置为 true 可以实现公平锁(先来后到),但性能会下降,默认是非公平。
  • Release 必须执行:一定要在 finally 中释放,否则信号量泄漏,系统吞吐量会越来越低直到卡死。

4. Condition (条件对象)

4.1 概念

“精准通知”。 它是 Object.wait()notify() 的升级版,必须配合 Lock (如 ReentrantLock) 使用。它支持多个等待队列,意味着你可以精准地唤醒“某类”等待的线程,而不是像 notifyAll() 那样一股脑全唤醒。

4.2 代码示例

// 模拟场景:生产者-消费者 (交替打印 A 和 B)
Lock lock = new ReentrantLock();
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();
// 标志位
boolean isATurn = true;

// 线程 A
new Thread(() -> {
    lock.lock();
    try {
        if (!isATurn) conditionA.await(); // 不是我的回合,去 A 队列睡
        System.out.println("A");
        isATurn = false;
        conditionB.signal(); // 唤醒 B 队列的一个线程
    } catch (Exception e) {} finally { lock.unlock(); }
}).start();

// 线程 B (类似逻辑,省略)

4.3 使用场景

  • 阻塞队列源码ArrayBlockingQueue 内部就是用两个 Condition (notEmptynotFull) 来分别控制“队列满了生产者等待”和“队列空了消费者等待”的逻辑。
  • 复杂的线程通信:需要严格控制线程执行顺序的场景。

4.4 注意事项

  • 死锁风险:必须在 lock.lock()lock.unlock() 之间使用,否则抛出 IllegalMonitorStateException
  • 虚假唤醒:在使用 await() 时,建议总是使用 while 循环检查条件,而不是 if

5. 总结与对比

  1. CountDownLatch: 用来等别人干完,常用,业务逻辑中等待异步结果
  2. Semaphore: 用来控制并发资源,限流、资源控制场景
  3. CyclicBarrier: 用来等大家到齐,复杂计算、多阶段任务聚合
  4. Condition: 用来精细化线程通信,通常在造轮子写中间件时用,业务代码直接用的少,直接用 BlockingQueue 更多

CountDownLatch vs CyclicBarrier 的区别

特性CountDownLatch (倒计时门闩)CyclicBarrier (循环栅栏)
侧重点一个等多个 (也可多个等一个),侧重于任务完成的“状态”。多个互相等,侧重于线程到达同步点。
可重用性一次性,计数器归零后失效。可循环使用,屏障打开后自动重置。
动作触发调用 countDown() 仅减计数,线程继续执行。调用 await() 会阻塞,直到所有人到齐。
构造参数仅接收数值。可接收数值 + Runnable 回调 (这是亮点)。

简单选择建议

  • 如果你需要 主线程等待子线程 干完活再继续:选 CountDownLatch
  • 如果你需要控制 并发度(比如只允许 10 个线程同时访问):选 Semaphore
  • 如果你需要 多线程步调一致(大家都在这等着,等人齐了一起跑):选 CyclicBarrier

实际使用

我个人在项目中很少用到这些工具类,但是我常常会在测试时用到,比如写个小工具类,用来测试个别接口的并发性能,比如下面这个:

public static void concurrentTest(int concurrentReqs) throws InterruptedException {
    // 模拟并发请求数量 (例如 5 个请求同时进来)
    final int CONCURRENT_REQUESTS = concurrentReqs;

    System.out.println("====== 开始并发压力测试 ======");
    System.out.println("模拟并发数: " + CONCURRENT_REQUESTS);

    // 模拟线程池 (多线程同时发起请求)
    ExecutorService requestPool = Executors.newFixedThreadPool(CONCURRENT_REQUESTS);

    // 闭锁:用于确保所有线程“同时”开始请求(模拟瞬时高峰)
    CountDownLatch startGate = new CountDownLatch(1);
    // 闭锁:用于等待所有任务结束
    CountDownLatch endGate = new CountDownLatch(CONCURRENT_REQUESTS);

    AtomicInteger successCount = new AtomicInteger(0);
    AtomicInteger failCount = new AtomicInteger(0);

    long globalStart = System.currentTimeMillis();

    for (int i = 0; i < CONCURRENT_REQUESTS; i++) {
      final int index = i;
      requestPool.submit(() -> {
        try {
          // 所有线程在这里阻塞,等待主线程发令
          startGate.await();

          System.out.println("[Client-Thread-" + index + "] 请求已提交...");

          long t1 = System.currentTimeMillis();

          // --- 核心调用 ---
          Office2PdfReq req = new Office2PdfReq();
          String wordUrl = wordUrlsList.get(index % wordUrlsList.size());
          System.out.println("wordUrl = " + wordUrl);
          req.setFileUrl(wordUrl);
          req.setTraceId(IDUtil.getSecureRandomId());
          req.setChannel("X01");
          req.setOperationUserId(361);
          req.setRequestTime(new Date());

          ServiceResult<String> serviceResult = service.word2Pdf(req);
          System.out.println("serviceResult = " + serviceResult);
          System.out.println("-----------");
          // ---------------

          long t2 = System.currentTimeMillis();
          System.out.println("[Client-Thread-" + index + "] ✔️ 转换成功,耗时: " + (t2 - t1) + "ms");
          successCount.incrementAndGet();

        } catch (Exception e) {
          System.err.println("[Client-Thread-" + index + "] ❌ 转换失败: " + e.getMessage());
          failCount.incrementAndGet();
        } finally {
          endGate.countDown(); // 完成一个任务
        }
      });
    }

    // 发令枪响,所有线程同时开始执行
    System.out.println(">>> 所有线程就绪,3秒后开始轰炸...");
    Thread.sleep(3000);
    startGate.countDown();

    // 等待所有任务完成
    endGate.await();

    long globalEnd = System.currentTimeMillis();
    long totalTime = globalEnd - globalStart;

    System.out.println("\n====== 测试报告 ======");
    System.out.println("总耗时: " + totalTime + " ms");
    System.out.println("平均每个文档耗时: " + (totalTime / CONCURRENT_REQUESTS) + " ms");
    System.out.println("成功: " + successCount.get());
    System.out.println("失败: " + failCount.get());

    requestPool.shutdown();
    System.exit(0);
  }