Spark 实战 - 3.一文搞懂 parquet

article/2025/10/20 22:24:21

一.引用

parquet 文件常见于 Spark、Hive、Streamin、MapReduce 等大数据场景,通过列式存储和元数据存储的方式实现了高效的数据存储与检索,下面主要讲 parquet 文件在 spark 场景下的存储,读取与使用中可能遇到的坑。

二.Parquet 加载方式

1.SparkSession.read.parquet

SparkSession 位于 org.apache.spark.sql.SparkSession 类下,除了支持读取 parquet 的列式文件外,SparkSession 也支持读取 ORC 列式存储文件,可以参考: Spark 读取 ORC FIle

    val conf = new SparkConf().setAppName("ParquetInfo").setMaster("local")val spark = SparkSession.builder.config(conf).getOrCreate()spark.read.parquet(path).foreach(row => {val head = row.getString(0)println(head)})

读取后会获取一个 Sql.DataFrame,支持常见的 sql 语法操作,如果不想使用 sql 才做也可以通过 .rdd 的方法得到 RDD[Row],随后遍历每个 partition 下的 Iterator[Row] 即可。

Tips:

后续可以执行 sql 操作,当然也支持初始化 SqlContext 调用 sql 方法,不过用 SparkSession 也可以搞定。

    val parquetFileDF = spark.read.parquet("path")parquetFileDF.createOrReplaceTempView("tableName")val resultDf = spark.sql("SELECT * FROM tableName")val sqlContext = new SQLContext(sc)sqkContext.sql("xxx")

2.SparkContext.HadoopFile

使用 hadoopFile 读取时需要指定对应的 K-V 以及 InputFormat 的格式,Parquet  文件对应的 K-V 为 Void-ArrayWritable,其 InputFormat 为: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat,获取 ArrayWritable 后通过索引可以获得 Writable。

    val sc = spark.sparkContextsc.setLogLevel("error")val parquetInfo = sc.hadoopFile(path, classOf[MapredParquetInputFormat], classOf[Void], classOf[ArrayWritable])parquetInfo.take(5).foreach(info => {val writable = info._2.get()val head = writable(0)println(writable.length + "\t" + head)})

 Tips:

需要在 SparkConf 中加入序列化的配置,否则 hadoopFile 方法会报错:

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

writable 需要通过反序列化的方式才能再获取具体内容,所以这里推荐使用 SparkSession 的官方 api 读取,不过可以 RcFile SparkSession 暂不支持直接读取,所以可以用 sc.hadoopRdd 的方法读取同样列式存储的 RcFile 格式文件,可以参考: Spark 读取 RcFile

三.Parquet 存储方式

1.静态转换

Parquet -> Parquet,读取 parquet 生成 Sql.DataFrame 再转存,类似 RDD 的 transform:

    spark.read.parquet(path).write.mode(SaveMode.Overwrite).option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").format("parquet").save("/split")

2.RDD[T*] 转换

常规数据 RDD 可以通过加入 import sqlContext.implicits._ 隐式转换的方式由 RDD 转换为 sql.Dataframe,随后完成 parquet 的存储,下面掩饰一个 PairRDD 转换为 df 并存储的方法:

    import sqlContext.implicits._val commonStringRdd = sc.emptyRDD[(String, String)].toDF()commonStringRdd.write.mode(SaveMode.Overwrite).format("parquet").save("")

Tips:

SaveModel 分为 Append 追加、Overwrite 覆盖、ErrorIfExists 报错、Ignore 忽略四种模式,前两个比较好理解,后面两个前者代表如果地址已存在则报错,后者如果地址已存在则忽略且不影响原始数据。SaveModel 通过枚举 Enum 的方式实现:

详细的 RDD 转换 Sql.DataFrame 可以参考:Spark - RDD / ROW / sql.DataFrame 互转 。

3.RDD[Row] 转换

