大数据Hadoop之——Flink DataStream API 和 DataSet API

article/2025/9/13 9:45:25

文章目录

    • 一、DataStream API概述
    • 二、什么是DataStream ?
    • 三、DataStream 数据处理过程
      • 1)Data Sources(数据源)
        • 1、Data Sources 原理
        • 2、Data Sources 实现方式
          • 1)基于文件
          • 2)基于套接字
          • 3)基于集合
          • 4)自定义
      • 2)DataStream Transformations(数据流转换//处理/算子)
        • 1、数据流转换
        • 2、物理分区
        • 3、算子链和资源组
      • 3)Data Sinks(数据输出)
        • 旁路输出(分流)
      • 2)Flink 程序剖析(scala)
        • 1、 获取一个执行环境(execution environment)
        • 2、加载/创建初始数据
        • 3、指定数据相关的转换
        • 4、指定计算结果的存储位置
        • 5、触发程序执行
    • 四、什么是DataSet?
    • 五、DataSet 数据处理过程
      • 1)Data Sources (数据源)
        • 1、基于文件
        • 2、基于集合
        • 3、通用型
      • 2)DataSet Transformations(数据集转换//处理/算子)
      • 3)Data Sinks(数据输出)

一、DataStream API概述

Flink 中的 DataStream 程序是对数据流(例如过滤、更新状态、定义窗口、聚合)进行转换的常规程序。数据流的起始是从各种源(例如消息队列、套接字流、文件)创建的。结果通过 sink 返回,例如可以将数据写入文件或标准输出(例如命令行终端)。Flink 程序可以在各种上下文中运行,可以独立运行,也可以嵌入到其它程序中。任务执行可以运行在本地 JVM 中,也可以运行在多台机器的集群上。

二、什么是DataStream ?

  • DataStream API 得名于特殊的 DataStream 类,该类用于表示 Flink 程序中的数据集合。你可以认为 它们是可以包含重复项的不可变数据集合。这些数据可以是有界(有限)的,也可以是无界(无限)的,但用于处理它们的API是相同的。
  • DataStream 在用法上类似于常规的 Java 集合,但在某些关键方面却大不相同。它们是不可变的,这意味着一旦它们被创建,你就不能添加或删除元素。你也不能简单地察看内部元素,而只能使用 DataStream API 操作来处理它们,DataStream API 操作也叫作转换(transformation)。
  • 你可以通过在 Flink 程序中添加 source 创建一个初始的 DataStream。然后,你可以基于 DataStream 派生新的流,并使用 map、filter 等 API 方法把 DataStream 和派生的流连接在一起。

三、DataStream 数据处理过程

在这里插入图片描述

1)Data Sources(数据源)

1、Data Sources 原理

官方文档
一个数据 source 包括三个核心组件:分片(Splits)分片枚举器(SplitEnumerator) 以及 源阅读器(SourceReader)

  • 分片(Split) 是对一部分 source 数据的包装,如一个文件或者日志分区。分片是 source 进行任务分配和数据并行读取的基本粒度。

  • 源阅读器(SourceReader) 会请求分片并进行处理,例如读取分片所表示的文件或日志分区。SourceReader 在 TaskManagers 上的 SourceOperators 并行运行,并产生并行的事件流/记录流。

  • 分片枚举器(SplitEnumerator) 会生成分片并将它们分配给 SourceReader。该组件在 JobManager 上以单并行度运行,负责对未分配的分片进行维护,并以均衡的方式将其分配给 reader。SplitEnumerator 被认为是整个 Source 的“大脑”。

在这里插入图片描述

2、Data Sources 实现方式

1)基于文件

Source 是你的程序从中读取其输入的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction)将一个 source 关联到你的程序。Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source,也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。 通过 StreamExecutionEnvironment 可以访问多种预定义的 stream source,source 连接器,请查看连接器文档。

  • readTextFile(path):读取文本文件。
  • readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) :这是前两个方法内部调用的方法。它基于给定的 fileInputFormat 读取路径 path 上的文件。根据提供的 watchType 的不同,source 可能定期(每 interval 毫秒)监控路径上的新数据(watchType 为 FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次当前路径中的数据然后退出(watchType 为 FileProcessingMode.PROCESS_ONCE)。使用 pathFilter,用户可以进一步排除正在处理的文件。
2)基于套接字
  • socketTextStream:套接字读取。元素可以由分隔符分隔。
3)基于集合
  • fromCollection(Collection) :从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。

  • fromCollection(Iterator, Class) :从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。

  • fromElements(T ...) :从给定的对象序列中创建数据流。所有的对象必须属于同一类型。

  • fromParallelCollection(SplittableIterator, Class) :从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型。

  • generateSequence(from, to) :基于给定间隔内的数字序列并行生成数据流。

