【Spark | SparkStreaming】

article/2025/10/12 14:11:56

原理

架构

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

实战

RDD 队列

val rddQueue = new mutable.QueueRDD[Int]

自定义数据源

用法及说明
需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。

class CustomerReceiver(host: String, port: Int) extends 
Receiver[String](StorageLevel.MEMORY_ONLY) {//最初启动的时候,调用该方法,作用为:读数据并将数据发送给 Sparkoverride def onStart(): Unit = {new Thread("Socket Receiver") {override def run() {receive()}}.start()}//读数据并将数据发送给 Sparkdef receive(): Unit = {//创建一个 Socketvar socket: Socket = new Socket(host, port)//定义一个变量,用来接收端口传过来的数据var input: String = null//创建一个 BufferedReader 用于读取端口传来的数据val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, 
StandardCharsets.UTF_8))//读取数据input = reader.readLine()//当 receiver 没有关闭并且输入数据不为空,则循环发送数据给 Sparkwhile (!isStopped() && input != null) {store(input)input = reader.readLine()}//跳出循环则关闭资源reader.close()socket.close()//重启任务restart("restart")}override def onStop(): Unit = {}
}

使用自定义的数据源采集数据

object FileStream {def main(args: Array[String]): Unit = {//1.初始化 Spark 配置信息
val sparkConf = new SparkConf().setMaster("local[*]")
.setAppName("StreamWordCount")//2.初始化 SparkStreamingContextval ssc = new StreamingContext(sparkConf, Seconds(5))
//3.创建自定义 receiver 的 Streaming
val lineStream = ssc.receiverStream(new CustomerReceiver("hadoop102", 9999))//4.将每一行数据做切分,形成一个个单词val wordStream = lineStream.flatMap(_.split("\t"))//5.将单词映射成元组(word,1)
val wordAndOneStream = wordStream.map((_, 1))//6.将相同的单词次数做统计val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _)//7.打印wordAndCountStream.print()//8.启动 SparkStreamingContextssc.start()ssc.awaitTermination()}
}

kafka数据源

Kafka 0-10 Direct 模式
1)需求:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计算,最终打印
到控制台。
2)导入依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version>
</dependency>
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.10.1</version></dependency>

3)编写代码


import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, 
LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object DirectAPI {def main(args: Array[String]): Unit = {//1.创建 SparkConfval sparkConf: SparkConf = new 
SparkConf().setAppName("ReceiverWordCount").setMaster("local[*]")//2.创建 StreamingContextval ssc = new StreamingContext(sparkConf, Seconds(3))//3.定义 Kafka 参数val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> 
"linux1:9092,linux2:9092,linux3:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" -> 
"org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> 
"org.apache.kafka.common.serialization.StringDeserializer")//4.读取 Kafka 数据创建 DStreamval kafkaDStream: InputDStream[ConsumerRecord[String, String]] = 
KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))//5.将每条消息的 KV 取出val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())//6.计算 WordCountvalueDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()//7.开启任务ssc.start()ssc.awaitTermination()}
}

查看 Kafka 消费进度

bin/kafka-consumer-groups.sh --describe --bootstrap-server linux1:9092 --group 
atguigu

DStream 转换

无状态转化操作

无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。
在这里插入图片描述

Transform

Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。

join

两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是
对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。

有状态转化操作

UpdateStateByKey

针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。
updateStateByKey 操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步:

  1. 定义状态,状态可以是一个任意的数据类型。
  2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更
    新。
    使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态

WindowOperations

Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
➢ 窗口时长:计算内容的时间范围;

