spark开发教程

article/2025/10/9 3:17:04

spark开发教程

目录

  • spark开发教程
  • 前言
  • 一、初始化spark
  • 二、获取数据源
    • 1.创建数据结构
    • 2.连接外部数据
        • textfile
        • jdbc
        • hive
    • 3. 数据处理
        • rdd算子
            • transform算子
            • action算子
        • dataframe操作
        • dataset操作
    • 4. 共享变量
    • 5.写入数据
  • 总结


前言

spark开发主要的基于RDD、Datasets、DataFrame、sql 。其中rdd是最核心的底层,Datasets、DataFrame、sql都是基于rdd封装的高级api,dataframe是datasets的一种(类型为row)。


一、初始化spark

一个spark脚本的提交,会产生一个driver,如何通过把driver的运行逻辑传递给各个executor,就是通过sparkcontext。
在这里插入图片描述
SparkContext是与ClusterManager打交道的,clusterManager类似yarn的resourceManager负责资源的分配。

初始化脚本

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

Spark2.0中只要创建一个SparkSession就够了,SparkConf、SparkContext和SQLContext都已经被封装在SparkSession当中。
参考:spark原理

二、获取数据源

1.创建数据结构

  • RDD创建
val lines=sc.parallelize(List("pandas", "apple"))
  • DataFrame创建
 val df=spark.createDataFrame(Seq(("ming", 20, 15552211521L),("hong", 19, 13287994007L),("zhi", 21, 15552211523L))) toDF("name", "age", "phone")
  • DataSet创建
val person1 = new Person("Andy", 32);
val person2 = new Person("katy", 33);
import spark.implicits._
val javaBeanDS= spark.createDataset(List(person1,person2))

SparkSession内部封装了SparkContext,创建sparksession就可以了。

  • spark-sql
    spark-sql的使用,一般是直接使用sql,通过dataframe 转化为临时表

2.连接外部数据

textfile

可以从本地文件系统或者hdfs文件系统读取数据

spark.sparkContext.textFile(path,1)
  • 如果textFile指定分区数量为0或者1的话,defaultMinPartitions值为1,则有多少个文件,就会有多少个分区。

  • 如果不指定默认分区数量,则默认分区数量为2,则会根据所有文件字节大小totalSize除以分区数量partitons的值goalSize,然后比较goalSize和hdfs指定分块大小(这里是32M)作比较,以较小的最为goalSize作为切分大小,对每个文件进行切分,若文件大于大于goalSize,则会生成该文件大小/goalSize + 1个分区。

  • 如果指定分区数量大于等于2,则默认分区数量为指定值,生成分区数量规则同2中的规则。
    参考textFile解读

jdbc

val jdbcDF = spark.read.format("jdbc").options(Map("url" ->  "jdbc:mysql://localhost:3306/ontime?user=root&password=mysql","dbtable" -> "ontime.ontime_sm","fetchSize" -> "10000","partitionColumn" -> "yeard", "lowerBound" -> "1988", "upperBound" -> "2015", "numPartitions" -> "48")).load()

hive

    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")val session = SparkSession.builder().config(conf)// 指定hive的metastore的端口  默认为9083 在hive-site.xml中查看.config("hive.metastore.uris", "thrift://hadoop-01:9083,thrift://hadoop-02:9083")

3. 数据处理

rdd算子

transform算子
  1. map
  2. filter
  3. flatmap
  4. mapPartition和mapPartitionWithIndex
  5. sortBy和sortByKey
  6. groupBy和groupByKey
  7. reduceByKey、aggregateByKey、foldByKey、combineByKey
  8. distinct
  9. union
  10. intersection
  11. join、leftJoin 、rightJoin
  12. cogroup 类似fulloutJoin
  13. zip
action算子
  1. collect
  2. reduce
  3. fold
  4. aggregate
  5. count
  6. take top first
  7. foreach foreachPartition
  8. saveAsTextFile
  • map
    Return a new distributed dataset formed by passing each element of the source through a function func.

  • filter(func)
    Return a new dataset formed by selecting those elements of the source on which func returns true.

  • flatMap(func)
    Similar to map,but each input item can be mapped to 0 or more output items(so func should return a Seq rather than a single item).

  • union(otherDataset)
    Return a new dataset that contain the union of the elements in the source dataset and the argument.

  • join(otherDataset,[numTasks])
    When called on datasets of type(K,V) and (K,W),returns a dataset of (K,(V,W)) pairs with all pairs of elements for each key.Outer joins are supported leftOutJoin, rightOuterJoin,and fullOuterJoin.

  • intersection(otherDataset)
    Return a new RDD that contains the intersection of elements in the source dataset and the argument.

  • distinct([numTasks])
    Return a new dataset that contains the distinct elements of the source dataset.

  • groupByKey([numTasks])
    When called on a dataset of (K,V) pairs,returns a dataset of (K,Iterable) pairs.
    Note:If you are grouping in order to perform an aggregation(such as a sum or average) over each key,using reduceByKey or combineByKey will yield much better performance.

  • reduceByKey(func,[numTasks])
    When called on a dataset of (K,V) pairs,returns a dataset of (K,V) pairs where the values for each key are aggregated using the given reduce function func,which must be of type(V,V)=>V. Like in groupByKey,the number of reduce tasks is configurable through an optional second argument.

  • sortByKey([ascending],[numTasks])
    When called on a dataset of (K,V) pairs where K implements Ordered,return a dataset of (K,V) pairs sorted by keys in ascending or descending order,as specified in the boolean ascending argument.

  • cogroup
    For each key k in this or other, return a resulting RDD that contains a tuple with the
    list of values for that key in this as well as other.

