Flink中的数据倾斜与解决方案实践

article/2025/8/25 17:44:39

什么是数据倾斜

在使用一些大数据处理框架进行海量数据处理的过程中,可能会遇到数据倾斜的问题,由于大数据处理框架本身架构的原因,在框架层面,数据倾斜问题是无法避免的,只能在业务层面来缓解或者避免。

因为要处理海量的数据,常用的大数据处理框架都会采用分布式架构,将海量数据分成多个小的分片,再将每个小分片分配给不同的计算节点来处理,通过对计算节点进行横向扩容,来快速提升框架的数据处理性能,因此即使是海量数据,也可以在较短的时间内完成处理,但是也正是由于这种架构设计,导致了数据倾斜问题的产生。

试想,如果小分片中的数据分布不均匀,有某个或者某几个小分片中包含了80%的数据量,那么处理这些分片的计算节点压力就会比较大,就会导致,整个分布式集群中,大部分节点是空闲的,只有某几个比较繁忙,无法使计算资源得到重复利用,最终导致框架的整体效率比较低。

如何解决数据倾斜

因为产生数据倾斜问题的直接原因就是数据分布不均匀,要解决这个问题最直接的方法就是:在业务层面将数据的分布变得均匀一些,让分布式集群中每个计算节点的资源得到重复利用。

因为数据分布不均匀是业务层面的问题,将数据分布变均匀的方案,也要结合业务场景来设计,下面我们以wordCount为例来演示以下数据倾斜问题,以及相应的解决方案。后面当遇到数据倾斜问题时,希望对你有一定的启发。

实践案例

下面是一个wordCount的程序的实现:

public class ShuffleWindowFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvBase.getStreamEnv(9091);DataStreamSource<Tuple2<String,Integer>> sourceStream = env.addSource(StreamExecutionEnvBase.getRandomStringSource(1000000, 1));(sourceStream.keyBy(new KeySelector<Tuple2<String,Integer>, String>() {@Overridepublic String getKey(Tuple2<String,Integer> value) throws Exception {return value.f0 ;}}).timeWindow(Time.seconds(5)).process(new ProcessWindowFunction<Tuple2<String,Integer>, String, String, TimeWindow>() {Map<String, Integer> map = new HashMap<>();@Overridepublic void process(String s, Context context, Iterable<Tuple2<String,Integer>> iterable, Collector<String> collector) throws Exception {for (Tuple2<String,Integer> tuple2 : iterable) {String key = tuple2.f0;if (map.containsKey(key)) {map.put(key, map.get(key) + 1);} else {map.put(key, 1);}}for (Map.Entry<String, Integer> entry : map.entrySet()) {collector.collect(String.format("key:%s,count:%s", entry.getKey(), entry.getValue()));}}})).setParallelism(8).print("total").setParallelism(8);env.execute("shuffle stream");}
}public class StreamExecutionEnvBase {public static StreamExecutionEnvironment getStreamEnv(Integer webUiPort) {Configuration conf = new Configuration();conf.setString("rest.port",String.valueOf(webUiPort));StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);return env;}public static SourceFunction<Tuple2<String,Integer>> getRandomStringSource(int count, Integer sleep) {return new SourceFunction<Tuple2<String,Integer>>() {Random random = new Random();String[] values = {"hadoop","flink","spark","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis","redis"};volatile boolean running = true;int c = count;@Overridepublic void run(SourceContext<Tuple2<String,Integer>> ctx) throws Exception {while (running && c-- > 0) {String target = values[c % values.length];ctx.collect(new Tuple2<>(target,random.nextInt(100)));TimeUnit.MILLISECONDS.sleep(sleep);}}@Overridepublic void cancel() {running = false;System.out.println("cancel job ...");}};}
}

上面的source代码中,word为"redis"的数据量比较大,明显多于其他word的数据量。此时,对于处理"reids"的subTask的压力会比较大,我们可以通过flink的监控来进行验证,具体如下图:
在这里插入图片描述


