手把手教你 Fork/Join 并发处理

2017-08-09 From 程序之心 By 丁仪

ForkJoinPool 是 Java 7 加入的一个并发处理类,位于 java.util.concurrent 包。

Fork / Join 回顾

ForkJoinPool 让我们可以很方便地把任务分裂成几个更小的任务,这些分裂出来的任务也将会提交给 ForkJoinPool。任务可以继续分割成更小的子任务,只要它还能分割。分叉和合并原理包含两个递归进行的步骤。两个步骤分别是分叉步骤和合并步骤。

一个使用了分叉和合并原理的任务可以将自己分叉(分割)为更小的子任务,这些子任务可以被并发执行。如下图所示:

通过把自己分割成多个子任务,每个子任务可以由不同的 CPU 并行执行,或者被同一个 CPU 上的不同线程执行。

只有当给的任务过大,把它分割成几个子任务才有意义。把任务分割成子任务有一定开销,因此对于小型任务,这个分割的消耗可能比每个子任务并发执行的消耗还要大。

什么时候把一个任务分割成子任务是有意义的,这个界限也称作一个阀值。这要看每个任务对有意义阀值的决定。很大程度上取决于它要做的工作的种类。

当一个任务将自己分割成若干子任务之后,该任务将等待所有子任务结束。一旦子任务执行结束,该任务可以把所有结果合并到同一个结果。图示如下:

Fork / Join 的使用

Fork / Join 的使用主要涉及 ForkJoinPool 和 ForkJoinTask。ForkJoinTask 类定义了任务,实现了 Fork 和 Join 操作;ForkJoinPool 管理线程与任务的执行。

ForkJoinTask 类是一个抽象类,要求子类实现以下三个方法:

getRawResult :获取 ForkJoinTask 的执行结果;

setRawResult :设置 ForkJoinTask 的执行结果;

exec :ForkJoinTask 的执行逻辑,返回 true 表示正常返回;


为了方便开发,标准库提供了 ForkJoinTask 的一个子类 RecursiveTask。RecursiveTask 类也是一个抽象类,封装了上述 3 个方法的实现,要求子类实现一个方法 compute。这样一来,我们只需要实现 compute 一个方法就可以使用 ForkJoinTask 了。RecursiveTask 的定义如下:

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    private static final long serialVersionUID = 5232453952276485270L;
    V result;

    protected abstract V compute();

    public final V getRawResult() {
        return result;
    }

    protected final void setRawResult(V value) {
        result = value;
    }

    protected final boolean exec() {
        result = compute();
        return true;
    }
}

以计算数组所有数字的和为例,我们定义一个 Task 类继承 RecursiveTask,在 compute 方法中把数组一分为二,创建两个 Task 实例,分别调用 fork 方法,再分别调用 join 方法获取两个 Task 的计算结果,从而得到数组所有数字的和。

public class Task extends RecursiveTask<Long> {
    private static final long serialVersionUID = 1L;
    long[] data;
    int start;
    int end;

    public Task(long[] data, int start, int end) {
        this.data = data;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long sum = 0;
        if(end - start < 1000){
            for(int i = start; i <= end; i++){
                sum += data[i];
            }
        }else {
            //分割任务
            int middle = (start + end) / 2;
            Task left = new Task(data,start,middle);
            Task right = new Task(data,middle + 1,end);
            left.fork();//fork 操作
            right.fork();//fork 操作
            sum = left.join() + right.join();//join操作
        }
        return sum;
    }
}

使用 ForkJoinPool 的方法很简单,创建 ForkJoinPool 实例,然后调用 invoke 方法执行 ForkJoinTask 任务即可获得计算结果。

long[] data = new long[1024*1024];
Arrays.setAll(data, i -> i);
long sum = new ForkJoinPool().invoke(
    new Task(data, 0, data.length - 1)
);
System.out.println(sum);

如果不需要获取计算的结果,比如需要执行一些没有返回值的操作,也可以调用 execute 方法。

本文来源:程序之心,转载请注明出处!

君子曰:学不可以已。
《深入理解计算机系统(原书第3版)》

主要介绍了计算机系统的基本概念,包括最底层的内存中的数据表示、流水线指令的构成、虚拟存储器、编译系统、动态加载库,以及用户应用等。书中提供了大量实际操作,可以帮助读者更好地理解程序执行的方式,改进程序的执行效率。此书以程序员的视角全面讲解了计算机系统,深入浅出地介绍了处理器、编译器、操作系统和网络环境,是这一领域的权威之作。

发表感想

© 2016 - 2024 chengxuzhixin.com All Rights Reserved.

浙ICP备2021034854号-1    浙公网安备 33011002016107号