Spark序列化简介

article/2025/8/29 23:22:03

参考文章:Spark序列化

spark之kryo 序列化

Spark序列化入门

1. 什么是序列化和序列化?

  • 序列化是什么
  1. 序列化的作用就是可以将对象的内容变成二进制, 存入文件中保存
  2. 反序列化指的是将保存下来的二进制对象数据恢复成对象
  • 序列化对对象的要求
  1. 对象必须实现 Serializable 接口
  2. 对象中的所有属性必须都要可以被序列化, 如果出现无法被序列化的属性, 则序列化失败
  • 限制
  1. 对象被序列化后, 生成的二进制文件中, 包含了很多环境信息, 如对象头, 对象中的属性字段等, 所以内容相对较大
  2. 因为数据量大, 所以序列化和反序列化的过程比较慢
  • 序列化的应用场景
  1. 持久化对象数据
  2. 网络中不能传输 Java 对象, 只能将其序列化后传输二进制数据

2. Java序列化

Java序列化算法

  • Serialization(序列化)是一种将对象以一连串的字节描述的过程;
  • 反序列化deserialization是一种将这些字节重建成一个对象的过程。
  • Java序列化API提供一种处理对象序列化的标准机制。

为什么要进行序列化?

Java中,一切都是对象,在分布式环境中经常需要将Object从这一端网络或设备传递到另一端。
这就需要有一种可以在两端传输数据的协议。
Java序列化机制就是为了解决这个问题而产生。

如何进行序列化?

一个对象能够序列化的前提是实现Serializable接口,Serializable接口没有方法,更像是个标记。
有了这个标记的Class就能被序列化机制处理。

序列化过程:

a、实现Serializable接口

b、写个程序将对象序列化并输出(ObjectOutputStream能把Object输出成Byte流。将Byte流暂时存储到serial.out文件里)

c、使用ObjectInputStream从持久的文件中读取Bytes重建对象

3. Spark序列化

大部分Spark程序都具有“内存计算”的天性,所以集群中的所有资源:CPU、网络带宽或者是内存都有可能成为Spark程序的瓶颈。
通常情况下,如果数据完全加载到内存那么网络带宽就会成为瓶颈,但是你仍然需要对程序进行优化,
例如采用序列化的方式保存RDD数据以便减少内存使用。
数据序列化不但能提高网络性能还能减少内存使用。

几乎所有的资料都显示kryo 序列化方式优于java自带的序列化方式,而且在spark2.*版本中都是默认采用kryo 序列化

在spark2.0+版本的官方文档中提到:spark默认提供了两个序列化库:Java自身的序列化和Kryo序列化
官网的解释是:java序列化灵活,但是速度缓慢。Kryo序列化速度更快且更紧凑,但是支持的类型较少。

而且spark现在已经默认RDD在shuffle的时候对简单类型使用了Kryo序列化

3.1 Spark通过两种方式来创建序列化器

a. Java序列化

在默认情况下,Spark采用Java的ObjectOutputStream序列化一个对象。
该方式适用于所有实现了java.io.Serializable的类。
通过继承java.io.Externalizable,你能进一步控制序列化的性能。
Java序列化非常灵活,但是速度较慢,在某些情况下序列化的结果也比较大

b. Kryo序列化

Spark也能使用Kryo(版本2)序列化对象。
Kryo 是 Spark 引入的一个外部的序列化工具, 可以增快 RDD 的运行速度,
因为 Kryo 序列化后的对象更小, 序列化和反序列化的速度非常快
Kryo不但速度极快, 而且产生的结果更为紧凑(通常能提高10倍)。

Kryo的缺点是不支持所有类型, 为了更好的性能, 需要提前注册程序中所使用的类(class)。

3.2 如何使用kryo序列化

可以在创建SparkContext之前,通过调用System.setProperty("spark.serializer", "spark.KryoSerializer"),
将序列化方式切换成Kryo。
但是Kryo需要用户进行注册,这也是为什么Kryo不能成为Spark序列化默认方式的唯一原因,
但是建议对于任何“网络密集型”(network-intensive)的应用,
都采用这种方式进行序列化方式。
Kryo文档描述了很多便于注册的高级选项,例如添加用户自定义的序列化代码。

