Spark 开发总结

article/2025/10/9 3:09:56

Spark 开发总结

  • 前言
  • spark UI
  • Spark API Function
    • Window.partitionBy
  • Spark udf
  • Spark 中禁止使用return
  • Spark NullPointException
  • Spark Shuffle FetchFailedException
  • spark 数据倾斜

前言

大数据开发过程中把自己积累的关于Spark的使用方法记录下来,便于不断的回顾和总结

spark UI

Spark API Function

Window.partitionBy

window 窗口函数在一组行(frame)上面执行计算,通过aggregate/window 函数给每行返回一个新的值,本质上按某个属性划分partition,在对每个partition进行聚合,使用的场景是计算每个partition中某个属性的平均值,总和,或者转化成集合;

overCategory = Window.partitionBy("depName").orderBy(desc("salary"))
df = empsalary.withColumn("salaries",collect_list("salary").over(overCategory))
.withColumn("average_salary",(avg("salary").over(overCategory)).cast("int"))
.withColumn("total_salary",sum("salary").over(overCategory))
.select("depName","empNo","name","salary","salaries","average_salary","total_salary")
df.show(20,False)

如数据集empsalary
在这里插入图片描述
在这里插入图片描述
输出结果:
在这里插入图片描述
注意:
spark函数中,只有Aggregate Functions 能够和 Window Functions搭配使用,其他类别的函数不能应用于Spark Window中,例如下面的一个例子,使用了函数array_contains,(collection functions的一种),spark会报错
在这里插入图片描述

