大数据常见问题:数据倾斜的原理及处理方案

article/2025/8/25 21:59:19

什么是数据倾斜

Hadoop能够进行对海量数据进行批处理的核心,在于它的分布式思想,通过多台服务器(节点)组成集群,共同完成任务,进行分布式的数据处理。

理想状态下,一个任务是由集群下所有机器共同承担执行任务,每个节点承担的任务应该相近,但实际上在并行处理过程中,分配到每台节点的数据量并不是均匀的,当大量的数据分配到某一个节点时(假设10个节点,5亿数据),那么原本只需要1小时完成的工作,变成了其中9个节点不到1小时就完成了工作,而分配到了大量数据的节点,花了5个小时才完成

从最终结果来看,就是这个处理10亿数据的任务,集群花了5个小时才最终得出结果。大量的数据集中到了一台或者几台机器上计算,这些数据的计算速度远远低于平均计算速度,导致整个计算过程过慢,这种情况就是发生了数据倾斜。

为什么会导致数据倾斜

数据倾斜发生的原因

1)、key分布不均匀

某些key的数量过于集中,存在大量相同值的数据

存在大量异常值或空值。

2)、业务数据本身的特性

例如某个分公司或某个城市订单量大幅提升几十倍甚至几百倍,对该城市的订单统计聚合时,容易发生数据倾斜。

3)、建表时考虑不周

4)、某些SQL语句本身就有数据倾斜

两个表中关联字段存在大量空值,或是关联字段的数据不统一,例如在A表中值是大写,B表中值是小写等情况。

发生数据倾斜的SQL操作

数据倾斜发生时的现象

MapReduce任务:

主要表现在ruduce阶段卡在99.99%,一直99.99%不能结束。

  • 有一个多几个reduce卡住
  • 各种container报错OOM
  • 读写的数据量极大,至少远远超过其它正常的reduce伴随着数据倾斜,会出现任务被kill等各种诡异的表现。

Spark任务:

  • 绝大多数task执行得都非常快,但个别task执行的极慢。
  • 单个Executor执行时间特别久,整体任务卡在某个stage不能结束
  • Executor lost,OOM,Shuffle过程出错。
  • 正常运行的任务突然失败
  • 用SparkStreaming做实时算法时候,一直会有executor出现OOM的错误,但是其余的executor内存使用率却很低。

为什么要处理?

首先要知道数据倾斜的危害

1.任务长时间挂起,资源利用率下降

在分阶段执行任务的场景下,阶段与阶段之间通常存在数据上的依赖关系,后一阶段任务需要等前一阶段执行完成才能开始

举个例子

Stage1在Stage0之后执行,假如Stage1依赖Stage0产生的数据结果,那么Stage1必须等待Stage0执行完成后才能开始,如果这时Stage0因为数据倾斜问题,导致任务执行时长过长,或者直接挂起,那么Stage1将一直处于等待状态,整个作业也就一直挂起。这个时候,资源被这个作业占据,但是却只有极少数task在执行,造成计算资源的严重浪费,利用率下降

2.引发内存溢出,导致任务失败

数据发生倾斜时,可能导致大量数据集中在少数几个节点上,在计算执行中由于要处理的数据超出了单个节点的能力范围,最终导致内存被撑爆,报OOM异常,直接导致任务失败。

3.作业执行时间超出预期,导致后续依赖数据结果的作业出错

有时候作业与作业之间,并没有构建强依赖关系,而是通过执行时间的前后时间差来调度,当前置作业未在预期时间范围内完成执行,那么当后续作业启动时便无法读取到其所需要的最新数据,从而导致连续出错

数据倾斜的处理方案

1.参数调节

【1】通用优化:提高shuffle并行度

Spark的shuffle并行度默认值是200,建议根据服务器的情况进行调整。一般是集群cpu总和的2-3倍。当发生数据倾斜的时候,适当增大并行度,可以将原本被分配到同一个Task的不同Key分配到不同Task。