如果对象非常大,还需要增加属性spark.kryoserializer.buffer.mb的值。
该属性的默认值是32,但是该属性需要足够大以便能够容纳需要序列化的最大对象。

如果不注册相应的类,Kryo仍然可以工作,
但是需要为了每一个对象保存其对应的全类名(full class name),这非常浪费。

spark中已经包含了kryo库,使用kryo只需要注册即可。官网只提供了scala版本的,java版本的如下:

或者:System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

两者都可以用,但是测试好像没起到什么效果。于是需要手动注册:

上图是关于kryo 的一些配置,可以单独注册自己的一个类,如紫色框线部分;
也可以像红色框线部分一样,自定义一个接口实现类MyKryoRegistrator,在这个类里面将所需的类全部注册。具体操作如下图:
如果需要序列化的类太多,就在这里逐一列举即可,当然被注册的类要实现java.io.Serializable,即:class TestKryo implements Serializable

序列化的效果:

test程序验证效果:

3.5 测试结果:

三种不同情况下的的RDD大小:
默认不序列化:2017.0 KB
在MyKryoRegistrator中序列化: 960.2 KB
只序列化demo.TestKryo:1053.0 KB

4. Spark 序列化和反序列化的应用场景

a.Task 分发

在这里插入图片描述

Task 是一个对象, 想在网络中传输对象就必须要先序列化

b.RDD 缓存

val rdd1 = rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
rdd1.cache
rdd1.collect
  • RDD 中处理的是对象, 例如说字符串, Person 对象等
  • 如果缓存 RDD 中的数据, 就需要缓存这些对象
  • 对象是不能存在文件中的, 必须要将对象序列化后, 将二进制数据存入文件

c.广播变量

在这里插入图片描述

广播变量会分发到不同的机器上, 这个过程中需要使用网络, 对象在网络中传输就必须先被序列化

d. Shuffle 过程

在这里插入图片描述

  • Shuffle 过程是由 Reducer Mapper 中拉取数据, 这里面涉及到两个需要序列化对象的原因
  • RDD 中的数据对象需要在 Mapper 端落盘缓存, 等待拉取
  • Mapper Reducer 要传输数据对象
  • Spark Streaming 的 Receiver

Spark Streaming 中获取数据的组件叫做 Receiver, 获取到的数据也是对象形式, 在获取到以后需要落盘暂存, 就需要对数据对象进行序列化

在这里插入图片描述

d. 算子引用外部对象

class userserializable(i: Int)rdd.map(i => new Unserializable(i)).collect.foreach(println)
  • Map算子的函数中, 传入了一个 Unserializable 的对象
  • Map 算子的函数是会在整个集群中运行的, 那 Unserializable 对象就需要跟随 Map 算子的函数被传输到不同的节点上
  • 如果 Unserializable 不能被序列化, 则会报错

e. RDD 的序列化

在这里插入图片描述

 

  • RDD 的序列化

RDD 的序列化只能使用 Java 序列化器, 或者 Kryo 序列化器

  • 为什么?

RDD 中存放的是数据对象, 要保留所有的数据就必须要对对象的元信息进行保存, 例如对象头之类的 保存一整个对象, 内存占用和效率会比较低一些

  • Kryo序列化示例
  val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")conf.registerKryoClasses(Array(classOf[Person]))val sc = new SparkContext(conf)rdd.map(arr => Person(arr(0), arr(1), arr(2)))

f.DataFrame Dataset 中的序列化

历史的问题

RDD 中无法感知数据的组成, 无法感知数据结构, 只能以对象的形式处理数据

DataFrame Dataset 的特点:

  • DataFrame Dataset 是为结构化数据优化的
  • DataFrame Dataset 中, 数据和数据的 Schema 是分开存储的
spark.read.csv("...").where($"name" =!= "").groupBy($"name").map(row: Row => row).show()
  • DataFrame 中没有数据对象这个概念, 所有的数据都以行的形式存在于 Row 对象中, Row 中记录了每行数据的结构, 包括列名, 类型等

