Flink-DataStream执行环境和数据读取

article/2025/9/16 19:33:40

​编辑执行环境

        创建执行环境

        执行模式

        触发程序执行 

源算子(Source)

        读取有界数据流

        读取无界数据

        读取自定义数据源(源算子)


DataStream是一个 Flink 程序,其实就是对 DataStream 的各种转换。具体来说,代码基本上都由以下几 部分构成:

  • 获取执行环境(Execution Environment)
  • 读取数据源(Source)
  • 定义基于数据的转换操作(Transformations)
  • 定义计算结果的输出位置(Sink)
  • 触发程序执行(Execute)

 执行环境

        创建执行环境

        编写Flink 程 序 的 第 一 步 , 就 是 创 建 执 行 环 境 。 我 们 要 获 取 的 执 行 环 境 , 是 StreamExecutionEnvironment 类的对象,这是所有 Flink 程序的基础。在代码中创建执行环境的 方式,就是调用这个类的静态方法,具体有以下三种:

1、getExecutionEnvironment(智能)

        就是直接调用 getExecutionEnvironment 方法。它会根据当前运行的上下文 直接得到正确的结果;也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的 运行环境。

val env = StreamExecutionEnvironment.getExecutionEnvironment

        这种“智能”的方式不需要我们额外做判断,用起来简单高效,是最常用的一种创建执行 环境的方式。 

 2、createLocalEnvironment(本地)

        这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果 不传入,则默认并行度就是本地的 CPU 核心数。

val localEnvironment = StreamExecutionEnvironment.createLocalEnvironment()

3.、createRemoteEnvironment(远程)

        这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定 要在集群中运行的 Jar 包。

val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("host", // JobManager 主机名1234, // JobManager 进程端口号"path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包
)

        执行模式

        在之前的 Flink 版本中,批处理的执行环境与流处理类似,是调用类 ExecutionEnvironment 的静态方法,并返回它的对象:

// 批处理环境
val batchEnv = ExecutionEnvironment.getExecutionEnvironment
// 流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
  1. 流执行模式(STREAMING) 这是 DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情 况下,程序使用的就是 STREAMING 执行模式。
  2. 批执行模式(BATCH) 专门用于批处理的执行模式, 这种模式下,Flink 处理作业的方式类似于 MapReduce 框架。 对于不会持续计算的有界数据,我们用这种模式处理会更方便。
  3. 自动模式(AUTOMATIC) 在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。 由于 Flink 程序默认是 STREAMING 模式,我们这里重点介绍一下 BATCH 模式的配置。 

        BATCH (批处理)模式的配置,主要有两种方式:

        1、通过命令行配置

        在提交作业时,增加 execution.runtime-mode 参数,指定值为 BATCH。

bin/flink run -Dexecution.runtime-mode=BATCH ...

        2、通过代码配置(不推荐)

        在代码中,直接基于执行环境调用 setRuntimeMode 方法,传入 BATCH 模式。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.BATCH)

        触发程序执行 

        我们需要显式地调用执行环境的 execute()方法,来触发程序执行。execute()方法将一直等 待作业完成,然后返回一个执行结果(JobExecutionResult)。 

env.execute()

源算子(Source)

        Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。一般将数据的输入 来源称为数据源,而读取数据的算子就是源算子(Source)。所以,Source 就是我们整个处理 程序的输入端。

        Flink 代码中通用的添加 Source 的方式,是调用执行环境的 addSource()方法,方法传入一个对象参数,需要实现 SourceFunction 接口,返回一个 DataStream。

//通过调用 addSource()方法可以获取 DataStream 对象
val stream = env.addSource(...)

        读取有界数据流

        案例需求:

        为了更好地理解,我们先构建一个实际应用场景。比如网站的访问操作,可以抽象成一个 三元组(用户名,用户访问的 url,用户访问 url 的时间戳),所以在这里,我们可以创建一个 类 Event,将用户行为包装成它的一个对象。Event 包含了以下一些字段

        定义样例类 

case class Event(user: String, url: String, timestamp: Long)

1、从元素中读取数据 

//从元素中读取数据val stream: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5, 6) //指定元素数据//创建当前样例类Event的对象val value1: DataStream[Event] = env.fromElements(Event("张三", "01", 1000L),Event("李四", "04", 2000L),Event("王五", "01", 6000L),Event("赵六", "03", 1000L))

 2、从集合中读取数据

 //从集合中读取数据//定义一个集合val e = List(Event("张三", "01", 1000L),Event("李四", "04", 2000L),Event("王五", "01", 6000L),Event("赵六", "03", 1000L))val value2: DataStream[Event] = env.fromCollection(e)

3、从文件读取数据

        真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中 获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。 

 

val value3: DataStream[String] = env.readTextFile("datas\\wc.txt")

        读取无界数据

1、从 Socket 读取数据 

        不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无 界的。一个简单的例子,就是我们之前用到的读取 socket 文本流。这种方式由于吞吐量小、 稳定性较差,一般也是用于测试。