在上图中,可以发现subTask0处理的数据量,是其他SubTask的40倍左右,此时产生了明显的数据倾斜问题。
为了解决"redis"倾斜的问题,我们可以将"redis"生成key的过程进行优化,将生成的key进行"打散",具体实现过程如下:
 sourceStream.keyBy(new KeySelector<Tuple2<String,Integer>, String>() {@Overridepublic String getKey(Tuple2<String,Integer> value) throws Exception {if("redis".equals(value.f0))return value.f0 + value.f1;return value.f0;}}

这样word为redis的数据,生成的key就会被打散成多个,打散的多个key分散到不同的subTask中处理,这样数据倾斜的问题就得到了解决,执行结果如下图:

在这里插入图片描述

到这里,我们解决了数据倾斜的问题,但是细心的读者会发现,将key打散后,数据倾斜问题虽然解决了,但是sink到下游的数据量也变多了,也就是说,发送到下游的数据聚合度降低了,数据变得更散了,如下图:

优化前:在这里插入图片描述
优化后:

在这里插入图片描述

结合之前文章Flink中的Window计算-增量计算&全量计算,我们可以知道,发送下游数据量变多的原因:key变多了,每个key都会对应一个 ProcessWindowFunction 实例,也就是 ProcessWindowFunction 实例个数变得更多,聚合结果 "map"的聚合度就变小了,发送到下游的数据量也就变得更多了。

如果flink的下游是存储层,如mysql,那么大批量的数据写到mysql中,对mysql的并发处理能力和存储都会是巨大的挑战。

那么该如何解决这个问题呢?解决方案其实也很简单:对发送给下游的数据,进行二次聚合,将分散的数据再次聚合一下,具体实现,可以参考如下代码:

 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvBase.getStreamEnv(9091);DataStreamSource<Tuple2<String,Integer>> sourceStream = env.addSource(StreamExecutionEnvBase.getRandomStringSource(1000000, 1));(sourceStream.keyBy(new KeySelector<Tuple2<String,Integer>, String>() {@Overridepublic String getKey(Tuple2<String,Integer> value) throws Exception {if("redis".equals(value.f0))return value.f0 + value.f1;return value.f0;}}).timeWindow(Time.seconds(5)).process(new ProcessWindowFunction<Tuple2<String,Integer>, String, String, TimeWindow>() {Map<String, Integer> map = new HashMap<>();@Overridepublic void process(String s, Context context, Iterable<Tuple2<String,Integer>> iterable, Collector<String> collector) throws Exception {for (Tuple2<String,Integer> tuple2 : iterable) {String key = tuple2.f0;if (map.containsKey(key)) {map.put(key, map.get(key) + 1);} else {map.put(key, 1);}}for (Map.Entry<String, Integer> entry : map.entrySet()) {collector.collect(String.format("key:%s,count:%s", entry.getKey(), entry.getValue()));}}})).map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {String[] split = value.split(",");return new Tuple2<String,Integer>(split[0].split(":")[1],Integer.valueOf(split[1].split(":")[1]));}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}}).timeWindow(Time.seconds(5)).process(new ProcessWindowFunction<Tuple2<String,Integer>, String, String, TimeWindow>() {Map<String, Integer> map = new HashMap<>();@Overridepublic void process(String s, Context context, Iterable<Tuple2<String,Integer>> iterable, Collector<String> collector) throws Exception {for (Tuple2<String,Integer> tuple2 : iterable) {String key = tuple2.f0;if (map.containsKey(key)) {map.put(key, map.get(key) + 1);} else {map.put(key, 1);}}for (Map.Entry<String, Integer> entry : map.entrySet()) {collector.collect(String.format("key:%s,count:%s", entry.getKey(), entry.getValue()));}}}).setParallelism(8).print().setParallelism(4);env.execute("shuffle stream");}

执行效果,如下图所示:
在这里插入图片描述
此时,数据倾斜问题得到了解决,同时发送给下游的数据量也变小了。这时,可能还会有读者剔除疑问:二次聚合的过程中是否还会产生数据倾斜?答案是:会的。

只是这个数据倾斜的程度是可控的,因为第一次聚合后的数据量的最大值为:业务key的个数 * 离散度(可能的后缀的个数,在上面的例子中是:random.nextInt(100),也就是100种)。倾斜问题,不会随着数据量增大而增大,这种倾斜问题不会产生太大影响,基本可以忽略。


