Spark Parquet使用

article/2025/10/20 22:24:22

Spark SQL下的Parquet使用最佳实践和代码实战


  分类:


  1. 一、Spark SQL下的Parquet使用最佳实践

    1)过去整个业界对大数据的分析的技术栈的Pipeline一般分为以下两种方式:

    a)Data Source -> HDFS -> MR/Hive/Spark(相当于ETL)-> HDFS Parquet -> Spark SQL/Impala -> ResultService(可以放在DB中,也有可能被通过JDBC/ODBC来作为数据服务使用);

    b)Data Source -> Real timeupdate data to HBase/DB -> Export to Parquet -> Spark SQL/Impala -> ResultService(可以放在DB中,也有可能被通过JDBC/ODBC来作为数据服务使用);

    上述的第二种方式完全可以通过Kafka+Spark Streaming+Spark SQL(内部也强烈建议采用Parquet的方式来存储数据)的方式取代

    2)期待的方式:DataSource -> Kafka -> Spark Streaming -> Parquet -> Spark SQL(ML、GraphX等)-> Parquet -> 其它各种Data Mining等。

  2. 二、Parquet的精要介绍

    Parquet是列式存储格式的一种文件类型,列式存储有以下的核心优势:

    a)可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。

    b)压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如RunLength Encoding和Delta Encoding)进一步节约存储空间。

    c)只读取需要的列,支持向量运算,能够获取更好的扫描性能。

  3. 设计蓝图

    以上分解似乎完美,一起来看看“设计框架”或“蓝图”。

    算了,不解释了,图,自己看。

    Coding Style

    从Kafka Stream获取数据

    // 从Kafka Stream获取数据JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class,StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
    

    写入Parquet

    accessLogsDStream.foreachRDD(rdd -> {// 如果DF不为空,写入(增加模式)到Parquet文件DataFrame df = sqlContext.createDataFrame(rdd, ApacheAccessLog.class);if (df.count() > 0) {df.write().mode(SaveMode.Append).parquet(Flags.getInstance().getParquetFile());}return null;});
    

    创建Hive表

    使用spark-shell,获取Parquet文件, 写入一个临时表;

    scala代码如下:

    import sqlContext.implicits._val parquetFile = sqlContext.read.parquet("/user/spark/apachelog.parquet")parquetFile.registerTempTable("logs")
    

    复制schema到新表链接到Parquet文件。

    在Hive中复制表,这里你会发现,文件LOCATION位置还是原来的路径,目的就是这个,使得新写入的文件还在Hive模型中。

    我总觉得这个方法有问题,是不是哪位Hive高人指点一下,有没有更好的办法来完成这个工作?

    CREATE EXTERNAL TABLE apachelog LIKE logs STORED AS PARQUET LOCATION '/user/spark/apachelog.parquet';
    

    启动你的SparkThriftServer

    当然,在集群中启用ThriftServer是必须的工作,SparkThriftServer其实暴露的是Hive2服务器,用JDBC驱动就可以访问了。

    我们都想要的结果

    本博客中使用的SQL查询工具是SQuirreL SQL,具体JDBC配置方法请参照前面说的向左向右转。

    结果看似简单,但是经历还是很有挑战的。

    至此,本例已完成。完成代码见 GitHub

    转自:https://blog.sectong.com/blog/spark_to_parquet.html

    APPMain.java

    [java]  view plain  copy
    1. package com.sectong.spark_to_parquet;  
    2.   
    3. import java.io.IOException;  
    4. import java.util.ArrayList;  
    5. import java.util.Arrays;  
    6. import java.util.HashMap;  
    7. import java.util.HashSet;  
    8. import java.util.List;  
    9.   
    10. import org.apache.commons.cli.Option;  
    11. import org.apache.commons.cli.Options;  
    12. import org.apache.spark.SparkConf;  
    13. import org.apache.spark.api.java.JavaSparkContext;  
    14. import org.apache.spark.api.java.function.Function;  
    15. import org.apache.spark.sql.DataFrame;  
    16. import org.apache.spark.sql.SQLContext;  
    17. import org.apache.spark.sql.SaveMode;  
    18. import org.apache.spark.streaming.api.java.JavaDStream;  
    19. import org.apache.spark.streaming.api.java.JavaPairInputDStream;  
    20. import org.apache.spark.streaming.api.java.JavaStreamingContext;  
    21. import org.apache.spark.streaming.kafka.KafkaUtils;  
    22.   
    23. import kafka.serializer.StringDecoder;  
    24. import scala.Tuple2;  
    25. import scala.collection.Seq;  
    26.   
    27. /** 
    28.  * 运行程序,spark-submit --class "com.sectong.spark_to_parquet.AppMain" --master 
    29.  * yarn target/park_to_parquet-0.0.1-SNAPSHOT.jar --kafka_broker 
    30.  * hadoop1:6667,hadoop2:6667 --kafka_topic apache --parquet_file /user/spark/ 
    31.  * --slide_interval 30 
    32.  */  
    33. public class AppMain {  
    34.   
    35.     public static final String WINDOW_LENGTH = "window_length";  
    36.     public static final String SLIDE_INTERVAL = "slide_interval";  
    37.     public static final String KAFKA_BROKER = "kafka_broker";  
    38.     public static final String KAFKA_TOPIC = "kafka_topic";  
    39.     public static final String PARQUET_FILE = "parquet_file";  
    40.   
    41.     private static final Options THE_OPTIONS = createOptions();  
    42.   
    43.     private static Options createOptions() {  
    44.         Options options = new Options();  
    45.   
    46.         options.addOption(new Option(WINDOW_LENGTH, true"The window length in seconds"));// 窗口大小  
    47.         options.addOption(new Option(SLIDE_INTERVAL, true"The slide interval in seconds"));// 计算间隔  
    48.         options.addOption(new Option(KAFKA_BROKER, true"The kafka broker list")); // Kafka队列  
    49.         options.addOption(new Option(KAFKA_TOPIC, true"The kafka topic"));// TOPIC  
    50.         options.addOption(new Option(PARQUET_FILE, true"The parquet file"));// 写入Parquet文件位置  
    51.         return options;  
    52.     }  
    53.   
    54.     public static void main(String[] args) throws IOException {  
    55.         Flags.setFromCommandLineArgs(THE_OPTIONS, args);  
    56.   
    57.         // 初始化Spark Conf.  
    58.         SparkConf conf = new SparkConf().setAppName("A SECTONG Application: Apache Log Analysis with Spark");  
    59.         JavaSparkContext sc = new JavaSparkContext(conf);  
    60.         JavaStreamingContext jssc = new JavaStreamingContext(sc, Flags.getInstance().getSlideInterval());  
    61.         SQLContext sqlContext = new SQLContext(sc);  
    62.   
    63.         // 初始化参数  
    64.         HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(Flags.getInstance().getKafka_topic().split(",")));  
    65.         HashMap<String, String> kafkaParams = new HashMap<String, String>();  
    66.         kafkaParams.put("metadata.broker.list", Flags.getInstance().getKafka_broker());  
    67.   
    68.         // 从Kafka Stream获取数据  
    69.         JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class,  
    70.                 StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);  
    71.   
    72.         JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {  
    73.             private static final long serialVersionUID = 5266880065425088203L;  
    74.   
    75.             public String call(Tuple2<String, String> tuple2) {  
    76.                 return tuple2._2();  
    77.             }  
    78.         });  
    79.   
    80.         JavaDStream<ApacheAccessLog> accessLogsDStream = lines.flatMap(line -> {  
    81.             List<ApacheAccessLog> list = new ArrayList<>();  
    82.             try {  
    83.                 // 映射每一行  
    84.                 list.add(ApacheAccessLog.parseFromLogLine(line));  
    85.                 return list;  
    86.             } catch (RuntimeException e) {  
    87.                 return list;  
    88.             }  
    89.         }).cache();  
    90.   
    91.         accessLogsDStream.foreachRDD(rdd -> {  
    92.   
    93.             // rdd to DataFrame  
    94.             DataFrame df = sqlContext.createDataFrame(rdd, ApacheAccessLog.class);  
    95.             // 写入Parquet文件  
    96.             df.write().partitionBy("ipAddress""method""responseCode").mode(SaveMode.Append).parquet(Flags.getInstance().getParquetFile());  
    97.   
    98.             return null;  
    99.         });  
    100.   
    101.         // 启动Streaming服务器  
    102.         jssc.start(); // 启动计算  
    103.         jssc.awaitTermination(); // 等待终止  
    104.     }  
    105. }  

    ApacheAccessLog.java

    [java]  view plain  copy
    1. package com.sectong.spark_to_parquet;  
    2.   
    3. import java.io.Serializable;  
    4. import java.util.regex.Matcher;  
    5. import java.util.regex.Pattern;  
    6.   
    7. /** 
    8.  * 解析Apache log 
    9.  */  
    10. public class ApacheAccessLog implements Serializable {  
    11.     /** 
    12.      *  
    13.      */  
    14.     private static final long serialVersionUID = 6681372116317508248L;  
    15.   
    16.     private String ipAddress;  
    17.     private String clientIdentd;  
    18.     private String userID;  
    19.     private String dateTimeString;  
    20.     private String method;  
    21.     private String endpoint;  
    22.     private String protocol;  
    23.     private int responseCode;  
    24.     private long contentSize;  
    25.   
    26.     private ApacheAccessLog(String ipAddress, String clientIdentd, String userID, String dateTime, String method,  
    27.             String endpoint, String protocol, String responseCode, String contentSize) {  
    28.         this.ipAddress = ipAddress;  
    29.         this.clientIdentd = clientIdentd;  
    30.         this.userID = userID;  
    31.         this.dateTimeString = dateTime;  
    32.         this.method = method;  
    33.         this.endpoint = endpoint;  
    34.         this.protocol = protocol;  
    35.         this.responseCode = Integer.parseInt(responseCode);  
    36.         if (contentSize.equals("-")) {  
    37.             this.contentSize = 0;  
    38.         } else {  
    39.             this.contentSize = Long.parseLong(contentSize);  
    40.         }  
    41.     }  
    42.   
    43.     public String getIpAddress() {  
    44.         return ipAddress;  
    45.     }  
    46.   
    47.     public String getClientIdentd() {  
    48.         return clientIdentd;  
    49.     }  
    50.   
    51.     public String getUserID() {  
    52.         return userID;  
    53.     }  
    54.   
    55.     public String getDateTimeString() {  
    56.         return dateTimeString;  
    57.     }  
    58.   
    59.     public String getMethod() {  
    60.         return method;  
    61.     }  
    62.   
    63.     public String getEndpoint() {  
    64.         return endpoint;  
    65.     }  
    66.   
    67.     public String getProtocol() {  
    68.         return protocol;  
    69.     }  
    70.   
    71.     public int getResponseCode() {  
    72.         return responseCode;  
    73.     }  
    74.   
    75.     public long getContentSize() {  
    76.         return contentSize;  
    77.     }  
    78.   
    79.     public void setIpAddress(String ipAddress) {  
    80.         this.ipAddress = ipAddress;  
    81.     }  
    82.   
    83.     public void setClientIdentd(String clientIdentd) {  
    84.         this.clientIdentd = clientIdentd;  
    85.     }  
    86.   
    87.     public void setUserID(String userID) {  
    88.         this.userID = userID;  
    89.     }  
    90.   
    91.     public void setDateTimeString(String dateTimeString) {  
    92.         this.dateTimeString = dateTimeString;  
    93.     }  
    94.   
    95.     public void setMethod(String method) {  
    96.         this.method = method;  
    97.     }  
    98.   
    99.     public void setEndpoint(String endpoint) {  
    100.         this.endpoint = endpoint;  
    101.     }  
    102.   
    103.     public void setProtocol(String protocol) {  
    104.         this.protocol = protocol;  
    105.     }  
    106.   
    107.     public void setResponseCode(int responseCode) {  
    108.         this.responseCode = responseCode;  
    109.     }  
    110.   
    111.     public void setContentSize(long contentSize) {  
    112.         this.contentSize = contentSize;  
    113.     }  
    114.   
    115.     // Example Apache log line:  
    116.     // 127.0.0.1 - - [21/Jul/2014:9:55:27 -0800] "GET /home.html HTTP/1.1" 200  
    117.     // 2048  
    118.     private static final String LOG_ENTRY_PATTERN =  
    119.             // 1:IP 2:client 3:user 4:date time 5:method 6:req 7:proto  
    120.             // 8:respcode 9:size  
    121.             "(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(\\S+) (\\S+) (\\S+)\" (\\d{3}) (\\S+)";  
    122.     private static final Pattern PATTERN = Pattern.compile(LOG_ENTRY_PATTERN);  
    123.   
    124.     public static ApacheAccessLog parseFromLogLine(String logline) {  
    125.         Matcher m = PATTERN.matcher(logline);  
    126.         if (!m.find()) {  
    127.             // logger.log(Level.ALL, "Cannot parse logline" + logline);  
    128.             throw new RuntimeException("Error parsing logline");  
    129.         } else {  
    130.             return new ApacheAccessLog(m.group(1), m.group(2), m.group(3), m.group(4), m.group(5), m.group(6),  
    131.                     m.group(7), m.group(8), m.group(9));  
    132.         }  
    133.   
    134.     }  
    135. }  

    Flags.java

    [java]  view plain  copy
    1. package com.sectong.spark_to_parquet;  
    2.   
    3. import org.apache.commons.cli.CommandLine;  
    4. import org.apache.commons.cli.CommandLineParser;  
    5. import org.apache.commons.cli.Options;  
    6. import org.apache.commons.cli.ParseException;  
    7. import org.apache.commons.cli.PosixParser;  
    8. import org.apache.spark.streaming.Duration;  
    9.   
    10. public class Flags {  
    11.     private static Flags THE_INSTANCE = new Flags();  
    12.   
    13.     private Duration windowLength;  
    14.     private Duration slideInterval;  
    15.     private String kafka_broker;  
    16.     private String kafka_topic;  
    17.     private String parquet_file;  
    18.   
    19.     private boolean initialized = false;  
    20.   
    21.     private Flags() {  
    22.     }  
    23.   
    24.     public Duration getWindowLength() {  
    25.         return windowLength;  
    26.     }  
    27.   
    28.     public Duration getSlideInterval() {  
    29.         return slideInterval;  
    30.     }  
    31.   
    32.     public String getKafka_broker() {  
    33.         return kafka_broker;  
    34.     }  
    35.   
    36.     public String getKafka_topic() {  
    37.         return kafka_topic;  
    38.     }  
    39.   
    40.     public String getParquetFile() {  
    41.         return parquet_file;  
    42.     }  
    43.   
    44.     public static Flags getInstance() {  
    45.         if (!THE_INSTANCE.initialized) {  
    46.             throw new RuntimeException("Flags have not been initalized");  
    47.         }  
    48.         return THE_INSTANCE;  
    49.     }  
    50.   
    51.     public static void setFromCommandLineArgs(Options options, String[] args) {  
    52.         CommandLineParser parser = new PosixParser();  
    53.         try {  
    54.             CommandLine cl = parser.parse(options, args);  
    55.             // 参数默认值  
    56.             THE_INSTANCE.windowLength = new Duration(  
    57.                     Integer.parseInt(cl.getOptionValue(AppMain.WINDOW_LENGTH, "30")) * 1000);  
    58.             THE_INSTANCE.slideInterval = new Duration(  
    59.                     Integer.parseInt(cl.getOptionValue(AppMain.SLIDE_INTERVAL, "5")) * 1000);  
    60.             THE_INSTANCE.kafka_broker = cl.getOptionValue(AppMain.KAFKA_BROKER, "kafka:9092");  
    61.             THE_INSTANCE.kafka_topic = cl.getOptionValue(AppMain.KAFKA_TOPIC, "apache");  
    62.             THE_INSTANCE.parquet_file = cl.getOptionValue(AppMain.PARQUET_FILE, "/user/spark/");  
    63.             THE_INSTANCE.initialized = true;  
    64.         } catch (ParseException e) {  
    65.             THE_INSTANCE.initialized = false;  
    66.             System.err.println("Parsing failed.  Reason: " + e.getMessage());  
    67.         }  
    68.     }  
    69. }  