overCategory = Window.partitionBy("depName")df = empsalary.withColumn(
"average_salary_in_dep",array_contains(col("hobby"),"game").over(overCategory)).withColumn("total_salary_in_dep",sum("salary").over(overCategory))
df.show()
## pyspark.sql.functions.array_contains(col,value)
## Collection 函数,return True if the array contains the given value.The collection elements and value must be of the same type
df = spark.createDataFrame([(['a', 'b', 'c'],),([],)],['data'])
df.select(array_contains(df.data,'a')).collect()
[Row(array_contains(data,a) = True,Row(array_contains(data,a) = False)]

参考:https://knockdata.github.io/spark-window-function-pyspark/

Spark udf

// 对于array<struct>类型的column 执行 udf(array<struct> => array<struct>) 返回结果仍是array<struct>的情况下,需要声明schema类型
val tmpschema = tmpadEffect.schema("crowd_dimen_group").dataType
val tmpcrowdDimensFilter = udf((crowd_dimen_groups: Seq[Row]) => {crowd_dimen_groups.filter(x => x.getAs[String]("frontend_dimension_id") == "306931917")}, tmpschema)

Spark 中禁止使用return

太头疼了,只能用if else , 代码不美观,后续有时间找找比较好的编码方式

Spark NullPointException

Spark开发中经常因为NullPointException问题导致整个任务失败不断重试,浪费大量时间;
最有效的解决方法是:
1、先尽量分步骤保存各阶段运行结果;
2、在上一步结果的基础上,不断来定位产生NullPointException的原因;
3、不要心怀侥幸,觉得随便加个判空就能解决问题,避免因判空不生效导致不断重试;
4、根据NullPoint的变量检查对应的DataFrame是否包含空值,然后添加过滤条件;

Spark Shuffle FetchFailedException

shuffle分为shuffle write和shuffle read两部分。
shuffle write的分区数由上一阶段的RDD分区数控制,shuffle read的分区数则是由Spark提供的一些参数控制。

shuffle write可以简单理解为类似于saveAsLocalDiskFile的操作,将计算的中间结果按某种规则临时放到各个executor所在的本地磁盘上。

shuffle read的时候数据的分区数则是由spark提供的一些参数控制。可以想到的是,如果这个参数值设置的很小,同时shuffle read的量很大,那么将会导致一个task需要处理的数据非常大。结果导致JVM crash,从而导致取shuffle数据失败,同时executor也丢失了,看到Failed to connect to host的错误,也就是executor lost的意思。有时候即使不会导致JVM crash也会造成长时间的gc。

解决办法
知道原因后问题就好解决了,主要从shuffle的数据量和处理shuffle数据的分区数两个角度入手。

减少shuffle数据

思考是否可以使用map side join或是broadcast join来规避shuffle的产生。

将不必要的数据在shuffle前进行过滤,比如原始数据有20个字段,只要选取需要的字段进行处理即可,将会减少一定的shuffle数据。

SparkSQL和DataFrame的join,group by等操作

通过spark.sql.shuffle.partitions控制分区数,默认为200,根据shuffle的量以及计算的复杂度提高这个值。

Rdd的join,groupBy,reduceByKey等操作

通过spark.default.parallelism控制shuffle read与reduce处理的分区数,默认为运行任务的core的总数(mesos细粒度模式为8个,local模式为本地的core总数),官方建议为设置成运行任务的core的2-3倍。

提高executor的内存

通过spark.executor.memory适当提高executor的memory值。

是否存在数据倾斜的问题

空值是否已经过滤?异常数据(某个key数据特别大)是否可以单独处理?考虑改变数据分区规则。

spark 数据倾斜

  • 提高shuffle并行度:增大shuffle read task参数值,让每个task处理比原来更少的数据;适应所有场景,简单有效可作为首选方案,缺点是缓解的效果有限;对于group,join shuffle类语句,可以通过设置spark.sql.shuffle.partitions来调整并发;
  • 两阶段聚合:适用于groupbykey分组聚合,reducebykey等shuffle算子,思路:首先通过map给每个key打上n以内的随机数前缀进行局部聚合,并进行reducebykey的局部聚合,然后再次map将key的前缀随机数去掉再次进行全局聚合;可以让整个过程中原本一个task处理的数据分摊到多个task做局部聚合,规避单task数据过量,缺点是仅适用于聚合类的Shuffle操作,无法适用于join类的shuffle操作
  • 广播broadcast: 对RDD或Spark SQL使用join类操作或语句,且join操作的RDD比较小(百兆或1,2G);使用broadcast和map类算子实现join的功能替代原本的join,彻底规避shuffle。对较小RDD直接collect到内存,并创建broadcast变量;并对另外一个RDD执行map类算子,在该算子的函数中,从broadcast变量(collect出的较小RDD)与当前RDD中的每条数据依次比对key,相同的key执行你需要方式的join;
  • 采样倾斜key拆分join:适用于两个表都很大,但大key占比很小;对join导致的倾斜是因为某几个key,可将原本RDD中的倾斜key从原RDD拆分出来得到新RDD,并以加随机前缀的方式打散n份做join,将倾斜key对应的大量数据分摊到更多task上来规避倾斜;不适用于大量倾斜key;
  • 随机前缀加扩容RDD进行join:适用场景:RDD中有大量key导致倾斜
    • 1.首先查看RDD/Hive表中数据分布并找到造成倾斜的RDD/表;
    • 2.对倾斜RDD中的每条数据打上n以内的随机数前缀;
    • 3.对另外一个正常RDD的每条数据扩容n倍,扩容出的每条数据依次打上0到n的前缀;
    • 4.对处理后的两个RDD进行join。与采样倾斜key方案不同在于这里对不倾斜RDD中所有数据进行扩大n倍,而不是找出倾斜key进行扩容, 效果非常显著;缺点是扩容需要大内存
  • 实际中需要结合业务全盘考虑,可先提升Shuffle的并行度,最后针对数据分布选择后面方案中的一种或多种灵活应用。

http://chatgpt.dhexx.cn/article/A1cTnY29.shtml

相关文章

Spark 开发入门

文章目录 Spark是什么DAG有向无环图 spark环境搭建Spark开发pyspark使用pythonSpark初始化创建RDD数据的读取和保存文本文件Json文件 RDD的转换操作RDD的行动操作集合操作mysql读取 Spark是什么 整个Hadoop生态圈分为分布式文件系统HDFS、计算框架MapReduce以及资源调度框架Ya…

Spark开发——Spark简介及入门

目录 什么是Spark&#xff1f; Spark有哪些特点和优势 1.计算速度 2.易用性 3.通用性 4.兼容性 Spark架构 Spark基本概念 Spark结构设计 使用Scala语言实现Spark本地词频统计 什么是Spark&#xff1f; Spark它是一个用于大规模数据处理的实时计算引擎。 Spark有哪些…

Spark开发指南

目 录 6 Spark开发指南 6.1 概述 6.2 开发环境准备 6.2.1 Java开发环境准备 6.2.2 Scala开发环境准备 6.3 开发指引 6.4 Java代码样例 6.5 Scala代码样例 6.6 对外接口 6.6.1 Java API 6.6.2 Scala API 6.6.3 Python API 6.6.4 Web UI 6.6.5 JDBC 6 Spark开发指南…

PIX飞控电流计设置

在 测量电池电压 一栏输入用电压表测得的电池电压&#xff0c;保存。

pixhawk飞控接口含义

官方文档&#xff1a;https://docs.px4.io/v1.9.0/en/flight_controller/pixhawk.html 1——spektrum DSM receiver2&#xff0c;3——远程通信口&#xff0c;接数传4——串口5——SPI6——电源口7——飞控的安全开关&#xff0c;长按启动解锁8——蜂鸣器9——串口10——GPS11—…

PIX飞控不能解锁问题总结

摘自&#xff1a;https://baijiahao.baidu.com/s?id1640767431717207814&wfrspider&forpc PIX飞控不能解锁问题总结 给力蹦小勇士 发布时间&#xff1a;19-08-0222:55 一、飞控故障或没校准 在地面站里飞行数据菜单查看报错。假如加速度计和地磁没校准&#xff0c;…

富斯i6航模遥控器配apm(pix)飞控mission planner疑难杂症解决策略(上)

提示&#xff1a;仅适用于新手入门参考。 目录 前言 在missionplanner调试遥控器出现信号异常&#xff0c;飞行调试出现操作异常如何处理&#xff0c;在硬件无损的前提下&#xff0c;如何进行简易调试&#xff0c;下文将介绍入门的处理办法。 一、切换飞行模式时突然出现油门…

Mission Planner初学者安装调试教程指南(APM或PIX飞控)3——连接与烧录

Mission Planner初学者安装调试教程指南&#xff08;APM或PIX飞控&#xff09;3——连接与烧录 目录 1.连接方式 2.烧录固件 1.连接方式 通常可以使用micro USB数据线直接连接APM&#xff08;pixhawk&#xff09;&#xff0c;将数据线一头接入电脑usb口&#xff0c;另一头接…

pixhawk飞控板的硬件构成

具体介绍 pixhawk是一款高性能的飞控板&#xff0c;它能用于固定翼&#xff0c;多旋翼&#xff0c;直升机&#xff0c;小车等多种应用。它能被用于研究用&#xff0c;玩耍用&#xff0c;甚至直接用于做产品。这款飞控其实是将PX4FMU和PX4IO做了一个封装&#xff0c;将两部分PC…

Mission Planner初学者安装调试教程指南(APM或PIX飞控)5——规划航点航线

目录 1.卫星地图上规划普通航点 2.曲线航点 3.规划多边形区域 4.环形航线 5. 曲线航线 6.设置网格 7.特殊航线 学习地面站&#xff0c;不可避免要触及英文指令。通过经常使用日常积累&#xff0c;可以熟练掌握各个指令的含义。serve the people heart and soul&#xff0…

OPENMV结合PIX飞控实现四轴定点 循迹 2017电赛

本文章代码已上传Github&#xff1a; https://github.com/Kevincoooool/2017_Follow 有兴趣的可以加个STAR 自从17年国赛之后&#xff0c;自己做了openmv&#xff0c;加了很多群&#xff0c;也了解到很多人都在想着这个题。 第一版 第二版 第三版 我们做国赛的时候实现了全…

飞行控制器Pixhawk简介

作者&#xff1a;华清远见讲师 Pixhawk是一款由PX4开源项目设计并由3DR公司制造生产的高级自动驾驶仪系统。其前身是APM&#xff0c;由于APM的处理器已经接近满负荷&#xff0c;没有办法满足更复杂的运算处理&#xff0c;所以硬件厂商采用了目前最新标准的32位ARM处理器&#x…

pixhawk飞控解锁方法

1. pixhawk飞控解锁方法是&#xff1a;油门(throttl)拉到最低,偏航角&#xff08;yaw&#xff09;拉到最右边。

如何用开源飞控PIXHAWK进行二次开发?

著作权归作者所有。 商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处。 作者&#xff1a;我是肉包子 链接&#xff1a;http://www.zhihu.com/question/38874663/answer/84239995 来源&#xff1a;知乎 以下所描述的都是针对px4原生固件&#xff0c;此外&#xff0…

APM和PIX飞控日志分析入门贴

我们在飞行中&#xff0c;经常会碰到各种各样的问题&#xff0c;经常有模友很纳闷&#xff0c;为什么我的飞机会这样那样的问题&#xff0c;为什么我的飞机会炸机&#xff0c;各种问题得不到答案是一件非常不爽的问题&#xff0c;在APM和PIX飞控中&#xff0c;都有记录我们整个…

开源飞控APM与PIXHAWK

一 APM 官网地址&#xff1a;http://ardupilot.org/ APM&#xff08;ArduPilotMega&#xff09; 是在2007年由DIY无人机社区&#xff08;DIY Drones&#xff09;推出的飞控产品&#xff0c;是当今最为成熟的开源硬件项目。APM基于Arduino的开源平台&#xff0c;对多处硬件做出了…

Mission Planner初学者安装调试教程指南(APM或PIX飞控)1——下载与版本

目录 1.概述 2.下载与版本 3.关于 ArduPilot wquav 1.概述 Misson Planner简称MP&#xff0c;图标为黑底大写白色字体MP加一个绿色固定翼飞机&#xff0c;是可以调试APM或者PIX飞控的地面站软件&#xff0c;可以运行在windows系统和Linux系统&#xff08;非直接安装&#x…

Pix4飞控常见问题解决方法(二)

一、无法解锁&#xff08;黄灯闪烁&#xff09; 无法解锁的原因会有多种&#xff0c;请按照如下步骤进行检查&#xff1a; 1、初始设置是否全部完成 a、机架类型选择是否正确&#xff0c;或者你根本就没有选择&#xff1f; 注意&#xff0c;新版本的飞控固件在默认参数情况下&…

Pix4飞控硬件平台框架(一)

硬件平台简介 本文只是为了让大家简单入门为主&#xff0c;所以我选择的硬件学习平台是Pixhawk系列的mRoPixhawk&#xff0c;兼容原始版本Pixhawk1&#xff0c;基于Pixhawk-project FMUv3开源硬件设计&#xff0c;修正了将原始版本flash限制在1MB这个bug&#xff0c;需要深入学…

Mission Planner中级应用(APM或PIX飞控)3——APM飞控安装双GPS测试 APM双GPS

目录 1.未得到答案和技术指导 2.第一次实验失败 3.完全废掉了解锁功能 4.调整RX/TX位置 5.成功解锁 6.广阔室外的探索 山重水复疑无路&#xff0c;柳暗花明又一村 ——Mission Planner中级应用&#xff08;APM或PIX飞控&#xff09;3——APM飞控安装双GPS测试 APM双GPS。…