在这里插入图片描述

 

  • Dataset 中上层可以提供有类型的 API, 用以操作数据, 但是在内部, 无论是什么类型的数据对象 Dataset 都使用一个叫做 InternalRow 的类型的对象存储数据
val dataset: Dataset[Person] = spark.read.csv(...).as[Person]

总结

  1. 当需要将对象缓存下来的时候, 或者在网络中传输的时候, 要把对象转成二进制, 在使用的时候再将二进制转为对象, 这个过程叫做序列化和反序列化
  2. Spark 中有很多场景需要存储对象, 或者在网络中传输对象
    1. Task 分发的时候, 需要将任务序列化, 分发到不同的 Executor 中执行
    2. 缓存 RDD 的时候, 需要保存 RDD 中的数据
    3. 广播变量的时候, 需要将变量序列化, 在集群中广播
    4. RDD Shuffle 过程中 Map Reducer 之间需要交换数据
    5. 算子中如果引入了外部的变量, 这个外部的变量也需要被序列化
  3. RDD 因为不保留数据的元信息, 所以必须要序列化整个对象, 常见的方式是 Java 的序列化器, 和 Kyro 序列化器
  4. Dataset DataFrame 中保留数据的元信息, 所以可以不再使用 Java 的序列化器和 Kyro 序列化器, 使用 Spark 特有的序列化协议, 生成 UnsafeInternalRow 用以保存数据, 这样不仅能减少数据量, 也能减少序列化和反序列化的开销, 其速度大概能达到 RDD 的序列化的 20 倍左右

http://chatgpt.dhexx.cn/article/QQpeMFDF.shtml

相关文章

谈谈序列化的作用

文章目录 1. 写在前面2. 问题阐述3. 解释3.1 一些不够完整的解释3.2 一种完整的解释3.2.1 去地址3.2.2 节省空间 4. 小节参考链接 1. 写在前面 我们应该都用过各种序列化(serialization)的方法(如Python中的pickle.dumps)&#x…

Java 之 Serializable 序列化和反序列化的概念,作用的通俗易懂的解释

遇到这个 Java Serializable 序列化这个接口,我们可能会有如下的问题 a,什么叫序列化和反序列化 b,作用。为啥要实现这个 Serializable 接口,也就是为啥要序列化 c,serialVersionUID 这个的值到底是在怎么设置的&#…

cas 原理分析

CAS 原理分析 1、了解java中锁的类型 1.1 悲观锁(Pessimistic Lock) 顾名思义,就是很悲观,假定会发生并发冲突,屏蔽一切可能违反数据完整性的操作,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上…

JAVA中的CAS算法

java 中的线程之间的栈空间是相互独立,堆空间是共享的 V:内存值就是主内存中i值 A:预估值(期望值)就是子线程拿到主内存的值(读取到高速缓存中的值) B:更新值是子线程拿到i值后,修改i的值 假设有两个线程…

面试:CAS算法原理

1、什么是CAS? CAS:Compare and Swap,即比较再交换。 jdk5增加了并发包java.util.concurrent.*,其下面的类使用CAS算法实现了区别于synchronouse同步锁的一种乐观锁。JDK 5之前Java语言是靠synchronized关键字保证同步的&#x…

CAS原理详解

CAS介绍 CAS全称是Compare And Swap,它的实现和它的字面意思一样,先比较后交换,它是CPU硬件层面的一种指令,从CPU层面能保证"比较并更新"这一段操作的原子性。 与synchronized关键字比较不同是synchronized是一种悲观锁…

CAS算法与ABA问题

锁是用来做并发最简单的方式,当然代价也是最高的。 独占锁是一种悲观锁,synchronized就是一种独占锁;它假设最坏的情况,并且只有在确保其它线程不会造成干扰的情况下执行,会导致其它所有需要锁的线程挂起直到持有锁的…

CAS算法-实现原理