http://chatgpt.dhexx.cn/article/p4BJve0t.shtml

相关文章

Arrow 之 Parquet

Parquet-format 左边是文件开头及具体的数据&#xff0c; 右边是文件结尾的 Footer Metadata There are three types of metadata: file metadata, column (chunk) metadata and page header metadata. All thrift structures are serialized using the TCompactProtocol. Co…

parquet存入mysql_解密列存 parquet

在做数据分析的时候,相对于传统关系型数据库,我们更倾向于计算列之间的关系。在使用传统关系型数据库时,基于此的设计,我们会扫描很多我们并不关心的列,这导致了查询效率的低下,大部分数据库 io 比较低效。因此目前出现了列式存储。Apache Parquet 是一个列式存储的文件格…

Parquet原理

在互联网大数据应用场景下&#xff0c;通常数据量很大且字段很多&#xff0c; 但每次查询数据只针对其中的少数几个字段&#xff0c;这时候列式存储是极佳的选择。 列式存储要解决的问题&#xff1a; 把IO只给查询需要用到的数据 只加载需要被计算的列空间节省 列式的压缩效…

parquet--golang使用

github 其实如果不适用一些可视化工具解析parquet文件&#xff0c;不太好看parquet文件内部正常应该是什么样的。但是使用一些可视化工具的话&#xff0c;可以发现&#xff0c;parquet文件会像表格&#xff0c;如excel文件&#xff0c;csv文件那样&#xff0c;排列数据。通过结…

