MapReduce中Combiner的作用

article/2025/10/8 8:35:14

问题提出:

众所周知,Hadoop框架使用Mapper将数据处理成一个<key,value>键值对,再网络节点间对其进行整理(shuffle),然后使用Reducer处理数据并进行最终输出。   

 

在上述过程中,我们看到至少两个性能瓶颈:

  1. 如果我们有10亿个数据,Mapper会生成10亿个键值对在网络间进行传输,但如果我们只是对数据求最大值,那么很明显的Mapper只需要输出它所知道的最大值即可。这样做不仅可以减轻网络压力,同样也可以大幅度提高程序效率。
  2. 使用专利中的国家一项来阐述数据倾斜这个定义。这样的数据远远不是一致性的或者说平衡分布的,由于大多数专利的国家都属于美国,这样不仅Mapper中的键值对、中间阶段(shuffle)的键值对等,大多数的键值对最终会聚集于一个单一的Reducer之上,压倒这个Reducer,从而大大降低程序的性能。

 

目标:

MapReduce中的Combiner就是为了避免map任务和reduce任务之间的数据传输而设置的,Hadoop允许用户针对map task的输出指定一个合并函数。即为了减少传输到Reduce中的数据量。它主要是为了削减Mapper的输出从而减少网络带宽和Reducer之上的负载。

Without Combiner:

With Combiner:

可以看出,使用Combiner之后,能够节省很大的带宽。

 

数据格式转换:

map: (K1, V1) → list(K2,V2)

combine: (K2, list(V2)) → list(K3, V3)

reduce: (K3, list(V3)) → list(K4, V4)

注意:combine的输入和reduce的完全一致,输出和map的完全一致

 

使用注意:

对于Combiner有几点需要说明的是:

  1. 有很多人认为这个combiner和map输出的数据合并是一个过程,其实不然,map输出的数据合并只会产生在有数据spill出的时候,即进行merge操作。

  2. 与mapper与reducer不同的是,combiner没有默认的实现,需要显式的设置在conf中才有作用。

  3. 并不是所有的job都适用combiner,只有操作满足结合律的才可设置combiner。combine操作类似于:opt(opt(1, 2, 3), opt(4, 5, 6))。如果opt为求和、求最大值的话,可以使用,但是如果是求中值的话,不适用。

  4. 一般来说,combiner和reducer它们俩进行同样的操作。

但是:特别值得注意的一点,一个combiner只是处理一个结点中的的输出,而不能享受像reduce一样的输入(经过了shuffle阶段的数据),这点非常关键。具体原因查看下面的数据流解释。

 

插入了Combiner的MapReduce数据流

Combiner:前面展示的流水线忽略了一个可以优化MapReduce作业所使用带宽的步骤,这个过程叫Combiner,它在Mapper之后Reducer之前运行。Combiner是可选的,如果这个过程适合于你的作业,Combiner实例会在每一个运行map任务的节点上运行。Combiner会接收特定节点上的Mapper实例的输出作为输入,接着Combiner的输出会被发送到Reducer那里,而不是发送Mapper的输出。Combiner是一个“迷你reduce”过程,它只处理单台机器生成的数据(特别重要,作者在做一个矩阵乘法的时候,没有领会到这点,把它当成一个完全的reduce的输入数据来处理,结果出错。)。

词频统计是一个可以展示Combiner的用处的基础例子,上面的词频统计程序为每一个它看到的词生成了一个(word,1)键值对。所以如果在同一个文档内“cat”出现了3次,(”cat”,1)键值对会被生成3次,这些键值对会被送到Reducer那里。通过使用Combiner,这些键值对可以被压缩为一个送往Reducer的键值对(”cat”,3)。现在每一个节点针对每一个词只会发送一个值到reducer,大大减少了shuffle过程所需要的带宽并加速了作业的执行。这里面最爽的就是我们不用写任何额外的代码就可以享用此功能!如果你的reduce是可交换及可组合的,那么它也就可以作为一个Combiner。你只要在driver中添加下面这行代码就可以在词频统计程序中启用Combiner。

 

例子

 

输入文件

test1.txt

hello darren
hello zhang
hello java

test2.txt

hello hadoop
hello spark

工具类