4)自定义
  • addSource:关联一个新的 source function。例如,你可以使用 addSource(new FlinkKafkaConsumer<>(…)) 来从 Apache Kafka 获取数据。更多详细信息见连接器。

2)DataStream Transformations(数据流转换//处理/算子)

【温馨提示】是用户通过算子能将一个或多个 DataStream 转换成新的 DataStream,在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑。这部分内容将描述 Flink DataStream API 中基本的数据转换API,数据转换后各种数据分区方式,以及算子的链接策略。

官方文档

1、数据流转换

算子数据转换解释示例
MapDataStream → DataStream获取一个元素并生成一个元素。将输入流的值加倍的映射函数dataStream.map { x => x * 2 }
FlatMapDataStream → DataStream获取一个元素并生成零个、一个或多个元素。将句子拆分为单词的flatmap函数dataStream.flatMap { str => str.split(" ") }
FilterDataStream → DataStream为每个元素计算布尔函数,并保留该函数返回true的元素。过滤掉零值的过滤器dataStream.filter { _ != 0 }
KeyByDataStream → KeyedStream在逻辑上将流划分为不相交的分区。具有相同密钥的所有记录都被分配到同一分区。在内部,keyBy()是通过哈希分区实现的,类似于mysql里面的group by。有不同的方法来指定键dataStream.keyBy(.someKey)
dataStream.keyBy(
._1)
ReduceKeyedStream → DataStream键控数据流上的“滚动”减少。将当前元素与上次减少的值合并,并发出新值。创建部分和流的reduce函数keyedStream.reduce { _ + _ }
WindowKeyedStream → WindowedStream可以在已分区的KeyedStreams上定义窗口。Windows根据某些特征(例如,在过去5秒内到达的数据)对每个键中的数据进行分组。有关windows的完整说明,请参见windows。dataStream
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
WindowAllDataStream → AllWindowedStream可以在常规数据流上定义窗口。Windows根据某些特征(例如,过去5秒内到达的数据)对所有流事件进行分组。有关windows的完整说明,请参见windows。dataStream
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
Window ApplyWindowedStream → DataStream ;AllWindowedStream → DataStream将常规功能应用于整个窗口。下面是一个手动求和窗口元素的函数。如果使用的是windowAll转换,则需要使用AllWindowFunctionwindowedStream.apply { WindowFunction }

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }
WindowReduceWindowedStream → DataStream将reduce函数应用于窗口并返回减少的值。windowedStream.reduce { _ + _ }
UnionDataStream* → DataStream两个或多个数据流的合并,创建一个包含所有流中所有元素的新流。注意:如果将一个数据流与其自身合并,则在结果流中会得到两次每个元素。dataStream.union(otherStream1, otherStream2, …);
Window JoinDataStream,DataStream → DataStream在给定的密钥和公共窗口上连接两个数据流。dataStream.join(otherStream)
.where().equalTo()
.window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply { … }
Interval JoinKeyedStream,KeyedStream → DataStream在给定的时间间隔内,将两个密钥流的两个元素e1和e2与一个公共密钥连接,因此 e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2))
// lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
.process(new IntervalJoinFunction() {…})
Window CoGroupDataStream,DataStream → DataStream在给定的键和公共窗口上对两个数据流进行协组。dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply {}
ConnectDataStream,DataStream → ConnectedStream“连接”两个保持其类型的数据流。连接允许两个流之间的共享状态。someStream : DataStream[Int] = …
otherStream : DataStream[String] = …
val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMapConnectedStream → DataStream类似于连接数据流上的map和flatMapconnectedStreams.map(
(_ : Int) => true,
(_ : String) => false)
)

