CountDownLatch与CyclicBarrier的应用与区别

猪小花1号2018-09-05 09:44

作者:廖祥俐


CountDownLatch和CyclicBarrier是Java JDK中提供的辅助类用来简化并发编程,在此分别介绍CountDownLatch和CyclicBarrier的作用,常用Api,使用场景,并给出简单的示例,最后对一些注意点以及两者的不同点进行阐述。


CountDownLatch

1,简介

CountDownLatch 允许一个或多个线程等待其他线程完成操作,首先初始化一个计数值N,然后通过调用 await可以让当前线程阻塞,直到计数值N在被调用N次countDown置为0后,再继续执行。


2,常用Api:

1,构造方法

public CountDownLatch(int count) // 创建一个新的 CountDownLatch,它将在计数值count减为0时启动

2,主要方法

public void await() throws InterruptedException;   //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException;  //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public void countDown();  //将count值减1

3,使用场景

一个线程需要等待其它线程完成操作后才能进行后续的操作。

比如在进行歌曲匹配时,当传入的歌曲songId量很大(如50w),单线程执行会导致处理速度过慢,这时候希望能有多个线程同时进行处理,提高匹配的效率,在所有匹配完成后,需要往数据库里插入一条匹配完成的记录。

4,使用示例

int countSize = 5;
CountDownLatch countDownLatch = new CountDownLatch(countSize);

for(int i=0; i<countSize; i++){ // 如果这里为i<countSize-1,则主线程会一直在await()这里hold住,因为计数器不会归0;如果这里为i<countSize+1,则主线程等待到第countSize个countDown后,就会去执行await()后面的任务
    new CuntDownTestThread(countDownLatch).start();
}
countDownLatch.await();
System.out.println("wait all finished."); // 这里会等到把计数器countDown到0才执行
----------------------------------------------------------------------------
class CuntDownTestThread extends Thread{

    private CountDownLatch countDownLatch;

    public CuntDownTestThread(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run(){
        try {
            Thread.sleep((long)(Math.random()*10000));
        } catch (InterruptedException e) {
            // TODO 
        }
        System.out.println("before count down. Thread id:" + Thread.currentThread().getName());
        doMatchTask();
        this.countDownLatch.countDown();
        System.out.println("after count down. Thread id:" + Thread.currentThread().getName());
    }

}

5,注意点

1)初始化值

初始化值如下:

CountDownLatch countDownLatch = new CountDownLatch(countSize);

若countSize小于0,则会直接抛出异常 若countSize等于0,计数器就是零,调用await方法时不会阻塞当前线程 只有当countSize大于0时,调用await方法会阻塞当前线程。

2)忘记countDown或其它原因导致没有countDown

在初始化CountDownLatch 的计数器值N后(大于0),如果由于某种原因,countDown的执行次数小于N,则会导致计数器值一直大于0,则会导致调用了await方法的线程一直处于等待状态。

3)如果是当前线程是等待N个线程完成操作,然后执行后续的任务,则注意在每个线程里,有且仅有一次

countDownLatch.countDown();

4)如果当前线程在await时,不关心N个线程是否执行到countDown而使得计数器置为0,则可通过设置超时时间来进行控制,当达到超时时间后,将计数器置为0,当前线程继续执行。

await(long timeout, TimeUnit unit)


CyclicBarrier

1,简介

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续工作。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier当前线程已经到达了屏障,然后当前线程被阻塞


2,常用Api

1,构造方法

CyclicBarrier(int parties) //创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
CyclicBarrier(int parties, Runnable barrierAction) // 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。

2,常用方法

int    await()  // 在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
int    await(long timeout, TimeUnit unit) //  在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
int    getNumberWaiting() //返回当前在屏障处等待的参与者数目。
int     getParties() //返回要求启动此 barrier 的参与者数目。
boolean     isBroken() // 查询此屏障是否处于损坏状态。
void     reset() // 将屏障重置为其初始状态。

3,使用场景

需要所有的子任务都完成时,才执行主任务,这个时候就可以选择使用CyclicBarrier 

如在进行版权库匹配任务时,需要把曲库数据迁移至版权库,版权库中的匹配结果显示中才会有相应的数据,即多个线程一起迁移数据,等数据迁移完之后,再分别执行匹配任务。

4,使用示例

int countSize = 5;
CyclicBarrier cyclicBarrier = new CyclicBarrier(countSize);

for(int i=0; i<countSize; i++){
    new CyclicBarrierTestThread(cyclicBarrier).start();
}
System.out.println("wait all finished."); // 这里会直接执行
----------------------------------------------------------------------------------------
class CyclicBarrierTestThread extends Thread{
    private CyclicBarrier cyclicBarrier;

    public CyclicBarrierTestThread(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }
    @Override
    public void run(){
        try {
            Thread.sleep((long)(Math.random()*10000));
        } catch (InterruptedException e) {
            // TODO 
        }
        try {
            System.out.println("before await. Thread id:" + Thread.currentThread().getName());
                        doImportTask() ; // 导入数据
            this.cyclicBarrier.await(); // 线程在此阻塞,直到集齐
                        doMatchTask();  // 进行匹配操作
            System.out.println("after await. Thread id:" + Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO
        } catch (BrokenBarrierException e) {
            // TODO
        }
    }
}

5,注意点

1)对于指定计数值parties,若由于某种原因,没有足够的线程调用CyclicBarrier的await,则所有调用await的线程都会被阻塞;

2)同样的CyclicBarrier也可以调用await(timeout, unit),设置超时时间,在设定时间内,如果没有足够线程到达,则解除阻塞状态,继续工作;

3)通过reset重置计数,会使得进入await的线程出现BrokenBarrierException;

4)如果采用是CyclicBarrier(int parties, Runnable barrierAction) 构造方法,执行barrierAction操作的是最后一个到达的线程


一些区别

1,同一个线程调用CountDownLatch的countDown可以将计数减为0,使等待线程解除阻塞状态;而对于CyclicBarrier,如果调用多次await时,线程会在第一个await处阻塞,直到集齐所有成员通过屏障,再到第二个await阻塞,进行下一轮“集齐成员”...,可以阅读源码,如下注释:

// CyclicBarrier 中 private int dowait(boolean timed, long nanos) 部分源码
int index = --count;
if (index == 0) {  // tripped
         boolean ranAction = false;
          try {
                  final Runnable command = barrierCommand; // 这里的command 是初始构造函数为CyclicBarrier(int parties, Runnable barrierAction) 的barrierAction
                   if (command != null)
                       command.run();
                   ranAction = true; // 如果command执行异常,这里就会被跳过
                   nextGeneration();  // 下一个轮次(重置count等)
                   return 0;
            } finally {
                   if (!ranAction)
                       breakBarrier();
          }
}

2,CountDownLatch的计数器值减为0后,不能再重置;而CyclicBarrier可以通过reset重置,并且在计数器值减为0后,会自动重置(进入下一个Cyclic)。

3,CountDownLatch调用await时,是当前线程阻塞等待计数值置为0;而CyclicBarrier是每个调用await的线程都进行阻塞,直到调用await的线程达到计数值

4,CyclicBarrier提供了其它辅助方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量,isBroken方法用来知道阻塞的线程是否被中断;.CountDownLatch提供了getCount()方法可以获取当前的计数值。



网易云大礼包:https://www.163yun.com/gift

本文来自网易实践者社区,经作者廖祥俐授权发布