package com.darren.hadoop.util;import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;public class HDFSUtil {private static final Logger LOG = Logger.getLogger(HDFSUtil.class);/*** Delete file* * @param conf* @param hdfsPath File path* @throws IOException*/public static void deleteHDFSFile(Configuration conf, String hdfsPath) throws IOException {FileSystem fileSystem = FileSystem.get(conf);Path path = new Path(hdfsPath);if (fileSystem.exists(path)) {fileSystem.delete(path, true);LOG.info("HDFS deleted: " + path);}}
}

Mapper文件

package com.darren.hadoop.wordcount.combiner;import java.io.IOException;
import java.util.StringTokenizer;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class WordCoundMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {Text word = new Text();IntWritable one = new IntWritable(1);String line = value.toString();StringTokenizer tokenizer = new StringTokenizer(line);while (tokenizer.hasMoreTokens()) {String token = tokenizer.nextToken();word.set(token);context.write(word, one);}}
}

Reduce文件

package com.darren.hadoop.wordcount.combiner;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class WordCoundReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {int count = value.get();sum += count;}context.write(key, new IntWritable(sum));}}

没有Combiner的Driver文件

package com.darren.hadoop.wordcount.combiner;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;import com.darren.hadoop.util.HDFSUtil;public class WordCount extends Configured implements Tool {private static final Logger LOG = Logger.getLogger(WordCount.class);private static final String INPUT_PATH = "test-in/wordcount";private static final String OUTPUT_PATH = "test-out/wordcount";public static void main(String[] args) throws Exception {int rtnStatus = -1;try {rtnStatus = ToolRunner.run(new Configuration(), new WordCount(), args);} catch (Exception e) {LOG.error("WordCount Driver", e);}LOG.info("WordCount Driver Status Code :" + rtnStatus);System.exit(rtnStatus);}@Overridepublic int run(String[] args) throws Exception {long start = System.currentTimeMillis();Configuration conf = getConf();Job job = Job.getInstance(conf);job.setJarByClass(WordCount.class);job.setJobName("WordCount");job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);job.setMapperClass(WordCoundMapper.class);job.setReducerClass(WordCoundReducer.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);// job.setCombinerClass(WordCoundCombiner.class);// set reduce numberjob.setNumReduceTasks(1);LOG.info(String.format("No of Reducers: %s", job.getNumReduceTasks()));// delete the output pathHDFSUtil.deleteHDFSFile(conf, OUTPUT_PATH);FileInputFormat.addInputPath(job, new Path(INPUT_PATH));FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));job.waitForCompletion(true);// closethis.close();long end = System.currentTimeMillis();double s = (end - start) / 1000.0;double m = s / 60.0;double h = m / 60.0;LOG.info("Total Cost: [" + s + "] s");LOG.info("Total Cost: [" + m + "] m");LOG.info("Total Cost: [" + h + "] h");return 0;}private void close() throws IOException {FileSystem fileSystem = FileSystem.get(getConf());fileSystem.close();}
}

执行结果日志

	Map-Reduce FrameworkMap input records=5Map output records=10Map output bytes=101Map output materialized bytes=133Input split bytes=270Combine input records=0Combine output records=0Reduce input groups=6Reduce shuffle bytes=133Reduce input records=10Reduce output records=6Spilled Records=20Shuffled Maps =2Failed Shuffles=0Merged Map outputs=2GC time elapsed (ms)=9CPU time spent (ms)=0Physical memory (bytes) snapshot=0Virtual memory (bytes) snapshot=0Total committed heap usage (bytes)=913833984

分析

Map input records=5 Map output records=10,从输入文件的数据来看是正确的。

Combine input records=0 Combine output records=0,因为没有使用Combiner,输入输出是0也是正确的。

Reduce input records=10 Reduce output records=6,从输入文件和mapper结果分析也是正确的。

那么我们来看一下使用Combiner的结果是怎样的。

Combiner文件

package com.darren.hadoop.wordcount.combiner;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class WordCoundCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {int count = value.get();sum += count;}context.write(key, new IntWritable(sum));}}

带有Combiner的Driver文件,把注释去掉

// job.setCombinerClass(WordCoundCombiner.class);job.setCombinerClass(WordCoundCombiner.class);

执行结果日志

	Map-Reduce FrameworkMap input records=5Map output records=10Map output bytes=101Map output materialized bytes=97Input split bytes=270Combine input records=10Combine output records=7Reduce input groups=6Reduce shuffle bytes=97Reduce input records=7Reduce output records=6Spilled Records=14Shuffled Maps =2Failed Shuffles=0Merged Map outputs=2GC time elapsed (ms)=0CPU time spent (ms)=0Physical memory (bytes) snapshot=0Virtual memory (bytes) snapshot=0Total committed heap usage (bytes)=1016070144

