Spark 开发入门

article/2025/10/9 3:21:58

文章目录

  • Spark是什么
    • DAG有向无环图
  • spark环境搭建
  • Spark开发
    • pyspark使用
    • python
      • Spark初始化
      • 创建RDD
        • 数据的读取和保存
          • 文本文件
          • Json文件
      • RDD的转换操作
      • RDD的行动操作
      • 集合操作
      • mysql读取

Spark是什么

整个Hadoop生态圈分为分布式文件系统HDFS、计算框架MapReduce以及资源调度框架Yarn。但是随着时代的发展,MapReduce其高强度的磁盘IO、网络通信频率以及写死了的使得其严重拖慢了整个生态圈的运行速度。Spark因此而生。

首先我们来讲一下原有的MapReduce是如拖慢节奏的:
对于一个统计词频的任务来说,map任务首先会分散统计在当前节点的词频信息,然后将这个临时信息落地到磁盘之中,之后reduce任务会从磁盘中读取这些数据进行混洗。混洗会把相同的键映射到同一个执行reduce任务的节点之中这个过程就会涉及到大量的网络通信。而Reduce任务执行完之后,会再次把数据落地到磁盘中。
在这里插入图片描述
可以看到一次MapReduce阶段就会经过至少两次网络IO+N次网络通信,这也是两个主要耗时的任务。特别是对于神经网络或者图计算这种需要频繁迭代的计算来说,这会涉及到大量的磁盘IO和网络通信。

Spark对此进行了改进,其通过DAG图和内存计算来改进上面的这些缺点。

DAG有向无环图

提到DAG有向无环图就需要先提及其基本组成元素RDD,又叫弹性分布式数据集。其会把所有中间环节的数据文件以某种统一的方式归纳、抽象出来,那么所有map与reduce就可以更流畅地衔接在一起。在具体的Spark开发过程中来讲,其就是对磁盘数据的描述信息,当遇到action操作的时候,Spark就会根据RDD中的信息对数据进行计算。

在RDD中主要有转换和行动两种操作。转换操作就会在多个RDD之间开始构建DAG有向无环图,其描述了RDD之间的关联关系。假如说,我们会从文本文件README.md中读取了数据创建了readme_rdd,这个就是有向无环图的首节点。之后我们加入要查找这个文件中包含“Python”这个单词的行,我们就会转换readme_rddpython_rdd
在这里插入图片描述
但是我们要记住一点:rdd是惰性求值的,转换只是记录对数据的操作关系的,并不会真正的进行运算。至于为什么这么做则是spark可以将一些操作合并到一起以减少计算数据的步骤。假如说我们启动rdd的行动操作,那么spark才会真正的根据这个DAG中的内容进行执行。


但是MapReduce并没有完全被Spark取代,因为内存容量问题Spark对于某些问题可能不能正常处理

spark环境搭建

使用以下命令下载并搭建spark环境

wget --no-check-certificate https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
tar zxvf spark-3.2.0-bin-hadoop3.2.tgz

之后就可以直接使用了


spar k需要运行在 jvm环境,所以需要 jdk1.8版本以上

Spark开发

pyspark使用

我们可以直接调用spark提供的pyspark来执行语句,其会默认初始化一个SparkContext对象sc,我们可以直接调用其方法创建rdd。

./bin/pyspark
lines = sc.textFile("README.md")
lines.count()

而我们想要写python脚本操作spark就需要使用spark提供的spark-submit组件:

./bin/spark-submit test.py

python

Spark初始化

要在python中使用Spark就需要先配置好SparkConf,其主要注意的是两个参数一个是Spark集群的Url以及这个驱动服务的名称。

from pyspark import SparkConf, rdd
from pyspark.context import SparkContext# spark配置
conf = SparkConf().setMaster("local").setAppName("app")

之后就可以根据这个SparkConf创建一个SparkContext对象。有了这个rdd我们就可以创建RDD并进行一系列操作了。

sc = SparkContext(conf=conf)

创建RDD

RDD的输入源主要有三种:

  • 本地文件系统
file = sc.textFile("file:[绝对路径]")

或者我们也可以向下面的例子一样,直接使用相对路径创建RDD:

lines = sc.textFile("README.md")
  • Amazon S3
    要在Spark中访问S3数据,首先就得把S3访问凭据设置为AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY环境变量
