CheckPoint的一些探寻

article/2025/9/19 5:19:47


由于上项目的模块计算部分依赖于spark,那么在spark的使用上,需要针对不同规模和形式的数据,都要能最大限度的做到数据变换,模型计算等计算的稳定性支持。这也是elemental目前急需优化的瓶颈所在。这里,我们针对下面的场景所遇到的问题进行一部分探讨:

在数据规模过大,无法cache到memory上

  1. DataFrame在transform多次后,进行action后,生成的Physical Plan过长
  2. DataFrame和RDD在transform多次后,进行action后,生成的DAG过长

理论

  • Cache
  1. 当我们对DataFrame进行cache操作时,我们会对到目前生成的DataFrame的logicalPlan进行一次执行,并将每个partition计算完的结果保存为CachedBatch的格式,最终保存到CachedData的List数组中。对应的RDD也变为PersistRDD格式。
  2. 而经过Cache的DataFrame,在后续的计算中,
  3. 正常情况下在数据规模不大的情况下,我们只需要对DataFrame和RDD进行cache操作,就可以解决前面提到的问题。当然我们也可以选择Cache到磁盘上来应对数据规模比较大的情况。
  • CheckPoint
  1. 当我们对RDD进行Checkpoint操作时,只是暂时在此加上标记,表明该RDD需要被CheckPoint。
  2. 而在后续进行action操作时,在runJob计算完RDD之后,才会进行doCheckpoint的活动,也就是具体RDD进行Checkpoint实际的过程。在这个过程中,RDD的生成过程实际上要进行第二次的计算。
  3. DataFrame在进行CheckPoint的操作中,默认参数eager为true,也就是对应的InternalRdd在checkpoint函数之后,会默认进行一次简单的count的action操作,这样就完成了DataFrame数据的checkpoint,当然后续还会清理掉相对应的前序依赖,以达到降低DAG和physicalPlan复杂度的目的。

探寻

测试步骤如下:

val df1 = df.withColumn
val df2 = df1.groupBy.sum
val df3 = df2.withColumn

我们用进行count的过程作为DAG与plan分析的样本(进行checkpoint的操作的,分别对df1,df2,df3进行checkpoint之后,在进行count过程)

1.DataFrame进行checkpoint对比
没有使用checkpoint的情况下,logicPlan变化为