val stream = env.socketTextStream("localhost", 7777)

2、从 Kafka 读取数据

        Flink 官方提供了连接工具 flink-connector-kafka,直接帮我们实现了一个消费者 FlinkKafkaConsumer,它就是用来读取 Kafka 数据的 SourceFunction。

        添加连接器依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>

        调用 env.addSource(),传入 FlinkKafkaConsumer 的对象实例即可:

def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置全局并行度env.setParallelism(1)//使用Properties对象保存kafa连接的相关配置val properties: Properties = new Properties()properties.setProperty("bootstrap.servers","master:9000")properties.setProperty("group.id","consumer-group")val stream: DataStream[String] = env.addSource( new FlinkKafkaConsumer[String]("clicks", new SimpleStringSchema(),properties))stream.print()env.execute()}

        读取自定义数据源(源算子)

        接下来我们创建一个自定义的数据源,实现 SourceFunction 接口。主要重写两个关键方法: run()和 cancel()。

  • run()方法:使用运行时上下文对象(SourceContext)向下游发送数据;
  • cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果。

创建自定义读取数据源类 

class f4 extends SourceFunction[ Event ]{ //实现SourceFunction接口 泛型为之前定义好的样例类Event//标志位var running = true//重写抽象方法override def run(sourceContext: SourceFunction.SourceContext[Event]): Unit = {//随机数生成器val random = new Random ()//定义数据随机选择的范围val user = Array ("张三", "李四", "王五")val url = Array ("02", "01", "03", "04")//用标志位作为循环判断条件,不停的发出数据while (running) {val event = Event (user (random.nextInt (user.length) ), url (random.nextInt (url.length) ), Calendar.getInstance.getTimeInMillis)//调用ctx的方法向下游发送数据sourceContext.collect (event)//每隔1秒发送一条数据Thread.sleep (1000)}}override def cancel(): Unit = running = false
}

测试类运行 

def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//全局并行度设置为1env.setParallelism(1)//读取自定义的数据源val ste: DataStream[Event] = env.addSource(new f4)//输出ste.print()//执行env.execute()}

执行效果:每隔1秒自动输出一行数据


http://chatgpt.dhexx.cn/article/Wy6rF9k6.shtml

相关文章

Flink数据流类型之间的转换(WindowedStream、DataStream、KeyedStream、AllWindowStream之间的转换)

Flink提供了一些流API&#xff0c;其中包括WindowedStream、DataStream、KeyedStream和AllWindowStream。 &#x1f34a;WindowedStream是一种特殊的流&#xff0c;其中数据已按时间或数据元素的键进行分组&#xff0c;并且每个分组的数据都在窗口中按时间划分。这意味着&…

DataStream API

目录 原算子 准备工作&#xff0c;环境搭建 读取数据 从文件中读取数据 从集合中读取数据 从元素中读取数据 从source文件中读取数据 从kafka中读取数据 自定义source类型输出 转换算子 map转换 Filter转换 FlatMap转换 原算子 准备工作&#xff0c;环境搭建 为…

Flink学习——DataStream API

一个flink程序&#xff0c;其实就是对DataStream的各种转换。具体可以分成以下几个部分&#xff1a; 获取执行环境&#xff08;Execution Environment&#xff09;读取数据源&#xff08;Source&#xff09;定义基于数据的转换操作&#xff08;Transformations&#xff09;定义…

大数据开发-Flink-数据流DataStream和DataSet

文章目录 一、DataStream的三种流处理Api1.1 DataSource1.2 Transformation1.3 Sink 二、DataSet的常用Api2.1 DataSource2.2 Transformation2.3 Sink Flink主要用来处理数据流&#xff0c;所以从抽象上来看就是对数据流的处理&#xff0c;正如前面大数据开发-Flink-体系结构 &…

