Hadoop:MapReduce应用

article/2025/9/25 9:17:52

文章目录

  • 一、Join多种应用
    • 1.1 Reduce Join
    • 1.2 Map Join
  • 二、计数器应用
  • 三、数据清洗(ETL)
  • 四、MapReduce开发总结

一、Join多种应用

1.1 Reduce Join

Reduce Join工作原理:

Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段打标志)分开,最后进行合并即可。

案例:

需求: 将商品信息表中数据根据商品pid合并到订单数据表中
order.txt:

idpidamount
1001011
1002022
1003033
1004014

pd.txt:

pidpname
01小米
02华为
03联想

期望获得数据:

idpnameamount
1001小米1
1004小米4
1002华为2
1003格力3

代码实现:

OrderBean实体:

public class OrderBean implements WritableComparable<OrderBean> {private String id;private String pid;private int amount;private String pname;public OrderBean() {}@Overridepublic int compareTo(OrderBean o) {int compare = this.pid.compareTo(o.pid);if (compare == 0) {return o.pname.compareTo(this.pname);} else {return compare;}}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(id);out.writeUTF(pid);out.writeInt(amount);out.writeUTF(pname);}@Overridepublic void readFields(DataInput in) throws IOException {this.id = in.readUTF();this.pid = in.readUTF();this.amount = in.readInt();this.pname = in.readUTF();}//省略getter、setter、toString方法...
}

Mapper类

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {private OrderBean orderBean = new OrderBean();private String fileName;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {FileSplit fs = (FileSplit) context.getInputSplit();fileName = fs.getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] fields = value.toString().split("\t");//根据文件名来创建OrderBean对象if ("order.txt".equals(fileName)){orderBean.setId(fields[0]);orderBean.setPid(fields[1]);orderBean.setAmount(Integer.parseInt(fields[2]));orderBean.setPname("");}else {orderBean.setPid(fields[0]);orderBean.setPname(fields[1]);orderBean.setId("");orderBean.setAmount(0);}context.write(orderBean,NullWritable.get());}
}

Reducer类:

