MapReduce快速入门系列(9) | Shuffle之Combiner合并

网友投稿 900 2022-05-30

Hello,大家好!博主上篇讲解了分区,这篇要讲的是合并操作。如何讲解这个章节呢?首先先对什么是合并进行解释,然后通过案例进行证明。

目录

一. Combiner合并的简单介绍

二. 通过图片了解使用Combiner和不使用的区别

三. 代码实现

3.1 编写Mapper类

3.2 编写Reducer类

3.3 编写Driver驱动类

四. 对比及结论

一. Combiner合并的简单介绍

今天我们讲的是Shuffle中的第七步

每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是

对map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能

,是 MapReduce 的一种优化手段之一。

1. Combiner是MR程序中Mapper和Reducer之外的一种组件。

2. Combiner组件的父类就是Reducer。

3. Combiner和Reducer的区别在于运行的位置

Combiner是在每一个MapTask所在的节点运行

Reducer是接收全局所有Mapper的输出结果

4. Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减少网络传输量。

二. 通过图片了解使用Combiner和不使用的区别

1. 未使用combiner的网络开销

2. 使用combiner的网络开销

可以很明显的看出在combiner阶段,通过合并同一个区中相同key的value值,减小了后续的数据传输,从而提高了网络的io!

但在MapReduce中,combiner是默认不开启的。为什么呢?是因为数据合并并不适用所有的业务需求,如果是计算个数,求和combiner还能发挥它的优势!但如果是求平均数,combiner必不可免的会影响到最终的结果,使结果变得不可靠!所以当我们需要到combiner时,需要手动开启。

3.

自定义Combiner实现步骤

①自定义一个Combiner继承Reducer,重写Reduce方法

public class WordcountCombiner extends Reducer{ @Override protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException { // 1 汇总操作 int count = 0; for(IntWritable v :values){ count += v.get(); } // 2 写出 context.write(key, new IntWritable(count)); } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

②在Job驱动类中设置:

job.setCombinerClass(WordcountCombiner.class);

三. 代码实现

注:

用于对比的程序源代码为《MapReduce系列(2) | 统计输出给定的文本文档每一个单词出现的总次数》中的源代码,有想进行对比的同学,可以自行复制创建对比

(其实本源码就比源代码多一行)。

3.1 编写Mapper类

package com.buwenbuhuo.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * @author 卜温不火 * @create 2020-04-22 21:24 * com.buwenbuhuo.wordcount - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class WcMapper extends Mapper { Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 切割 String[] words = line.split(" "); // 3 输出 for (String word : words) { k.set(word); context.write(k, v); } } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

3.2 编写Reducer类

package com.buwenbuhuo.wordcount; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * @author 卜温不火 * @create 2020-04-22 21:24 * com.buwenbuhuo.wordcount - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class WcReducer extends Reducer{ int sum; IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException { // 1 累加求和 sum = 0; for (IntWritable count : values) { sum += count.get(); } // 2 输出 v.set(sum); context.write(key,v); } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

3.3 编写Driver驱动类

package com.buwenbuhuo.wordcount; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * @author 卜温不火 * @create 2020-04-22 21:24 * com.buwenbuhuo.wordcount - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class WcDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1 获取配置信息以及封装任务 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 设置jar加载路径 job.setJarByClass(WcDriver.class); // 3 设置map和reduce类 job.setMapperClass(WcMapper.class); job.setReducerClass(WcReducer.class); // 4 设置map输出 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // // 仅此一行添加 job.setCombinerClass(WcReducer.class); // 5 设置最终输出kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 6 设置输入和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

MapReduce快速入门系列(9) | Shuffle之Combiner合并

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

四. 对比及结论

1. 对比

2.

Combiner能够应用的前提是不能影响最终的业务逻辑

,而且,Combiner的输出kv应该跟Reducer的输入kv类型要对应起来。

本次的分享就到这里了,大家有什么疑惑或者好的建议可以在评论区积极留言。受益的小伙伴们不要忘了关注我呀!!!

MapReduce 网络

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:PHP实现微信小程序支付完整版,可以借鉴!
下一篇:gitbook安装指南
相关文章