Flink DataStream API 介绍

Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-JKeWa22W2vWA4zBS .error-icon{fill:#552222;}#mermaid-svg-JKeWa22W2vWA4z…

DataStream API介绍与使用(一)

详细API参考官网 DataStream编程模型 在Flink整个系统架构中&#xff0c;对流计算的支持是其最重要的功能之一&#xff0c;Flink基于Google提出的DataFlow模型&#xff0c;实现了支持原生数据流处理的计算引擎。Flink中定义了DataStream API让用户灵活且高效地编写Flink流式应…

DataStream API(一)

Flink 有非常灵活的分层 API 设计&#xff0c;其中的核心层就是 DataStream/DataSet API。由于新版 本已经实现了流批一体&#xff0c; DataSet API 将被弃用&#xff0c;官方推荐统一使用 DataStream API 处理流数 据和批数据。由于内容较多&#xff0c;我们将会用几章的篇幅来…

DataStream(二)

目录 5.3.2 聚合算子&#xff08;Aggregation&#xff09; 5.3.3 用户自定义函数&#xff08;UDF&#xff09; 3. 扁平映射&#xff08;flatMap&#xff09; flatMap 操作又称为扁平映射&#xff0c;主要是将数据流中的整体&#xff08;一般是集合类型&#xff09;拆分成一个 …

Flink DataStream API

Flink DataStream API 编程指南 概览前言什么是DataStreamFlink程序剖析程序样例 Data SourcesDataStream Transformations算子数据流转换算子物理分区算子链和资源组 Data Sinks迭代执行参数 概览 前言 Flink中的DataStream程序是常规程序&#xff0c;可对数据流进行转换&am…

DataStream API(三)

目录 5.3.4 物理分区&#xff08;Physical Partitioning&#xff09; 5.4 输出算子&#xff08;Sink&#xff09; 5.4.1 连接到外部系统 5.4.2 输出到文件 5.4.3 输出到 Kafka 5.4.4 输出到 MySQL&#xff08;JDBC&#xff09; 5.4.5 自定义 Sink 输出 5.5 本章总结 5.3.…

流式数据采集和计算(十):Flink的DataStream学习笔记

Flink的DataStream学习笔记.. 1 Flink 基础.. 3 Flink特性.. 3 Flink和Spark对比.. 3 设计思路.. 3 状态管理.. 3 Flink 初探.. 4 设计架构.. 4 Flink on yarn. 5 流程分析.. 6 DataStream. 7 API程序结构.. 7 DataSource 8 Transformation. 9 Sink. 13 Time 14…

DataStream API(基础篇) 完整使用 (第五章)

DataStream API基础篇 一、执行环境&#xff08;Execution Environment&#xff09;1、创建执行环境1. getExecutionEnvironment2. createLocalEnvironment3. createRemoteEnvironment 二、执行模式(Execution Mode)1. BATCH模式的配置方法&#xff08;1&#xff09;通过命令行…

DataStream API 四 之 Flink DataStream编程

DataStream API 四 之 Flink DataStream编程 1.分布式流处理基本模型2.流应用开发步骤3.数据类型4. Connector5. Execution environment6. 参数传递7.配置并⾏度8.Watermark9.Checkpoint10.State11. Data Source11.111.2 自定义Source 12.Transformations13.Window13.1窗⼝处理…

Flink的DataStream介绍

1|0一&#xff1a;流式处理基本概念 流处理系统本身有很多自己的特点。一般来说&#xff0c;由于需要支持无限数据集的处理&#xff0c;流处理系统一般采用一种数据驱动的处理方式。它会提前设置一些算子&#xff0c;然后等到数据到达后对数据进行处理。 为了表达复杂的逻辑&am…

Flink DataStream API(基础版)

概述 DataStream&#xff08;数据流&#xff09;本身是 Flink 中一个用来表示数据集合的类&#xff08;Class&#xff09;&#xff0c;我们编写的 Flink 代码其实就是基于这种数据类型的处理&#xff0c;所以这套核心API 就以DataStream 命名。对于批处理和流处理&#xff0c;我…

node.js上开启服务,在同一局域网下的另一客户端访问

选择的服务是我之前做的案例&#xff1a;链接 1.在本机上开启服务&#xff1a; 2.本机上用浏览器访问验证无误&#xff1a; 3.运行cmd使用命令ipconfig查看本机ip地址 4.在另一台局域网下的机子&#xff0c;要求可以ping到。 浏览器访问ip地址&#xff1a;3000即可。&#…

前端 面试题

介绍项目 安全性 token 验证 处理令牌续期问题&#xff0c;在header中获取到新令牌时&#xff0c;替换老令牌&#xff0c;以达到用户无感刷新令牌 1、第一次登录的时候&#xff0c;前端调后端的登陆接口&#xff0c;发送用户名和密码 2、后端收到请求&#xff0c;验证用户名和…

基础知识---cmd命令行篇

1、echo&#xff08;输出&#xff09; > 覆盖 >>追加 2、dir&#xff08;展示当前目录的文件 .当前的目录 ..表示上一层目录&#xff09; 3、d&#xff1a;和cd&#xff1a;d为驱动器 cd展示当前目录的文件&#xff08; .为当前文件 ..为上一层目录的文件&#xf…

虚拟机NAT+静态IP+DNS

NAT模式下 虚拟机联网是通过物理机的VMware Nat服务&#xff08;电脑网络切换也无碍&#xff09;&#xff0c;禁用状态下ping不通物理机&#xff0c;也连不了网 物理机连接虚拟机的通过VMnet8虚拟网络适配器&#xff0c;禁用情况下ping不通虚拟机&#xff0c;Xshell工具也没法…

ACP考前错题总结(精华,已过ACP)

前言 证书和战绩镇楼&#xff0c;希望大家都可以拿到自己想要的Certificate。无论ACA-ACP-ACE亦或者GCP、AWS等等。 错题总结 镜像、本地磁盘部分 不建议基于本地服务器制作镜像上传到阿里云ECS并提供服务。 不支持写在座位数据盘使用的本地盘 镜像和快照&#xff1a;镜像可…