UDAF和UDF的介绍

article/2025/9/2 6:23:47

目录

UDF介绍

UDAF简介

关于UDAF的一个误区

使用UDF

在SQL语句中使用UDF

 直接对列应用UDF(脱离sql)

UDAF使用

继承UserDefinedAggregateFunction

继承Aggregator


 

UDF介绍

UDF(User Define Function),即用户自定义函数,Spark的官方文档中没有对UDF做过多介绍,猜想可能是认为比较简单吧。

几乎所有sql数据库的实现都为用户提供了扩展接口来增强sql语句的处理能力,这些扩展称之为UDXXX,即用户定义(User Define)的XXX,这个XXX可以是对单行操作的UDF,或者是对多行操作的UDAF,或者是UDTF,本次主要介绍UDF。

UDF的UD表示用户定义,既然有用户定义,就会有系统内建(built-in),一些系统内建的函数比如abs,接受一个数字返回它的绝对值,比如substr对字符串进行截取,它们的特点就是在执行sql语句的时候对每行记录调用一次,每调用一次传入一些参数,这些参数通常是表的某一列或者某几列在当前行的值,然后产生一个输出作为结果。

适用场景:UDF使用频率极高,对于单条记录进行比较复杂的操作,使用内置函数无法完成或者比较复杂的情况都比较适合使用UDF。

UDAF简介

先解释一下什么是UDAF(User Defined Aggregate Function),即用户定义的聚合函数,聚合函数和普通函数的区别是什么呢,普通函数是接受一行输入产生一个输出,聚合函数是接受一组(一般是多行)输入然后产生一个输出,即将一组的值想办法聚合一下。

 

关于UDAF的一个误区

我们可能下意识的认为UDAF是需要和group by一起使用的,实际上UDAF可以跟group by一起使用,也可以不跟group by一起使用,这个其实比较好理解,联想到mysql中的max、min等函数,可以:

1

select max(foo) from foobar group by bar;

表示根据bar字段分组,然后求每个分组的最大值,这时候的分组有很多个,使用这个函数对每个分组进行处理,也可以:

1

select max(foo) from foobar;

这种情况可以将整张表看做是一个分组,然后在这个分组(实际上就是一整张表)中求最大值。所以聚合函数实际上是对分组做处理,而不关心分组中记录的具体数量。

 

使用UDF

在SQL语句中使用UDF

在sql语句中使用UDF指的是在spark.sql("select udf_foo(…)")这种方式使用UDF,套路大致有以下几步:

1. 实现UDF,可以是case class,可以是匿名类

2. 注册到spark,将类绑定到一个name,后续会使用这个name来调用函数

3. 在sql语句中调用注册的name调用UDF

下面是一个简单的示例:

