Java 中的 CyclicBarrier 多线程同步机制使用方法

2017-08-21 From 程序之心 By 丁仪

CyclicBarrier 是一种同步机制,它能够对处理一些算法线程实现同步。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这里,然后所有线程才可以继续做其他事情。

回顾

在下图的流程中,线程1和线程2都到达第一个栅栏后才能够继续运行。如果线程1先到线程2后到,则线程1需要等待线程2到达栅栏处,然后两个线程才能继续运行。

使用方法

CyclicBarrier 提供了两个构造函数,可以传入线程个数和所有线程都到达栅栏后执行的操作。使用时,先创建 CyclicBarrier 对象,然后在需要等待的地方调用 await 方法即可。await 方法会等待所有线程到达才返回:当所有线程都到达时返回当前线程到达的次序;如果等待过程中出现超时,第一个到达的线程会收到超时异常 TimeoutException,同时其他线程被 broken 并抛出 BrokenBarrierException 异常。相关方法定义如下。

//构造函数,提供线程数和操作
public CyclicBarrier(int parties, Runnable barrierAction)
//构造函数,仅提供线程数
public CyclicBarrier(int parties)
//返回需要等待的线程数
public int getParties()
//一直等待其他线程到达
public int await() throws InterruptedException, BrokenBarrierException 
//等待其他线程到达,并指定超时时间
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException 
//查看是否broken
public boolean isBroken() 
//复位barrier
public void reset() 
//返回正在barrier处等待的线程数
public int getNumberWaiting()

使用示例

如下的示例,演示了如何使用 CyclicBarrier 实现线程同步。CyclicBarrierTest 测试类定义了线程数 threadNums 和 CyclicBarrier 实例 barrier,在构造函数中创建了 CyclicBarrier 实例,当所有线程都到达栅栏处时打印到达提示, 在 test 方法中创建线程线程启动后调用 await 方法等待,并打印当前线程是第几个到达栅栏。

public class CyclicBarrierTest {
    private int threadNums;
    private CyclicBarrier barrier;
    
    public CyclicBarrierTest(int threadNums){
        this.threadNums = threadNums;
        barrier = new CyclicBarrier(threadNums, () -> println("All " 
            + threadNums + " threads reached barrier"));
    }
    
    private void println(String msg){
        SimpleDateFormat sdf = new SimpleDateFormat("[YYYY-MM-dd HH:mm:ss:SSS] ");
        System.out.println(sdf.format(new Date()) + msg);
    }
    
    public void test(){
        for(int i = 0; i < threadNums; i ++){
            new Thread(() -> {
                println(Thread.currentThread().getName() + " start!");
                try {
                    int index = threadNums - barrier.await();
                    println(Thread.currentThread().getName() 
                        + " arrive "+ index);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

以启动 6 个线程为例,执行结果如下。6 个线程几乎同时启动,1 毫秒后同时到达栅栏并打印出 “All 6 threads reached barrier”,6 个线程到达的次序是 Thread-5、Thread-4、Thread-0、Thread-3、Thread-1、Thread-2。

[2017-08-20 12:15:54:053] Thread-5 start!
[2017-08-20 12:15:54:053] Thread-4 start!
[2017-08-20 12:15:54:053] Thread-0 start!
[2017-08-20 12:15:54:053] Thread-3 start!
[2017-08-20 12:15:54:053] Thread-1 start!
[2017-08-20 12:15:54:053] Thread-2 start!
[2017-08-20 12:15:54:054] All 6 threads reached barrier
[2017-08-20 12:15:54:055] Thread-2 arrive 6
[2017-08-20 12:15:54:055] Thread-5 arrive 1
[2017-08-20 12:15:54:055] Thread-4 arrive 2
[2017-08-20 12:15:54:055] Thread-3 arrive 4
[2017-08-20 12:15:54:055] Thread-0 arrive 3
[2017-08-20 12:15:54:055] Thread-1 arrive 5

如下的示例增加了 test2 方法,来测试 CyclicBarrier 的其他方法。各个线程的创建时间差了 300 ms,创建线程时打印出此时栅栏处正在等待的线程个数,await 等待的时间为 1s 。当 CyclicBarrier 等待超时时,超时的线程打印超时提示和总的线程数,被 broken 的线程打印 broken 提示。

public class CyclicBarrierTest {
    private int threadNums;
    private CyclicBarrier barrier;
    
    public CyclicBarrierTest(int threadNums){
        this.threadNums = threadNums;
        barrier = new CyclicBarrier(threadNums, () -> 
            println("All " + threadNums + " threads reached barrier"));
    }
    
    private void println(String msg){
        SimpleDateFormat sdf = new SimpleDateFormat("[YYYY-MM-dd HH:mm:ss:SSS] ");
        System.out.println(sdf.format(new Date()) + msg);
    }
    
    public void test2() throws Exception{
        for(int i = 0; i < threadNums; i ++){
            new Thread(() -> {
                println(Thread.currentThread().getName() + " start with " 
                    + barrier.getNumberWaiting() + " threads waiting");
                    try {
                        barrier.await(1, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        println(Thread.currentThread().getName() + " broken "
                            + barrier.isBroken());
                    } catch (TimeoutException e) {
                        println(Thread.currentThread().getName() 
                            + " time out to wait " + barrier.getParties()
                            + " threds");
                    }
            }).start();
            Thread.sleep(300);
        }
    }
}

从理论上分析,按照 300 ms 间隔创建线程,超时时间为 1s ,则在第 4 个线程创建并等待时发生超时并 broken 其他线程,创建第 5 个线程时已经没有线程等待。

以 6 个线程为例,执行结果如下。Thread-0、Thread-1、Thread-2 依次启动并等待,此时并没有超时一切正常。Thread-3 启动后,时间已经达到 1s,此时 Thread-0 等待超时打印超时提示,Thread-1、Thread-2、Thread-3 同时收到 break 消息进入 broken 状态并打印 broken 提示。在此之后,Thread-4、Thread-5 创建并启动,已经没有线程在栅栏处等待,直接进入 broken 状态并打印 broken 提示。

[2017-08-20 12:41:26:610] Thread-0 start with 0 threads waiting
[2017-08-20 12:41:26:887] Thread-1 start with 1 threads waiting
[2017-08-20 12:41:27:189] Thread-2 start with 2 threads waiting
[2017-08-20 12:41:27:492] Thread-3 start with 3 threads waiting
[2017-08-20 12:41:27:613] Thread-0 time out to wait 6 threds
[2017-08-20 12:41:27:613] Thread-2 broken true
[2017-08-20 12:41:27:613] Thread-3 broken true
[2017-08-20 12:41:27:613] Thread-1 broken true
[2017-08-20 12:41:27:794] Thread-4 start with 0 threads waiting
[2017-08-20 12:41:27:794] Thread-4 broken true
[2017-08-20 12:41:28:099] Thread-5 start with 0 threads waiting
[2017-08-20 12:41:28:100] Thread-5 broken true

本文来源:程序之心,转载请注明出处!

本文地址:https://chengxuzhixin.com/blog/article/200045.html

发表感想

© 2016 - 2022 chengxuzhixin.com All Rights Reserved.

浙ICP备2021034854号-1    浙公网安备 33011002016107号