Parquet

动机 创建Parquet是利用压缩性,高效的列式存储来在Haddop生态圈任何项目中应用. 记住Parquet是构建在复杂嵌套的数据结构, 并且使用记录分解和集成的算法在Dremely论文中描述.我们相信这种方法是更强大的的可以非常简单的使嵌套命令空间的扁平化. Parquet构建可以非常高效的…

Parquet 存储格式

1.介绍 Apache Parquet 是 Hadoop 生态圈中一种新型列式存储格式&#xff0c;它可以兼容 Hadoop 生态圈中大多数计算框架(Mapreduce、Spark 等)&#xff0c;被多种查询引擎支持&#xff08;Hive、Impala、Drill 等&#xff09;&#xff0c;并且它是语言和平台无关的。 2.特点…

parquet 简介

参考文章&#xff1a;parquet 简介 Parquet原理 【2019-05-29】Parquet 简介 Apache Parquet是一种能够有效存储嵌套数据的列式存储格式。 面向分析型业务的列式存储格式 由 Twitter 和 Cloudera 合作开发&#xff0c;2015 年 5 月从 Apache 的孵化器里毕业成为 Apache 顶…

Parquet文件详解

1、parquet文件简介 Apache Parquet是Apache Hadoop生态系统的一种免费的开源面向列的数据存储格式。 它类似于Hadoop中可用的其他列存储文件格式&#xff0c;如RCFile格式和ORC格式。 Apache Parquet 是由 Twitter 和 Cloudera 最先发起并合作开发的列存项目&#xff0c;也是…