缺陷:它没有从根本上改变数据倾斜的本质和问题,只是说尽可能地去缓解和减轻

shuffle reduce task的数据压力,以及数据倾斜的问题。

【2】map端聚合,启动负载均衡

适用范围:group by 造成的数据倾斜

set hive.map.aggr=true; --在map中会做部分聚集操作,效率更高但需要更多的内存set hive.groupby.skewindata=true; --默认false,数据倾斜时负载均衡

数据倾斜时负载均衡,当选项设定为true,它使计算变成了两个mapreduce,先在第一个中在 shuffle 过程 partition 时随机给 key 打标记,使每个key 随机均匀分布到各个 reduce 上计算,但是这样只能完成部分计算,因为相同key没有分配到相同reduce上,所以需要第二次的mapreduce,这次就回归正常 shuffle,但是数据分布不均匀的问题在第一次mapreduce已经有了很大的改善。

【3】Reduce join 改为Map join

适用范围:小表 join 大表

set hive.auto.convert.join = true; -- hive是否自动根据文件量大小,选择将common join转成map join 。set hive.mapjoin.smalltable.filesize =25000000; --大表小表判断的阈值,如果表的大小小于该值25Mb,则会被判定为小表。则会被加载到内存中运行,将commonjoin转化成mapjoin。一般这个值也就最多几百兆的样子。

原理:mapjoin优化就是在Map阶段完成join工作,而不是像通常的common join在Reduce阶段按照join的列值进行分发数据到每个Reduce上进行join工作。这样避免了Shuffle阶段,从而避免了数据倾斜。

注意:这个操作会将所有的小表全量复制到每个map任务节点,然后再将小表缓存在每个map节点的内存里与大表进行join工作。小表的大小的不能太大,一般也就几百兆,否则会出现OOM报错。

【4】开启Skewed Join

hadoop 中默认是使用hive.exec.reducers.bytes.per.reducer = 1000000000

也就是每个节点的reduce 默认是处理1G大小的数据,如果你的join 操作也产生了数据倾斜,那么你可以在hive 中设定

set hive.optimize.skewjoin = true;set hive.skewjoin.key = skew_key_threshold (default = 100000)

建议每次运行比较复杂的sql 之前都可以设一下这个参数. 如果你不知道设置多少,可以就按官方默认的1个reduce 只处理1G 的算法,那么 skew_key_threshold = 1G/平均行长. 或者默认直接设成250000000 (差不多算平均行长4个字节)

当我们开启Skew Join之后,在运行时,会对数据进行扫描并检测哪个key会出现倾斜,对于会倾斜的key,用map join做处理,不倾斜的key正常处理。

2.SQL优化

【1】count(distinct)优化

慎用distinct  先用group去重,再count子查询

--优化前
select count(distinct a) from test ;--优化后
select count x.a 
from 
(select a from test group by a ) x 

【2】聚合类操作,发生数据倾斜

解决方法:阶段拆分-两阶段聚合

在需要聚合的key前加一个随机数的前后缀,这样就能得到非常均匀的key,然后按这个加工之后的key进行第一次聚合之后,再对聚合的结果,按照原始key进行二次聚合,这样基本就不可能出现数据倾斜了

【3】大表join大表发生数据倾斜

  • 大表拆分,倾斜部分单独处理
  • 中间表分桶排序后join

举例:

假设有有两张表

table_a表是一张近3个月买家交易明细表,字段如下,数据量较大

buyer_id(买家id)

seller_id(卖家id)

order_num(订单数)

1113839839

1001

5

table_b表是一张所有卖家信息表,数据量也很大,不满足mapjoin的条件。

seller_id(卖家id)

seller_level(卖家评级)

1001

A

SELECT t1.buyer_id ,sum(case when t2.seller_level = 'A' then order_num end) as a_num,sum(case when t2.seller_level = 'B' then order_num end) as b_num,sum(case when t2.seller_level = 'C' then order_num end) as c_num
FROM table_a  t1  --买家交易明细表
INNER JOIN table_b t2 --卖家信息表
ON t1.seller_id = t2.seller_id
GROUP BY t1.buyer_id;

