目录
UDF
GenericUDF
Java开发转了大数据,竟然被拉去做了非结构的ETL抽取,真的是比做后端伤脑筋,没有可借鉴的框架,只能根据数据进行抽取,第一份大数据实习,写完抽取代码后,需要写成UDF和UDTF进行使用。
简单意思:
UDF: 一对一,输入一笔数据输出一笔数据
UDTF:一对多,输入一笔数据输出多笔数据 (接受0个或多个输入然后产生多列或多行输出。)
UDAF:多对一,输入多笔数据输出一笔数据
记录一下UDF和GenericUDF的区别:
UDF属于基础的UDF:
简单的udf实现很简单,只需要继承udf,然后实现evaluate()方法就行了。evaluate()允许重载。
UDF
对于自定义函数现在需要进行总结一下:
pom文件:主要为打包文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>UDF</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.3.0</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.3.0</version><scope>provided</scope></dependency></dependencies><build><sourceDirectory>src</sourceDirectory><defaultGoal>compile</defaultGoal><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>
package com.demo;import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;/*** 最简单的UDF 继承 org.apache.hadoop.hive.ql.exec.UDF*/
@Description(name = "wordUDF",value = "_FUNC_(String word) - Returns result",extended ="Example:\\n > SELECT _FUNC_(\\'你好\\') FROM src LIMIT 1;\\n \\'2022新年快乐:你好\\'\""
)public class WordSingleUDF extends UDF {public String evaluate(String args) {return "2022新年快乐:"+args;}public static void main(String[] args) {System.out.println(new WordSingleUDF().evaluate("你好"));}}
进行打包上传:
1. add jar /home/zhaohai.li/tmp/UDF-1.0-SNAPSHOT-jar-with-dependencies.jar
2. create temporary function udf_word as 'com.demo.WordSingleUDF';
3. select udf_word('hello')
显示:
2022新年快乐:hello
GenericUDF
这个函数需要进行实现多个方法
GenericUDF的有点 可以处理复杂的数据类型,所以它能处理更为复杂的数据类型场景。
在进行继承GenericUDF 时需要进行实现三个方法:
必须实现的函数:ObjectInspector initialize(ObjectInspector[] arguments) //初始化操作,在函数进行初始化的时候会执行,其他时间不执行Object evaluate(DeferredObject[] arguments) //进行业务计算逻辑,处理具体的数据String getDisplayString(String[] children)//进行函数描述结果的显示,只有当函数执行一场才会显示其余的函数:
configure(MapredContext context) //在函数初始化之前,进行设置mapContext
package main.java.com.demo;import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import java.util.ArrayList;
import java.util.Date;public class WordUDF extends GenericUDF {private static int mapTasks = 0;private static String init = "";private transient ArrayList ret = new ArrayList();@Overridepublic void configure(MapredContext context) {System.out.println(new Date() + "configure mapredContext");if (null != context) {//从jobConf中获取map数mapTasks = context.getJobConf().getNumMapTasks();}System.out.println(new Date() + "######## mapTasks [" + mapTasks + "] ..");}/*** 初始化函数 能够定义返回的数据类型** @param objectInspectors* @return* @throws UDFArgumentException*/@Overridepublic ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {System.out.println("1. init start:udfName" + this.getUdfName() + new Date());//初始化文件系统,可以在这里初始化读取文件等init = "init";//定义函数的返回类型为java的ListObjectInspector returnOI = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING);return ObjectInspectorFactory.getStandardListObjectInspector(returnOI);}/*** 评估计算业务逻辑* @param args* @return* @throws HiveException*/@Overridepublic Object evaluate(DeferredObject[] args) throws HiveException {System.out.println("2. deal with the data process " + new Date());ret.clear();if(args.length < 1) return ret;//获取第一个参数String str = args[0].get().toString();String[] s = str.split(",",-1);for(String word : s) {ret.add(word);}return ret;}@Overridepublic String getDisplayString(String[] strings) {return "Usage: Lxw1234GenericUDF(String str)";}public static void main(String[] args) {}
}
UDTF
package main.java.com.demo;import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import java.util.ArrayList;
import java.util.List;public class MyUDTF extends GenericUDTF {private ArrayList<String> outList = new ArrayList<>();@Overridepublic StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {//1.定义输出数据的列名和类型List<String> fieldNames = new ArrayList<>();List<ObjectInspector> fieldOIs = new ArrayList<>();//2.添加输出数据的列名和类型fieldNames.add("lineToWord");fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);}@Overridepublic void process(Object[] args) throws HiveException {//1.获取原始数据String arg = args[0].toString();//2.获取数据传入的第二个参数,此处为分隔符String splitKey = args[1].toString();//3.将原始数据按照传入的分隔符进行切分String[] fields = arg.split(splitKey);//4.遍历切分后的结果,并写出for (String field : fields) {//集合为复用的,首先清空集合outList.clear();//将每一个单词添加至集合outList.add(field);//将集合内容写出forward(outList);}}@Overridepublic void close() throws HiveException {}}
UDAF
UDAF已经失效 需要去实现 implement
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2 或者 extend org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver能看到实际上
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver也是实现的org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2
那我们直接进行继承父类 AbstractGenericUDAFResolver (自己可做选择)
要先了解UDAF的四个阶段,定义在GenericUDAFEvaluator的Mode枚举中:
COMPLETE:如果mapreduce只有map而没有reduce,就会进入这个阶段;
PARTIAL1:正常mapreduce的map阶段;
PARTIAL2:正常mapreduce的combiner阶段;
FINAL:正常mapreduce的reduce阶段;