一、MapReduce编程规范
MapReduce的开发一共又八个步骤,其中Map阶段分为2个步骤,Shuffle阶段4个步骤,Reduce阶段分为2个步骤。
1.1 步骤流程
Map阶段2个步骤
- 设置InputFormat类,将数据切分为key-value(k1和v1)对,输入到第二步;
- 自定义Map逻辑,将第一步的结果转换为另外的Key-Value(k2和v2)对,输出结果。
Shuffle阶段的4个步骤
- 对输出的Key-Value进行分区;
- 对不同分区的数据按照相同的Key排序;
- (可选)对分组过的数据初步规约,降低数据的网络拷贝;
- 对数据进行分组,相同key的value放入同一个集合中。
Reduce阶段2个步骤
- 对多个Map任务的结果进行排序以及合并,编写Reduce函数实现自己的逻辑,对输入的key-value进行处理,转为新的key-value(k3和v3)输出;
- 设置OutputFormat处理并保存Reduce输出的key-value数据。
1.1.1 Map阶段
Map函数的输入来自于分布式文件系统的文件块,这些文件块的格式是任意的,可以是文档,也可以是二进制格式。
文件块是一系列元素的集合,这些元素也是任意类型的,同一个元素不能跨文件块存储。
Map函数将输入的元素转换成<key,value>形式的键值对,键和值的类型也是任意的,其中键不同于一般的标志属性,即键没有唯一性。
Map阶段输出的结果是许多<key,1>
具有相同key的键值对会被发送到同一个Reduce那里。
1.1.2 Shuffle阶段
1.1.3 Reduce阶段
1.2 InputFormat
平时我们写MapReduce程序的时候,在设置输入格式的时候,总会调用形如job.setInputFormatClass(KeyValueTextInputFormat.class)来保证输入文件按照我们想要的格式被读取。
所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等。
其实,一个输入格式InputFormat,主要无非就是要解决如何将数据分割成分片(比如多少行为一个分片),以及如何读取分片中的数据(比如按行读取)。前者由getSplits()完成,后者由RecordReader完成。
不同的InputFormat都会按自己的实现来读取输入数据并产生输入分片,一个输入分片会被单独的map task作为数据源。
下面我们先看看这些输入分片(inputSplit)是什么样的。
1.2.1 InputSplit
Mappers的输入是一个一个的输入分片,称InputSplit。InputSplit是一个抽象类,它在逻辑上包含了提供给处理这个InputSplit的Mapper的所有K-V对。
public abstract class InputSplit {public abstract long getLength() throws IOException, InterruptedException;public abstract String[] getLocations() throws IOException, InterruptedException;
}
- getLength()用来获取InputSplit的大小,以支持对InputSplits进行排序;
- getLocations()则用来获取存储分片的位置列表。
我们来看一个简单InputSplit子类:FileSplit。
public class FileSplit extends InputSplit implements Writable {private Path file;private long start;private long length;private String[] hosts;FileSplit() {}public FileSplit(Path file, long start, long length, String[] hosts) {this.file = file;this.start = start;this.length = length;this.hosts = hosts;}//序列化、反序列化方法,获得hosts等等……
}
从上面的源码我们可以看到,一个FileSplit是由文件路径,分片开始位置,分片大小和存储分片数据的hosts列表组成,由这些信息我们就可以从输入文件中切分出提供给单个Mapper的输入数据。这些属性会在Constructor设置,我们在后面会看到这会在InputFormat的getSplits()中构造这些分片。
我们再看CombineFileSplit
public class CombineFileSplit extends InputSplit implements Writable {private Path[] paths;private long[] startoffset;private long[] lengths;private String[] locations;private long totLength;public CombineFileSplit() {}public CombineFileSplit(Path[] files, long[] start, long[] lengths, String[] locations) {initSplit(files, start, lengths, locations);}public CombineFileSplit(Path[] files, long[] lengths) {long[] startoffset = new long[files.length];for (int i = 0; i < startoffset.length; i++) {startoffset[i] = 0;}String[] locations = new String[files.length];for (int i = 0; i < locations.length; i++) {locations[i] = "";}initSplit(files, startoffset, lengths, locations);}private void initSplit(Path[] files, long[] start, long[] lengths, String[] locations) {this.startoffset = start;this.lengths = lengths;this.paths = files;this.totLength = 0;this.locations = locations;for(long length : lengths) {totLength += length;}}//一些getter和setter方法,和序列化方法
}
与FileSplit类似,CombineFileSplit同样包含文件路径,分片起始位置,分片大小和存储分片数据的host列表。
由于CombineFileSplit是针对小文件的,它把很多小文件包在一个InputSplit内,这样一个Mapper就可以处理很多小文件。
要知道我们上面的FileSplit是对应一个输入文件的,也就是说如果用FileSplit对应的FileInputFormat来作为输入格式,那么即使文件特别小,也是单独计算成一个输入分片来处理的。当我们的输入是由大量小文件组成的,就会导致有同样大量的InputSplit,从而需要同样大量的Mapper来处理,这将很慢,想想有一堆map task要运行!!这是不符合Hadoop的设计理念的,Hadoop是为处理大文件优化的。
最后介绍TagInputSplit,这个类就是封装了一个InputSplit,然后加了一些tags在里面满足我们需要这些tags数据的情况,我们从下面就可以一目了然。
class TaggedInputSplit extends InputSplit implements Configurable, Writable {private Class<? extends InputSplit> inputSplitClass;private InputSplit inputSplit;@SuppressWarnings("unchecked")private Class<? extends InputFormat> inputFormatClass;@SuppressWarnings("unchecked")private Class<? extends Mapper> mapperClass;private Configuration conf;//getters and setters,序列化方法,getLocations()、getLength()等
}
1.2.2 InputFormat
现在我们对InputSplit的概念有了一些了解,我们继续看它是怎么被使用和计算出来的。
通过使用InputFormat,MapReduce框架可以做到:
- 验证作业的输入的正确性
- 将输入文件切分成逻辑的InputSplits,一个InputSplit将被分配给一个单独的Mapper task
- 提供RecordReader的实现,这个RecordReader会从InputSplit中正确读出一条一条的K-V对供Mapper使用。
public abstract class InputFormat<K, V> {public abstract List<InputSplit> getSplits(JobContext context) throws IOException,InterruptedException;public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException,InterruptedException;
}
上面是InputFormat的源码,getSplits用来获取由输入文件计算出来的InputSplits,我们在后面会看到计算InputSplits的时候会考虑到输入文件是否可分割、文件存储时分块的大小和文件大小等因素;
createRecordReader()提供了前面第三点所说的RecordReader的实现,以将K-V对从InputSplit中正确读出来,比如LineRecordReader就以偏移值为key,一行的数据为value,这就使得所有其createRecordReader()返回了LineRecordReader的InputFormat都是以偏移值为key,一行数据为value的形式读取输入分片的。
**PathFilter被用来进行文件筛选,这样我们就可以控制哪些文件要作为输入,哪些不作为输入。**PathFilter有一个accept(Path)方法,当接收的Path要被包含进来,就返回true,否则返回false。可以通过设置mapred.input.pathFilter.class来设置用户自定义的PathFilter。
public interface PathFilter {boolean accept(Path path);
}
FileInputFormat是InputFormat的子类,它包含了一个MultiPathFilter,这个MultiPathFilter由一个过滤隐藏文件(名字前缀为’-‘或’.’)的PathFilter和一些可能存在的用户自定义的PathFilters组成,MultiPathFilter会在listStatus()方法中使用,而listStatus()方法又被getSplits()方法用来获取输入文件,也就是说实现了在获取输入分片前先进行文件过滤。
private static class MultiPathFilter implements PathFilter {private List<PathFilter> filters;public MultiPathFilter(List<PathFilter> filters) {this.filters = filters;}public boolean accept(Path path) {for (PathFilter filter : filters) {if (!filter.accept(path)) {return false;}}return true;}
}
1.2.3 TextInputFormat
我们来看看FileInputFormat的几个子类。
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {@Overridepublic RecordReader<LongWritable, Text> createRecordReader(InputSplit split,TaskAttemptContext context) {return new LineRecordReader();}@Overrideprotected boolean isSplitable(JobContext context, Path file) {CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file);return codec == null;}
}
二、案例
2.1 单词统计
2.1.1 数据准备
-
创建一个新的文件
cd /export/servers vim wordcount.txt
-
向其中放入英文句子并保存
-
上传到HDFS
hdfs dfs -mkdir /wordcount/ hdfs dfs -put wordcount.txt /wordcount/
2.1.2 Mapper
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;/*Mapper<KeyIn,ValueIn,KeyOut,ValueOut>KeyIn:k1ValueIn:v1KeyOut:k2ValueOut:v2*/public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {/*参数:key:k1 行偏移量value:v1 每一行的文本数据content:上下文对象*/@Overrideprotected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { Text text=new Text();LongWritable longWritable=new LongWritable();//get values stringString valueString = value.toString();//spile string String wArr[] = valueString.split(" ");//map out key/value for (String word:wArr) {text.set(word);longWritable.set(1);context.write(text,longWritable);}}
}
2.1.3 Reducer
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;/*Reducer<KeyIn,ValueIn,KeyOut,ValueOut>KeyIn:k2ValueIn:v2KeyOut:k3ValueOut:v3*/public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>
{ /*参数:key:新k2 行偏移量v2s:集合 新v2content:上下文对象*/@Override protected void reduce(Text key, Iterable<LongWritable> v2s,Context context) throws IOException, InterruptedException {Iterator<LongWritable> it = v2s.iterator(); //define var sum long sum = 0; // iterator count arr while(it.hasNext()){ sum += it.next().get(); }context.write(key,new LongWritable(sum)); }
}
2.1.4 定义主类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.Job;
public class TestMapReducer {public static void main(String[] args) throws Exception{ Configuration conf = new Configuration();conf.set("fs.default.name", "hdfs://ubuntu:9000"); // step1 : get a job Job job = Job.getInstance(conf); //step2: set jar main class job.setJarByClass(TestMapReducer.class); //step3: set map class and reducer class job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); //step4: set map reduce output type job.setMapOutputKeyClass(Text.class); //k2job.setMapOutputValueClass(LongWritable.class); //v3 job.setOutputKeyClass(Text.class); //k3job.setOutputValueClass(LongWritable.class); //v3// set input/output typejob.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);//step5: set key/value output file format and input/output path TextInputFormat.setInputPath(job, new Path("hdfs://node01:8020/wordcount")); TextOutputFormat.setOutputPath(job, new Path("hdfs://node01:8082/output")); //step6: commit job job.waitForCompletion(true); }
}
2.1.5 MapReduce运行模式
- 集群运行模式
- 将MapReduce程序提交给Yarn集群,分发到很多节点上并发执行;
- 处理的数据和输出结果应该位于HDFS文件系统;
- 提交集群的实现步骤:将程序打成JAR包,并上传,然后在集群上用hadoop命令启动
执行命令:hadoop jar xxx.jar 主类名
- 本地运行模式
- MapReduce程序是在本地以单进程的形式运行;
- 处理的数据及输出结果在本地文件系统
// 这样是指本地文件系统
TextInputFormat.setInputPaths(job, new Path("file:///home/zcc/Desktop/simple/zcc.txt"));
TextOutputFormat.setOutputPath(job, new Path("file:///home/zcc/Desktop/simple/output"));
三、MapReduce分区
3.1 分区概述
在MapReduce中,通过我们指定分区,会将同一个分区的数据发送到同一个Reduce中进行处理。
3.2 编程实现
案例说明:将大于15的数字和小于等于15的数字分离。
3.2.1 定义Mapper
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;import java.io.IOException;
/** k1:行偏移量* v1:行文本数据* * k2:行文本数据* v2:NullWritable,v2不需要设置,但又必须有,因此使用Null占位*/public class PartitionMap extends Mapper<LongWritable,Text,Text,NullWritable>{@Overrideprotected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException {context.write(value,NullWritable.get());}
}
3.2.2 自定义Partitioner
主要的分区逻辑就在这里,通过Partitioner将数据分发给不同的Reducer。
我们需要继承Partitioner,重写getPartition方法,默认的分区规则是只要key相同就是同一个分区。
使用setPartitionerClass设置自定义的parttitioner。
package partition;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;public class MyPartitoner extends Partitioner<Text,NullWritable>{/**定义分区规则,反回对应的分区编号*/@Overridepublic int getPartition(Text text,NullWritable nullWritable,int i) {String[] split=text.toString().split("\t");String numStr=split[5];if(Integer.parseInt(numStr)>15) {return 1;}return 0;}
}
3.2.3 定义Reducer
3.2.4 编写主类
package partition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;public class JobMain extends Configured implements Tool{@Overridepublic int run(String[] arg0) throws Exception {//1. 获得Job实例Job job = Job.getInstance(super.getConf(),"partition_mapreduce");//2. 对job任务进行配置//第一步:设置输入类和输入的路径job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("hdfs://ubuntu:8020/input"));//第二步:设置Mapper类和数据类型job.setMapperClass(PartitionMap.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//第三步:指定分区类job.setPartitionerClass(MyPartitioner.class);//第四步:指定Reducer类和数据类型job.setReducerClass(PartitionReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//设置Reducer Task的个数job.setNumReduceTasks(2);//第五步:指定输出类和输出路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job,new Path("hdfs://ubuntu:8020/out/partition_out"));boolean b=job.waitForCompletion(true);return b?1:0;}public static void main(String[] args) throws Exception{Configuration configuration=new Configuration();int code=ToolRunner.run(configuration,new JobMain(),args);System.out.println(code);}}
四、排序和序列化
4.1 概述
- 序列化是指把结构化对象转化为字节流。
- 反序列化是指序列化的逆过程,把字节流转为结构化对象。当要在进程间传递对象或持续化对象的时候,就需要序列化对象成字节流;反之当要将接收到或从磁盘中读取的字节流转为对象时,就要进行反序列化。
- Hadoop的序列化机制(writable)具有精简高效的特点。
- Writable时Hadoop的序列化格式,Hadoop定义了Writable接口,一个类要支持可序列化只需要实现这个接口即可。
- 另外Writable有一个子接口,WritableComparable,它是即可实现序列化,也可以对key进行比较。
4.2 编程实现
要求:对下列数据进行排序,首先对第一列进行排序,第一列相同时,对第二列进行排序。
数据格式
a 1
a 9
b 3
a 7
b 8
b 10
a 5
4.2.1 自定义类型和比较器
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class SortBean implements WritableComparable<SortBean>{private String word;private int num;@Overridepublic void readFields(DataInput in) throws IOException {this.word=in.readUTF();this.num=in.readInt();}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(word);out.writeInt(num);}@Overridepublic int compareTo(SortBean o) {int res=this.word.compareTo(o.getWord());if(res==0) {return this.num-o.getNum();}return res;}public String getWord() {return word;}public void setWord(String word) {this.word = word;}public int getNum() {return num;}public void setNum(int num) {this.num = num;}@Overridepublic String toString() {return word+"\t"+num;}}
4.2.2 Mapper
import org.apache.hadoop.io.Text;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Mapper;public class SortMapper extends Mapper<LongWritable,Text,SortBean,NullWritable>{@Overrideprotected void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {String[] split = value.toString().split("\t");SortBean sortBean=new SortBean();sortBean.setWord(split[0]);sortBean.setNum(Integer.parseInt(split[1]));context.write(sortBean,NullWritable.get());}}
4.2.3 Reducer
import java.io.IOException;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;public class SortReducer extends Reducer<SortBean,NullWritable,SortBean,NullWritable>{@Overrideprotected void reduce(SortBean key,Iterable<NullWritable> values,Context context) throws IOException, InterruptedException {context.write(key,NullWritable.get());}}
4.2.4 主类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;public class JobMain extends Configured implements Tool{@Overridepublic int run(String[] arg0) throws Exception {//1. Job job = Job.getInstance(super.getConf(),"partition_mapreduce");job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("hdfs://ubuntu:8020/input"));job.setMapperClass(SortMapper.class);job.setMapOutputKeyClass(SortBean.class);job.setMapOutputValueClass(NullWritable.class);job.setReducerClass(SortReducer.class);job.setOutputKeyClass(SortBean.class);job.setOutputValueClass(NullWritable.class);job.setNumReduceTasks(2);job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job,new Path("hdfs://ubuntu:8020/out/partition_out"));boolean b=job.waitForCompletion(true);return b?1:0;}public static void main(String[] args) throws Exception{Configuration configuration=new Configuration();int code=ToolRunner.run(configuration,new JobMain(),args);System.out.println(code); }
}
五、规约Combiner
5.1 概述
又叫合并。
每一个map都可能产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并(也可以说是局部reduce),以减少在map和reduce节点之间的数据传输量,提高网络IO性能,是MapReduce的一种优化手段。
- combiner是MR程序中,除Mapper和Reducer之外的一种组件;
- combiner的父类就是Reducer;
- combiner和reducer的区别在于运行的位置
- combiner是在每一个maptask所在节点运行
- reducer是接收全局所有Mapper的输出结果
- combiner的意义就是对每一个maptask的输出进行局部汇总。
5.2 实现步骤
- 自定义一个combiner继承Reducer,重写reducer方法;
- 在job中设置job.setCombinerClass()
combiner能够应用的前提是不能影响最终的业务逻辑,而且combiner的输出kv,要与reducer的输入kv类型对应。
lic static void main(String[] args) throws Exception{Configuration configuration=new Configuration();int code=ToolRunner.run(configuration,new JobMain(),args);System.out.println(code); }
}