file = sc.textFile("s3n:[绝对路径]")
  • HDFS
file = sc.textFile("hdfs://[ip:port]/[绝对路径]")

当然除了从磁盘读取,其也可以转换内存中的数据成为RDD:

memory_rdd = sc.parallelize(["pandas","china","Chinese","chinese food"])

数据的读取和保存

创建RDD必然离不开数据的读取,其主要有以下几种文件格式可以读取:

  • 文本文件
  • JSON
  • SequenceFile
  • protobuf

目前暂且只介绍前面两种常用的文件格式。

文本文件

对于本地的文本文件,我们直接使用刚刚的textFile就可以了。同时pyspark还支持我们对整个文件夹进行操作,其会读取整个文件夹,之后以文件名作为Key,文件内容作为Value进行创建输入RDD。下面是个例子:

读取目录,这样会形成键值对<文件名,文件内容>
dir_rdd = sc.wholeTextFiles("file:///home/ik/software/spark/python_demo")
print(dir_rdd.keys().collect()) # 输出键

输出如下:

['file:/home/ik/software/spark/python_demo/pydemo.py', 
'file:/home/ik/software/spark/python_demo/README.md', 
'file:/home/ik/software/spark/python_demo/pair.py', 
'file:/home/ik/software/spark/python_demo/LocalFile.py', 
'file:/home/ik/software/spark/python_demo/JsonTest.json', 
'file:/home/ik/software/spark/python_demo/JsonTest1.json', 
'file:/home/ik/software/spark/python_demo/SparkPlus.py']

跟我们下面的结果是一致的,只是其不会递归读入文件夹里面的内容。
在这里插入图片描述
而保存文件则是调用saveAsTextFile这个方法,其会在路径下创建一个目录。

dir_rdd.keys().saveAsTextFile("file:///home/ik/software/spark/python_demo")

这个目录下会有四个文件,之后红箭头所指的文件才是文件中的内容。
在这里插入图片描述

Json文件

Json文件的接口本质上可文本文件是一致的,只不过我们需要调用Python的加载器。

json_rdd = sc.textFile("file:///home/ik/software/spark/python_demo/JsonTest1.json")
data = json_rdd.map(lambda x:json.loads(x))
print(data.collect())

RDD的转换操作

RDD是只读的,所以其转换操作其实是从父RDD中抽取一部分数据经过转换形成新的RDD。这其中主要是涉及到两个方法mapfilter

  • map:接收一个函数,把这个函数用于RDD的每个元素,将函数的返回结果作为RDD中元素的值
nums = sc.parallelize([1,2,3,4,5])
for number in nums.map(lambda x:x*x).collect():print(number)
  • filter:接收一个函数,用于筛选RDD中的数据
python_line = lines.filter(lambda line:"Python" in line)
print(python_line.first())
  • flatmap:其也接收一个函数,不过其会把原来的一个元素根据函数规则划分为两个元素
sentence = sc.parallelize(["hello world","hi world"])
words = sentence.flatMap(lambda line:line.split(" "))
print(words.collect())	# ['hello', 'world', 'hi', 'world']

RDD的行动操作

行动操作会真正的触发Spark的运算,主要是以下方法:

  • count:返回计数结果
lines = sc.textFile("README.md")
print(lines.count())
  • take:其接收一个整数操作,这个整数表示从RDD中抽取几个元素。之后我们可以对这些元素进行打印
# take获取10个数据
rdd_set = python_line.take(10)
for line in rdd_set:print(line)
  • collect:其会获取RDD中的所有元素,但是要注意如果对象过大会导致内存爆掉
# 传递函数,主要是filter
def filter_func(lines):return "Python" in linespython_line2 = lines.filter(filter_func)
for line in python_line2.collect():print(line)

集合操作

集合操作主要如下所示,就不多介绍了:

rdd1 = sc.parallelize([1,3,5,7,9,1])
rdd2 = sc.parallelize([2,4,6,8,1])
# distinct 去重
print(rdd1.distinct().collect())
# intersection 运行时只返回重复的元素
print (rdd1.intersection(rdd2).collect())
# subtract 返回只存在第一个rdd而不存在于第二个rdd中的元素
print(rdd1.subtract(rdd2).collect())
# cartesian 计算笛卡尔积
print(rdd1.cartesian(rdd2).collect())

