Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。Fork/Join框架要完成两件事情:

  1.任务分割:首先Fork/Join框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割

  2.执行任务并合并结果:分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并这些数据。

  在Java的Fork/Join框架中,使用两个类完成上述操作

  1.ForkJoinTask:我们要使用Fork/Join框架,首先需要创建一个ForkJoin任务。该类提供了在任务中执行fork和join的机制。通常情况下我们不需要直接集成ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了两个子类:

    a.RecursiveAction:用于没有返回结果的任务

    b.RecursiveTask:用于有返回结果的任务

  2.ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行

  任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务(工作窃取算法)。

Fork/Join框架的实现原理

  ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool,而ForkJoinWorkerThread负责执行这些任务。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
 * Fork/Join是jdk7新增的并行计算框架,采用工作窃取算法,将一个大的任务拆分为多个小的任务,最终汇总各个小的任务的计算结果。
 * 比如计算1-100之类数字相加的和,假定阀值为30(即开始位置和结束位置相减<=30即可进行计算),如果>30则进行拆分。
 * 计算过程如下:
 * 拆分为1-50,51-100
 * 1-50拆分为1-25,26-50
 * 51-100拆分为51-75,76-100
 * 计算1-25的和,计算26-50的和==》合并为结果1
 * 计算51-75的和,计算76-100的和==》合并为结果2
 * 合并结果1,2
 * @author Donny
 * @version 1.0
 * @date 2018/08/25
 */
public class FormJoinDemo {

    static class ComputeTask extends RecursiveTask<Long> {
        private final static int THREAD_HOLD = 100;
        private int[] data;
        private int start;
        private int end;

        public ComputeTask(int[] data, int start, int end) {
            this.data = data;
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            Long sum = 0L;

            // 如果任务足够小,那么直接计算
            if (end - start <= THREAD_HOLD) {
                for (int i = start; i < end; i++) {
                    sum += data[i];
                }

                System.out.println(String.format("Compute %d ~ %d = %d", start, end, sum));

            } else {
                // 如果任务过大,则一分为二
                int middle = (end + start) / 2;
                System.out.println(String.format("Split %d~%d ==> %d~%d,%d~%d", start, end, start, middle, middle + 1, end));
                ComputeTask left = new ComputeTask(data, start, middle);
                ComputeTask right = new ComputeTask(data, middle + 1, end);
                invokeAll(left, right);// 注意:这里使用invokeAll,而不要使用left.fork()和right.fork()

                Long leftSum = left.join();
                Long rightSum = right.join();
                sum = leftSum + rightSum;
                System.out.println(String.format("Result = %d + %d ==> %d", leftSum, rightSum, sum));
            }

            return sum;
        }
    }

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();

        int[] data = new int[1000];
        fillData(data);
        Long sum = pool.invoke(new ComputeTask(data, 0, data.length));
        System.out.println("Fork/join sum:" + sum);
    }

    private static void fillData(int[] data) {
        for (int i = 0; i < data.length; i++) {
            data[i] = i;
        }
    }
}

上面的代码使用Fork/Join框架来计算一个元素个数为1000的数组的和。

执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Split 0~1000 ==> 0~500,501~1000
Split 501~1000 ==> 501~750,751~1000
Split 0~500 ==> 0~250,251~500
Split 501~750 ==> 501~625,626~750
Split 0~250 ==> 0~125,126~250
Compute 0 ~ 125 = 7750
Compute 126 ~ 250 = 23250
Result = 7750 + 23250 ==> 31000
Split 251~500 ==> 251~375,376~500
Compute 251 ~ 375 = 38750
Compute 376 ~ 500 = 54250
Result = 38750 + 54250 ==> 93000
Result = 31000 + 93000 ==> 124000
Split 751~1000 ==> 751~875,876~1000
Compute 751 ~ 875 = 100750
Compute 876 ~ 1000 = 116250
Result = 100750 + 116250 ==> 217000
Compute 626 ~ 750 = 85250
Compute 501 ~ 625 = 69750
Result = 69750 + 85250 ==> 155000
Result = 155000 + 217000 ==> 372000
Result = 124000 + 372000 ==> 496000
Fork/join sum:496000

这里要注意的是:在拆分任务时不要调用fork()方法将任务推给别的线程执行,而使用invokeAll()。

这是因为执行compute()方法的线程本身也是一个Worker线程,当对两个子任务调用fork()时,这个Worker线程就会把任务分配给另外两个Worker,但是它自己却停下来等待不干活了!这样就白白浪费了Fork/Join线程池中的一个Worker线程

原因参考:https://www.liaoxuefeng.com/article/001493522711597674607c7f4f346628a76145477e2ff82000

Fork/Join实现介绍:https://blog.csdn.net/yinwenjie/article/details/71524140