Gson解析json数据

gson是谷歌推出的&#xff0c;除此之外还有阿里的FastJson&#xff0c;官方json和jackjson。下面通过一个实例来讲解使用gson来解析json数据&#xff1a; 1.先做好准备工作&#xff0c;在网上下载Gson的jar包&#xff0c;放到工程的libs(没有此目录的话自己建一个)目录下: ht…

Android Gson解析json

前言&#xff1a; 解析json的库有很多&#xff0c;如&#xff1a;JSON-Java、Gson、Jackson、FastJson…而Gson是谷歌的&#xff0c;相信自有它的好处 简介 用于json与java对象之间的转换通过 序列化和反序列化 实现功能强大&#xff0c;稳定性也好 使用 Gson提供了两个方…

Android 使用 Gson 解析 json 数据及生成

1.导入 Gson 包 第一种导入Gson 包的方式 在 app 文件下的 build.gradle 文件 导入 gson:2.9.1 包 implementation com.google.code.gson:gson:2.9.1第二种导入Gson 包的方式 直接去下载最新的 Gson 包 下载链接&#xff1a;gson.jar 选择最新的包进行下载 将下载的 gson…

用Gson解析json

首先我们需要导入gson的jar包&#xff0c;因为gson解析方法不是java官方的而是谷歌提供的。 一.把json数据转成java对象 首先因为已经手动导入了jar包&#xff0c;现在只需创建解析器对象&#xff0c;当然首先得有一个json类型的文件地址&#xff0c;和文件输出流 第二步调用…