public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {@Overrideprotected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {Iterator<NullWritable> vars = values.iterator();//指针下移获取第一个OrderBeanvars.next();String pname = key.getPname();while (vars.hasNext()) {//指针下移,其对应的key也变化了vars.next();key.setPname(pname);context.write(key, NullWritable.get());}}
}

分组Comparator类:

public class OrderComparator extends WritableComparator {public OrderComparator() {super(OrderBean.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {OrderBean oa = (OrderBean) a;OrderBean ob = (OrderBean) b;return oa.getPid().compareTo(ob.getPid());}
}

驱动Driver

public class OrderDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(OrderDriver.class);job.setMapperClass(OrderMapper.class);job.setReducerClass(OrderReducer.class);job.setMapOutputKeyClass(OrderBean.class);job.setMapOutputValueClass(NullWritable.class);job.setOutputKeyClass(OrderBean.class);job.setOutputValueClass(NullWritable.class);//设置分组Comparatorjob.setGroupingComparatorClass(OrderComparator.class);FileInputFormat.setInputPaths(job, new Path("D:\\MyFile\\test"));//指定_SUCCESS文件的位置FileOutputFormat.setOutputPath(job, new Path("d:\\output"));boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

缺点:
Reduce Join合并的操作是在Reduce阶段完成的,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。

解决方案: 使用Map Join

1.2 Map Join

使用场景:

Map Join适用于一张表非常小、另一表非常大的场景。

Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据压力,尽可能的减少数据倾斜。

实现方式:

  1. DistributedCacheDriver缓存小文件
  2. MapsetUp()方法中读取缓存文件

代码:

Mapper类:

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {private OrderBean orderBean = new OrderBean();private Map<String, String> pMap = new HashMap<>();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {URI[] cacheFiles = context.getCacheFiles();String path = cacheFiles[0].getPath();/*** 使用FSDataInputStream会中文乱码*/
//        FileSystem fs = FileSystem.get(context.getConfiguration());
//        FSDataInputStream fis = fs.open(new Path(path));BufferedReader fis = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));String line;while (StringUtils.isNotEmpty(line = fis.readLine())) {String[] fields = line.split("\t");pMap.put(fields[0], fields[1]);}IOUtils.closeStream(fis);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] fields = value.toString().split("\t");String pname = pMap.get(fields[1]);pname = pname == null ? "" : pname;orderBean.setId(fields[0]);orderBean.setPid(fields[1]);orderBean.setAmount(Integer.parseInt(fields[2]));orderBean.setPname(pname);context.write(orderBean, NullWritable.get());}
}

驱动Driver

public class OrderDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(OrderDriver.class);job.setMapperClass(OrderMapper.class);FileInputFormat.setInputPaths(job, new Path("D:\\MyFile\\test"));//指定_SUCCESS文件的位置FileOutputFormat.setOutputPath(job, new Path("d:\\output"));//加载缓存数据job.addCacheFile(new URI("file:///d:/MyFile/cache/pd.txt"));//Map端Join的逻辑不需要Reduce阶段,设置ReduceTask数量为0job.setNumReduceTasks(0);boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

二、计数器应用

Hadoop为每个作业维护若干内置计数器,以描述多项指标。例如,某些计数器记录已处理的字节数和记录数,使用户监控已处理的输入数据量和已产生的输出数据量。

  1. 采用枚举的方式统计计数
enum MyCounter{MALFORORMED,NORMAL}
//对枚举定义的自定义计数器加1
context.getCounter(MyCounter.MALFORORMED).increment(1);
  1. 采用计数组、计数器名称的方式统计
context.getCounter("counterGroup","counter").increment(1);
  1. 计数结果在程序运行后的控制台上查看

三、数据清洗(ETL)

在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。

需求: 去除日志中字段长度小于等于11的日志
在这里插入图片描述Mappper类:

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{Text k = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 1 获取1行数据String line = value.toString();// 2 解析日志boolean result = parseLog(line,context);// 3 日志不合法退出if (!result) {return;}// 4 设置keyk.set(line);// 5 写出数据context.write(k, NullWritable.get());}// 2 解析日志private boolean parseLog(String line, Context context) {// 1 截取String[] fields = line.split(" ");// 2 日志长度大于11的为合法if (fields.length > 11) {// 系统计数器context.getCounter("map", "true").increment(1);return true;}else {context.getCounter("map", "false").increment(1);return false;}}
}

驱动Driver:

public class LogDriver {public static void main(String[] args) throws Exception {// 1 获取job信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 加载jar包job.setJarByClass(LogDriver.class);// 3 关联mapjob.setMapperClass(LogMapper.class);// 4 设置最终输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 设置reducetask个数为0job.setNumReduceTasks(0);// 5 设置输入和输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 6 提交job.waitForCompletion(true);}
}

计数效果:
在这里插入图片描述

四、MapReduce开发总结

在编写MapReduce程序时,需要考虑的几个方面:

①输入数据接口:InputFormat

  1. 默认使用的实现类是:TextInputFormat
  2. TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回
  3. KeyValueTextInputFormat每一行均为一条记录,被分隔符分割为keyvalue。默认分隔符是tab (\t)
  4. NlineInputFormat按照指定的行数N来划分切片。
  5. CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率
  6. 用户还可以自定义InputFormat

②逻辑处理接口:Mapper

用户根据业务需求实现其中三个方法:map()setup()cleanup ()

Partitioner分区

有默认实现HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces

如果业务上有特别的需求,可以自定义分区。

Comparable排序

当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法

部分排序:对最终输出的没一个文件进行内部排序
全排序:对所有数据进行排序,通常只有一个Reduce
二次排序:排序的条件有两个
辅助排序:可以让不同的key进入到同一个ReduceTask

Combiner合并

Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果。

Reduce端分组:Groupingcomparator

Reduce端对key进行分组。应用于:在接收的KeyBean对象时,想让一个或几个字段相同(全部字段比较不相同)的Key进入到同一个Reduce方法时,可以采用分组排序。

⑦逻辑处理接口:Reducer

用户根据业务需求实现其中三个方法: reduce()setup()cleanup()

⑧输出数据接口:OutputFormat

默认实现类是TextOutputFormat,功能逻辑是:将每一个KV对向目标文本文件中输出为一行。

用户还可以自定义OutputFormat


http://chatgpt.dhexx.cn/article/q8GNpv0f.shtml

相关文章

大数据分析项目实例:Hadoop数据分析应用场景

对于海量数据价值的挖掘&#xff0c;需要通过大数据分析来实现&#xff0c;而这些数据由于具有不同于传统数据的新特征&#xff0c;传统的数据分析技术和工具都不能高效的进行处理&#xff0c;因而才有了基于大数据技术平台进行大数据分析的需求。今天&#xff0c;我们以Hadoop…

Hadoop常见场景

本篇文章主要列举一些Hadoop常用场景 ​ 主要是以下几种 ​ 高可用集群 ​ 节点新增/减少/拉黑 ​ HDFS数据迁移 ​ 大量小文件存储 ​ 高可用集群 ​ 一句话概括 双namenode消除单点故障 ​ 过程&#xff1a; ​ 对active Namenode进行的任何操作&#xff0c;都会同…

Hadoop的优势及大数据平台系统架构典型行业应用场景

扩容能力强&#xff1a;Hadoop可以部署在数百台并行运行的廉价服务器集群&#xff0c;能提供成百上千TB的数据节点上运行的高度可扩展的存储与计算平台。 成本低&#xff1a;Hadoop可以通过普通廉价的服务器集群分布式处理数据&#xff0c;从而降低成本。 高效率&#xff1a;…

大数据利器:Hadoop的十大应用场景[转]

【IT168 评论】谁在用Hadoop?这是个问题。在大数据背景下&#xff0c;Apache Hadoop已经逐渐成为一种标签性&#xff0c;业界对于这一开源分布式技术的了解也在不断加深。但谁才是Hadoop的最大用户呢?首先想到的当然是它的“发源地”&#xff0c;像Google这样的大型互联网搜索…

金三银四、金九银十 面试宝典 Spring、MyBatis、SpringMVC面试题 超级无敌全的面试题汇总(超万字的面试题,让你的SSM框架无可挑剔)

Spring、MyBatis、SpringMVC 框架 - 面试宝典 又到了 金三银四、金九银十 的时候了&#xff0c;是时候收藏一波面试题了&#xff0c;面试题可以不学&#xff0c;但不能没有&#xff01;&#x1f941;&#x1f941;&#x1f941; 一个合格的 计算机打工人 &#xff0c;收藏夹里…

Spring 常见面试题

目录 Spring 基础 1、什么是 Spring 框架? 2、Spring 包含的模块有哪些&#xff1f; 3、Spring,Spring MVC,Spring Boot 之间什么关系? Spring IoC 4、谈谈自己对于 Spring IoC 的了解 5、什么是 Spring Bean&#xff1f; 6、将一个类声明为 Bean 的注解有哪些? 7、…

Spring框架常见面试题

1. 你对Spring框架的理解(特点)&#xff1f; Spring框架有哪些模块 &#xff1f; Spring&#xff0c;一种用来简化企业应用级开发的一种开源框架。简化开发&#xff1a;它对常用的API做了封装&#xff0c;比如对JDBC的封装&#xff0c;使用Spring JDBC访问数据库&#xff0c;就…

【面试】Spring面试题

文章目录 Spring概述什么是spring?Spring的俩大核心概念Spring框架的设计目标&#xff0c;设计理念&#xff0c;和核心是什么Spring的优缺点是什么&#xff1f;Spring有哪些应用场景Spring由哪些模块组成&#xff1f;Spring 框架中都用到了哪些设计模式&#xff1f;详细讲解一…

Spring框架面试精华知识

咳咳&#xff0c;这里整理一些关于WEB框架的一些知识点&#xff0c;来帮助更多的人进行面试&#xff0c;主要的还是讲解一些关于目前企业用得比较流行的框架&#xff0c;还有就是一些比较前沿的框架&#xff0c;比如阿里开源的Dubbo框架及其开源框架Zookeeper等等&#xff0c;都…

Spring框架-面试题核心概念

目录 1.Spring框架的作用是什么&#xff1f; 2. 什么是DI&#xff1f; 3.什么是AOP&#xff1f; 4.Spring常用注解 5.Spring中的设计模式 6.Spring支持的几种bean的作用域 7.Spring中Bean的生命周期&#xff1f; 8.Spring中的事务管理 9.Spring中的依赖注入方式有几种 10.Sprin…

MyBatis+Spring+SpringMVC框架面试题整理(一)

目录 SpringMVC 的工作原理 谈谈你对SpringMVC的理解 SpringMVC 常用注解都有哪些? Spring 的常用注解 如何开启注解处理器和适配器? 如何解决 get 和 post 乱码问题? 谈谈你对 Spring 的理解 Spring 中的设计模式(解释) 简单介绍一下 Spring bean 的生命周期&…

面试官:看你简历上写熟悉Spring框架,谈谈对Spring的理解

今天只谈一下我们在面试中的如果面试官问到Spring&#xff0c;你应该怎么去回答&#xff0c;来获取面试官的青睐。在我的印象中&#xff0c;Spring框架可以说是Java世界里面最为成功的框架了&#xff0c;在企业的实际的应用里面&#xff0c;大部分的企业架构都是基于Spring框架…

说实话,面试这么问Spring框架的问题,我快扛不住了

面试官&#xff1a;Spring Framework有用过吧? 小小白&#xff1a;用过(有些心虚&#xff0c;因为Spring框架中内容太多了)。 面试官&#xff1a;在applicationgContext.xml文件中定义了一个bean&#xff0c;id为authService&#xff0c;通过ApplicationContext实例对象的getB…

【java面试】框架篇之Spring

1.你如何理解Spring? 具体来说Spring是一个轻量级的容器&#xff0c;用于管理业务相关对象的。核心功能主要为&#xff1a;IOC,AOP,MVC。 IOD&#xff1a;控制反转&#xff0c;将对象的创建过程交给容器&#xff0c;让容器管理对象的生命周期如创建&#xff0c;初始化&#…

spring框架-如何面试(四)

回顾&#xff1a; spring框架-认识spring框架&#xff08;一&#xff09; spring框架-认识IOC&#xff08;二&#xff09; spring框架-认识AOP&#xff08;三&#xff09; 面试官关于spring最喜欢、也是概率最大的提问 谈谈你对spring的理解 spring的官方定义&#xff1a;…

Spring框架面试题总结(面试必备)

1. spring 概述部分 1.1 什么是spring? 1.2 spring框架的核心? 1.3 spring框架的优缺点 1.4 spring5 的主要模块构成 1.5 Spring 框架中都用到了哪些设计模式&#xff1f; 1.6 讲解一下核心容器&#xff08;spring context应用上下文) 模块 2. spring的控制反转&#xff08…

cmos管宽长比,OC, OD门和线与逻辑,传输门,竞争冒险,三态门

https://blog.csdn.net/vivid117/article/details/100187137 pmos,nmos宽长比https://blog.csdn.net/qq_34070723/article/details/89291200 cmos宽长比&#xff1a; 1.CMOS的宽长比 关于COMS原理及结构图可以参考[1]COMS原理及门电路设计. 栅在源漏方向的长度称作栅的长L&…

OD门OC门

什么是集电极开路&#xff08;OC&#xff09;&#xff1f; % {0 V: X2 {1 I2 s E( y; n! S 我们先来说说集电极开路输出的结构。集电极开路输出的结构如图1所示&#xff0c;右边的那个三极管集电极什么都不接&#xff0c;所以叫做集电极开路&#xff08;左边的三极管为反相之用…

最强深入浅出上拉、OC门、OD门 —— 电子电路基础2

写在前面&#xff1a;对于本科阶段数电课程课时比较少的学校&#xff0c;这一块老师很可能就跳过了&#xff0c;对于这一块的知识来源&#xff0c;清华大学阎石的数电课本在第三章CMOS门电路和TTL门电路。 先看一些电路&#xff1b; &#xff08;1&#xff09;为什么有些时候单…

女朋友去应聘硬件工程师,面试官考她OC门、OD门和推挽输出!

硬件工程师的笔试题中经常会考OC门和OD门&#xff0c;有的放在面试中问&#xff0c;多个OC或者多个OD门能组成线与结构&#xff0c;线与结构考的更频繁&#xff0c;还有一个推挽输出&#xff0c;有一些单片机的GPIO用的就是这种结构&#xff0c;如最常见的STM32。 原创博主AirC…