package cc11001100.spark.sql.udfimport org.apache.spark.sql.SparkSessionobject SparkUdfInSqlBasicUsageStudy {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[*]").appName("SparkUdfStudy").getOrCreate()import spark.implicits._// 注册可以在sql语句中使用的UDFspark.udf.register("to_uppercase", (s: String) => s.toUpperCase())// 创建一张表Seq((1, "foo"), (2, "bar")).toDF("id", "text").createOrReplaceTempView("t_foo")spark.sql("select id, to_uppercase(text) from t_foo").show()}}

运行结果:

image

 

 直接对列应用UDF(脱离sql)

在sql语句中使用比较麻烦,还要进行注册什么的,可以定义一个UDF然后将它应用到某个列上:

package cc11001100.spark.sql.udfimport org.apache.spark.sql.{SparkSession, functions}object SparkUdfInFunctionBasicUsageStudy {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[*]").appName("SparkUdfStudy").getOrCreate()import spark.implicits._val ds = Seq((1, "foo"), (2, "bar")).toDF("id", "text")val toUpperCase = functions.udf((s: String) => s.toUpperCase)ds.withColumn("text", toUpperCase('text)).show()}}

运行效果:

image

 

需要注意的是受Scala limit 22限制,自定义UDF最多接受22个参数,不过正常情况下完全够用了。

 

UDAF使用

继承UserDefinedAggregateFunction

使用UserDefinedAggregateFunction的套路:

1. 自定义类继承UserDefinedAggregateFunction,对每个阶段方法做实现

2. 在spark中注册UDAF,为其绑定一个名字

3. 然后就可以在sql语句中使用上面绑定的名字调用

 

下面写一个计算平均值的UDAF例子,首先定义一个类继承UserDefinedAggregateFunction:

package cc11001100.spark.sql.udafimport org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._object AverageUserDefinedAggregateFunction extends UserDefinedAggregateFunction {// 聚合函数的输入数据结构override def inputSchema: StructType = StructType(StructField("input", LongType) :: Nil)// 缓存区数据结构override def bufferSchema: StructType = StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)// 聚合函数返回值数据结构override def dataType: DataType = DoubleType// 聚合函数是否是幂等的,即相同输入是否总是能得到相同输出override def deterministic: Boolean = true// 初始化缓冲区override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0Lbuffer(1) = 0L}// 给聚合函数传入一条新数据进行处理override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {if (input.isNullAt(0)) returnbuffer(0) = buffer.getLong(0) + input.getLong(0)buffer(1) = buffer.getLong(1) + 1}// 合并聚合函数缓冲区override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}// 计算最终结果override def evaluate(buffer: Row): Any = buffer.getLong(0).toDouble / buffer.getLong(1)}

然后注册并使用它:

package cc11001100.spark.sql.udafimport org.apache.spark.sql.SparkSessionobject SparkSqlUDAFDemo_001 {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[*]").appName("SparkStudy").getOrCreate()spark.read.json("data/user").createOrReplaceTempView("v_user")spark.udf.register("u_avg", AverageUserDefinedAggregateFunction)// 将整张表看做是一个分组对求所有人的平均年龄spark.sql("select count(1) as count, u_avg(age) as avg_age from v_user").show()// 按照性别分组求平均年龄spark.sql("select sex, count(1) as count, u_avg(age) as avg_age from v_user group by sex").show()}}

使用到的数据集:

{"id": 1001, "name": "foo", "sex": "man", "age": 20}
{"id": 1002, "name": "bar", "sex": "man", "age": 24}
{"id": 1003, "name": "baz", "sex": "man", "age": 18}
{"id": 1004, "name": "foo1", "sex": "woman", "age": 17}
{"id": 1005, "name": "bar2", "sex": "woman", "age": 19}
{"id": 1006, "name": "baz3", "sex": "woman", "age": 20}

运行结果:

image

image

 

继承Aggregator

还有另一种方式就是继承Aggregator这个类,优点是可以带类型:

package cc11001100.spark.sql.udafimport org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}/*** 计算平均值**/
object AverageAggregator extends Aggregator[User, Average, Double] {// 初始化bufferoverride def zero: Average = Average(0L, 0L)// 处理一条新的记录override def reduce(b: Average, a: User): Average = {b.sum += a.ageb.count += 1Lb}// 合并聚合bufferoverride def merge(b1: Average, b2: Average): Average = {b1.sum += b2.sumb1.count += b2.countb1}// 减少中间数据传输override def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.countoverride def bufferEncoder: Encoder[Average] = Encoders.product// 最终输出结果的类型override def outputEncoder: Encoder[Double] = Encoders.scalaDouble}/*** 计算平均值过程中使用的Buffer** @param sum* @param count*/
case class Average(var sum: Long, var count: Long) {
}case class User(id: Long, name: String, sex: String, age: Long) {
}

运行结果:

image


http://chatgpt.dhexx.cn/article/dNmietsp.shtml

相关文章

FLUENT UDF并行化(1)

来源:ANSYS FLUENT UDF帮助文档,翻译自用,如有错误,欢迎指出! 本章概述了并行ANSYS Fluent的用户定义函数(UDF)及其用法。有关并行UDF功能的详细信息,请参见以下部分,本…

FLUENT UDF并行化(2)

来源:ANSYS FLUENT UDF帮助文档,翻译自用,如有错误,欢迎指出! 本章概述了并行ANSYS Fluent的用户定义函数(UDF)及其用法。有关并行UDF功能的详细信息,请参见以下部分,本…

udf开发入门(python udf、hive udf)

开发前的声明 udf开发是在数据分析的时候如果内置的函数解析不了的情况下去做的开发,比方说你只想拆分一个字段,拼接一个字段之类的,就不要去搞udf了,这种基本的需求自带函数完全支持,具体参数可参考文档: …

UDF的入门科普

新入门的小伙伴们好像对udf有一些疑问,那么今天就给大家整理一些udf的学习资料供大家参考。(公众号:刘华强仿真笔记) 01 UDF的基本概念 UDF的定义? UDF 是用户自己用C语言写的一个函数,可以和FLUENT动态链接 用UDF…

FLUENT中初识UDF——UDF的简单使用(1)

FLUENT中初识UDF——UDF的简单使用(1) 注意:不包含其他ANSYS基本操作流程!!!1:UDF的编写:2:ANSYS中导入UDF函数:2.1:首先打开Fluent,导…

对window的注册表进行优化

Regclean pro是一款优秀的注册表扫描、清理工具,由微软金牌合作伙伴Systweak开发。 它具有强大的Windows注册表检测及修复功能,可以帮助用户轻松而有效的清理、修复Windows 系统注册表中缺省的、被破坏的或残缺的系统参数,轻松提升系统性能。…

Windows注册表基本管理配置

一.注册表优化 注册表的优化分为几点: 1.系统安装是产生的无用信息 (1) 删除多余的时区 路径:计算机\HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVersion\Time Zones 删除除了China Standard Time中国时区以外的其他时区…

Windows与网络基础-19注册表维护与优化

目录 一、注册表维护 1.1注册表被破坏后的常见现象 1.2注册表被破坏的原因 1.3备份注册表 1.4恢复注册表 1.5锁定和解锁注册表 二、注册表的优化 2.1删除多余的DLL文件 2.2安装卸载应用程序的垃圾信息 2.3系统安装时产生的无用信息 2.3.1删除多余时区(必…

Win7注册表优化工具箱

软件名称:Win7注册表优化工具箱 软件版本:1.0.0.0 软件大小 1.04MB 适用环境: win7 软件性质: 国产软件 - 系统工具 - 优化设置 下载地址 http://pan.baidu.com/share/link?shareid104303&uk4160867570# 本文转自hai…

windows注册表

第一课 注册表基础 一、什么是注册表 注册表是windows操作系统、硬件设备以及客户应用程序得以正常运行和保存设置的核心“数据库”,也可以说是一个非常巨大的树状分层结构的数据库系统。 注册表记录了用户安装在计算机上的软件和每个程序的相互关联信息&#…

玩转电脑|盘点一下Windows 10 注册表系统优化【 InsCode Stable Diffusion 美图活动一期】

目录 什么是注册表 组成 Windows 注册表的层次结构 如何在 Windows 11/10 中创建注册表项 1、使用注册表编辑器 2、使用命令行 3、使用记事本创建REG文件 注意:在本文中主要就是使用记事本创建REG文件来修改。 注册表代码 修改状态栏透明度 任务栏时间显…

计算机系统的优化具体操作,注册表优化电脑内存的详细操作步骤

注册表优化电脑内存 当我们在使用电脑出现内存不足的情况下,通常会选择升级内存的方式来解决相关问题。而加了一个内存条之后,通常你会发现运行速度并没有等到比较显著的改善。只有在运行一些大程序的时候才能感觉到有所提升。这是因为系统会照顾到使用低…

win10清理注册表的方法

如果不需要某款软件之后,我们将其从电脑上卸载之后,如果不清理干净注册表信息的话可能日积月累会对电脑运行造成影响。那么win10如何清理注册表多余信息呢?今天小编就教下大家win10清理注册表的方法。 具体的方法如下: 1、在桌面…

Reg Organizer v8.75 注册表及系统清理优化工具

前言 Reg Organizer是一个非常好用的编辑、整理注册表文件软件,在导入reg文件前,会先分析结构,供您确认。它是一个功能丰富的应用程序,旨在编辑、清理和维护注册表,修复系统中的错误,并提高计算机性能。深…

暴力解决注册表删不掉的问题

今天在使用注册表编辑器修改注册表权限的时候,不小心把所有用户对某个注册表的权限全部拒绝了。等到我再想修改的时候,右键点击该文件,选择了“权限”后,弹出的编辑界面的“组或用户名”一栏中没有出现任何内容。倒腾了半天也实在…

Windows与网络基础:注册表基础和注册表维护与优化

学习目标 1.理解注册表概念 2.掌握注册表维护及优化方法 目录 一、注册表基础1、概述2、早期的注册表3、Windows 95后的注册表4、注册表结构4.1、注册表以树状结构进行呈现4.1.1子树(实际只有两棵子树,为了方便操作,分成了5棵子树)4.1.2、项…

No1.Windows10系统优化设置注册表项

Windows10.22H2.19045.2193 2022‎年‎11‎月‎4‎日 Windows Registry Editor Version 5.00#在锁屏界面上显示通知、提醒和VoIP来电、播放声音-关闭 [HKEY_CURRENT_USER\Software\Microsoft\Windows\CurrentVersion\Notifications\Settings] "NOC_GLOBAL_SETTING_AL…

注册表的维护与优化,硬核实操讲解

作者介绍大家好,我是柒烨~ 系列专栏 《网络安全 --> Windows与网络基础》 学习是一个好习惯,更是你和我每个人的成长手段,如何灵活应用且高效的去使用才是关键!不要让生活中的拖延占据你过多的时间,更要和生活中的…

Windows与网络基础-19-注册表维护与优化

目录 一、 注册表维护 1.1 注册表被破坏后的常见现象 1.2 注册表被破坏的原因 1.3 备份注册表 1.4 恢复注册表 1.5 注册表锁定和解锁 二、注册表优化 2.1 清除多余的DLL文件 2.2 安装卸载应用程序的垃圾信息 2.3 系统安装时产生的无用信息 一、 注册表维护 1.1 注册表被…