Window 的操作
(1)window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream;
(2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
(3)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;
(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的 DStream 上调用此函数,会返回一个新(K,V)对的 DStream,此处通过对滑动窗口中批次数据使用 reduce 函数来整合每个 key 的 value 值。
(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的 reduce 值都是通过用前一个窗的 reduce 值来递增计算。
通过 reduce 进入到滑动窗口数据并”反向 reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对 keys 的“加”“减”计数。

DStream 输出

输出操作如下:
➢ print():在运行流程序的驱动结点上打印 DStream 中每一批次数据的最开始 10 个元素。这用于开发和调试。在 Python API 中,同样的操作叫 print()。
➢ saveAsTextFiles(prefix, [suffix]):以 text 文件形式存储这个 DStream 的内容。每一批次的存储文件名基于参数中的 prefix 和 suffix。”prefix-Time_IN_MS[.suffix]”。
➢ saveAsObjectFiles(prefix, [suffix]):以 Java 对象序列化的方式将 Stream 中的数据保存为SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]“. Python中目前不可用。
➢ saveAsHadoopFiles(prefix, [suffix]):将 Stream 中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]”。Python API 中目前不可用。
➢ foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream 的每一个RDD。其中参数传入的函数 func 应该实现将每一个 RDD 中数据推送到外部系统,如将
RDD 存入文件或者通过网络将其写入数据库。

优雅关闭

使用外部文件系统来控制内部程序关闭。


http://chatgpt.dhexx.cn/article/p9d1pGK3.shtml

相关文章

Spotfire数据分析案例分享——几个国家的人口抚养比趋势及对比(Dependency ratio trends and compare)

从世界银行官网上下载了少数几个国家从1960年至今的人口数据&#xff0c;用Spotfire进行了一些数据分析&#xff0c;对人口学的一些指标有了新的认识&#xff0c;感觉还是挺有意义的。 人口抚养比&#xff08;Dependency ratio&#xff09;的中文解释&#xff08;来自百度&…

Spotfire 的排名函数Rank和DenseRank

Rank、DenseRank 和 RankReal 函数 参考以下表格&#xff1a; 函数说明DenseRank(Arg1, Arg2, Arg3...)将返回选定列中各值的整数值排名。第一个参数是要进行排名的列。 可选参数是字符串&#xff0c;可决定使用升序排名 (默认) 还是降序排名。要检索排名 1 以获得最大值&…

Spotfire 使用IronPython脚本

脚本位置&#xff1a;文件->文档属性&#xff0c;新建即可。 1、使用IronPython检查数据表是链接到源还是嵌入在Spotfire中 from Spotfire.Dxp.Data import *tableDocument.Data.Tables["表名"]found,tableSettingsApplication.Document.Data.SaveSettings.TryGe…

Spotfire经验总结—累积百分比(帕累托图)的绘制方法

在Excel中绘制帕累托图&#xff0c;可使用“数据分析”模块进行绘制。 Spotfire中的图表类型中&#xff0c;并没有“帕累托图”这一选项&#xff0c;那么应该如何绘制呢&#xff1f; 1. 图表类型需选择“Combination Chart” 2. 需添加一列“Cumulative Sum”并修改公式为“S…

【保姆级】-spotfire服务端、客户端安装部署(V7.8)

文章目录 一、Spotfire安装简介架构模式 二、安装配置Spotfire服务器执行数据库初始化脚本运行setup-win64.exe附&#xff1a;配置工具bootstrap文件&#xff08;可跳过&#xff09;运行配置工具 三、部署包四、部署节点安装节点后到控制台进行信任节点 安装spotfire客户端安装…

spotfire中文使用教程含演示数据以及二次开发思路整理

文档下载地址&#xff1a; https://download.csdn.net/download/weixin_41607523/85214206?spm1001.2014.3001.5503 基于Echarts图表的开发思路&#xff1a; 通过html语言、javascript语言做出一个基于需求所开发组件的静态图表&#xff08;此处指手动填充数据&#xff0c;如…

Spotfire10.10.0 windows10安装与配置

Spotfire10.10.0 windows10安装与配置 Spotfire10.10.0官方文档&#xff1a;Spotfire10.10.0官方文档 目录 一 、准备工作 1 下载并解压 2 配置sqlserver数据库的基础信息二、 安装spotfire服务器并初始化配置 安装 初始配置 创建管理员账号 将客户端软件包部署到Spotf…

TIBCO Spotfire 入门指南

数据分析工具 - TIBCO Spotfire 入门指南&#xff08;一&#xff09; 文章目录 数据分析工具 - TIBCO Spotfire 入门指南&#xff08;一&#xff09;前言一、Sportfire 是什么&#xff1f;二、使用Sportfire 创建一个简单的数据分析模型1.设置分析&#xff08;加载数据&#xf…

文件上传漏洞总结(全)

文件上传漏洞 凡是存在上传文件的地方&#xff0c;都有可能存在文件上传漏洞&#xff0c;并不是说有文件上传就一定有文件上传漏洞。 原理 这主要看一些文件上传的代码有没有 严格限制用户上传的文件类型&#xff0c;比如&#xff0c;只可以上传.jpg|.png|.gif文件&#xff…

文件上传漏洞详解(CTF篇)

需要了解的前置知识&#xff1a; 1.什么是文件上传&#xff1a; 文件上传就是通过流的方式将文件写到服务器上文件上传必须以POST提交表单表单中需要 <input type"file" name"upload"> 2.一句话木马 <?php eval($_POST[a]) ?>其中eval就…

web漏洞——文件上传漏洞(upload-labs)

一、文件上传漏洞简介、原理、高危触发点、防御 文件上传漏洞简介&#xff1a; 由于程序员在对用户文件上传部分的控制不足或者处理缺陷&#xff0c;而导致的用户可以越过其本身权限向 服务器 上上传可执行的动态脚本文件。这里上传的文件可以是木马&#xff0c;病毒&#xff0…

Python中OpenCV+Numpy安装配置

1.安装OpenCV Numpy 安装OpenCV 下载地址&#xff1a;http://www.lfd.uci.edu/~gohlke/pythonlibs/ 安装Numpy 下载地址&#xff1a;https://pypi.python.org/pypi/numpy 根据python版本选择相应的opencv版本&#xff0c;例如在Windows64操作系统下python3.7.1…

python中numpy模块安装_numpy安装,python中怎样安装numpy模块

python中怎样安装numpy模块 1.在python官网https://pypi.python.org/pypi/numpy中找到安装的python版本对应的numpy版本。 例如: python版本是 下载的对应numpy版本是 2.将numpy下载到 python的安装目录下的scripts文件夹中 3.然后在cmd中执行以下命令 ip3.6 install D:j…

python之Numpy 安装

1、在windows下&#xff0c;ctrlR&#xff0c;进入运行&#xff1b; 2、进入到Python所在的目录&#xff0c;默认为c&#xff1a;\\Python27 3、运行命令 python -m pip install numpy 4、系统显示如下&#xff1a;collecting numpy,开始自动进行安装&#xff1b; 5、显示进…

numpy的安装与使用

文章目录 前言 一、numpy是什么&#xff1f; 1、Numpy的数据结构 1.1 元数据&#xff08;metadata&#xff09; 1.2 实际数据 2、ndarray数组对象的特点 3、Numpy的优点 二、安装numpy 三、开始使用 3.1 引入库 3.2 查看numpy安装版本 3.3 牛刀小试 3.4 numpy VS list 总结 前言…

【Numpy学习笔记1】numpy安装、维度、基本运算、改变数据形态

1.Numpy使用场景 需要批量处理数据的时候机器学习&#xff0c;人工智能这些需要进行海量数据运算处理的地方写游戏里面的物体运行逻辑时&#xff0c;经常涉及到矩阵、向量运算机器人模拟环境&#xff0c;背后的环境反馈信息&#xff0c;全是靠批量数据算出来的任何需要做统计的…

pythonnumpy官网_Numpy安装

问题引入&#xff1a;电脑里安装了从官网下载的python3.8.0,。先使用了菜鸟教程的方法2安装。https://www.runoob.com/numpy/numpy-install.html 发现产生错误。先是提示我的pip工具没有更新到最新版本&#xff0c;于是根据提示更新后&#xff0c;发现仍有错误&#xff0c;于是…

numpy的安装

安装 1.以管理员身份打开cmd 2.输入命令安装numpy插件 pip insatll numpy 3.使用以下命令查看是否安装成功 pip list 出现numpy就安装成功

如何在matlab坐标轴上输入希腊字符和开根号符号

1.以输入\mu为例&#xff0c;要将解释器选择为 tex&#xff0c;不过matlab默认的就是tex. xlabel(\mum,FontName,Arial,fontsize,24,Interpreter,tex); 效果如下图所示&#xff1a; 2.输入开根号&#xff0c;需要把解释器改为latex. clear,clc ylabel($Z_{0} \sqrt {\epsilo…

matlab之方程式求根

一.利用syms和solve() 利用syms和sym定义一个标志变量 syms x sym(‘x’) 2.求方程根&#xff1a; 求x-2y5与xy6的联立方程组的根&#xff1a; 3.对函数求微分 4.对函数求积分 5. fsolve()