mysql读取

  # mysql 配置(需要修改)prop = {'user': 'xxx', 'password': 'xxx', 'driver': 'com.mysql.cj.jdbc.Driver'}# database 地址(需要修改)url = 'jdbc:mysql://host:port/database'# 读取表data = spark.read.jdbc(url=url, table='tb_newCity', properties=prop)# 打印data数据类型print(type(data))# 展示数据data.show()# 关闭spark会话spark.stop()

http://chatgpt.dhexx.cn/article/UfsBJJD4.shtml

相关文章

Spark开发——Spark简介及入门

目录 什么是Spark&#xff1f; Spark有哪些特点和优势 1.计算速度 2.易用性 3.通用性 4.兼容性 Spark架构 Spark基本概念 Spark结构设计 使用Scala语言实现Spark本地词频统计 什么是Spark&#xff1f; Spark它是一个用于大规模数据处理的实时计算引擎。 Spark有哪些…

Spark开发指南

目 录 6 Spark开发指南 6.1 概述 6.2 开发环境准备 6.2.1 Java开发环境准备 6.2.2 Scala开发环境准备 6.3 开发指引 6.4 Java代码样例 6.5 Scala代码样例 6.6 对外接口 6.6.1 Java API 6.6.2 Scala API 6.6.3 Python API 6.6.4 Web UI 6.6.5 JDBC 6 Spark开发指南…

PIX飞控电流计设置

在 测量电池电压 一栏输入用电压表测得的电池电压&#xff0c;保存。

pixhawk飞控接口含义

官方文档&#xff1a;https://docs.px4.io/v1.9.0/en/flight_controller/pixhawk.html 1——spektrum DSM receiver2&#xff0c;3——远程通信口&#xff0c;接数传4——串口5——SPI6——电源口7——飞控的安全开关&#xff0c;长按启动解锁8——蜂鸣器9——串口10——GPS11—…

PIX飞控不能解锁问题总结

摘自&#xff1a;https://baijiahao.baidu.com/s?id1640767431717207814&wfrspider&forpc PIX飞控不能解锁问题总结 给力蹦小勇士 发布时间&#xff1a;19-08-0222:55 一、飞控故障或没校准 在地面站里飞行数据菜单查看报错。假如加速度计和地磁没校准&#xff0c;…

富斯i6航模遥控器配apm(pix)飞控mission planner疑难杂症解决策略(上)

提示&#xff1a;仅适用于新手入门参考。 目录 前言 在missionplanner调试遥控器出现信号异常&#xff0c;飞行调试出现操作异常如何处理&#xff0c;在硬件无损的前提下&#xff0c;如何进行简易调试&#xff0c;下文将介绍入门的处理办法。 一、切换飞行模式时突然出现油门…

Mission Planner初学者安装调试教程指南(APM或PIX飞控)3——连接与烧录

Mission Planner初学者安装调试教程指南&#xff08;APM或PIX飞控&#xff09;3——连接与烧录 目录 1.连接方式 2.烧录固件 1.连接方式 通常可以使用micro USB数据线直接连接APM&#xff08;pixhawk&#xff09;&#xff0c;将数据线一头接入电脑usb口&#xff0c;另一头接…

pixhawk飞控板的硬件构成

具体介绍 pixhawk是一款高性能的飞控板&#xff0c;它能用于固定翼&#xff0c;多旋翼&#xff0c;直升机&#xff0c;小车等多种应用。它能被用于研究用&#xff0c;玩耍用&#xff0c;甚至直接用于做产品。这款飞控其实是将PX4FMU和PX4IO做了一个封装&#xff0c;将两部分PC…

Mission Planner初学者安装调试教程指南(APM或PIX飞控)5——规划航点航线

目录 1.卫星地图上规划普通航点 2.曲线航点 3.规划多边形区域 4.环形航线 5. 曲线航线 6.设置网格 7.特殊航线 学习地面站&#xff0c;不可避免要触及英文指令。通过经常使用日常积累&#xff0c;可以熟练掌握各个指令的含义。serve the people heart and soul&#xff0…

OPENMV结合PIX飞控实现四轴定点 循迹 2017电赛