http://chatgpt.dhexx.cn/article/CYdcob0C.shtml

相关文章

spark处理数据倾斜的案例

在前期的工作遇到了很多数据倾斜的案例&#xff0c;在此记录下解决的心得 1) 大表join小表: 执行某段sql&#xff0c;出现了Executor OOM的现象&#xff0c;查看其stage的状况: 第3个stage读取了21.1G的数据&#xff0c;并shuffle写入了2.6G的数据&#xff0c;由于两个表根据字…

redis之数据倾斜如何处理

写在前面 我们在使用Redis分片集群时&#xff0c;集群最好的状态就是每个实例可以处理相同或相近比例的请求&#xff0c;但如果不是这样&#xff0c;则会出现某些实例压力特别大&#xff0c;而某些实例特别空闲的情况发生&#xff0c;本文就一起来看下这种情况是如何发生的以及…

实操 | Hive 数据倾斜问题定位排查及解决

Hive 数据倾斜怎么发现&#xff0c;怎么定位&#xff0c;怎么解决 多数介绍数据倾斜的文章都是以大篇幅的理论为主&#xff0c;并没有给出具体的数据倾斜案例。当工作中遇到了倾斜问题&#xff0c;这些理论很难直接应用&#xff0c;导致我们面对倾斜时还是不知所措。 本文首发在…

数据倾斜原理及解决方案

导读 相信很多接触MapReduce的朋友对数据倾斜这四个字并不陌生,那么究竟什么是数据倾斜?又该怎样解决这种该死的情况呢? 何为数据倾斜? 在弄清什么是数据倾斜之前,我想让大家看看数据分布的概念: 正常的数据分布理论上都是倾斜的&#xff0c;就是我们所说的20-80原理&…

spark 数据倾斜调优

数据倾斜应该算是一个比较麻烦的问题&#xff0c;笔者也是刚刚开始学习相关的调优&#xff0c;将看到的比较全面、清晰的几种解决方案整合了一下&#xff0c;并加上了一些理解与心得&#xff0c;供参考&#xff01; 首先&#xff0c;需要对spark执行计划有一定的基础与理解&am…

如何解决mysql数据倾斜_数据倾斜解决方案

