一、Combiner
1.Combiner是MR程序中Mapper和Reduce之外的一种组件
2.Combiner组件的父类就是Reducer
3.Combiner和Reducer之间的区别在于运行的位置
4.Reducer是每一个接收全局的Map Task 所输出的结果
5.Combiner是在MapTask的节点中运行
6.每一个map都会产生大量的本地输出,Combiner的作用就是对map输出的结果先做一次合并,以较少的map和reduce节点中的数据传输量
7.Combiner的存在就是提高当前网络IO传输的性能,也是MapReduce的一种优化手段。
8.实现自定义的Combiner
1.因为源码中的Combiner是继承于Reducer,我们使用自己的Combiner就需要继承Reducer并重写reduce方法
2.job中设置job.setCombinerClass(自定义Combiner的类.class)
ps:需要注意一个点就是:Combiner就是一次Reducer类中reduce方法的实现,所以这里的KV需要和Reducer的KV是一致的
实际开发一定是先实现Mapper之后,知道了KV,然后再根据需要实现自定义的Combiner中的KV
Combiner.javaimport java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class CombinerDemo extends Reducer<Text, Text, Text, Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {//因为Combiner就相当于在Mapper实现了reduce方法//所以逻辑和实际Reducer中的reduce方法一致int sum = 0;for (Text t : values) {sum += Integer.parseInt(t.toString());}context.write(key, new Text(sum + ""));}
}WordCountCombiner.javaimport java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountCombiner {//实现MapReduce/*1.实现Mapper端的逻辑* KEYIN:文件中读取的偏移量-->LongWritable(固定的) * VALUEIN:文件中实际读取的内容-->Text* KEYOUT:Mapper处理完成后传递给Reducer中的KEYIN的数据类型-->不固定,根据需求来* VALUEOUT:Mapper端处理完成后传递给Reducer中的VALUEIN的数据类型-->不固定,根据需求来* */public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{/* 进入Map处理逻辑之前会执行一次的方法*/@Overrideprotected void setup(Context context)throws IOException, InterruptedException {}/** 需要实现Mapper端的处理逻辑* key:是文件中数据的偏移量,数据类型是由泛型中定义得来的KEYIN* value:是文件中实际的内容,数据类型是泛型中定义得来的VALUEIN* context:将处理过后产生的KV,写成文件输出*/@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//第一种String [] words = value.toString().split(" ");for (String word : words) {context.write(new Text(word), new Text("1"));}//第二种/** 通过StringTokenizer类来进行拆分,默认是空格\t,\n,\r,\f* 需要向 使用迭代器一样使用这个对象*//* String line = value.toString();//若文件中的数据分割方式较多,那么建议使用这个类进行拆分//这个类实现了枚举迭代器,所以它提供了一些类似于迭代一样的获取数据方式StringTokenizer st = new StringTokenizer(line);while(st.hasMoreTokens()) {//返回是否还有分隔符-->即判断是否还有其他分隔符//返回从当前位置到分隔符之间的字符串-->获取下一个元素
// st.nextToken();//取出元素context.write(new Text(st.nextToken()), new Text("1"));}*/}/* * 在Map处理逻辑之后会执行一次,可以处理一些逻辑*/@Overrideprotected void cleanup(Context context)throws IOException, InterruptedException {}}//实现Reducer端的逻辑/** Reducer相当于对Mapper端处理过后的数据进行一个实际的处理业务* KEYIN-->Mapper处理过后输出key的数据类型,由Mapper的泛型中第三个参数决定* VALUE-->Mapper处理过后输出value的数据类型,由Mapper的泛型中第四个参数决定* KEYOUT-->Reducer端处理完数据之后要写出key的数据类* VALUEOUT-->Reducer处理完数据之后,要写出value的 数据类*/public static class MyReduce extends Reducer<Text, Text, Text, Text>{/* 执行Reducer端先执行一次的方法*/@Overrideprotected void setup(Context context) throws IOException, InterruptedException {}/**reduce方法是处理业务的核心逻辑 *key: 是从Mapper端处理完成后,产生key的数据*values-->是从 Mapper端处理完成之后相同key的values的数据集合*context-->写出实际 处理完成后KV的方法 */@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {//因为Combiner就相当于在Mapper实现了reduce方法//所以逻辑和实际Reducer中的reduce方法一致int sum = 0;for (Text t : values) {sum += Integer.parseInt(t.toString());}context.write(key, new Text(sum + ""));}/* * 执行完reduce方法执行的方法* */@Overrideprotected void cleanup(Context context)throws IOException, InterruptedException {}}/*** 实现job,完成作业配置*/public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//1.获取配置对象Configuration conf = new Configuration();//2.创建Job对象(创建作业)/** 这个方法一共有两个参数版本* getInstance(conf) --------直接传入配置对象* getInstance(conf,"WordCountCombiner")---传入配置对象和类的名字*/Job job = Job.getInstance(conf);//3.设置运行job的classjob.setJarByClass(WordCountCombiner.class);//4.设置Mapper端的运行类和输出key,输出value的数据类型job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//5.读取数据来源//这两个方法处理是一样的,只是最后一个参数不同,/** FileInputFormat.addInputPath(job, new Path("input/data1"));* * add:证明只有 一个路径,* * FileInputFormat.setInputPaths(job, new Path("input/data1"));* set证明后面是可变参数,多个* * 因为当前运行的是本地MR,所以数据是 从本地读取,若需要在集群中运行,这个位置的参数应该是args[0]*///FileInputFormat.addInputPath(job, new Path("input/data1"));FileInputFormat.setInputPaths(job, new Path("input/data1"));//优化设置//一般可以写分区设置,多文件输出设置,Combiner设置/** 并不是所有job都适用于Combiner,只有操作满足结合规律才可以进行设置* 如 求和,求最大值,topN 等可以使用Combiner* * Combiner不一定需要存在,只有数据量较大,需要做优化的时候可以使用*/job.setCombinerClass(CombinerDemo.class);//6.社会Reducer端的运行类和输出key和输出value的数据类型job.setReducerClass(MyReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//7.处理完文件之后输出的路径//ps:因为当前运行的是本地MR,所以数据是写到本地的,若需要再集群中运行,这个位置的参数应该是args[1]//数据是存储到HDFS中FileOutputFormat.setOutputPath(job, new Path("output1"));//8.提交作业int isok = job.waitForCompletion(true)?0:-1;System.exit(isok);}
}
word.txt里面的内容