但因为二八法则,某些卖家会拥有成千上万的买家,而大部分卖家的买家数量是较少的,两个表关联会发生数据倾斜。

优化方案1:转为map join 避免shuffle

tableb由于数据量较大,无法直接map join ,所以考虑通过过滤3个月内没有发生交易的卖家,来减少table_b的数据量,使其达到满足map join 的情况。

SELECT t1.buyer_id ,sum(case when t4.seller_level = 'A' then order_num end) as a_num,sum(case when t4.seller_level = 'B' then order_num end) as b_num,sum(case when t4.seller_level = 'C' then order_num end) as c_num
FROM table_a  t1
INNER JOIN (SELECTt2.seller_id,t2.seller_levelFROMtable_b t2INNER JOIN (SELECTseller_idFROM table_aGROUP BY seller_id	) t3ON t2.seller_id = t3.seller_id) t4  --过滤掉table_b中没有出现在table_a中的seller_id
ON t1.seller_id = t4.seller_id
GROUP BY t1.buyer_id;

此方案在有些时候能够起作用,但大部分情况并不能解决问题,比如table_b经过过滤后,数量依然很大,还是不满足map join的条件。

优化方案2:拆表一分为二,倾斜部分单独处理

首先创建一个临时表temp_b,存放大卖家(买家超过1万个)的名单,然后tablea分别join temp_b和table_b,结果union all 起来即可。此方案的适用情况最多,且最有效。

--临时表存放90天内买家超过10000的大卖家
INSERT OVERWRITE TABLE temp_b
SELECTt1.seller_id,t2.seller_level
FROM
(
SELECTseller_id,count(buyer_id) as buy_num
FROM table_a
GROUP BY seller_id
) t1
LEFT JOIN 
(
SELECTseller_id,seller_level
FROM table_b
) t2
ON t1.seller_id = t2.seller_id
AND t1.buy_num > 10000;--获取最终结果
SELECT t7.buyer_id ,sum(case when t7.seller_level = 'A' then t7.order_num end) as a_num,sum(case when t7.seller_level = 'B' then t7.order_num end) as b_num,sum(case when t7.seller_level = 'C' then t7.order_num end) as c_num
FROM
(
SELECT/*+mapjoin(t2)*/t1.buyer_id,t1.seller_id,t1.order_num,t2.seller_level
FROM table_a t1
LEFT JOIN temp_b t2
ON t1.seller_id = t2.seller_id
UNION ALL  --针对大卖家map join 其他卖家正常join
SELECTt3.buyer_id,t3.seller_id,t3.order_num,t6.seller_level
FROM table_a t3
LEFT JOIN (SELECTseller_id,seller_levelFROM table_b t4LEFT JOIN temp_b t5ON t4.seller_id = t5.seller_idWHERE t5.seller_id is null) t6
ON t3.seller_id = t6.seller_id
) t7
GROUP BY t7.buyer_id;

【4】join发生数据倾斜通用方法

解决方法:随机前缀和RDD扩容

sql实现方法:

1、将大表的倾斜key筛选出来,随机打上前后缀,假如为1-10的数字。

2、将较小表cross join(笛卡尔积) 1-10数字共10行,再通过拼接,形成与大表对应的前后缀。(假如较小表数据量为10万,此时扩容至100万)

3、此时将筛选过的大表与扩容的较小表关联,关联成功后,通过字段加工复原。

4、将大表其他无倾斜数据与较小表关联,union all 前3步处理后得到的数据。

原理如下图

【5】空值产生的数据倾斜

解决方法:给空值进行随机赋值

场景:如日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和 用户表中的user_id 关联,会碰到数据倾斜的问题。

其他方法

1. 增加reduce 的jvm内存

2. 增加reduce 个数

set mapred.reduce.tasks = [num]