目录 CAS是什么? CAS解决了什么问题? CAS存在什么问题? CAS有哪些应用场景? cas的实现 最后 CAS是什么? CAS的全称为Compare and swap 比较并交换。CAS又经常被称为乐观锁,主要的三个变量,内存值…

并发策略-CAS算法

对于并发控制而言,我们平时用的锁(synchronized,Lock)是一种悲观的策略。它总是假设每一次临界区操作会产生冲突,因此,必须对每次操作都小心翼翼。如果多个线程同时访问临界区资源,就宁可牺牲性…

深入理解CAS算法原理

转载自 深入理解CAS算法原理 1、什么是CAS? CAS:Compare and Swap,即比较再交换。 jdk5增加了并发包java.util.concurrent.*,其下面的类使用CAS算法实现了区别于synchronouse同步锁的一种乐观锁。JDK 5之前Java语言是靠synchronized关键字保证…

CAS操作原理

1、什么是CAS? CAS:Compare and Swap,即比较再交换。 jdk5增加了并发包java.util.concurrent.*,其下面的类使用CAS算法实现了区别于synchronouse同步锁的一种乐观锁。JDK 5之前Java语言是靠synchronized关键字保证同步的,这是一…

CAS原理

一、CAS 1.1 CAS概述和作用 CAS的全称是: Compare And Swap(比较相同再交换)。是现代CPU广泛支持的一种对内存中的共享数据进行操作的一种特殊指令。 CAS的作用:CAS可以将比较和交换转换为原子操作,这个原子操作直接由CPU保证。 CAS可以保证…

CAS算法详解

CAS算法 1、CAS概念: CAS是CompareAndSwap的缩写,中文意思是:比较并替换。当要进行CAS操作时,先比较内存地址和原来预期的 地址比较,如果相同,表示这块内存地址没有被修改,可以用新地址替换&…

CAS的原理和使用

CAS 文章目录 CAS一、学习CAS首先了解原子类?1. 何为原子类 二、 CAS是什么1. CAS是什么2. CAS原理3. 使用CAS实例代码4. CAS属于硬件级别保证5. 源码分析 三、CAS底层原理?如果知道,谈谈你对UnSafe的理解1. UnSafe2. 我们知道i线程不安全的&…

对cas算法的理解

cas算法主要关心3个值:内存值V,预期值A,要更新的新值B 如下图所示: 注:t1,t2线程是同时更新同一变量56的值 因为t1和t2线程都同时去访问同一变量56,所以他们会把主内存的值完全拷贝一份到自己…

CAS原理分析

CAS的英文为Compare and Swap 翻译为比较并交换。 CAS加volatile关键字是实现并发包的基石。没有CAS就不会有并发包,synchronized是一种独占锁、悲观锁,java.util.concurrent中借助了CAS指令实现了一种区别于synchronized的一种乐观锁。 什么是乐观锁与…

CAS详解

一、CAS概念 1.1 CAS是什么 Compare And Swap 比较并交换 1. 如果线程的期望值跟物理内存的真实值一样,就更新值到物理内存当中,并返回true 2. 如果线程的期望值跟物理内存的真实值不一样,返回false,那么本次修改失败&#xf…

CAS算法的理解及应用

应用 原子操作类,例如AtomicInteger,AtomicBoolean …适用于并发量较小,多cpu情况下; Java中有许多线程安全类,比如线程安全的集合类。从Java5开始,在java.util.concurrent包下提供了大量支持高效并发访问…

解析CAS算法原理

解析CAS算法原理 什么是CAS?CAS原理概念实现形式底层原理 案例CAS的缺点ABA问题ABA问题如何产生的?原子的引用时间戳原子的引用利用AtomicStampedReference解决ABA问题案例 什么是CAS? CAS,全称Compare And Swap,顾名…

深入解析CAS算法原理

目录 一、CAS的基本概念二、CAS算法理解三、CAS开销四、CAS算法在JDK中的应用 一、CAS的基本概念 CAS:Compare and Swap,即比较再交换,是一种硬件对并发的支持,针对多处理器操作而设计的处理器中的一种特殊指令,用于管…