dataframe操作

  • selectExpr
    Selects a set of SQL expressions. This is a variant of select that accepts
    The following are equivalent:
    ds.selectExpr(“colA”, “colB as newName”, “abs(colC)”)
    ds.select(expr(“colA”), expr(“colB as newName”), expr(“abs(colC)”))
    val df1=spark.createDataFrame(List(( "a" , 1) ,( "a" , 2) ,( "b" , 3),("a",1) ,( "b" , 4) ,("c" , 4))).toDF("a","b")df1.select("dfd","b").show()df1.selectExpr("concat(a,\"b\") as a","b+10").show()
  • select
    Selects a set of columns. This is a variant of select that can only select
    existing columns using column names (i.e. cannot construct expressions).
    ds.select(“colA”, “colB”)
    ds.select($“colA”, $“colB”)
    val df1=spark.createDataFrame(List(( "a" , 1) ,( "a" , 2) ,( "b" , 3),("a",1) ,( "b" , 4) ,("c" , 4))).toDF("a","b")df1.select("dfd","b").show()df1.selectExpr("concat(a,\"b\") as a","b+10").show()
  • group
    (Scala-specific) Compute aggregates by specifying the column names and
    aggregate methods. The resulting DataFrame will also contain the grouping columns.
    The available aggregate methods are avg, max, min, sum, count.
    // Selects the age of the oldest employee and the aggregate expense for each department
    df.groupBy(“department”).agg(
    “age” -> “max”,
    “expense” -> “sum”
    )
df.groupBy("department").agg("age" -> "max","expense" -> "sum")

其他类似RDD算子

dataset操作

dataset与dataframe的区别

4. 共享变量

  • 累加变量Accumulator
    val list1=spark.sparkContext.parallelize( List(( 'a' , 1) ,( 'a' , 2) ,( 'b' , 3),('a',1) ,( 'b' , 4) ,( 'c' , 4)),4)val accum1=spark.sparkContext.collectionAccumulator[String]("a")list1.foreachPartition(x=> accum1.add("123"))println(accum1.value)
  • 广播变量
	val temp=List(1,2,2,3,4)val broad1=spark.sparkContext.broadcast(temp)list1.foreachPartition(x=> println(broad1.value))

broadcast:通过调用SparkContext的broadcast()方法,来针对某个变量创建广播变量。然后在算子的函数内,使用到广播变量时,每个节点只会拷贝一份副本了。每个节点可以使用广播变量的value()方法获取值。记住,广播变量,是只读的。
Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能。但是确给我们提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。
参考

5.写入数据

总结

spark离线开发王者


http://chatgpt.dhexx.cn/article/aOqIhi9O.shtml

相关文章

Spark 开发总结

Spark 开发总结 前言spark UISpark API FunctionWindow.partitionBy Spark udfSpark 中禁止使用returnSpark NullPointExceptionSpark Shuffle FetchFailedExceptionspark 数据倾斜 前言 大数据开发过程中把自己积累的关于Spark的使用方法记录下来,便于不断的回顾和…

Spark 开发入门

文章目录 Spark是什么DAG有向无环图 spark环境搭建Spark开发pyspark使用pythonSpark初始化创建RDD数据的读取和保存文本文件Json文件 RDD的转换操作RDD的行动操作集合操作mysql读取 Spark是什么 整个Hadoop生态圈分为分布式文件系统HDFS、计算框架MapReduce以及资源调度框架Ya…

Spark开发——Spark简介及入门

目录 什么是Spark? Spark有哪些特点和优势 1.计算速度 2.易用性 3.通用性 4.兼容性 Spark架构 Spark基本概念 Spark结构设计 使用Scala语言实现Spark本地词频统计 什么是Spark? Spark它是一个用于大规模数据处理的实时计算引擎。 Spark有哪些…

Spark开发指南

目 录 6 Spark开发指南 6.1 概述 6.2 开发环境准备 6.2.1 Java开发环境准备 6.2.2 Scala开发环境准备 6.3 开发指引 6.4 Java代码样例 6.5 Scala代码样例 6.6 对外接口 6.6.1 Java API 6.6.2 Scala API 6.6.3 Python API 6.6.4 Web UI 6.6.5 JDBC 6 Spark开发指南…

PIX飞控电流计设置

在 测量电池电压 一栏输入用电压表测得的电池电压,保存。

pixhawk飞控接口含义

