Java8 - 一文搞定Fork/Join 框架

网友投稿 536 2022-05-29

文章目录

概述

CPU密集型 vs IO密集型

计算密集型任务

IO密集型

简单示例

Fork/Join常用的类

RecursiveTask 实现 并行计算

RecursiveAction

Fork/Join执行流程

最佳实践

概述

分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。

它是 ExecutorService 接口的一个实现,它把子任务分配给线程池(称为 ForkJoinPool )中的工作线程。

CPU密集型 vs IO密集型

通常来讲,任务可以划分为计算密集型和IO密集型

计算密集型任务

特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。

这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。

计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。

IO密集型

涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。

对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。

IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少 。

简单示例

来看个最简单的求和

public class ForkJoinTest { private static int[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; public static void main(String[] args) { System.out.println("result=> " + calc()); System.out.println("result=> " + calcByStream()); } private static int calc() { int result = 0; for (int i = 0; i < data.length; i++) { result += data[i]; } return result; } private static Long calcByStream() { return LongStream.rangeClosed(0,10).reduce(0, Long::sum); } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

Fork/Join常用的类

ForkJoinTask:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制 。

通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:

RecursiveAction:用于没有返回结果的任务。 比如写数据到磁盘,然后就退出了。 一个RecursiveAction可以把自己的工作分割成更小的几块, 这样它们可以由独立的线程或者CPU执行。 我们可以通过继承来实现一个RecursiveAction

RecursiveTask :用于有返回结果的任务。 可以将自己的工作分割为若干更小任务,并将这些子任务的执行合并到一个集体结果。 可以有几个水平的分割和合并

CountedCompleter: 在任务完成执行后会触发执行一个自定义的钩子函数

ForkJoinPool :ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

RecursiveTask 实现 并行计算

要把任务提交到这个池,必须创建 RecursiveTask 的一个子类,其中 R 是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是 RecursiveAction 类型 。

要定义 RecursiveTask, 只需实现它唯一的抽象方法compute

protected abstract R compute();

1

这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。

if (任务足够小或不可分) { 顺序计算该任务 } else { 将任务分成两个子任务 递归调用本方法,拆分每个子任务,等待所有子任务完成 合并每个子任务的结果 }

1

2

3

4

5

6

7

一般来说并没有确?的标准决定一个任务是否应该再拆分,但有几种试探方法可以帮助你

事实上,这只不过是著名的分治算法的并行版本而已。

现在编写一个方法来并行对前n个自然数求和就很简单了。你只需把想要的数字数组传给ForkJoinSumCalculator 的构造函数:

public static long forkJoinSum(long n) { long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinTask task = new ForkJoinSumCalculator(numbers); return new ForkJoinPool().invoke(task); }

1

2

3

4

5

这里用了一个 LongStream 来生成包含前n个自然数的数组,然后创建一个 ForkJoinTask( RecursiveTask 的父类),并把数组传递给 ForkJoinSumCalculator 的公共构造函数。

最后,创建了一个新的 ForkJoinPool ,并把任务传给它的调用方法 。在ForkJoinPool 中执行时,最后一个方法返回的值就是 ForkJoinSumCalculator 类定义的任务结果。

在实际应用时,使用多个 ForkJoinPool 是没有什么意义的。正是出于这个原因,一般来说把它实例化一次,然后把实例保存在静态字段中,使之成为单例,这样就可以在软件中任何部分方便地重用了。这里创建时用了其默认的无参数构造函数,这意味着想让线程池使用JVM能够使用的所有处理器。更确切地说,该构造函数将使用 Runtime.availableProcessors 的返回值来决定线程池使用的线程数。请注意 availableProcessors 方法虽然看起来是处理器,但它实际上返回的是可用内核的数量,包括超线程生成的虚拟内核。

当把 ForkJoinSumCalculator 任务传给 ForkJoinPool 时,这个任务就由池中的一个线程执行,这个线程会调用任务的 compute 方法。

该方法会检查任务是否小到足以顺序执行,如果不够小则会把要求和的数组分成两半,分给两个新的 ForkJoinSumCalculator ,而它们也由ForkJoinPool 安排执行。

因此,这一过程可以递归重复,把原任务分为更小的任务,直到满足不方便或不可能再进一步拆分的条件(本例中是求和的项目数小于等于10 000)。

这时会顺序计算每个任务的结果,然后由分支过程创建的(隐含的)任务二叉树遍历回到它的根。接下来会合并每个子任务的部分结果,从而得到总任务的结果。

package com.artisan.java8; import java.util.concurrent.RecursiveTask; public class AccumulatorRecursiveTask extends RecursiveTask { private final int start; private final int end; private final int[] data; private final int LIMIT = 3; public AccumulatorRecursiveTask(int start, int end, int[] data) { this.start = start; this.end = end; this.data = data; } @Override protected Integer compute() { if ((end - start) <= LIMIT) { int result = 0; for (int i = start; i < end; i++) { result += data[i]; } return result; } int mid = (start + end) / 2; AccumulatorRecursiveTask left = new AccumulatorRecursiveTask(start, mid, data); AccumulatorRecursiveTask right = new AccumulatorRecursiveTask(mid, end, data); left.fork(); Integer rightResult = right.compute(); Integer leftResult = left.join(); return rightResult + leftResult; } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

RecursiveAction

package com.artisan.java8; import java.util.concurrent.RecursiveAction; import java.util.concurrent.atomic.AtomicInteger; public class AccumulatorRecursiveAction extends RecursiveAction { private final int start; private final int end; private final int[] data; private final int LIMIT = 3; public AccumulatorRecursiveAction(int start, int end, int[] data) { this.start = start; this.end = end; this.data = data; } @Override protected void compute() { if ((end - start) <= LIMIT) { for (int i = start; i < end; i++) { AccumulatorHelper.accumulate(data[i]); } } else { int mid = (start + end) / 2; AccumulatorRecursiveAction left = new AccumulatorRecursiveAction(start, mid, data); AccumulatorRecursiveAction right = new AccumulatorRecursiveAction(mid, end, data); left.fork(); right.fork(); left.join(); right.join(); } } static class AccumulatorHelper { private static final AtomicInteger result = new AtomicInteger(0); static void accumulate(int value) { result.getAndAdd(value); } public static int getResult() { return result.get(); } static void rest() { result.set(0); } } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

Java8 - 一文搞定Fork/Join 框架

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

Fork/Join执行流程

最佳实践

虽然分支/合并框架还算简单易用,不幸的是它也很容易被误用

对一个任务调用 join 方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子任务的计算都开始之后再调用它。否则,你得到的版本会比原始的顺序算法更慢更复杂,因为每个子任务都必须等待另一个子任务完成才能启动。

不应该在 RecursiveTask 内部使用 ForkJoinPool 的 invoke 方法。相反,你应该始终直接调用 compute 或 fork 方法,只有顺序代码才应该用 invoke 来启动并行计算。

Java 任务调度

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:解锁华为云新技能-AIOT开发全流程(1)【设备接入-ESP端侧数据收集[MQTT]-实时数据分析】(步步截图较详细)
下一篇:跨云业务迁移时需要注意什么?
相关文章