Mapreduce之购物篮分析
购物篮分析是一个流行的数据挖掘技术,在市场营销中这个技术可以揭示不同商品或商品组之间的相似度
MBA原理
通过MapReduce框架,设计相关的解决方案生成交易的关联规则,从而查找最常见的商品对
应用领域
- 信用卡交易分析
- 电话呼叫模式分析
- 欺诈识别
- 电信服务交易分析
- 大型在线零售商的每日/每周交易分析
样例输入
crackers,bread,banana
crackers,coke,butter,coffee
crackers,bread
crackers,bread
crackers,bread
crackers,bread,coffee
butter,coke
butter,coke,bread,crackers
样例输出
mapper阶段任务
maper阶段的map()函数根据购物篮子中的商品生成如下键-值对
[<crackers,icecream>,1]
[<crackers,coke.,1]
但是在程序自动分类过程中会和出现如下现象
购物篮T1:crackers,icecream,coke
购物篮T2:icecream,coke,crackers
根据关联规则,对于T1会生成如下规则
[(crackers,icecream),1]
[(crackers,coke),1]
[(icecream,coke),1]
对于T2则会生成如下规则
[(icecream,coke),1]
[(icecream,crackers),1]
[(coke,crackers),1]
从中我们可以看到,有六对规则,但是我们发现(crackers,icecream)和(icecream,crackers)是一样的,在这里会被分成不同的规则,所以在生成规则之前需要对商品按照字母顺序进行排序,就可以避免这个问题
mapper阶段编码
在这个过程中使用快速排序算法对商品进行排序
package com.deng.MBA;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class MBAMapper extends Mapper<LongWritable,Text,Text,IntWritable>{int numberOfPairs;public static final int DEFAULT_NUMBER_OF_PAIRS=2;protected void setup(Context context) throws IOException,InterruptedException{this.numberOfPairs=context.getConfiguration().getInt("number.of.pairs",DEFAULT_NUMBER_OF_PAIRS);}public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{String line=value.toString();List<String> items=convertItems(line);if((items==null)||items.isEmpty()){return ;}generateMapperOutput(numberOfPairs,items,context);}private static List<String> convertItems(String line){if((line==null)||line.length()==0){return null;}String[] tokens=line.split(",");if((tokens==null)||(tokens.length==0)){return null;}List<String> items=new ArrayList<String>();for(String token:tokens){if(token!=null){items.add(token.trim());}}return items;}private void generateMapperOutput(int numberOfPairs, List<String> items, Context context)throws IOException, InterruptedException{List<List<String>> sortedCombinations =Combination.findSortedCombinations(items,numberOfPairs);for(List<String> itemsList: sortedCombinations){context.write(new Text(itemsList.toString()),new IntWritable(1));}}
}
其中combinations是一个简单的工具,使用Combinations.generateCombinations(s1,s2,…,sn)方法可以生成给定的集合,如下所示
假设购物篮为{a,b,c}
假设生成具有两个商品的规则,则分类结果如下所示
[a,b],[a,c],[b,c]
Combination编码
package com.deng.MBA;import java.util.List;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;public class Combination {public static <T extends Comparable<? super T>> List<List<T>>findSortedCombinations(Collection<T> elements,int n){List<List<T>> result =new ArrayList<List<T>>();if(n==0){result.add(new ArrayList<T>());return result;}List<List<T>> combinations=findSortedCombinations(elements,n-1);for(List<T> combination:combinations){for(T element:elements){if(combination.contains(element)){continue;}List<T> list=new ArrayList<T>();list.addAll(combination);if(list.contains(element)){continue;}list.add(element);Collections.sort(list);if(result.contains(list)){continue;}result.add(list);}}return result;}public static void main(String[] args) {List<String> elements = Arrays.asList("a", "b", "c", "d", "e");List<List<String>> combinations = findSortedCombinations(elements, 2);System.out.println(combinations);}
}
reduce阶段任务
这个阶段就是对规则的支持度进行统计
reduce阶段编码
package com.deng.MBA;
import java.io.IOException;
import java.io.WriteAbortedException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;public class MBAReduce extends Reducer<Text,IntWritable,Text,IntWritable> {public void reduce(Text key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{int sum=0;for(IntWritable value:values){sum+=value.get();}context.write(key,new IntWritable(sum));}
}
完整代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.util.*;public class MBADriver {public static class Combination {public static <T extends Comparable<? super T>> List<List<T>>findSortedCombinations(Collection<T> elements, int n){List<List<T>> result =new ArrayList<List<T>>();if(n==0){result.add(new ArrayList<T>());return result;}List<List<T>> combinations=findSortedCombinations(elements,n-1);for(List<T> combination:combinations){for(T element:elements){if(combination.contains(element)){continue;}List<T> list=new ArrayList<T>();list.addAll(combination);if(list.contains(element)){continue;}list.add(element);Collections.sort(list);if(result.contains(list)){continue;}result.add(list);}}return result;}}public static class MBAMapper extends Mapper<LongWritable,Text,Text,IntWritable> {int numberOfPairs;public static final int DEFAULT_NUMBER_OF_PAIRS=2;protected void setup(Context context) throws IOException,InterruptedException{this.numberOfPairs=context.getConfiguration().getInt("number.of.pairs",DEFAULT_NUMBER_OF_PAIRS);}public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{String line=value.toString();List<String> items=convertItems(line);if((items==null)||items.isEmpty()){return ;}generateMapperOutput(numberOfPairs,items,context);}private static List<String> convertItems(String line){if((line==null)||line.length()==0){return null;}String[] tokens=line.split(",");if((tokens==null)||(tokens.length==0)){return null;}List<String> items=new ArrayList<String>();for(String token:tokens){if(token!=null){items.add(token.trim());}}return items;}private void generateMapperOutput(int numberOfPairs, List<String> items, Context context)throws IOException, InterruptedException{List<List<String>> sortedCombinations = com.deng.MBA.Combination.findSortedCombinations(items,numberOfPairs);for(List<String> itemsList: sortedCombinations){context.write(new Text(itemsList.toString()),new IntWritable(1));}}}public static class MBAReduce extends Reducer<Text,IntWritable,Text,IntWritable> {public void reduce(Text key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{int sum=0;for(IntWritable value:values){sum+=value.get();}context.write(key,new IntWritable(sum));}}public static void main(String[] args) throws Exception{FileUtil.deleteDirs("output");Configuration conf=new Configuration();String[] otherArgs=new String[]{"input/MBA","output","3"};if(otherArgs.length!=3){System.out.println("参数错误");}Job job=new Job(conf,"MBA");job.getConfiguration().setInt("number.of.pairs",Integer.parseInt(otherArgs[2]));FileInputFormat.addInputPath(job,new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));job.setJarByClass(MBADriver.class);job.setMapperClass(MBAMapper.class);job.setReducerClass(MBAReduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputValueClass(Text.class);job.setOutputKeyClass(IntWritable.class);job.setCombinerClass(MBAReduce.class);System.exit(job.waitForCompletion(true)?0:1);}
}
写在最后
代码是个很神奇的东西,多看看别人优秀的代码才能提升自己的代码风格