MapReduce 编程实战

article/2025/10/16 17:54:03

MapReduce 采用了「分而治之」的思想。在分布式计算中,MapReduce 框架负责处理并行编程中分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等复杂问题,把处理过程高度抽象为两个函数:map,把一个任务分解成多个任务;reduce,将任务的处理结果汇总。MapReduce 处理的数据集(或任务)必须具备这样的特点:待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。

在 Hadoop 中,每个 MapReduce 任务都被初始化为一个 Job,每个 Job 又可以分为两种阶段:map 阶段和 reduce 阶段,分别用两个函数表示。map 函数接收一个 <key, value> 形式的输入,同样产生一个 <key, value> 形式的输出;Hadoop 函数接收一个如 <key, List> 形式的输入,然后对 list 进行处理,每个 reduce 产生 0 或 1 个输出,reduce 的输出也是 <key, value> 形式的。

 

数据去重 

编写测试文件并上传

cat

 cat

put

put 

 

上传jar包,执行程序

jar

jar

查看输出文件

cat

 cat

数据排序

编写测试文件并上传

cat

cat

 

put

put

 

上传jar包,并执行程序

jar

查看输出文件

cat

cat

 

单表关联

编写测试文件并上传

cat

cat

 

上传jar包,并执行程序

jar

jar

 

查看输出文件

cat

cat

 

参考代码

数据去重