Android --Gson解析json数据

Android --Gson解析json数据 private void analyseJson() throws Exception {InputStream isgetAssets().open("dataTest.json");ByteArrayOutputStream baosnew ByteArrayOutputStream();byte[] bytesnew byte[1024];int len;while ((lenis.read(bytes))!-1){baos.…

Gson解析json字符串

Gson 怎样使用gson把一个json字符串解析成一个jsonObject对象 因此我要把上面的fastjson转换成是gson&#xff0c;如下图&#xff1a; JsonObject object new JsonParser().parse(result).getAsJsonObject();怎样从gson中取出键的值 使用gson把json字符串转换成一个list集合 …

使用Gson解析Json数据

目录 一、Gson介绍 二、使用方法 完整代码&#xff1a; MainActivity: 布局&#xff1a; 运行结果&#xff1a; 一、Gson介绍 Gson是Google提供的一个Java库&#xff0c;用于将Java对象转换为JSON格式数据或将JSON格式数据转换为Java对象。 常用方法&#xff1a; 方法名…

用GSON解析Json格式数据

GSON是谷歌提供的开源库&#xff0c;用来解析Json格式的数据&#xff0c;非常好用。如果要使用GSON的话&#xff0c;则要先下载gson-2.2.4.jar这个文件&#xff0c;如果是在Android项目中使用&#xff0c;则在Android项目的libs目录下添加这个文件即可&#xff1b;如果是在Java…

Gson解析JSON

1.介绍 Gson是Google提供的处理JSON数据的Java类库&#xff0c;主要用于转换Java对象和JSON对象。 2.依赖 <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson --> <dependency><groupId>com.google.code.gson</groupId><artifac…

python 学习笔记—— #(井号)的作用

在Python语言中&#xff0c;经常看到#后面跟着一些文字。#的作用就是注释&#xff0c;用于解释代码是怎样的逻辑或者作用&#xff0c;方便自己或者别的程序员阅读代码时能够理解代码的意义。 例如 &#xff1a; 我们可以看到# &#xff08;井号&#xff09;跟着的文字是不会被程…

vue 输入网址后,url中自动出现井号#,如何去除

问题描述&#xff1a; 解决方法&#xff1a; 1.打开 2.找到 3.删除Hash 4.成功

URL中#号(井号)的作用

今天又看到了一篇非常好的来自HTTPWatch的文章&#xff0c;不得不推荐给大家。 1. 井号在URL中指定的是页面中的一个位置 井号作为页面定位符出现在URL中&#xff0c;比如&#xff1a;http://www.httpwatch.com/features.htm#print &#xff0c;此URL表示在页面features.htm中p…