connectedStreams.flatMap(
(_ : Int) => true,
(_ : String) => false
)
IterateDataStream → IterativeStream → ConnectedStream通过将一个操作符的输出重定向到前一个操作符,在流中创建一个“反馈”循环。这对于定义不断更新模型的算法特别有用。下面的代码从一个流开始,并连续地应用迭代体。大于0的元素被发送回反馈通道,其余的元素被下游转发。initialStream.iterate {
iteration => {
val iterationBody = iteration.map {/do something/}
(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
}
}

2、物理分区

Flink 也提供以下方法让用户根据需要在数据转换完成后对数据分区进行更细粒度的配置。

分区数据转换解释示例
Custom PartitioningDataStream → DataStream使用用户定义的Partitioner为每个元素选择目标任务。dataStream.partitionCustom(partitioner, “someKey”)
dataStream.partitionCustom(partitioner, 0)
Random PartitioningDataStream → DataStream根据均匀分布随机划分元素。dataStream.shuffle()
RescalingDataStream → DataStream循环地将元素分区到下游操作的一个子集。dataStream.rescale()
BroadcastingDataStream → DataStream将元素广播到每个分区。

3、算子链和资源组

将两个算子链接在一起能使得它们在同一个线程中执行,从而提升性能。Flink 默认会将能链接的算子尽可能地进行链接(例如, 两个 map 转换操作)。此外, Flink 还提供了对链接更细粒度控制的 API 以满足更多需求。

如果想对整个作业禁用算子链,可以调用 StreamExecutionEnvironment.disableOperatorChaining()。下列方法还提供了更细粒度的控制。需要注 意的是, 这些方法只能在 DataStream 转换操作后才能被调用,因为它们只对前一次数据转换生效。例如,可以 someStream.map(…).startNewChain() 这样调用,而不能 someStream.startNewChain()这样。

算子链操作解释示例
Start New Chain开始一个新的链,从这个操作符开始。这两个映射器将被链接,过滤器将不会链接到第一个映射器。someStream.filter(…).map(…).startNewChain().map(…)
Disable Chaining不要链接map操作符。someStream.map(…).disableChaining()
Set Slot Sharing Group设置操作的槽位共享组。Flink将把具有相同槽共享组的操作放在相同槽中,而将没有槽共享组的操作放在其他槽中。这可以用来隔离槽。如果所有的输入操作都在同一个槽位共享组中,则从输入操作继承槽位共享组。默认槽位共享组的名称为“default”,可以通过调用slotSharingGroup(“default”)显式地将操作放入该组。someStream.filter(…).slotSharingGroup(“name”)

3)Data Sinks(数据输出)

sink 连接器,请查看连接器文档。

Data sinks 使用 DataStream 并将它们转发到文件、套接字、外部系统或打印它们。Flink 自带了多种内置的输出格式,这些格式相关的实现封装在 DataStreams 的算子里:

  • writeAsText() / TextOutputFormat : 将元素按行写成字符串。通过调用每个元素的 toString() 方法获得字符串。

  • writeAsCsv(...) / CsvOutputFormat :将元组写成逗号分隔值文件。行和字段的分隔符是可配置的。每个字段的值来自对象的 toString() 方法。

  • print() / printToErr():在标准输出/标准错误流上打印每个元素的 toString() 值。 可选地,可以提供一个前缀(msg)附加到输出。这有助于区分不同的 print 调用。如果并行度大于1,输出结果将附带输出任务标识符的前缀。

  • writeUsingOutputFormat() / FileOutputFormat :自定义文件输出的方法和基类。支持自定义 object 到 byte 的转换。

  • writeToSocket :根据 SerializationSchema 将元素写入套接字。

  • addSink : 调用自定义 sink function。Flink 捆绑了连接到其他系统(例如 Apache Kafka)的连接器,这些连接器被实现为 sink functions。

【温馨提示】DataStream 的 write*() 方法主要用于调试目的。它们不参与 Flink 的 checkpointing,这意味着这些函数通常具有至少有一次语义。刷新到目标系统的数据取决于 OutputFormat 的实现。这意味着并非所有发送到 OutputFormat 的元素都会立即显示在目标系统中。此外,在失败的情况下,这些记录可能会丢失。

为了将流可靠地、精准一次地传输到文件系统中,请使用 StreamingFileSink。此外,通过 .addSink(…) 方法调用的自定义实现也可以参与 Flink 的 checkpointing,以实现精准一次的语义。

旁路输出(分流)

旁路输出在Flink中叫作SideOutput,用途类似于DataStream#split,本质上是一个数据流的切分行为,按照条件将DataStream切分为多个子数据流,子数据流叫作旁路输出数据流,每个旁路输出数据流可以有自己的下游处理逻辑。

使用旁路输出时,首先需要定义用于标识旁路输出流的 OutputTag:

val outputTag = OutputTag[String]("side-output")

可以通过以下方法将数据发送到旁路输出:

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • KeyedCoProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

【示例】