本文章代码已上传Github&#xff1a; https://github.com/Kevincoooool/2017_Follow 有兴趣的可以加个STAR 自从17年国赛之后&#xff0c;自己做了openmv&#xff0c;加了很多群&#xff0c;也了解到很多人都在想着这个题。 第一版 第二版 第三版 我们做国赛的时候实现了全…

飞行控制器Pixhawk简介

作者&#xff1a;华清远见讲师 Pixhawk是一款由PX4开源项目设计并由3DR公司制造生产的高级自动驾驶仪系统。其前身是APM&#xff0c;由于APM的处理器已经接近满负荷&#xff0c;没有办法满足更复杂的运算处理&#xff0c;所以硬件厂商采用了目前最新标准的32位ARM处理器&#x…

pixhawk飞控解锁方法

1. pixhawk飞控解锁方法是&#xff1a;油门(throttl)拉到最低,偏航角&#xff08;yaw&#xff09;拉到最右边。

如何用开源飞控PIXHAWK进行二次开发?

著作权归作者所有。 商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处。 作者&#xff1a;我是肉包子 链接&#xff1a;http://www.zhihu.com/question/38874663/answer/84239995 来源&#xff1a;知乎 以下所描述的都是针对px4原生固件&#xff0c;此外&#xff0…

APM和PIX飞控日志分析入门贴

我们在飞行中&#xff0c;经常会碰到各种各样的问题&#xff0c;经常有模友很纳闷&#xff0c;为什么我的飞机会这样那样的问题&#xff0c;为什么我的飞机会炸机&#xff0c;各种问题得不到答案是一件非常不爽的问题&#xff0c;在APM和PIX飞控中&#xff0c;都有记录我们整个…

开源飞控APM与PIXHAWK

一 APM 官网地址&#xff1a;http://ardupilot.org/ APM&#xff08;ArduPilotMega&#xff09; 是在2007年由DIY无人机社区&#xff08;DIY Drones&#xff09;推出的飞控产品&#xff0c;是当今最为成熟的开源硬件项目。APM基于Arduino的开源平台&#xff0c;对多处硬件做出了…

Mission Planner初学者安装调试教程指南(APM或PIX飞控)1——下载与版本

目录 1.概述 2.下载与版本 3.关于 ArduPilot wquav 1.概述 Misson Planner简称MP&#xff0c;图标为黑底大写白色字体MP加一个绿色固定翼飞机&#xff0c;是可以调试APM或者PIX飞控的地面站软件&#xff0c;可以运行在windows系统和Linux系统&#xff08;非直接安装&#x…

Pix4飞控常见问题解决方法(二)

一、无法解锁&#xff08;黄灯闪烁&#xff09; 无法解锁的原因会有多种&#xff0c;请按照如下步骤进行检查&#xff1a; 1、初始设置是否全部完成 a、机架类型选择是否正确&#xff0c;或者你根本就没有选择&#xff1f; 注意&#xff0c;新版本的飞控固件在默认参数情况下&…

Pix4飞控硬件平台框架(一)

硬件平台简介 本文只是为了让大家简单入门为主&#xff0c;所以我选择的硬件学习平台是Pixhawk系列的mRoPixhawk&#xff0c;兼容原始版本Pixhawk1&#xff0c;基于Pixhawk-project FMUv3开源硬件设计&#xff0c;修正了将原始版本flash限制在1MB这个bug&#xff0c;需要深入学…

Mission Planner中级应用(APM或PIX飞控)3——APM飞控安装双GPS测试 APM双GPS

目录 1.未得到答案和技术指导 2.第一次实验失败 3.完全废掉了解锁功能 4.调整RX/TX位置 5.成功解锁 6.广阔室外的探索 山重水复疑无路&#xff0c;柳暗花明又一村 ——Mission Planner中级应用&#xff08;APM或PIX飞控&#xff09;3——APM飞控安装双GPS测试 APM双GPS。…

Mission Planner初学者安装调试教程指南(APM或PIX飞控)4——校准加速度计、指南针、遥控器、设置飞行模式

目录 1.加速度计校准。 2.指南针校准。 3.遥控器校准。 安装完固件后&#xff0c;无人机并不能马上解锁起飞&#xff0c;必须进行校准加速度计、指南针、遥控器&#xff0c;下面就逐一进行校准&#xff0c;该环境使用的是APM2.8、Mission Planner1.3.70地面站软件。 1.加速…