如果有生成的 RDD[Row] 就可以直接调用 sqlContext 将该 RDD 转换为 DataFrame。这里 TABLE_SCHEMA 可以看作是每一列数据的描述,类似 Hive 的 column 的信息,主要是字段名和类型,也可以添加额外的信息,sqlContext 将对应的列属性与 Row 一一匹配,如果 Schema 长度没有达到 Row 的总列数,则后续字段都只能读为 Null。

    val sqlContext = new SQLContext(sc)final val TABLE_SCHEME = StructType(Array(StructField("A", StringType),StructField("B", StringType),StructField("C", StringType),StructField("D", StringType),StructField("E", StringType),StructField("F", StringType),StructField("G", StringType),StructField("H", StringType)))val commonRowRdd = sc.emptyRDD[Row]sqlContext.createDataFrame(commonRowRdd, TABLE_SCHEME).write.mode(SaveMode.Overwrite).format("parquet").save("/split")

Tips:

使用上述语法读取时可能会报错: Illegal pattern component: XXX ,这是因为内部 DataFormat 解析的问题,在代码中加入 .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") 即可。

spark.read.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").parquet(path)

四.Parquet 浅析

Parquet 由于其开源,支持多平台多系统以及高效的存储和编码方案,使得其非常适合大数据场景下的任务开发,下面简单看下他的两个特性,列式存储和元数据存储:

1.列式存储 - 更小的 IO

CSV 是最常见的行式存储,对于一些需要单独特征或列的场景,如果是 CSV 文件需要遍历整行并分割,最终获取目标元素,而 Parquet 方式通过列式存储,对于单独的特征可以直接访问,从而提高了执行的效率,减少了数据 IO。

CSV: A,B,C,D,E -> Split(",")[col]
Parquet: A B C D E -> getString(col)

2.元数据存储 - 更高的压缩比

Parquet 采用多种编码 encoding 方式,保证数据的高效存储和低空间

A.Run Length encoding

游程编码,当一行的多列数据有很多重复数据时,可以通过 "X重复了N次" 的记录方法,缩小记录的成本,虽然 N 可能很大,但存储成本很小:

[1,2,1,1,1,1,2] -> 1-1,2-1,1-4,2-1

B.Dictionary encoding

字典编码,顾名思义就是通过映射,保存重复过多的数据,例如 "0" -> "LongString":

[LongString, LongString, LongString] -> [0, 0, 0]

C.Delta encoding

增量编码,适用于 unix 时间戳,时间戳记录为 1970年1月1日以来的秒数,存储时间戳时可以直接减去初始时间戳,减少存储量,比如 1577808000 作为基准,则可以减少很多存储空间:

[1577808000, 1577808004, 1577808008] -> [0, 4, 8]

 3.存储-压缩对比

    val st = System.currentTimeMillis()val pairInfo = (0 to 1000000).zipWithIndex.toArrayval format = "csv" // csv、json、parquetsc.parallelize(pairInfo).toDF("A", "B").write.mode(SaveMode.Overwrite).option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").save(s"./output/$format")val saveType = "gzip" // text、gzipsc.parallelize(pairInfo).saveAsTextFile(s"./output/$saveType", classOf[GzipCodec])val cost = System.currentTimeMillis() - stprintln(s"耗时: $cost")

使用上述两种方法分别将 0 到 1000000 的数组存到对应文件,看一下存储的大小:

类型TextGzipParquetCSVJSON
大小(MB)15.8 4.6813.823.8

相比于表格数据 CSV 和 JSON 存储,parquet 提供了更高的压缩比,Amazon S3 集群曾经对比过 CSV 与 parquet 的效率对比,使用 Parquet 可以缩减 87% 的大小,查询的速度快 34 倍 同时可以节省 99.7 的成本,所以在大数据量加经常需要个别列操作的场景下,Parquet 非常适合。

4.读取-效率对比

再分别读取上述文件:

    val csv = spark.read.csv(path + "/output/csv").rdd.count()val parquet = spark.read.parquet(path + "/output/parquet").rdd.count()val json = spark.read.json(path + "/output/json").count()val common = sc.textFile(path + "/output/common").count()val gz = sc.textFile(path + "/output/gzip").count()
类型TextGzipParquetCSVJSON
耗时(ms)14171448495268706766

