如何处理Spark数据倾斜

article/2025/8/25 17:40:33

一、什么是数据倾斜

在分布式集群计算中,数据计算时候数据在各个节点分布不均衡,某一个或几个节点集中80%数据,而其它节点集中20%甚至更少数据,出现了数据计算负载不均衡的现象。

数据倾斜在MR编程模型中是十分常见的,用最通俗的话来讲,数据倾斜就是大量的相同key被分配到一个partition里,而其它partition被分配了少量的数据。这时候我们就认为是数据倾斜了


二、数据倾斜的影响

造成了“少数人累死,多数人闲死”的情况,这种情况是我们不能接受的,这也违背了分布式计算的初衷。集群中一个或几个节点要承受着巨大的计算压力,而其它节点计算完毕后要一直等待忙碌的节点计算完成,也拖累了整体的计算时间,直接导致计算效率低下,甚至出现最终executor撑爆,OOM结果。

小结如下:

(1)拖慢运行速度,计算效率低下。特别是对于一些几十亿到百亿的大表,倾斜情况下不夸张的情况下,运行十几个小时也不一定结束

(2)撑爆executor,OOM风险。大量的数据涌向同一个task中的executor执行,直接结果是OOM率,任务运行失败


三、数据倾斜的解决

1、数据倾斜的定位及解决

对于spark-program应用而言

(1)倾斜key的定位方法:

选取key,对数据进行抽样,统计出现的次数,根据出现次数大小排序取出前几个

df.select("key").sample(false,0.1).(k=>(k,1)).reduceBykey(_+_).map(k=>(k._2,k._1)).sortByKey(false).take(10)

经过分析,倾斜的数据主要有以下三种情况:

(1)null(空值)或是一些无意义的信息()之类的,大多是这个原因引起。

(2)无效数据,大量重复的测试数据或是对结果影响不大的有效数据。

(3)有效数据,业务导致的正常数据分布。

(2)解决办法

对于第(1),(2)种情况,直接对数据进行过滤即可。对于第(3)种情况则需要进行一些特殊操作,常见的有以下几种做法。

(1)将异常的key过滤出来单独处理,最后与正常数据的处理结果进行union操作

(2)或对所有key先添加随机值,进行操作后,去掉随机值,再继续后续操作

对于spark-sql应用而言

首先,可以对尝试distribute by rand()操作,打乱数据分布

其次,可通过配置开启倾斜key检测
 

由于Join语义限制,对于A left join skewed B之类的场景,无法对B进行划分处理,否则会导致数据正确性问题,这也是Spark项目所面临的难题。如果开启以上功能依然不能处理数据倾斜,可以通过开启倾斜key检测功能来定位是哪些key导致了倾斜或膨胀,继而进行过滤等处理。

spark.sql.adaptive.enabled=true

spark.sql.adaptive.shuffle.detectSkewness=true (默认false,由于采样计算会导致性能回归,正常任务不要开启)

其他参数:

spark.sql.adaptive.shuffle.sampleSizePerPartition (默认100,每个Task中的采样数,如果Task数量不大,可以酌情调大

在spark-ui上看到hot keys,如下图中的hot keys


2、Spark常见倾斜的场景及解决

Spark的数据倾斜一般由Shuffle时数据不均匀导致的,一般有三类算子会产生Shuffle:Aggregation (groupBy)、Window、Join

Aggregation

建议打散key进行二次聚合:采用对非constant值、与key无关的列进行hash取模

以DataFrame API示例:

dataframe

.groupBy(col("key"), pmod(hash(col("some_col")), 100)).agg(max("value").as("partial_max"))

.groupBy(col("key")).agg(max("partial_max").as("max"))

Window

目前支持该模式下的倾斜window,(仅支持spark3.0)

select (... row_number() over(partition by ... order by ...) as rn)where rn [==|<=|<] k and other conditionsspark.sql.rankLimit.enabled=true (目前支持基于row_number的topK计算逻辑)

Join

Shuffled Join

在spark2.4版本

开启功能:

spark.sql.adaptive.enabled=true
spark.shuffle.statistics.verbose=true
spark.sql.adaptive.skewedJoin.enabled=true
spark.sql.adaptive.allowAdditionalShuffle=true

如果不能处理,建议用户自行定位热点数据进行处理

在spark3.0版本

数据倾斜(Join)

spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true (默认true)
spark.sql.adaptive.skewJoin.enhance.enabled=true (通用倾斜算法,可处理更多场景)
spark.sql.adaptive.forceOptimizeSkewedJoin=true (允许插入额外shuffle,可处理更多场景)

其他参数:

spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB (默认为256MB,分区大小超过该阈值才可被识别为倾斜分区,如果希望调整的倾斜分区小于该阈值,可以酌情调小)

spark.sql.adaptive.skewJoin.skewedPartitionFactor=5 (默认为5,分区大小超过中位数Xfactor才可被识别为倾斜分区,一般不需要调整)

spark.sql.adaptive.skewJoin.enhance.maxJoins=5 (默认5,通用倾斜算法中,如果shuffled join超过此阈值则不处理,一般不需要调整)

spark.sql.adaptive.skewJoin.enhance.maxSplitsPerPartition=1000 (默认1000,通用倾斜算法中,尽量使得每个倾斜分区的划分不超过该阈值,一般不需要调整)

数据膨胀(Join)

在开启数据倾斜功能的基础上,额外开启:

spark.sql.adaptive.skewJoin.inflation.enabled=true (默认false,由于采样计算会导致性能回归,正常任务不要开启)

spark.sql.adaptive.skewJoin.inflation.factor=50 (默认为100,预估的分区输出大小超过中位数Xfactor才可被识别为膨胀分区,由于预估算法存在误差,一般不要低于50)

spark.sql.adaptive.shuffle.sampleSizePerPartition=500 (默认100,每个Task中的采样数,基于该采样数据预估Join之后的分区大小,如果Task数量不大,可以酌情调大)

参考案例(该膨胀导致后续stage反复失败,运行时长超10h)

Read: Max/Read=5.7,Write: Max/Med=142

开启功能后:整个job 23min运行完


参考:

解决spark中遇到的数据倾斜问题_breeze_lsw的博客-CSDN博客_spark中的数据倾斜问题解决spark中遇到的数据倾斜问题一. 数据倾斜的现象多数task执行速度较快,少数task执行时间非常长,或者等待很长时间后提示你内存不足,执行失败。二. 数据倾斜的原因数据问题key本身分布不均匀(包括大量的key为空)key的设置不合理spark使用问题shuffle时的并发度不够计算方式有误三. 数据倾斜的后果spark中一个stage的执行时间受限于最后那个执行完的task,https://blog.csdn.net/lsshlsw/article/details/52025949

参考:Performance Tuning - Spark 3.2.1 Documentationhttps://spark.apache.org/docs/latest/sql-performance-tuning.html#optimizing-skew-join


http://chatgpt.dhexx.cn/article/UEb54eXd.shtml

相关文章

数据倾斜

数据倾斜 转载声明 本文大量内容系转载自以下文章&#xff0c;有删改&#xff0c;并参考其他文档资料加入了一些内容&#xff1a; Spark性能优化指南——高级篇 作者&#xff1a;李雪蕤 出处&#xff1a;美团技术团队博客漫谈千亿级数据优化实践&#xff1a;数据倾斜&#x…

大数据常见问题:数据倾斜的原理及处理方案

什么是数据倾斜 Hadoop能够进行对海量数据进行批处理的核心&#xff0c;在于它的分布式思想&#xff0c;通过多台服务器&#xff08;节点&#xff09;组成集群&#xff0c;共同完成任务&#xff0c;进行分布式的数据处理。 理想状态下&#xff0c;一个任务是由集群下所有机器…

数据倾斜问题

一、什么是数据倾斜 简单来说&#xff0c;就是在数据计算的时候&#xff0c;数据会分配到不同的task上执行&#xff0c;当数据分配不均匀导致某些大批量数据分配到某几个task上就会造成计算不动或者异常的情况。 二、数据倾斜表现形式 1、大部分的task在计算的时候计算的特别…

数据倾斜常见原因和解决办法

数据倾斜在MapReduce编程模型中十分常见&#xff0c;多个节点并行计算&#xff0c;如果分配的不均&#xff0c;就会导致长尾问题&#xff08;大部分节点都完成了任务&#xff0c;一直等待剩下的节点完成任务&#xff09;&#xff0c;本文梳理了常见的发生倾斜的原因以及相应的解…

数据倾斜产生,原因及其解决方案

目录 第七章 数据倾斜 7.1 数据倾斜的产生&#xff0c;表现与原因 7.1.1 数据倾斜的定义 7.1.2 数据倾斜的危害 7.1.3 数据倾斜发生的现象 7.2 数据倾斜倾斜造成的原因 7.3 几种常见的数据倾斜及其解决方案 7.3.1 空值引发的数据倾斜 7.3.2 不同数据类型引发的数据倾斜…

数据倾斜原理与解决方法

数据倾斜的概念 数据倾斜这四个字经常会在学习MapReduce中遇到。所谓数据分区&#xff0c;就是数据分区分布因为数据本身或者分区方法的原因变得极为不一致&#xff0c;大量的数据被划分到了同一个区。由于Reducer Task每次处理一个区的数据&#xff0c;这导致Reducer Task处理…

什么是缓存穿透、缓存雪崩、缓存击穿

缓存穿透 缓存穿透 &#xff1a;缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在&#xff0c;这样缓存永远不会生效&#xff0c;这些请求都会打到数据库&#xff0c;失去了缓存保护后端存储的意义。 解决方案 缓存空值 如果访问数据库后还未命中&#xff0c;则把一…

缓存穿透和缓存击穿

一、背景介绍 几乎所有互联网公司都采用缓存的方案来解决瞬时流量超高&#xff0c;或者长期流量过高的问题。但使用缓存存在风险——缓存穿透和缓存击穿&#xff1a;简单的讲就是如果该数据原本就不存在&#xff0c;那么就会发生缓存穿透&#xff1b;如果缓存内容因为各种原因…

缓存穿透,缓存雪崩,缓存击穿

一&#xff0c;缓存穿透 原因&#xff1a;一个请求来访问某个数据&#xff0c;发现缓存中没有&#xff0c;直接去DB中访问。此种情况就是穿透。(正常情况下缓存跟数据库中数据都是存在&#xff0c;异常情况下会导致) 特点:因传递了非法的key,导致缓存跟数据库中都无法查询 方…

如何避免缓存穿透、缓存击穿、缓存雪崩?

如何避免缓存穿透、缓存击穿、缓存雪崩&#xff1f; 缓存穿透 先来看一下缓存穿透&#xff0c;顾名思义&#xff0c;是指业务请求穿过了缓存层&#xff0c;落到持久化存储上。在大多数场景下&#xff0c;我们应用缓存是为了承载前端业务请求&#xff0c;缓存被击穿以后&#x…

缓存穿透、缓存击穿、缓存雪崩解决方案

微信搜索【程序员囧辉】&#xff0c;关注这个坚持分享技术干货的程序员。 前言 ​ 我一个QPS不到10的项目&#xff0c;天天问我缓存穿透、缓存击穿、缓存雪崩&#xff0c;我是真滴难。 可能大家经常会有这种感受&#xff0c;但是只要是面试要问的题目&#xff0c;就算用不上&…

缓存穿透 缓存击穿 缓存雪崩 这三者是什么 如何处理

通常我们使用缓存中间件的方式 将数据库的热点数据缓存到Redis中 尽量去缓存中查找数据,目的就是为了减轻数据库的压力 那什么是 缓存穿透,缓存击穿 与 缓存雪崩 呢 ? 缓存穿透 当Redis中不存在某个key时,将对数据库进行查询操作 但如果数据库也不存在 就会造成每一个请求即…

应对缓存击穿的解决方法

一.什么样的数据适合缓存? 分析一个数据是否适合缓存,我们要从访问频率、读写比例、数据一致性等要求去分析. 二.什么是缓存击穿 在高并发下,多线程同时查询同一个资源,如果缓存中没有这个资源,那么这些线程都会去数据库查找,对数据库造成极大压力,缓存失去存在的意义.打个…

redis缓存击穿

缓存击穿&#xff1a; 缓存击穿是指&#xff0c;针对某个访问非常频繁的热点数据的请求&#xff0c;无法在缓存中进行处理&#xff0c;紧接着&#xff0c;访问该数据的大量请求&#xff0c;一下子都发送到了后端数据库&#xff0c;导致了数据库压力激增&#xff0c;会影响数据…

【缓存】缓存穿透、缓存击穿、缓存雪崩及其解决方案

文章目录 缓存穿透缓存击穿缓存雪崩大量数据同时过期Redis 故障宕机 总结来源 用户的数据一般都是存储于数据库&#xff0c;数据库的数据是落在磁盘上的&#xff0c;磁盘的读写速度可以说是计算机里最慢的硬件了。 当用户的请求&#xff0c;都访问数据库的话&#xff0c;请求数…

Redis 缓存击穿,缓存穿透,缓存雪崩原因+解决方案

一、前言 在我们日常的开发中&#xff0c;无不都是使用数据库来进行数据的存储&#xff0c;由于一般的系统任务中通常不会存在高并发的情况&#xff0c;所以这样看起来并没有什么问题&#xff0c;可是一旦涉及大数据量的需求&#xff0c;比如一些商品抢购的情景&#xff0c;或者…

Redis缓存击穿、雪崩、穿透!(超详细)

缓存的击穿、穿透和雪崩应该是再熟悉不过的词了&#xff0c;也是面试常问的高频试题。 不过&#xff0c;对于这三大缓存的问题&#xff0c;有很多人背过了解决方案&#xff0c;却少有人能把思路给理清的。 而且&#xff0c;网络上仍然充斥着&#xff0c;大量不太靠谱的解决方案…

缓存穿透、缓存击穿、缓存雪崩如何应对

参考连接&#xff1a;redis避免缓存穿透为什么缓存空对象而不是null&#xff1f; - 知乎 缓存穿透&#xff1a;key对应的数据在数据源并不存在&#xff0c;每次针对此key的请求从缓存获取不到&#xff0c;请求都会到数据源&#xff0c;从而可能压垮数据源。比如用一个不存在的…

缓存穿透、缓存击穿、缓存雪崩区别和解决方案

一、缓存处理流程 前台请求&#xff0c;后台先从缓存中取数据&#xff0c;取到直接返回结果&#xff0c;取不到时从数据库中取&#xff0c;数据库取到更新缓存&#xff0c;并返回结果&#xff0c;数据库也没取到&#xff0c;那直接返回空结果。 二、缓存穿透 描述&#xff1a; …

缓存穿透,缓存雪崩,缓存击穿的超详解

文章目录 1、缓存穿透问题的解决思路2、缓存雪崩问题及解决思路3、缓存击穿问题及解决思路 1、缓存穿透问题的解决思路 缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在&#xff0c;这样缓存永远不会生效&#xff0c;这些请求都会打到数据库&#xff0c;失去了缓存的…