public class RemoveDeduplication {/*** map 将输入中的 value 复制到输出数据的 key 上,并直接输出*/public static class Map extends Mapper<Object, Text, Text, Text> {// 每行数据private static Text line = new Text();@Overridepublic void map(Object key, Text value, Context context) throws IOException, InterruptedException {line = value;context.write(line, new Text(""));}}/*** reduce 将输入中的 key 复制到输出数据的 key 上,并直接输出*/public static class Reduce extends Reducer<Text, Text, Text, Text> {@Overridepublic void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {context.write(key, new Text(""));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();// 这句话很关键// conf.set("mapred.job.tracker", "192.168.1.2:9001");String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: Data Deduplication <in> <out>");System.exit(2);}Job job = Job.getInstance(conf, "Data Deduplication");job.setJarByClass(RemoveDeduplication.class);// 设置 Map、Combine 和 Reduce 处理类job.setMapperClass(Map.class);job.setCombinerClass(Reduce.class);job.setReducerClass(Reduce.class);// 设置输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 设置输入和输出目录FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}

 

数据排序

public class DataSort {/*** map 将输入中的 value 化成 IntWritable 类型,作为输出的 key*/public static class Map extends Mapper<Object, Text, IntWritable, IntWritable> {private static IntWritable data = new IntWritable();@Overridepublic void map(Object key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();data.set(Integer.parseInt(line));context.write(data, new IntWritable(1));}}/*** reduce 将输入中的 key 复制到输出数据的 key 上,然后根据输入的 value-list 中元素的个数决定 key 的输出次数。* 用全局 lineNum 来代表 key 的位次*/public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {private static IntWritable lineNum = new IntWritable(1);@Overridepublic void reduce(IntWritable key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {for (IntWritable val : values) {context.write(lineNum, key);lineNum = new IntWritable(lineNum.get() + 1);}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();// 这句话很关键// conf.set("mapred.job.tracker", "192.168.1.2:9001");String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: Data Sort <in> <out>");System.exit(2);}Job job = Job.getInstance(conf, "Data Sort");job.setJarByClass(DataSort.class);// 设置 Map 和 Reduce 处理类job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);// 设置输出类型job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);// 设置输入和输出目录FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}

 

单表关联

public class SimpleTableJoin {public static int time = 0;/*** map 将输出分割 child 和 parent,然后正序输出一次作为右表,* 反序输出一次作为左表,需要注意的是在输出的 value 中必须加上左右表的区别标识。*/public static class Map extends Mapper<Object, Text, Text, Text> {@Overridepublic void map(Object key, Text value, Context context) throws IOException, InterruptedException {String childName;String parentName;// 左右表标识String relationType;// 输入的一行预处理文本StringTokenizer itr = new StringTokenizer(value.toString());String[] values = new String[2];if (itr.hasMoreTokens()) {values[0] = itr.nextToken();}if (itr.hasMoreTokens()) {values[1] = itr.nextToken();}if (values[0].compareTo("child") != 0) {childName = values[0];parentName = values[1];// 输出左表relationType = "1";context.write(new Text(values[1]), new Text(relationType + "+" + childName + "+" + parentName));// 输出右表relationType = "2";context.write(new Text(values[0]), new Text(relationType + "+" + childName + "+" + parentName));}}}public static class Reduce extends Reducer<Text, Text, Text, Text> {@Overridepublic void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {// 输出表头if (0 == time) {context.write(new Text("grandchild"), new Text("grandparent"));time++;}int grandchildnum = 0;String[] grandchild = new String[10];int grandparentnum = 0;String[] grandparent = new String[10];for (Text value : values) {String record = value.toString();int len = record.length();int i = 2;if (0 == len) {continue;}// 取得左右表标识char relationType = record.charAt(0);// 定义孩子和父母变量StringBuilder childName = new StringBuilder();StringBuilder parentName = new StringBuilder();// 获取 value-list 中 value 的 childwhile (record.charAt(i) != '+') {childName.append(record.charAt(i));i++;}i = i + 1;// 获取 value-list 中 value 的 parentwhile (i < len) {parentName.append(record.charAt(i));i++;}// 左表,取出 child 放入 grandchildrenif ('1' == relationType) {grandchild[grandchildnum] = childName.toString();grandchildnum++;}// 右表,取出 parent 放入 grandparentif ('2' == relationType) {grandparent[grandparentnum] = parentName.toString();grandparentnum++;}}// grandchild 和 grandparent 数组求笛卡尔儿积if (0 != grandchildnum && 0 != grandparentnum) {for (int m = 0; m < grandchildnum; m++) {for (int n = 0; n < grandparentnum; n++) {// 输出结果context.write(new Text(grandchild[m]), new Text(grandparent[n]));}}}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();// 这句话很关键// conf.set("mapred.job.tracker", "192.168.1.2:9001");String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: Single Table Join <in> <out>");System.exit(2);}Job job = Job.getInstance(conf, "Single Table Join");job.setJarByClass(SimpleTableJoin.class);// 设置 Map 和 Reduce 处理类job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);// 设置输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 设置输入和输出目录FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}

 


http://chatgpt.dhexx.cn/article/jUMsJYfQ.shtml

相关文章

实验三-MapReduce编程

前提&#xff1a;安装好Hadoop 参考文章&#xff1a; MapReduce编程实践(Hadoop3.1.3)_厦大数据库实验室博客 实验要求 基于MapReduce执行“词频统计”任务。 将提供的A&#xff0c;B&#xff0c;C文件上传到HDFS上&#xff0c;之后编写MapReduce代码并将其部署到hadoop&…

MapReduce编程模型

1.MapReduce简介 MapReduce是一个分布式运算程序的编程框架&#xff0c;核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序&#xff0c;并发运行在Hadoop集群上。 一个完整的mapreduce程序在分布式运行时有三类实例进程&#xff1a; MRAppMaste…

MapReduce编程框架

1、MapReduce思想 MapReduce思想在生活中处处可见。我们或多或少都曾接触过这种思想。MapReduce的思想核心是分而治之&#xff0c;充分利用了并行处理的优势。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想&#xff0c;而不是自己原创。 MapReduce任务过程是分为…

MapReduce编程实践

MapReduce编程实践 重要知识点&#xff1a; MapReduce是一种分布式并行编程模型,是Hadoop核心子项目之一,如果已经安装了Hadoop&#xff0c;就不需要另外安装MapReduce。主要的理论知识点包括&#xff1a;MapReduce概述、MapReduce的工作流程&#xff0c;WordCount实例分析&a…

mapreduce 编程模型

MapReduce是在总结大量应用的共同特点的基础上抽象出来的分布式计算框架&#xff0c;它适用的应用场景往往具有一个共同的特点&#xff1a;任务可被分解成相互独立的子问题。基于该特点&#xff0c;MapReduce编程模型给出了其分布式编程方法&#xff0c;共分5个步骤&#xff1a…

MapReduce编程基础

&#xff08;一&#xff09;实现词频统计的基本的MapReduce编程。 ①在/user/hadoop/input文件夹(该文件夹为空)&#xff0c;创建文件wordfile1.txt和wordfile2.txt上传到HDFS中的input文件夹下。 文件wordfile1.txt的内容如下&#xff1a; I love Spark I love Hadoop 文件wor…

(超详细)MapReduce工作原理及基础编程

MapReduce工作原理及基础编程&#xff08;代码见文章后半部分&#xff09; JunLeon——go big or go home 目录 MapReduce工作原理及基础编程&#xff08;代码见文章后半部分&#xff09; 一、MapReduce概述 1、什么是MapReduce&#xff1f; 2、WordCount案例解析MapRed…

【小白视角】大数据基础实践(五) MapReduce编程基础操作

目录 1. MapReduce 简介1.1 起源1.2 模型简介1.3 MRv1体系结构1.4 YARN1.4.1 YARN体系结构1.4.2 YARN工作流程 2. MapReduce 工作流程3. Java Api要点4. 实验过程最后 1. MapReduce 简介 1.1 起源 在函数式语言里&#xff0c;map表示对一个列表&#xff08;List&#xff09;中…

MapReduce编程

一、MapReduce编程规范 MapReduce的开发一共又八个步骤&#xff0c;其中Map阶段分为2个步骤&#xff0c;Shuffle阶段4个步骤&#xff0c;Reduce阶段分为2个步骤。 1.1 步骤流程 Map阶段2个步骤 设置InputFormat类&#xff0c;将数据切分为key-value&#xff08;k1和v1&#x…

SSL/TLS

SSL/TLS 一、SSL/TLS1.1 历史发展1.2 使用场景1.3 解决的问题1.4 工作流程 二、对称加密&#xff08;Symmetric Cryptography&#xff09;2.1 工作原理2.2 翻转攻击2.3 认证加密&#xff08;Authentication Encryption&#xff09;2.4 Diffie-Hellman2.5 KDF2.6 Diffie-Hellman…

HTTPS,SSL,TLS

SSL TLS secure sockets layer 安全套接字层&#xff0c;Netscape公司研发。 transport layer security 安全传输层协议 定义 协议 年份 SSL 1.0 未知 SSL 2.0 1995 SSL 3.0 1996 TLS 1.0 1999 TLS 1.1 2006 TLS 1.2 2008 TLS 1.3 2018 IETF&#xff08;The…

TLS传输协议

TLS&#xff1a;安全传输层协议&#xff08;TLS&#xff09;用于在两个通信应用程序之间提供保密性和数据完整性。 该协议由两层组成&#xff1a;TLS 记录协议&#xff08;TLS Record&#xff09;和 TLS 握手协议&#xff08;TLS Handshake&#xff09;。 传输层安全性协议&a…

LVGL misc tlsf算法(lv_tlsf.c)

更多源码分析请访问:LVGL 源码分析大全 目录 1、概述2、算法特点3、同类型算法举例1、概述 LVGL采用的内存分配器是使用的tlsf算法。因为这个算法只是一个实时系统常用的算法,可以看作是一个工具,对LVGL本身并没有逻辑上的关联,所以这里只介绍一下算法的基本知识,就不过…

TLS/SSL 协议详解(17) Certificate verify

发送这个类型的握手需要2个前提条件 &#xff08;1&#xff09;&#xff1a;服务器端请求了客户端证书 &#xff08;2&#xff09;&#xff1a;客户端发送了非0长的证书 此时&#xff0c;客户端想要证明自己拥有该证书&#xff0c;必然需要私钥签名一段数据发给服务器验证。 …

HTTPS之TLS证书

文章目录 一. TLS概述1. TLS概述2. HTTPS 协议栈与 HTTP 的唯一区别3. TLS协议版本 二. TLS证书格式1. 概述2. 示例&#xff1a;知乎网站证书解析(mac系统)3. 通过openssl获取证书的含义 三. 证书链&#xff08;Certificate Chain&#xff09;1. 背景2. 概述3. 背景问题的解释 …

SSL和TLS简单概述

SSL和TLS简单概述 本文不会只有几个比较重要的概念,科普性质的文章,方便自己记忆,极大概率存在缺陷 如果想了解这方面的内容&#xff0c;请参阅官方文档。 SSL和TLS TLS是更安全版本的ssl,先出的的ssh,一个基于加密机制的应用,之后为了方便给其他应用层使用然后引入了ssl,最…

动态内存管理——tlsf

定义 TLSF(全称Two-Level Segregated Fit) 源码 https://github.com/mattconte/tlsf 代码 结构体 typedef struct block_header_t {/* 指向上一个物理块。*/struct block_header_t * prev_phys_block;/* 此块的大小&#xff0c;不包括块头。*/size_t size;/* 下一个和上一…

SSL与TLS协议详解

写在最前面的话&#xff1a;这篇文章是我借鉴了Eric Rescorla的《SSL and TLS》一书之后对该书的前半部分内容整合而做。如您需要开发围绕SSL、TLS的程序建议参阅原著或者RFC相关文档。 一、关于SSL、TLS与HTTPS的三两事 什么是SSL、TLS&#xff1a; 众所周知&#xff0c;真…

TLS协议/SSL协议

历史背景 SSL(Secure Socket Layer 安全套接层)是基于HTTPS下的一个协议加密层&#xff0c;最初是由网景公司&#xff08;Netscape&#xff09;研发&#xff0c;后被IETF&#xff08;The Internet Engineering Task Force - 互联网工程任务组&#xff09;标准化后写入&#xf…

TLS加密体系

谈到这个词&#xff0c;可能大家的第一印象就是加密&#xff0c;而对TLS了解甚少。那么在介绍 TLS 加密体系之前先来讲一讲加密。 一提到加密&#xff0c;可能很多人脑海中会浮现出电视剧里特务的场景&#xff0c;他们拿出一台电报机&#xff0c;“滴滴滴滴”按下情报报文&…