3.自定义partitioner

根据现在处理的这份数据,单独写一个适合的partitioner。比如现在是按省份进行汇总数据,如果只是简单的按省份去分(这并没有错),那么数据肯定会倾斜,因为各省的数据天然不一样。我们可以通过历史数据、抽样数据或者一些常识,对数据进行人工分区,让数据按照我们自定义的分区规则比较均匀的分配到不同的task中。

常见的分区方式:

  • 随机分区:每个区域的数据基本均衡,简单易用,偶尔出现倾斜,但是特征同样也会随机打散。
  • 轮询分区:绝对不会倾斜,但是需要提前预知分成若干份,进行轮询。
  • hash散列:可以针对某个特征进行hash散列,保证相同特征的数据在一个区,但是极容易出现数据倾斜。
  • 范围分区:需要排序,临近的数据会被分在同一个区,可以控制分区数据均匀。

http://chatgpt.dhexx.cn/article/212489Du.shtml

相关文章

数据倾斜问题

一、什么是数据倾斜 简单来说,就是在数据计算的时候,数据会分配到不同的task上执行,当数据分配不均匀导致某些大批量数据分配到某几个task上就会造成计算不动或者异常的情况。 二、数据倾斜表现形式 1、大部分的task在计算的时候计算的特别…

数据倾斜常见原因和解决办法

数据倾斜在MapReduce编程模型中十分常见,多个节点并行计算,如果分配的不均,就会导致长尾问题(大部分节点都完成了任务,一直等待剩下的节点完成任务),本文梳理了常见的发生倾斜的原因以及相应的解…

数据倾斜产生,原因及其解决方案

目录 第七章 数据倾斜 7.1 数据倾斜的产生,表现与原因 7.1.1 数据倾斜的定义 7.1.2 数据倾斜的危害 7.1.3 数据倾斜发生的现象 7.2 数据倾斜倾斜造成的原因 7.3 几种常见的数据倾斜及其解决方案 7.3.1 空值引发的数据倾斜 7.3.2 不同数据类型引发的数据倾斜…

数据倾斜原理与解决方法

数据倾斜的概念 数据倾斜这四个字经常会在学习MapReduce中遇到。所谓数据分区,就是数据分区分布因为数据本身或者分区方法的原因变得极为不一致,大量的数据被划分到了同一个区。由于Reducer Task每次处理一个区的数据,这导致Reducer Task处理…

什么是缓存穿透、缓存雪崩、缓存击穿

缓存穿透 缓存穿透 :缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库,失去了缓存保护后端存储的意义。 解决方案 缓存空值 如果访问数据库后还未命中,则把一…

缓存穿透和缓存击穿

一、背景介绍 几乎所有互联网公司都采用缓存的方案来解决瞬时流量超高,或者长期流量过高的问题。但使用缓存存在风险——缓存穿透和缓存击穿:简单的讲就是如果该数据原本就不存在,那么就会发生缓存穿透;如果缓存内容因为各种原因…

缓存穿透,缓存雪崩,缓存击穿

一,缓存穿透 原因:一个请求来访问某个数据,发现缓存中没有,直接去DB中访问。此种情况就是穿透。(正常情况下缓存跟数据库中数据都是存在,异常情况下会导致) 特点:因传递了非法的key,导致缓存跟数据库中都无法查询 方…

如何避免缓存穿透、缓存击穿、缓存雪崩?

如何避免缓存穿透、缓存击穿、缓存雪崩? 缓存穿透 先来看一下缓存穿透,顾名思义,是指业务请求穿过了缓存层,落到持久化存储上。在大多数场景下,我们应用缓存是为了承载前端业务请求,缓存被击穿以后&#x…

缓存穿透、缓存击穿、缓存雪崩解决方案

微信搜索【程序员囧辉】,关注这个坚持分享技术干货的程序员。 前言 ​ 我一个QPS不到10的项目,天天问我缓存穿透、缓存击穿、缓存雪崩,我是真滴难。 可能大家经常会有这种感受,但是只要是面试要问的题目,就算用不上&…