分析

Map input records=5 Map output records=10,从输入文件的数据来看是正确的。

Combine input records=10 Combine output records=7,输入和Map输出一致,但是输出为什么是7而不是6。从输入文件可知,两个输入文件,明显都小于128M,显然会划分成两个Map Task, 每一个Map Task对应一个Combiner,所以两个Combiner的输入输出分别是:

// 输入
hello 1
hello 1
hello 1
darren 1
zhang 1
java 1
// 输出
hello 3
darren 1
zhang 1
java 1
// 输入
hello 1
hello 1
hadoop 1
spark 1
// 输出
hello 2
hadoop 1
spark 1

所以,输入是10,输出是7而不是6

Reduce input records=7 Reduce output records=6,输入是Combiner的输出结果,7是正确的,最终结果6也是正确的。

 

参考:

https://blog.csdn.net/guoery/article/details/8529004

https://blog.csdn.net/ipolaris/article/details/8723782


http://chatgpt.dhexx.cn/article/Wf4xw94k.shtml

相关文章

Combiner

一、Combiner 1.Combiner是MR程序中Mapper和Reduce之外的一种组件 2.Combiner组件的父类就是Reducer 3.Combiner和Reducer之间的区别在于运行的位置 4.Reducer是每一个接收全局的Map Task 所输出的结果 5.Combiner是在MapTask的节点中运行 6.每一个map都会产生大量的本地输出…

【MapReduce】Combiner详解

Combiner详解 解析Combiner是什么&#xff1f;为什么会出现Combiner&#xff1f;如何使用 CombinerCombiner注意点 代码实现MapperReduceDriver运行日志加上Combiner 解析 Combiner是什么&#xff1f;为什么会出现Combiner&#xff1f; Combiner是一个本地化的reduce操作&…

最简单的js去除首尾空格

function trimStr(str){return str.replace(/(^\s*)|(\s*$)/g,""); } a runoob console.log(trimStr(a));

js去除字符串空格

1、去除字符串内 “所有” 的空格 var str " 1 1 "; var g str.replace(/\s*/g,""); console.log(g) 2、去除字符串内 “两头” 的空格 var str " 1 1 "; var g str.replace(/^\s*|\s*$/g,""); console.log(g);3、去除字符串内…

JS去空格方法

1.trim() 我们知道trim()在IE所支持的版本中&#xff0c;只有IE9以上能支持&#xff0c;所以下面第二个会介绍另一种方法。 2.replace 以下图所示&#xff0c;如果直接这样输入&#xff0c;replace只能去掉一个空格 如果要去掉多个空格用正则表达式&#xff0c;如下图所示&am…

js去掉空格方法-简单一行原生js代码实现

str为要去除空格的字符串: 1、去掉所有空格 strstr.replace(/\s/g,""); //js去掉所有空格 \s表示查找空格带上加好表示连续的空格2、js去掉两头空格 strstr.replace(/^\s|\s$/g,"");//js去掉两头空格3、js去掉左空格 strstr.replace( /^\s*/, ); //…

js字符串去掉前后空格回车换行

问题&#xff1a; 需要规范用户在textarea框中输入的数据&#xff0c;需去掉字符串前后空格回车换行&#xff08;字符串中间的不需要管&#xff09; 解决&#xff1a; 直接使用trim()方法。 var str row.serviceNameModifyList;strstr.trim();//把数据进行去前后的空格和换行案…

【Android -- 蓝牙】蓝牙配对和蓝牙连接

文章目录 一、蓝牙配对二、蓝牙连接 一、蓝牙配对 搜索到蓝牙设备后&#xff0c;将设备信息填充到listview中&#xff0c;点击listiew则请求配对 蓝牙配对有点击配对和自动配对&#xff0c;点击配对就是我们选择设备两个手机弹出配对确认框&#xff0c;点击确认后配对 自动配…

蓝牙协议之配对和绑定学习笔记

蓝牙配对及绑定专业术语 术语描述BDA蓝牙设备地址RPAResolvable Private Address的缩写&#xff0c;可解析的蓝牙设备地址&#xff0c;它会周期性的变化IRK全称&#xff1a;Identity Resolving Key&#xff0c;用于解析蓝牙设备地址的密钥STKShort Term Key&#xff0c;短期密…

android开发之蓝牙配对连接的方法

