MapReduce快速入门系列(9) | Shuffle之Combiner合并

网友投稿 876 2022-05-30

Hello,大家好!博主上篇讲解了分区,这篇要讲的是合并操作。如何讲解这个章节呢?首先先对什么是合并进行解释,然后通过案例进行证明。

目录

一. Combiner合并的简单介绍

二. 通过图片了解使用Combiner和不使用的区别

三. 代码实现

3.1 编写Mapper类

3.2 编写Reducer类

3.3 编写Driver驱动类

四. 对比及结论

一. Combiner合并的简单介绍

今天我们讲的是Shuffle中的第七步

每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是

对map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能

,是 MapReduce 的一种优化手段之一。

1. Combiner是MR程序中Mapper和Reducer之外的一种组件。

2. Combiner组件的父类就是Reducer。

3. Combiner和Reducer的区别在于运行的位置

Combiner是在每一个MapTask所在的节点运行

Reducer是接收全局所有Mapper的输出结果

4. Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减少网络传输量。

二. 通过图片了解使用Combiner和不使用的区别

1. 未使用combiner的网络开销

2. 使用combiner的网络开销

可以很明显的看出在combiner阶段,通过合并同一个区中相同key的value值,减小了后续的数据传输,从而提高了网络的io!

但在MapReduce中,combiner是默认不开启的。为什么呢?是因为数据合并并不适用所有的业务需求,如果是计算个数,求和combiner还能发挥它的优势!但如果是求平均数,combiner必不可免的会影响到最终的结果,使结果变得不可靠!所以当我们需要到combiner时,需要手动开启。

3.

自定义Combiner实现步骤

①自定义一个Combiner继承Reducer,重写Reduce方法

public class WordcountCombiner extends Reducer{ @Override protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException { // 1 汇总操作 int count = 0; for(IntWritable v :values){ count += v.get(); } // 2 写出 context.write(key, new IntWritable(count)); } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

②在Job驱动类中设置:

job.setCombinerClass(WordcountCombiner.class);

三. 代码实现

注:

用于对比的程序源代码为《MapReduce系列(2) | 统计输出给定的文本文档每一个单词出现的总次数》中的源代码,有想进行对比的同学,可以自行复制创建对比

(其实本源码就比源代码多一行)。

3.1 编写Mapper类

package com.buwenbuhuo.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * @author 卜温不火 * @create 2020-04-22 21:24 * com.buwenbuhuo.wordcount - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class WcMapper extends Mapper { Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 切割 String[] words = line.split(" "); // 3 输出 for (String word : words) { k.set(word); context.write(k, v); } } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

3.2 编写Reducer类

package com.buwenbuhuo.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * @author 卜温不火 * @create 2020-04-22 21:24 * com.buwenbuhuo.wordcount - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class WcReducer extends Reducer{ int sum; IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException { // 1 累加求和 sum = 0; for (IntWritable count : values) { sum += count.get(); } // 2 输出 v.set(sum); context.write(key,v); } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

3.3 编写Driver驱动类

package com.buwenbuhuo.wordcount; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * @author 卜温不火 * @create 2020-04-22 21:24 * com.buwenbuhuo.wordcount - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class WcDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1 获取配置信息以及封装任务 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 设置jar加载路径 job.setJarByClass(WcDriver.class); // 3 设置map和reduce类 job.setMapperClass(WcMapper.class); job.setReducerClass(WcReducer.class); // 4 设置map输出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // // 仅此一行添加 job.setCombinerClass(WcReducer.class); // 5 设置最终输出kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 6 设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

MapReduce快速入门系列(9) | Shuffle之Combiner合并

四. 对比及结论

1. 对比

2.

Combiner能够应用的前提是不能影响最终的业务逻辑

,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。

本次的分享就到这里了,大家有什么疑惑或者好的建议可以在评论区积极留言。受益的小伙伴们不要忘了关注我呀!!!

MapReduce 网络

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:PHP实现微信小程序支付完整版,可以借鉴!
下一篇:gitbook安装指南
相关文章