一文搞懂excel的饼图制作(excel的饼图怎么做)
582
2022-05-29
文章目录
概述
CPU密集型 vs IO密集型
计算密集型任务
IO密集型
简单示例
Fork/Join常用的类
RecursiveTask 实现 并行计算
RecursiveAction
Fork/Join执行流程
最佳实践
概述
分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。
它是 ExecutorService 接口的一个实现,它把子任务分配给线程池(称为 ForkJoinPool )中的工作线程。
CPU密集型 vs IO密集型
通常来讲,任务可以划分为计算密集型和IO密集型
计算密集型任务
特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。
这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。
计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。
IO密集型
涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。
对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。
IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少 。
简单示例
来看个最简单的求和
public class ForkJoinTest { private static int[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; public static void main(String[] args) { System.out.println("result=> " + calc()); System.out.println("result=> " + calcByStream()); } private static int calc() { int result = 0; for (int i = 0; i < data.length; i++) { result += data[i]; } return result; } private static Long calcByStream() { return LongStream.rangeClosed(0,10).reduce(0, Long::sum); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Fork/Join常用的类
ForkJoinTask:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制 。
通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:
RecursiveAction:用于没有返回结果的任务。 比如写数据到磁盘,然后就退出了。 一个RecursiveAction可以把自己的工作分割成更小的几块, 这样它们可以由独立的线程或者CPU执行。 我们可以通过继承来实现一个RecursiveAction
RecursiveTask :用于有返回结果的任务。 可以将自己的工作分割为若干更小任务,并将这些子任务的执行合并到一个集体结果。 可以有几个水平的分割和合并
CountedCompleter: 在任务完成执行后会触发执行一个自定义的钩子函数
ForkJoinPool :ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
RecursiveTask 实现 并行计算
要把任务提交到这个池,必须创建 RecursiveTask
要定义 RecursiveTask, 只需实现它唯一的抽象方法compute
protected abstract R compute();
1
这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。
if (任务足够小或不可分) { 顺序计算该任务 } else { 将任务分成两个子任务 递归调用本方法,拆分每个子任务,等待所有子任务完成 合并每个子任务的结果 }
1
2
3
4
5
6
7
一般来说并没有确?的标准决定一个任务是否应该再拆分,但有几种试探方法可以帮助你
事实上,这只不过是著名的分治算法的并行版本而已。
现在编写一个方法来并行对前n个自然数求和就很简单了。你只需把想要的数字数组传给ForkJoinSumCalculator 的构造函数:
public static long forkJoinSum(long n) { long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinTask
1
2
3
4
5
这里用了一个 LongStream 来生成包含前n个自然数的数组,然后创建一个 ForkJoinTask( RecursiveTask 的父类),并把数组传递给 ForkJoinSumCalculator 的公共构造函数。
最后,创建了一个新的 ForkJoinPool ,并把任务传给它的调用方法 。在ForkJoinPool 中执行时,最后一个方法返回的值就是 ForkJoinSumCalculator 类定义的任务结果。
在实际应用时,使用多个 ForkJoinPool 是没有什么意义的。正是出于这个原因,一般来说把它实例化一次,然后把实例保存在静态字段中,使之成为单例,这样就可以在软件中任何部分方便地重用了。这里创建时用了其默认的无参数构造函数,这意味着想让线程池使用JVM能够使用的所有处理器。更确切地说,该构造函数将使用 Runtime.availableProcessors 的返回值来决定线程池使用的线程数。请注意 availableProcessors 方法虽然看起来是处理器,但它实际上返回的是可用内核的数量,包括超线程生成的虚拟内核。
当把 ForkJoinSumCalculator 任务传给 ForkJoinPool 时,这个任务就由池中的一个线程执行,这个线程会调用任务的 compute 方法。
该方法会检查任务是否小到足以顺序执行,如果不够小则会把要求和的数组分成两半,分给两个新的 ForkJoinSumCalculator ,而它们也由ForkJoinPool 安排执行。
因此,这一过程可以递归重复,把原任务分为更小的任务,直到满足不方便或不可能再进一步拆分的条件(本例中是求和的项目数小于等于10 000)。
这时会顺序计算每个任务的结果,然后由分支过程创建的(隐含的)任务二叉树遍历回到它的根。接下来会合并每个子任务的部分结果,从而得到总任务的结果。
package com.artisan.java8; import java.util.concurrent.RecursiveTask; public class AccumulatorRecursiveTask extends RecursiveTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
RecursiveAction
package com.artisan.java8; import java.util.concurrent.RecursiveAction; import java.util.concurrent.atomic.AtomicInteger; public class AccumulatorRecursiveAction extends RecursiveAction { private final int start; private final int end; private final int[] data; private final int LIMIT = 3; public AccumulatorRecursiveAction(int start, int end, int[] data) { this.start = start; this.end = end; this.data = data; } @Override protected void compute() { if ((end - start) <= LIMIT) { for (int i = start; i < end; i++) { AccumulatorHelper.accumulate(data[i]); } } else { int mid = (start + end) / 2; AccumulatorRecursiveAction left = new AccumulatorRecursiveAction(start, mid, data); AccumulatorRecursiveAction right = new AccumulatorRecursiveAction(mid, end, data); left.fork(); right.fork(); left.join(); right.join(); } } static class AccumulatorHelper { private static final AtomicInteger result = new AtomicInteger(0); static void accumulate(int value) { result.getAndAdd(value); } public static int getResult() { return result.get(); } static void rest() { result.set(0); } } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
Fork/Join执行流程
最佳实践
虽然分支/合并框架还算简单易用,不幸的是它也很容易被误用
对一个任务调用 join 方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子任务的计算都开始之后再调用它。否则,你得到的版本会比原始的顺序算法更慢更复杂,因为每个子任务都必须等待另一个子任务完成才能启动。
不应该在 RecursiveTask 内部使用 ForkJoinPool 的 invoke 方法。相反,你应该始终直接调用 compute 或 fork 方法,只有顺序代码才应该用 invoke 来启动并行计算。
Java 任务调度
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。