package comimport org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject myOutputTag {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval input: DataStream[String] = env.readTextFile("flink/data/hello.txt")val outputTag = OutputTag[String]("side-output")val mainDataStream = input.process(new ProcessFunction[String, String] {override def processElement(value: String,ctx: ProcessFunction[String, String]#Context,out: Collector[String]): Unit = {// 发送数据到主要的输出out.collect(value)// 发送数据到旁路输出ctx.output(outputTag, "sideout-" + value)}})// 获取outputTag并输出mainDataStream.getSideOutput(outputTag).print()// 必须调用execute或者executeAsync(),下面会讲env.execute("test OutputTag")}}

在这里插入图片描述


【问题】Caused by: java.lang.ClassNotFoundException: org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream

【解决】在pom.xml添加下面依赖

<dependency><groupId>org.apache.commons</groupId><artifactId>commons-compress</artifactId><version>1.21</version>
</dependency>

2)Flink 程序剖析(scala)

Flink 程序看起来像一个转换 DataStream 的常规程序。每个程序由相同的基本部分组成:

  1. 获取一个执行环境(execution environment);
  2. 加载/创建初始数据;
  3. 指定数据相关的转换;
  4. 指定计算结果的存储位置;
  5. 触发程序执行。

1、 获取一个执行环境(execution environment)

val env = StreamExecutionEnvironment.getExecutionEnvironment

2、加载/创建初始数据

为了指定 data sources,执行环境提供了一些方法,支持使用各种方法从文件中读取数据:你可以直接逐行读取数据,像读 CSV 文件一样,或使用任何第三方提供的 source。下面是将一个文本文件作为一个行的序列来读。

val env = StreamExecutionEnvironment.getExecutionEnvironment// 加载数据源
val input: DataStream[String] = env.readTextFile("file:///path/to/file")

3、指定数据相关的转换

val env = StreamExecutionEnvironment.getExecutionEnvironment
val input: DataStream[String] = env.readTextFile("file:///path/to/file")// 例如一个 map 的转换如下:
val mapped = input.map { x => x.toInt }

4、指定计算结果的存储位置

一旦你有了包含最终结果的 DataStream,你就可以通过创建 sink 把它写到外部系统。下面是一些用于创建 sink 的示例方法:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val input: DataStream[String] = env.readTextFile("flink/data/source")// 例如一个 map 的转换如下:
val mapped = input.map { x => x.toInt }// 存储到文件,当然还可以执行更多的sink
// writeAsText第二个参数来定义输出模式,它有以下两个可选值:
// WriteMode.NO_OVERWRITE:当指定路径上不存在任何文件时,才执行写出操作;
// WriteMode.OVERWRITE:不论指定路径上是否存在文件,都执行写出操作;如果原来已有文件,则进行覆盖。
mapped.writeAsText("flink/data/sink", FileSystem.WriteMode.OVERWRITE)

5、触发程序执行

  • 一旦指定了完整的程序,需要调用 StreamExecutionEnvironmentexecute()方法来触发程序执行。根据 ExecutionEnvironment 的类型,执行会在你的本地机器上触发,或将你的程序提交到某个集群上执行。execute() 方法将等待作业完成,然后返回一个 JobExecutionResult,其中包含执行时间和累加器结果。

  • 如果不想等待作业完成,可以通过调用 StreamExecutionEnvironment 的 executeAsync() 方法来触发作业异步执行。它会返回一个 JobClient,你可以通过它与刚刚提交的作业进行通信。如下是使用 executeAsync() 实现 execute() 语义的示例。

final JobClient jobClient = env.executeAsync();final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();

完整示例程序(官网示例)

【问题一】

【温馨提示】如果出现这种报错,一般就是IDEA 对scope为provided,这是IDEA的bug:Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/api/scala/typeutils/CaseClassTypeInfo

【解决】

  • 【第一种方式】把依赖范围调大或者直接去掉都行,不清楚的可以看我之前的Java-Maven详解,但是记住在打包的时候得加上。
  • 【第二种方式】Run->Edit Configurations,设置如下:
    在这里插入图片描述
    【问题二】

【问题】Caused by: java.util.concurrent.CompletionException: java.lang.NoSuchMethodError: org.apache.commons.math3.stat.descriptive.rank.Percentile.withNaNStrategy(Lorg/apache/commons/math3/stat/ranking/NaNStrategy;)Lorg/apache/commons/math3/stat/descriptive/rank/Percentile;hadoop-common中的commons-math3冲突导致。

【解决】排除hadoop-common中的commons-math3,设置如此:

<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.3.1</version><scope>provided</scope><exclusions><exclusion><groupId>org.apache.commons</groupId><artifactId>commons-math3</artifactId></exclusion></exclusions>
</dependency>

先启动服务

$ nc -lk 9999

WindowWordCount源码如下:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Timeobject WindowWordCount {def main(args: Array[String]) {val env = StreamExecutionEnvironment.getExecutionEnvironmentval text = env.socketTextStream("localhost", 9999)val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }.map { (_, 1) }.keyBy(_._1).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1)counts.print()env.execute("Window Stream WordCount")}
}

在这里插入图片描述

四、什么是DataSet?

Flink用DataStream 表示无界数据集,用DataSet表示有界数据集,前者用于流处理应用程序,后者用于批处理应用程序。从操作形式上看,DataStream 和 DataSet 与集合 Collection 有些相似,但两者有着本质的区别:

  • DataStream 和 DataSet 是不可变的数据集合,因此不可以想操作集合那样增加或者删除 DataStream 和 DataSet 中的元素,也不可以通过诸如下标等方式访问某个元素。
  • Flink 应用程序通过 Source 创建 DataStream 对象和 DataSet 对象,通过转换操作产生新的 DataStream 对象和 DataSet 对象。
  • 运行时是应用程序被调度执行时的上下文环境,通过StreamExecutionEnvironmentExecutionEnvironment方法会根据当前环境自动选择本地或者集群运行时环境。

五、DataSet 数据处理过程

在这里插入图片描述

1)Data Sources (数据源)