缓存穿透 缓存击穿 缓存雪崩 这三者是什么 如何处理

通常我们使用缓存中间件的方式 将数据库的热点数据缓存到Redis中 尽量去缓存中查找数据,目的就是为了减轻数据库的压力 那什么是 缓存穿透,缓存击穿 与 缓存雪崩 呢 ? 缓存穿透 当Redis中不存在某个key时,将对数据库进行查询操作 但如果数据库也不存在 就会造成每一个请求即…

应对缓存击穿的解决方法

一.什么样的数据适合缓存? 分析一个数据是否适合缓存,我们要从访问频率、读写比例、数据一致性等要求去分析. 二.什么是缓存击穿 在高并发下,多线程同时查询同一个资源,如果缓存中没有这个资源,那么这些线程都会去数据库查找,对数据库造成极大压力,缓存失去存在的意义.打个…

redis缓存击穿

缓存击穿: 缓存击穿是指,针对某个访问非常频繁的热点数据的请求,无法在缓存中进行处理,紧接着,访问该数据的大量请求,一下子都发送到了后端数据库,导致了数据库压力激增,会影响数据…

【缓存】缓存穿透、缓存击穿、缓存雪崩及其解决方案

文章目录 缓存穿透缓存击穿缓存雪崩大量数据同时过期Redis 故障宕机 总结来源 用户的数据一般都是存储于数据库,数据库的数据是落在磁盘上的,磁盘的读写速度可以说是计算机里最慢的硬件了。 当用户的请求,都访问数据库的话,请求数…

Redis 缓存击穿,缓存穿透,缓存雪崩原因+解决方案

一、前言 在我们日常的开发中,无不都是使用数据库来进行数据的存储,由于一般的系统任务中通常不会存在高并发的情况,所以这样看起来并没有什么问题,可是一旦涉及大数据量的需求,比如一些商品抢购的情景,或者…

Redis缓存击穿、雪崩、穿透!(超详细)

缓存的击穿、穿透和雪崩应该是再熟悉不过的词了,也是面试常问的高频试题。 不过,对于这三大缓存的问题,有很多人背过了解决方案,却少有人能把思路给理清的。 而且,网络上仍然充斥着,大量不太靠谱的解决方案…

缓存穿透、缓存击穿、缓存雪崩如何应对

参考连接:redis避免缓存穿透为什么缓存空对象而不是null? - 知乎 缓存穿透:key对应的数据在数据源并不存在,每次针对此key的请求从缓存获取不到,请求都会到数据源,从而可能压垮数据源。比如用一个不存在的…

缓存穿透、缓存击穿、缓存雪崩区别和解决方案

一、缓存处理流程 前台请求,后台先从缓存中取数据,取到直接返回结果,取不到时从数据库中取,数据库取到更新缓存,并返回结果,数据库也没取到,那直接返回空结果。 二、缓存穿透 描述: …

缓存穿透,缓存雪崩,缓存击穿的超详解

文章目录 1、缓存穿透问题的解决思路2、缓存雪崩问题及解决思路3、缓存击穿问题及解决思路 1、缓存穿透问题的解决思路 缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库,失去了缓存的…

redis 缓存穿透,缓存击穿,缓存雪崩

虽然我们在使用 redis 缓存的时候非常的爽,它大大的提高了我们应用程序的性能和效率,尤其是数据查询方面,咱们不用直接去持久化的数据库中查询数据,而是到内存中查询数据即可 事物总是有两面的,用的爽的同时&#xff…

缓存穿透、缓存击穿、缓存雪崩的理解和解决方案

目录 一、缓存穿透 二、缓存击穿 三:缓存雪崩 在生产环境中,会因为很多的原因造成访问请求绕过了缓存,都需要访问数据库持久层,虽然对Redsi缓存服务器不会造成影响,但是数据库的负载就会增大,使缓存的作…