相比 CSV,JSON 是有优势的,但是相对于行数存储的 Text 和 Gzip,执行 count 类的行统计操作显然不是列式存储文件的强项,所以相差很多,如果是大数据下针对某个或几个字段统计,Parquet 会提供相比于行式存储文件更高的性能。 

5.selectExpr

读取 Parquet 文件除了获取原始的字段内容外,也可以通过 selectExpr 操作获取更多额外的信息,方法位于 org.apache.spark.sql.functions 中,内部包含 collect_list 类似的聚合操作,也包含 count 类似的统计操作,还有 max、min、isnull 等等。

      spark.read.parquet(path).selectExpr("count('_c1')").rdd.foreach(row => {println(row.getLong(0))})

上述操作通过 selectExpr 获取了 count(_c1) 特征的数量,count Result:5383。

其中 _c1 为 Parquet 获取的 sql.DataFrame 的默认 schema,可以通过下述方法获取默认的 schema 信息:

      val schema = spark.read.parquet(path).schemaprintln(schema)

这里截取了一部分,特征名从 _c0 开始依次累加,默认为 _c0,_c1 ,如果自己定义了 schema 的 StructField ,使用 spark.read.schema().parqeut() 读出来的 sql.Dataframe 的 selectExpr 函数内操作使用的列名就要换成自己定义的名称,例如 _c1 我定义为 age,则上述写法要改为 count('age'),再使用 _c1 会报错。更详细的 schema 操作可以参考:Parquet 指定 schema

五.总结

Spark - Parquet 大致常用的内容就这些,SparkSession 集成了读取 parquet、orc 的 API 非常的便捷,有需要建议直接通过 API 读取而不是 HadoopRdd / HadoopFile 。最后想说 parquet 的命名确实很好玩,parquet 翻译为地板,而不定长的列名存储,如果通过平面展示也颇有地板的感觉。


http://chatgpt.dhexx.cn/article/oBwD2jgH.shtml

相关文章

Spark Parquet使用

Spark SQL下的Parquet使用最佳实践和代码实战 分类: spark-sql(1) 一、Spark SQL下的Parquet使用最佳实践 1)过去整个业界对大数据的分析的技术栈的Pipeline一般分为以下两种方式: a)Data Source -> HD…

Arrow 之 Parquet

Parquet-format 左边是文件开头及具体的数据, 右边是文件结尾的 Footer Metadata There are three types of metadata: file metadata, column (chunk) metadata and page header metadata. All thrift structures are serialized using the TCompactProtocol. Co…

parquet存入mysql_解密列存 parquet

在做数据分析的时候,相对于传统关系型数据库,我们更倾向于计算列之间的关系。在使用传统关系型数据库时,基于此的设计,我们会扫描很多我们并不关心的列,这导致了查询效率的低下,大部分数据库 io 比较低效。因此目前出现了列式存储。Apache Parquet 是一个列式存储的文件格…

Parquet原理

在互联网大数据应用场景下,通常数据量很大且字段很多, 但每次查询数据只针对其中的少数几个字段,这时候列式存储是极佳的选择。 列式存储要解决的问题: 把IO只给查询需要用到的数据 只加载需要被计算的列空间节省 列式的压缩效…

parquet--golang使用

github 其实如果不适用一些可视化工具解析parquet文件,不太好看parquet文件内部正常应该是什么样的。但是使用一些可视化工具的话,可以发现,parquet文件会像表格,如excel文件,csv文件那样,排列数据。通过结…

Parquet

动机 创建Parquet是利用压缩性,高效的列式存储来在Haddop生态圈任何项目中应用. 记住Parquet是构建在复杂嵌套的数据结构, 并且使用记录分解和集成的算法在Dremely论文中描述.我们相信这种方法是更强大的的可以非常简单的使嵌套命令空间的扁平化. Parquet构建可以非常高效的…

Parquet 存储格式

1.介绍 Apache Parquet 是 Hadoop 生态圈中一种新型列式存储格式,它可以兼容 Hadoop 生态圈中大多数计算框架(Mapreduce、Spark 等),被多种查询引擎支持(Hive、Impala、Drill 等),并且它是语言和平台无关的。 2.特点…