Aggregate [count(1) AS count#22L]
+- Project [id#3, double#4, plusOne#5, (id#3 % 9) AS idType#10]+- LogicalRDD [id#3, double#4, plusOne#5]Aggregate [count(1) AS count#41L]
+- Aggregate [idType#10], [idType#10, sum(cast(double#4 as bigint)) AS sum(double)#33L]+- Project [id#3, double#4, plusOne#5, (id#3 % 9) AS idType#10]+- LogicalRDD [id#3, double#4, plusOne#5]Aggregate [count(1) AS count#56L]
+- Project [idType#10, sum(double)#33L, (cast(sum(double)#33L as double) / cast(10 as double)) AS rst#46]+- Aggregate [idType#10], [idType#10, sum(cast(double#4 as bigint)) AS sum(double)#33L]+- Project [id#3, double#4, plusOne#5, (id#3 % 9) AS idType#10]+- LogicalRDD [id#3, double#4, plusOne#5]

由于是依赖关系,出现上述情况是合理的。

那么,在使用checkpoint之后

Aggregate [count(1) AS count#83L]
+- Project [id#64, double#65, plusOne#66, (id#64 % 9) AS idType#71]+- LogicalRDD [id#64, double#65, plusOne#66]Aggregate [count(1) AS count#108L]
+- Aggregate [idType#71], [idType#71, sum(cast(double#65 as bigint)) AS sum(double)#100L]+- LogicalRDD [id#64, double#65, plusOne#66, idType#71]Aggregate [count(1) AS count#129L]
+- Project [idType#71, sum(double)#100L, (cast(sum(double)#100L as double) / cast(10 as double)) AS rst#119]+- LogicalRDD [idType#71, sum(double)#100L]

很显然,LogicalPlan得到了抑制。与此相对应的PhysicalPlan也会得到缩减。

DAG的变化,这里只枚举df3的过程就可以说明问题
test1-3
图1.没有checkpoint情况下,df3进行count的DAG
test2-3
图2.在df2进行checkpoint情况下,df3进行count的DAG

对比,可以知道Stage已经得到了减少(图1在PhysicalPlan优化后才为3个Stage,实际上LogicalPlan已经为4个Stage),而且图1是从最开的df走流程下来的,而图2是直接从前面一个df2的checkpoint点出来的。

2.RDD进行checkpoint对比
使用RDD进行上述类似的操作,DAG的缩减也是一致,这里我们可以看一下RDD的recursive dependencies信息对比

(4) MapPartitionsRDD[64] at map at AlexTestJob.scala:115 []|  ShuffledRDD[63] at groupBy at AlexTestJob.scala:115 []+-(4) MapPartitionsRDD[62] at groupBy at AlexTestJob.scala:115 []|  MapPartitionsRDD[61] at map at AlexTestJob.scala:109 []|  ParallelCollectionRDD[60] at parallelize at AlexTestJob.scala:106 []
(4) MapPartitionsRDD[71] at map at AlexTestJob.scala:141 []|  ReliableCheckpointRDD[72] at count at AlexTestJob.scala:147 []

这是rdd2过程后的是否使用checkpoint的toDebugString的对比

3.DataFrame在循环体进行checkpoint对比
这里我们采用下面的逻辑代码进行测试

var df
(0 until 5).foreach {idx=>
df = df.withColumn(s"addCol_$idx",df.col("id")+idx)
}

LogicalPlan对比分析

'Project [*, (id#97 + 4) AS idType_4#134]
+- Project [id#97, double#98, plusOne#99, idType_0#104, idType_1#110, idType_2#117, (id#97 + 3) AS idType_3#125]+- Project [id#97, double#98, plusOne#99, idType_0#104, idType_1#110, (id#97 + 2) AS idType_2#117]+- Project [id#97, double#98, plusOne#99, idType_0#104, (id#97 + 1) AS idType_1#110]+- Project [id#97, double#98, plusOne#99, (id#97 + 0) AS idType_0#104]+- LogicalRDD [id#97, double#98, plusOne#99]

每次迭代都进行checkpoint之后

'Project [*, (id#3 + 4) AS idType_4#75]
+- LogicalRDD [id#3, double#4, plusOne#5, idType_0#15, idType_1#27, idType_2#41, idType_3#57]

这样的缩减,对于在模型计算过程中,多次迭代缩减DAG过程都存在实际意义

4.checkpoint与cache(DISK_ONLY)
cache只保存在DISK_ONLY可以理解为localCheckpoint的过程

结论

  1. 无论cache还是checkpoint操作,本质上是部分保存中间结果,减少后续过程重复计算。cache更倾向于保存比较频繁使用的,数据规模比较小的数据,且保存在内存中意义更大一些。checkpoint则相对于言没有数据规模的限制。
  2. checkpoint一次,会进行2次计算,这是额外开销。
  3. cache到磁盘上,是1次计算,但该次cache的结果仅能在该driver上运行的程序调用。实际上在elemental中是符合使用的。只是需要考虑对应的driver所在的机器的磁盘空间是否足够。
  4. checkpoint到alluxio,算是方便统一管理。checkpoint更大的优势是在于SparkStream上的优势,具有可恢复性。
  5. 一个RDD无论是否被标记为checkpoint,只要进行过实际性质action操作之后,该RDD就会被标记为已经checkpoint。例如:
RDD.checkpoint
RDD.count

这样是可以成功checkpoint的,但是:

RDD.count
RDD.checkpoint
RDD.count

无法被checkpoint。因此,选择checkpoint,之后马上action。最佳方案为:

RDD.checkpoint
RDD.persist(DISK_ONLY)

http://chatgpt.dhexx.cn/article/tIMAuj11.shtml

相关文章

Flink学习1-基础概念

Flink学习1-基础概念 Flink系列文章 更多Flink系列文章请点击Flink系列文章 更多大数据文章请点击大数据好文推荐 摘要 本文是作者学习Flink的一些文档整理、记录和心得体会,希望与大家共同学习探讨。 1 Flink简介 1.1 概念 Apache Flink是一个开源的分布式流…

Ranger架构剖析

Ranger介绍 2016年,Hadoop迎来了自己十周岁生日。过去的十年,Hadoop雄霸武林盟主之位,号令天下,引领大数据技术生态不断发展壮大,一时间百家争鸣,百花齐放。然而,兄弟多了不好管,为了…

Ranger架构

一、Ranger介绍 随着大数据技术生态不断发展壮大,为了抢占企业级市场,各厂商都迭代出自己的一套访问控制体系,不管是老牌系统(比如HDFS、HBase),还是生态新贵(比如Kafka、Alluxio)&…

大数据技术栈概貌

一、大数据技术栈概貌 pig:要使用 Apache Pig 分析数据,程序员需要使用Pig Latin语言编写脚本。所有这些脚本都在内部转换为Map和Reduce任务。sqoop 是 apache 旗下一款“Hadoop 和关系数据库服务器之间传送数据”的工具。导入数据:MySQL&…

一节课轻松通关 Spark

大数据跟我学系列文章007-三节课轻松通关 Spark (一) 文章目录 大数据跟我学系列文章007-三节课轻松通关 Spark (一)前言第01讲: MapReduce:计算框架和编程模型第02讲:Hadoop:集群的…

基于深度强化学习的连接查询优化

Krishnan, S., et al. (2018). "Learning to optimize join queries with deep reinforcement learning." 如何优化 SQL 连接是数据库社区数十年来一直在研究的一个大问题。伯克利 RiseLab 公布的一项研究表明,深度强化学习可以被成功地应用在优化 SQL 连…

Flink流处理框架总结

Flink流处理框架 第一部分 Flink 概述第 1 节 什么是 Flink第 2 节 Flink 特点第 2 节 Flink 应用场景第 4 节 Flink 核心组成及生态发展第 5 节 Flink 处理模型:流处理与批处理第 6 节 流处理引擎的技术选型 第二部分 Flink快速应用第 1 节 单词统计案例&#xff0…

Presto(Trino)动态过滤与优化器

Presto 系列文章目录- 动态过滤与谓词下推 文章目录 Presto 系列文章目录- 动态过滤与谓词下推trino性能提升新特性Dynamic partition pruning动态分区裁剪设计注意事项执行未来的工作 Hive connector延迟执行动态过滤器 动态过滤分析和确认动态过滤器收集阈值维度表布局局限性…

AI 应用的全流程存储加速方案技术解析和实践分享

AI 应用对存储系统的挑战是全面的,从离应用最近的数据计算如何加速,到离应用最远的数据存储如何管理,到数据存储和数据计算之间如何高效流通,再到不同应用之间的资源调度如何协调 …… 这其中每一个环节的低效,都有可能…

flink部署-1.13

1. 版本说明 本文档内容基于 flink-1.13.x,其他版本的整理,请查看本人博客的 flink 专栏其他文章。 2. 概述 Flink 是一种通用性框架,支持多种不同的部署方式。 本章简要介绍 Flink 集群的组成部分、用途和可用实现。如果你只是想在本地启…

Spark2.1.0——存储体系概述

本书在5.7节曾介绍过存储体系的创建,那时只为帮助读者了解SparkEnv,现在是时候对Spark的存储体系进行详细的分析了。简单来讲,Spark存储体系是各个Driver、Executor实例中的BlockManager所组成的。但是从一个整体出发,把各个节点的BlockManager看成存储体系的一部分,那么存…

探秘百度数据工厂Pingo的多存储后端数据联合查询技术

作者介绍:张志宏,2013年加入百度大数据部,曾作为核心成员参与百度大数据平台的搭建。目前是百度数据工厂Pingo核心团队的技术负责人。 Pingo是来自百度的离线大数据集成开发平台,使用Spark作为计算引擎,深度整合了资源…

Tachyon与Ignite系统对比

1. Alluxio(原Tachyon)内存文件系统 1.1 系统概述 Alluxio(原Tachyon)是以内存为中心(memory-centric)的虚拟的分布式存储系统,拥有高性能和容错能力,能够为集群框架(如Spark、MapReduce)提供可靠的内存级…

百度案例:利用Alluxio实现安全的即插即用分布式文件系统

全文共3361字,预计学习时长7分钟 本文介绍了百度如何依靠开源项目Alluxio,在一个企业大数据分析解决项目Pingo中创建了一个安全、模块化和可扩展的分布式文件系统服务。 在这篇文章中,你将学习如何依靠Alluxio来实现一个统一的分布式文件系统服务,以及如何在Alluxio之上添…

如何通过API调用alluxio 文件系统

如何通过API调用alluxio 文件系统 一般来说,不论从spark,hfds等等大数据分布式框架,甚至使用公有云api,来说,与集群进行交互不外乎以下这些套路: 设置configuration(有很重要的主节点host&…

office产品密钥如何找回

明明自己有正版office产品密钥却因为时间太久种种原因找不到了,登录微软账户查看信息也没有,怎么办?? 不要着急,只要你还记得你当时激活office的微软账号和密码,可以登录微软账户,follow me&am…

qq恢复官方网站服务器繁忙,qq恢复官方网站

删除好友,又后悔了!!?? 想把好友找回来? 世上没有后悔药,但是系统是还是人性化的, 所以,QQ官方给广大用户提供了好友找回功能。登录qq以恢复官方网络以恢复朋友操作。 在qq恢复官方网…

手机计算机怎么恢复出厂设置密码,如何找回手机锁屏密码?

原标题:如何找回手机锁屏密码? 世界上最遥远的距离不是我站在你面前你却不知道我爱你,而是手机明明在手上偏偏忘了锁屏密码! 我们总是会有犯傻的时候,比如突然想到一个数字觉得还不错,于是想着给手机改个密…

Vue + element + Springboot 通过邮箱找回密码

Vue element Springboot 通过邮箱找回密码 需求分析一、导入二、流程分析 详细设计一、前端界面设计1. 登录界面2. 重置密码界面 二、后端代码设计1. JavaMail配置2. QQ邮箱开启STMP授权3. 配置applicaiton.yml文件4. 新建文件夹5. 邮件配置:6. User相关类&#xf…

5年前带留言的公众号还可以找回,让我来教你该怎么做

今天注定是很神奇的一天,因为今天我居然找回了5年前注册的公众号,一个带留言功能的公众号。 了解公众号的人可能都知道,腾讯在2018年3月宣布暂停新注册公众号的留言功能,这之后注册的公众号都不具备留言功能。 这成了很多号主运营…