数据源创建初始数据集,比如从文件或Java集合创建数据集。创建数据集的一般机制抽象在InputFormat后面。Flink提供了几种内置格式,可以从常见的文件格式创建数据集。它们中的许多在ExecutionEnvironment上都有快捷方法。

官方文档

1、基于文件

  • readTextFile(path) / TextInputFormat :读取文本文件。
  • readTextFileWithValue(path) / TextValueInputFormat: 读取文件,并将它们作为StringValues返回。StringValues是可变字符串。
  • readCsvFile(path) / CsvInputFormat:解析带有逗号(或其他字符)分隔字段的文件。返回由元组或pojo组成的数据集。支持基本java类型及其对应值作为字段类型。
  • readFileOfPrimitives(path, Class) / PrimitiveInputFormat:解析以新行(或另一个字符序列)分隔的原始数据类型(如String或Integer)的文件。
  • readFileOfPrimitives(path, delimiter, Class) / PrimitiveInputFormat:使用给定的分隔符解析以新行(或另一个字符序列)分隔的原始数据类型(如String或Integer)的文件。

2、基于集合

  • fromCollection(Collection):从Java.util.Collection创建一个数据集。集合中的所有元素必须具有相同的类型。
  • fromCollection(Iterator, Class):从迭代器创建数据集。该类指定迭代器返回的元素的数据类型。
  • fromElements(T …):根据给定的对象序列创建一个数据集。所有对象必须是相同的类型。
  • fromParallelCollection(SplittableIterator, Class):并行地从迭代器创建数据集。该类指定迭代器返回的元素的数据类型。
  • generateSequence(from, to):并行生成给定区间内的数字序列。

3、通用型

  • readFile(inputFormat, path) / FileInputFormat :接受文件输入格式。
  • createInput(inputFormat) / InputFormat:接受通用输入格式。