官方文档:https://docs.px4.io/v1.9.0/en/flight_controller/pixhawk.html 1——spektrum DSM receiver2,3——远程通信口,接数传4——串口5——SPI6——电源口7——飞控的安全开关,长按启动解锁8——蜂鸣器9——串口10——GPS11—…

PIX飞控不能解锁问题总结

摘自:https://baijiahao.baidu.com/s?id1640767431717207814&wfrspider&forpc PIX飞控不能解锁问题总结 给力蹦小勇士 发布时间:19-08-0222:55 一、飞控故障或没校准 在地面站里飞行数据菜单查看报错。假如加速度计和地磁没校准,…

富斯i6航模遥控器配apm(pix)飞控mission planner疑难杂症解决策略(上)

提示:仅适用于新手入门参考。 目录 前言 在missionplanner调试遥控器出现信号异常,飞行调试出现操作异常如何处理,在硬件无损的前提下,如何进行简易调试,下文将介绍入门的处理办法。 一、切换飞行模式时突然出现油门…

Mission Planner初学者安装调试教程指南(APM或PIX飞控)3——连接与烧录

Mission Planner初学者安装调试教程指南(APM或PIX飞控)3——连接与烧录 目录 1.连接方式 2.烧录固件 1.连接方式 通常可以使用micro USB数据线直接连接APM(pixhawk),将数据线一头接入电脑usb口,另一头接…

pixhawk飞控板的硬件构成

具体介绍 pixhawk是一款高性能的飞控板,它能用于固定翼,多旋翼,直升机,小车等多种应用。它能被用于研究用,玩耍用,甚至直接用于做产品。这款飞控其实是将PX4FMU和PX4IO做了一个封装,将两部分PC…

Mission Planner初学者安装调试教程指南(APM或PIX飞控)5——规划航点航线

目录 1.卫星地图上规划普通航点 2.曲线航点 3.规划多边形区域 4.环形航线 5. 曲线航线 6.设置网格 7.特殊航线 学习地面站,不可避免要触及英文指令。通过经常使用日常积累,可以熟练掌握各个指令的含义。serve the people heart and soul&#xff0…

OPENMV结合PIX飞控实现四轴定点 循迹 2017电赛

本文章代码已上传Github: https://github.com/Kevincoooool/2017_Follow 有兴趣的可以加个STAR 自从17年国赛之后,自己做了openmv,加了很多群,也了解到很多人都在想着这个题。 第一版 第二版 第三版 我们做国赛的时候实现了全…

飞行控制器Pixhawk简介

作者:华清远见讲师 Pixhawk是一款由PX4开源项目设计并由3DR公司制造生产的高级自动驾驶仪系统。其前身是APM,由于APM的处理器已经接近满负荷,没有办法满足更复杂的运算处理,所以硬件厂商采用了目前最新标准的32位ARM处理器&#x…

pixhawk飞控解锁方法

1. pixhawk飞控解锁方法是:油门(throttl)拉到最低,偏航角(yaw)拉到最右边。

如何用开源飞控PIXHAWK进行二次开发?

著作权归作者所有。 商业转载请联系作者获得授权,非商业转载请注明出处。 作者:我是肉包子 链接:http://www.zhihu.com/question/38874663/answer/84239995 来源:知乎 以下所描述的都是针对px4原生固件,此外&#xff0…

APM和PIX飞控日志分析入门贴

我们在飞行中,经常会碰到各种各样的问题,经常有模友很纳闷,为什么我的飞机会这样那样的问题,为什么我的飞机会炸机,各种问题得不到答案是一件非常不爽的问题,在APM和PIX飞控中,都有记录我们整个…

开源飞控APM与PIXHAWK

一 APM 官网地址:http://ardupilot.org/ APM(ArduPilotMega) 是在2007年由DIY无人机社区(DIY Drones)推出的飞控产品,是当今最为成熟的开源硬件项目。APM基于Arduino的开源平台,对多处硬件做出了…

Mission Planner初学者安装调试教程指南(APM或PIX飞控)1——下载与版本

目录 1.概述 2.下载与版本 3.关于 ArduPilot wquav 1.概述 Misson Planner简称MP,图标为黑底大写白色字体MP加一个绿色固定翼飞机,是可以调试APM或者PIX飞控的地面站软件,可以运行在windows系统和Linux系统(非直接安装&#x…

Pix4飞控常见问题解决方法(二)

一、无法解锁(黄灯闪烁) 无法解锁的原因会有多种,请按照如下步骤进行检查: 1、初始设置是否全部完成 a、机架类型选择是否正确,或者你根本就没有选择? 注意,新版本的飞控固件在默认参数情况下&…

Pix4飞控硬件平台框架(一)

硬件平台简介 本文只是为了让大家简单入门为主,所以我选择的硬件学习平台是Pixhawk系列的mRoPixhawk,兼容原始版本Pixhawk1,基于Pixhawk-project FMUv3开源硬件设计,修正了将原始版本flash限制在1MB这个bug,需要深入学…