1)聚合原数据(主要操作的是hive数据库中的数据&#xff0c;先通过hive sql将相同key的数据聚合成一条数据&#xff0c;再进行map操作) 当没办法聚合成一条数据时&#xff1a;增大key粒度&#xff0c;从而key的数量会减少&#xff0c;但是每个key对应的数据量会增大&#xff0c…

数据倾斜及其解决方式

数据倾斜是大数据领域绕不开的拦路虎&#xff0c;当你所需处理的数据量到达了上亿甚至是千亿条的时候&#xff0c;数据倾斜将是横在你面前一道巨大的坎。很可能有几周甚至几月都要头疼于数据倾斜导致的各类诡异的问题。 数据倾斜是指&#xff1a;mapreduce程序执行时&#xff0…

Hive 数据倾斜

数据倾斜&#xff0c;即单个节点任务所处理的数据量远大于同类型任务所处理的数据量&#xff0c;导致该节点成为整个作业的瓶颈&#xff0c;这是分布式系统不可能避免的问题。从本质来说&#xff0c;导致数据倾斜有两种原因&#xff0c;一是任务读取大文件&#xff0c;二是任务…

Spark数据倾斜解决

一、数据倾斜表现 数据倾斜就是数据分到各个区的数量不太均匀,可以自定义分区器,想怎么分就怎么分。 Spark中的数据倾斜问题主要指shuffle过程中出现的数据倾斜问题&#xff0c;是由于不同的key对应的数据量不同导致的不同task所处理的数据量不同的问题。 例如&#xff0c;redu…

大数据篇--数据倾斜

文章目录 一、什么是数据倾斜二、结合Shuffle1.结合mapreduce的shshuffle来说&#xff1a;&#xff08;1&#xff09;Shuffle 机制&#xff08;2&#xff09;Shuffle 阶段的优化 2.结合spark的shshuffle来说&#xff1a;&#xff08;1&#xff09;Spark任务调度&#xff1a;&am…

spark 数据倾斜

一. 数据倾斜的现象 1、spark中一个stage的执行时间受限于最后那个执行完的task&#xff0c;因此运行缓慢的任务会拖累整个程序的运行速度&#xff08;分布式程序运行的速度是由最慢的那个task决定的&#xff09;比如&#xff0c;总共有1000个task&#xff0c;997个task都在1分…

Spark处理数据倾斜问题

写在前面&#xff1a;有博主的文章写的很好&#xff0c;很详细&#xff0c;推荐&#xff01; 参考&#xff1a;Spark如何处理数据倾斜&#xff08;甚好&#xff0c;甚详细&#xff0c;很有逻辑&#xff0c;强推&#xff01;&#xff09; spark数据倾斜解决方案汇总 1、什么是数…

如何处理Spark数据倾斜

一、什么是数据倾斜 在分布式集群计算中&#xff0c;数据计算时候数据在各个节点分布不均衡&#xff0c;某一个或几个节点集中80%数据&#xff0c;而其它节点集中20%甚至更少数据&#xff0c;出现了数据计算负载不均衡的现象。 数据倾斜在MR编程模型中是十分常见的&#xff0…

数据倾斜

数据倾斜 转载声明 本文大量内容系转载自以下文章&#xff0c;有删改&#xff0c;并参考其他文档资料加入了一些内容&#xff1a; Spark性能优化指南——高级篇 作者&#xff1a;李雪蕤 出处&#xff1a;美团技术团队博客漫谈千亿级数据优化实践&#xff1a;数据倾斜&#x…

大数据常见问题:数据倾斜的原理及处理方案

什么是数据倾斜 Hadoop能够进行对海量数据进行批处理的核心&#xff0c;在于它的分布式思想&#xff0c;通过多台服务器&#xff08;节点&#xff09;组成集群&#xff0c;共同完成任务&#xff0c;进行分布式的数据处理。 理想状态下&#xff0c;一个任务是由集群下所有机器…

数据倾斜问题

一、什么是数据倾斜 简单来说&#xff0c;就是在数据计算的时候&#xff0c;数据会分配到不同的task上执行&#xff0c;当数据分配不均匀导致某些大批量数据分配到某几个task上就会造成计算不动或者异常的情况。 二、数据倾斜表现形式 1、大部分的task在计算的时候计算的特别…

数据倾斜常见原因和解决办法

数据倾斜在MapReduce编程模型中十分常见&#xff0c;多个节点并行计算&#xff0c;如果分配的不均&#xff0c;就会导致长尾问题&#xff08;大部分节点都完成了任务&#xff0c;一直等待剩下的节点完成任务&#xff09;&#xff0c;本文梳理了常见的发生倾斜的原因以及相应的解…

数据倾斜产生,原因及其解决方案

目录 第七章 数据倾斜 7.1 数据倾斜的产生&#xff0c;表现与原因 7.1.1 数据倾斜的定义 7.1.2 数据倾斜的危害 7.1.3 数据倾斜发生的现象 7.2 数据倾斜倾斜造成的原因 7.3 几种常见的数据倾斜及其解决方案 7.3.1 空值引发的数据倾斜 7.3.2 不同数据类型引发的数据倾斜…

数据倾斜原理与解决方法

数据倾斜的概念 数据倾斜这四个字经常会在学习MapReduce中遇到。所谓数据分区&#xff0c;就是数据分区分布因为数据本身或者分区方法的原因变得极为不一致&#xff0c;大量的数据被划分到了同一个区。由于Reducer Task每次处理一个区的数据&#xff0c;这导致Reducer Task处理…

什么是缓存穿透、缓存雪崩、缓存击穿

缓存穿透 缓存穿透 &#xff1a;缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在&#xff0c;这样缓存永远不会生效&#xff0c;这些请求都会打到数据库&#xff0c;失去了缓存保护后端存储的意义。 解决方案 缓存空值 如果访问数据库后还未命中&#xff0c;则把一…