parquet 简介

参考文章:parquet 简介 Parquet原理 【2019-05-29】Parquet 简介 Apache Parquet是一种能够有效存储嵌套数据的列式存储格式。 面向分析型业务的列式存储格式 由 Twitter 和 Cloudera 合作开发,2015 年 5 月从 Apache 的孵化器里毕业成为 Apache 顶…

Parquet文件详解

1、parquet文件简介 Apache Parquet是Apache Hadoop生态系统的一种免费的开源面向列的数据存储格式。 它类似于Hadoop中可用的其他列存储文件格式,如RCFile格式和ORC格式。 Apache Parquet 是由 Twitter 和 Cloudera 最先发起并合作开发的列存项目,也是…

Gson解析json数据

gson是谷歌推出的,除此之外还有阿里的FastJson,官方json和jackjson。下面通过一个实例来讲解使用gson来解析json数据: 1.先做好准备工作,在网上下载Gson的jar包,放到工程的libs(没有此目录的话自己建一个)目录下: ht…

Android Gson解析json

前言: 解析json的库有很多,如:JSON-Java、Gson、Jackson、FastJson…而Gson是谷歌的,相信自有它的好处 简介 用于json与java对象之间的转换通过 序列化和反序列化 实现功能强大,稳定性也好 使用 Gson提供了两个方…

Android 使用 Gson 解析 json 数据及生成

1.导入 Gson 包 第一种导入Gson 包的方式 在 app 文件下的 build.gradle 文件 导入 gson:2.9.1 包 implementation com.google.code.gson:gson:2.9.1第二种导入Gson 包的方式 直接去下载最新的 Gson 包 下载链接:gson.jar 选择最新的包进行下载 将下载的 gson…

用Gson解析json

首先我们需要导入gson的jar包,因为gson解析方法不是java官方的而是谷歌提供的。 一.把json数据转成java对象 首先因为已经手动导入了jar包,现在只需创建解析器对象,当然首先得有一个json类型的文件地址,和文件输出流 第二步调用…

Android --Gson解析json数据

Android --Gson解析json数据 private void analyseJson() throws Exception {InputStream isgetAssets().open("dataTest.json");ByteArrayOutputStream baosnew ByteArrayOutputStream();byte[] bytesnew byte[1024];int len;while ((lenis.read(bytes))!-1){baos.…

Gson解析json字符串

Gson 怎样使用gson把一个json字符串解析成一个jsonObject对象 因此我要把上面的fastjson转换成是gson,如下图: JsonObject object new JsonParser().parse(result).getAsJsonObject();怎样从gson中取出键的值 使用gson把json字符串转换成一个list集合 …

使用Gson解析Json数据

目录 一、Gson介绍 二、使用方法 完整代码: MainActivity: 布局: 运行结果: 一、Gson介绍 Gson是Google提供的一个Java库,用于将Java对象转换为JSON格式数据或将JSON格式数据转换为Java对象。 常用方法: 方法名…

用GSON解析Json格式数据

GSON是谷歌提供的开源库,用来解析Json格式的数据,非常好用。如果要使用GSON的话,则要先下载gson-2.2.4.jar这个文件,如果是在Android项目中使用,则在Android项目的libs目录下添加这个文件即可;如果是在Java…

Gson解析JSON

1.介绍 Gson是Google提供的处理JSON数据的Java类库&#xff0c;主要用于转换Java对象和JSON对象。 2.依赖 <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson --> <dependency><groupId>com.google.code.gson</groupId><artifac…

python 学习笔记—— #(井号)的作用

在Python语言中&#xff0c;经常看到#后面跟着一些文字。#的作用就是注释&#xff0c;用于解释代码是怎样的逻辑或者作用&#xff0c;方便自己或者别的程序员阅读代码时能够理解代码的意义。 例如 &#xff1a; 我们可以看到# &#xff08;井号&#xff09;跟着的文字是不会被程…

vue 输入网址后,url中自动出现井号#,如何去除

问题描述&#xff1a; 解决方法&#xff1a; 1.打开 2.找到 3.删除Hash 4.成功