2)DataSet Transformations(数据集转换//处理/算子)

数据转换将一个或多个数据集转换为新的数据集。程序可以将多个转换组合成复杂的程序集。

算子解释示例
Map获取一个元素并生成一个元素。将输入流的值加倍的映射函数。data.map { x => x.toInt }
FlatMap获取一个元素并生成零个、一个或多个元素。将句子拆分为单词的flatmap函数。data.flatMap { str => str.split(" ") }
MapPartition在单个函数调用中转换并行分区。该函数以Iterable流的形式获取分区,并可以生成任意数量的结果值。每个分区中的元素数量取决于并行度和之前的操作。data.mapPartition { in => in map { (_, 1) } }
Filter为每个元素计算布尔函数,并保留该函数返回true的元素。过滤掉零值的过滤器。data.filter { _ > 1000 }
Reduce通过重复地将两个元素组合成一个元素,将一组元素组合成一个元素。Reduce可以应用于完整的数据集或分组的数据集。data.reduce { _ + _ }
ReduceGroup将一组元素组合成一个或多个元素。ReduceGroup可以应用于完整的数据集,也可以应用于分组的数据集。data.reduceGroup { elements => elements.sum }
Aggregate将一组值聚合为一个值。聚合函数可以看作是内置的reduce函数。聚合可以应用于完整的数据集,也可以应用于分组的数据集。val input: DataSet[(Int, String, Double)] = // […]
val output: DataSet[(Int, String, Double)] = input.aggregate(SUM, 0).aggregate(MIN, 2)
Distinct返回数据集的不同元素。对于元素的所有字段或字段的子集,它将从输入数据集中删除重复的条目。data.distinct()
Join通过创建键值相等的所有元素对来连接两个数据集。可选地使用JoinFunction将这对元素转换为单个元素,或使用FlatJoinFunction将这对元素转换为任意多个(包括没有)元素。参见键部分了解如何定义连接键。val result = input1.join(input2).where(0).equalTo(1)
OuterJoin对两个数据集执行左、右或完全外部连接。外部连接类似于常规(内部)连接,它创建的所有元素对的键值相等。此外,如果在另一侧没有找到匹配的键,则保存外部的记录(如果是完整的,则为左、右或两者)。匹配的元素对(或一个元素和另一个输入的空值)被赋给一个JoinFunction以将这对元素转换为单个元素,或者赋给一个FlatJoinFunction以将这对元素转换为任意多个(包括没有)元素。参见键部分了解如何定义连接键。val joined = left.leftOuterJoin(right).where(0).equalTo(1) {
(left, right) =>
val a = if (left == null) “none” else left._1
(a, right)
}
CoGroup简化运算的二维变体。对一个或多个字段上的每个输入进行分组,然后合并组。每对组调用一个变换函数。请参阅键部分以了解如何定义coGroup键。data1.coGroup(data2).where(0).equalTo(1)
Cross构建两个输入的笛卡尔积(叉积),创建所有的元素对。可选地使用CrossFunction将这对元素转换为单个元素。val data1: DataSet[Int] = // […]
val data2: DataSet[String] = // […]
val result: DataSet[(Int, String)] = data1.cross(data2)
Union生成两个数据集的并集。data.union(data2)
Rebalance均匀地重新平衡数据集的并行分区,以消除数据倾斜。只有类似map的转换可以遵循rebalance转换。val data1: DataSet[Int] = // […]
val result: DataSet[(Int, String)] = data1.rebalance().map(…)
Hash-Partition哈希分区一个给定键的数据集。键可以指定为位置键、表达式键和键选择器函数。val in: DataSet[(Int, String)] = // […]
val result = in.partitionByHash(0).mapPartition { … }
Range-Partition根据给定的键对数据集进行范围分区。键可以指定为位置键、表达式键和键选择器函数。val in: DataSet[(Int, String)] = // […]
val result = in.partitionByRange(0).mapPartition { … }
Custom Partitioning使用自定义Partitioner函数,根据键将记录分配到特定的分区。该键可以指定为位置键、表达式键和选择键函数。注意:此方法只适用于单个字段键。val in: DataSet[(Int, String)] = // […]
val result = in
.partitionCustom(partitioner, key).mapPartition { … }
Sort Partitioning按照指定的顺序在本地对指定字段上的数据集的所有分区进行排序。字段可以指定为元组位置或字段表达式。对多个字段进行排序是通过链接sortPartition()调用来完成的。val in: DataSet[(Int, String)] = // […]
val result = in.sortPartition(1, Order.ASCENDING).mapPartition { … }
First-N返回数据集的前n个(任意的)元素。First-n可以应用于常规数据集、分组数据集或分组排序数据集。分组键可以指定为键选择器函数或字段位置键。val in: DataSet[(Int, String)] = // […]
// regular data set
val result1 = in.first(3)
// grouped data set
val result2 = in.groupBy(0).first(3)
// grouped-sorted data set
val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)
MinBy / MaxBy从一个或多个字段值为最小(最大值)的元组中选择一个元组。用于比较的字段必须是有效的关键字段,即可比性。如果多个元组具有最小(最大)字段值,则返回这些元组中的任意一个元组。MinBy (MaxBy)可以应用于完整的数据集或分组的数据集。val in: DataSet[(Int, Double, String)] = // […]
// a data set with a single tuple with minimum values for the Int and String fields.
val out: DataSet[(Int, Double, String)] = in.minBy(0, 2)
// a data set with one tuple for each group with the minimum value for the Double field.
val out2: DataSet[(Int, Double, String)] = in.groupBy(2).minBy(1)
Specifying Keys一些转换(join、coGroup、groupBy)要求在元素集合上定义键。其他转换(Reduce、groureduce、Aggregate)允许在应用数据之前对数据进行分组。DataSet<…> input = // […]
DataSet<…> reduced = input
.groupBy(/define key here/)
.reduceGroup(/do something/);
Define keys for Tuples最简单的情况是在元组的一个或多个字段上分组元组。val input: DataSet[(Int, String, Long)] = // […]
val keyed = input.groupBy(0)

//val input: DataSet[(Int, String, Long)] = // […]
val grouped = input.groupBy(0,1)

3)Data Sinks(数据输出)

数据接收器使用数据集,并用于存储或返回它们。使用OutputFormat描述数据接收器操作。Flink提供了多种内置的输出格式,这些格式封装在DataSet上的操作后面:

  • writeAsText() / TextOutputFormat:按行方式将元素写入字符串。字符串是通过调用每个元素的toString()方法获得的。
  • writeAsFormattedText() / TextOutputFormat:将元素按行编写为字符串。字符串是通过为每个元素调用用户定义的format()方法获得的。
  • writeAsCsv(…) / CsvOutputFormat:将元组写入逗号分隔的值文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。print() / printToErr() / print(String msg) / printToErr(String msg) -打印出标准输出/标准错误流中每个元素的toString()值。可选地,可以提供一个前缀(msg),作为输出的前缀。这有助于区分不同的打印调用。如果并行度大于1,输出也会被添加产生输出的任务的标识符。
  • write() / FileOutputFormat:方法和基类用于自定义文件输出。支持自定义对象到字节的转换。
  • output()/ OutputFormat:大多数通用输出方法,用于非基于文件的数据接收器(例如将结果存储在数据库中)。