新年第一篇。 最近在做蓝牙开锁的小项目&#xff0c;手机去连接单片机总是出现问题&#xff0c;和手机的连接也不稳定&#xff0c;看了不少蓝牙方面的文档&#xff0c;做了个关于蓝牙连接的小结。 在做android蓝牙串口连接的时候一般会使用 BluetoothSocket tmp null; // G…

蓝牙配对

蓝牙HC05是主从一体的蓝牙串口模块&#xff0c;简单的说&#xff0c;当蓝牙设备与蓝牙设备配对连接成功后&#xff0c;我们可以忽视蓝牙内部的通信协议&#xff0c;直接将将蓝牙当做串口用。当建立连接&#xff0c;两设备共同使用一通道也就是同一个串口&#xff0c;一个设备发…

Android蓝牙自动配对Demo,亲测好使!!!

蓝牙自动配对&#xff0c;即搜索到其它蓝牙设备之后直接进行配对&#xff0c;不需要弹出配对确认框或者密钥输入框。 转载请注明出处http://blog.csdn.net/qq_25827845/article/details/52400782 源码下载地址&#xff1a;https://github.com/chaohuangtianjie994/BlueTooth-A…

HC05蓝牙模块配对指南(教程)

HC05蓝牙模块配对指南&#xff08;教程&#xff09; 1.准备两个蓝牙模块&#xff0c;一个作为主机&#xff0c;一个作为从机 本人调试过程中用到的是正点原子的HC05蓝牙模块&#xff0c;其余模块的调试大同小异。 2.进入AT状态 进入AT状态有2种方法&#xff1a; 1,上电同时…

android 实现ble蓝牙自动配对连接

蓝牙自动配对&#xff0c;即搜索到其它蓝牙设备之后直接进行配对&#xff0c;不需要弹出配对确认框或者密钥输入框。 本文章用来连接蓝牙设备ai-thinker&#xff0c;如果你要连接其他蓝牙设备&#xff0c;注意修改相关名字以及修改设备初试pin值。 将Demo安装在Android手机上…

一篇文章带你解读蓝牙配对绑定

BLE配对绑定解读 什么是低功耗蓝牙配对&#xff1f;什么又是绑定&#xff1f;配对和绑定有什么区别&#xff1f;配对有什么好处&#xff1f;如何删除绑定信息&#xff1f;如何确定配对的安全等级&#xff1f;just work的配对一定就不安全吗&#xff1f;如何开发自己的配对应用…

蓝牙配对流程(一)

一、扫描 被动扫描&#xff08;主从之间没有扫描请求与扫描响应&#xff09; 2.主动扫描&#xff08;主从之间有扫描请求与扫描响应&#xff09; 二、过滤 1、信息匹配&#xff08;是否在白名单&#xff09; 三、建立连接 1、建立连接 建立连接后的结果&#xff1a; 连接成…

蓝牙协议和配对

蓝牙协议 蓝牙协议分层 物理层&#xff08;PHA&#xff09;&#xff0c;链路层&#xff08;LL&#xff09;&#xff0c;HCI&#xff08;可选&#xff09;,GAP层&#xff0c;L2CAP&#xff0c;SMP &#xff0c; ATT &#xff0c;GATT GAP层角色总结 对于蓝牙的主机和蓝牙的从…

蓝牙(二)蓝牙搜索、配对、连接

1.搜索 从上一节我们可以知道&#xff0c;蓝牙状态发生了改变&#xff0c;并发生了回调。咱们就从回调开始。 DevicePickerFragment.java 用于蓝牙设置界面的蓝牙配置和管理 Overridepublic void onBluetoothStateChanged(int bluetoothState) {super.onBluetoothStateChange…

Android蓝牙配对

上一篇博客介绍了Android ble的一些情况。 http://blog.csdn.net/guijiaoba/article/details/41172403 蓝牙如果链接好&#xff0c;能够读写&#xff0c;基本上完成了。蓝牙还有个比较烦人的东西&#xff0c;就是蓝牙配对。 Android ble4.0使用的配对方式和原先版本的配对方式…

Android蓝牙开发(二)之蓝牙配对和蓝牙连接

上篇文章&#xff1a;https://blog.csdn.net/huangliniqng/article/details/82185983 讲解了打开蓝牙设备和搜索蓝牙设备&#xff0c;这篇文章来讲解蓝牙配对和蓝牙连接 1.蓝牙配对 搜索到蓝牙设备后&#xff0c;将设备信息填充到listview中&#xff0c;点击listiew则请求配对…