一个数据集可以被输入到多个操作。程序可以写或打印一个数据集,同时在它们上运行额外的转换。

【示例】

package comimport org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.core.fs.FileSystem.WriteModeobject DataSetTest001 {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)// text dataval textData: DataSet[String] = env.readTextFile("flink/data/s1")// write DataSet to a file on the local file system
//    textData.writeAsText("flink/data/sink01")// write DataSet to a file on an HDFS with a namenode running at nnHost:nnPort// 先创建目录:hadoop fs -mkdir -p hdfs://hadoop-node1:8082/flink/DataSet/// 操作添加依赖/*<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.3.1</version><scope>provided</scope></dependency>*/textData.writeAsText("hdfs://hadoop-node1:8082/flink/DataSet/sink02")//
//    // write DataSet to a file and overwrite the file if it exists
//    textData.writeAsText("flink/data/sink03", WriteMode.OVERWRITE)
//
//    // tuples as lines with pipe as the separator "a|b|c"
//    val values: DataSet[(String, Int, Double)] = // [...]
//    values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")
//
//    // this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
//    values.writeAsText("file:///path/to/the/result/file")// this writes values as strings using a user-defined formatting
//    values map { tuple => tuple._1 + " - " + tuple._2 }
//      .writeAsText("file:///path/to/the/result/file")env.execute("dataset test")}
}

在这里插入图片描述

【示例】WordCount

package comimport org.apache.flink.api.scala._object WordCount {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval text = env.fromElements("Who's there?","I think I hear them. Stand, ho! Who's there?")val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }.map { (_, 1) }.groupBy(0).sum(1)counts.print()}
}

在这里插入图片描述
未完待续,更多大数据知识,请耐心等待~


http://chatgpt.dhexx.cn/article/BoQrEHR8.shtml

相关文章

数据挖掘常用算法整理

前言&#xff1a; 找工作时&#xff08;IT行业&#xff09;&#xff0c;除了常见的软件开发以外&#xff0c;机器学习岗位也可以当作是一个选择&#xff0c;不少计算机方向的研究生都会接触这个&#xff0c;如果你的研究方向是机器学习/数据挖掘之类&#xff0c;且又对其非常感…

C++ 26 常用算法

目录 一、概述 1.1 常用遍历算法 1.1.1 算法简介 1.1.2 for_each遍历算法 1.1.3 transform遍历算法 1.2 常用查找算法 1.2.1 算法简介 1.2.2 find 查找算法 1.2.3 find_if 查找算法 1.2.4 adjacent_find 查找算法 1.2.5 binary_search 查找算法 1.2.6 count 查找算法…

干货 | 轻松看懂机器学习十大常用算法

通过本篇文章可以对ML的常用算法有个常识性的认识&#xff0c;没有代码&#xff0c;没有复杂的理论推导&#xff0c;就是图解一下&#xff0c;知道这些算法是什么&#xff0c;它们是怎么应用的&#xff0c;例子主要是分类问题。 每个算法都看了好几个视频&#xff0c;挑出讲的最…

常用十种算法

目录 1、二分查找算法&#xff08;非递归&#xff09;二分查找算法(非递归)介绍二分查找算法(非递归)代码实现 2、分治算法分治算法介绍分治算法的基本步骤分治算法最佳实践汉诺塔 3、动态规划动态规划算法介绍应用场景——背包问题 4、KMP算法*next数组生成KMP 算法介绍应用场…

Java常用算法

Java常用算法 一、二分查找算法&#xff08;非递归&#xff09; 1、介绍 ​ 二分查找法只适用于从有序的数列中进行查找(比如数字和字母等)&#xff0c;将数列排序后再进行查找。 ​ 二分查找法的运行时间为对数时间O(㏒₂n) &#xff0c;即查找到需要的目标位置最多只需要㏒…

五大常用算法学习笔记

一。分治算法&#xff1a;快速排序、归并排序、大整数乘法、二分查找、递归&#xff08;汉诺塔&#xff09; 基本概念&#xff1a;把一个复杂的问题分成若干个相同或相似的子问题&#xff0c;再把子问题分成更小的子问题… &#xff0c; 知道最后子问题可以简单的直接求解&…

常见算法

首页 论坛 新闻 文章 下载 源码 网友作品 合作开发 招聘 刻盘服务 编程爱好者光盘 请登陆或者注册新用户 用户名 密 码 记住密码 注册新用户 忘记密码了 您所在位置&#xff1a;编程爱好者论坛 — C/C语言讨论区 — 我见到过的一些常用算法 C/C语言讨论区&#xff1a;…

最常用算法汇总(一)

一、贪心算法 贪心算法&#xff08;又称贪婪算法&#xff09;是指&#xff0c;在对问题求解时&#xff0c;总是做出在当前看来是最好的选择。也就是说&#xff0c;不从整体最优上加以考虑&#xff0c;他所做出的仅是在某种意义上的局部最优解。贪心算法不是对所有问题都能得到整…

十大常用算法

前言 最近在研究一些经常用到的东西想把它们做一个汇总&#xff0c;想了想用到最多的应该是排序算法&#xff0c;所以对排序算法做了个总结&#xff0c;并自己用C实现了一下。 一、算法概述 0.1 算法分类 十种常见排序算法可以分为两大类&#xff1a; 非线性时间比较类排序…

常用十大算法

这里的讲解图主要使用的是尚硅谷韩顺平老师的图&#xff0c;请周知。 目录 二分查找&#xff08;非递归&#xff09; 分治算法 动态规划算法 KMP算法 贪心算法 普利姆&#xff08;Prim&#xff09;算法 克鲁斯卡尔&#xff08;Kruskal&#xff09;算法 迪杰斯特拉&a…

常见十大算法

优劣术语 - 稳定性 原本a在b前&#xff0c;ab,排序之后位置任然不变。不稳定性则相反 - 内排序 所有排序都在内存中完成。外排序数据放磁盘&#xff0c;排序通过磁盘内存的数据传输 - 事件复杂度 算法执行耗费的时间 - 空间复杂度 算法执行耗费的内存 In/out-place: 不占/占额…

常见的10种算法

常见的10种算法 数据结构研究的内容&#xff1a;就是如何按一定的逻辑结构&#xff0c;把数据组织起来&#xff0c;并选择适当的存储表示方法把逻辑结构组织好的数据存储到计算机的存储器里。 算法研究的目的是为了更有效的处理数据&#xff0c;提高数据运算效率。数据的运算是…

常用的10 种算法

译者&#xff1a;claudio jandan.net/2014/05/31/10-algorithms.html Reddit 有篇帖子介绍了算法对我们现在生活的重要性&#xff0c;以及哪些算法对现代文明所做贡献最大。如果对算法有所了解&#xff0c;读这篇文章时你可能会问 “作者知道算法为何物吗&#xff1f;”&#x…

常用的十种算法

十种算法 1、二分查找算法&#xff08;非递归&#xff09; 1、介绍&#xff1a; 1&#xff09;二分查找只适用于从有序的数列中进行查找&#xff08;比如数字和字母等&#xff09;&#xff0c;将数列排序后再进行查找 2&#xff09;二分查找算法的运行时间为对数时间O&…

基础夯实:基础数据结构与算法(二)

基础夯实&#xff1a;基础数据结构与算法&#xff08;二&#xff09; 常见的10种算法1、递归算法例题1&#xff1a;计算n&#xff01;例题2&#xff1a;斐波那契数列例题3&#xff1a;递归将整形数字转换为字符串例题4&#xff1a;汉诺塔例题5&#xff1a;猴子吃桃例题6&#x…

蓝桥杯知识点汇总:基础知识和常用算法

文章目录 JAVA基础语法&#xff1a;算法竞赛常用的JAVA API&#xff1a;算法和数据结构简单算法简单数据结构图论数学贪心动态规划 补充省赛题解待更&#xff1a; 此系列包含蓝桥杯&#xff08;软件类&#xff09;所考察的绝大部分知识点。一共分为 基础语法&#xff0c; 常用…

HBA的WWN号以及存储区域网络

古驰古驰巴拉巴拉&#xff0c;今天讲一下存储区域网络和wwn号以及查看wwn号的方法 存储区域网络&#xff08;Storage Area Network&#xff0c;简称SAN&#xff09;采用网状通道&#xff08;Fibre Channel &#xff0c;简称FC&#xff0c;区别与Fiber Channel光纤通道&#xf…

nsw hnsw

参考了很多该博客 https://blog.csdn.net/u011233351/article/details/85116719&#xff0c;感谢博主。 参考论文《Approximate nearest neighbor algorithm based on navigable small world graphs》 《Efficient and robust approximate nearest neighbor search using Hie…

思科光交MDS9710绑定WWN并激活新的wwn

第一步、查看所有的wwn号 #命令 #show flogi database 内容示例&#xff1a; 第二步、查看是否有发现新的wwn号 图中为新发现的wwn号 第三步、将该wwn号加入到对应的zone下 #先筋肉config模式 #再进入对应的zone zone name Zone_P11_****——** vsan 1 #新增新存在的wwn号…

www.wwwwwwwwww

复习题 一、问答题 1.Anaconda的优点有哪些&#xff1f; &#xff08;1&#xff09;开源。 &#xff08;2&#xff09;安装过程简单。 &#xff08;3&#xff09;⾼性能使⽤Python和R语⾔。 &#xff08;4&#xff09;免费的社区⽀持。 &#xff08;5&#xff09; Conda包…