Flink DataStream API 编程指南
- 概览
- 前言
- 什么是DataStream
- Flink程序剖析
- 程序样例
- Data Sources
- DataStream Transformations
- 算子
- 数据流转换算子
- 物理分区
- 算子链和资源组
- Data Sinks
- 迭代
- 执行参数
概览
前言
Flink中的DataStream程序是常规程序,可对数据流进行转换(例如,过滤,更新状态,定义窗口,聚合)。数据流最初是从各种来源(例如消息队列,套接字流,文件)创建的。结果通过接收器返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)。Flink程序可以在各种上下文中运行,独立运行或嵌入其他程序中。执行可以在本地JVM或许多计算机的群集中进行。
什么是DataStream
DataStream API的名称来自特殊的DataStream类,该类用于表示Flink程序中的数据集合。可以将它们视为包含重复项的不可变数据集合。此数据可以是有界的,也可以是无界的,用于处理它们的API是相同的。
在用法方面,DataStream与常规Java Collection相似,但在某些关键方面有很大不同。它们是不可变的,这意味着一旦创建它们就不能添加或删除元素。您还不能仅检查内部元素,而只能使用DataStream API操作(也称为转换)对其进行处理。
您可以通过在Flink程序中添加源来创建初始DataStream。然后从中派生新的流,并通过使用API方法(例如map,filter等)将它们组合在一起。
Flink程序剖析
Flink程序看起来像转换DataStreams的常规程序。每个程序都包含相同的基本部分:
- 获取执行环境;
- 加载/创建初始数据;
- 定此数据的转换;
- 指定将计算结果放在何处;
- 触发程序执行。
StreamExecutionEnvironment是所有Flink程序的基础。您可以在StreamExecutionEnvironment上使用以下静态方法获得一个:
getExecutionEnvironment()createLocalEnvironment()createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
通常,您只需要使用getExecutionEnvironment(),因为这将根据上下文执行正确的操作:如果您是在IDE中执行程序或作为常规Java程序执行,它将创建一个本地环境,该环境将在本地计算机上执行您的程序。如果您是从程序创建的JAR文件,并通过命令行调用它,则Flink集群管理器将执行您的main方法,而getExecutionEnvironment()将返回用于在集群上执行程序的执行环境。
为了指定数据源,执行环境有几种方法可以使用各种方法从文件中读取:您可以逐行,以CSV文件的形式读取它们,或使用任何其他提供的源。要将文本文件读取为一系列行,可以使用:
val env = StreamExecutionEnvironment.getExecutionEnvironment()val text: DataStream[String] = env.readTextFile("file:///path/to/file")
这将为您提供一个DataStream,然后您可以在其上应用转换以创建新的派生DataStream。
您可以通过使用转换函数在DataStream上调用方法来应用转换。例如,地图转换如下所示:
val input: DataSet[String] = ...val mapped = input.map { x => x.toInt }
通过将原始集合中的每个String转换为Integer,将创建一个新的DataStream。
一旦有了包含最终结果的DataStream,就可以通过创建接收器将其写入外部系统。这些只是创建接收器的一些示例方法:
writeAsText(path: String)print()
一旦指定了完整程序,就需要通过在StreamExecutionEnvironment上调用execute()来触发程序执行
。根据ExecutionEnvironment的类型,执行将在本地计算机上触发或提交程序以在群集上执行。
execute()方法将等待作业完成,然后返回JobExecutionResult,其中包含执行时间和累加器结果。
如果您不想等待作业完成,可以通过在StreamExecutionEnvironment上调用executeAysnc()来触发异步作业执行。它将返回一个JobClient,您可以通过该JobClient与刚提交的作业进行通信。例如,这是如何通过使用executeAsync()实现execute()的语义。
inal JobClient jobClient = env.executeAsync();final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();
关于程序执行的最后一部分对于理解何时以及如何执行Flink操作至关重要。所有Flink程序都是延迟执行的:执行程序的main方法时,不会直接进行数据加载和转换。而是,将创建每个操作并将其添加到数据流图中。当在执行环境中由execute()调用显式触发执行时,实际上将执行这些操作。程序是在本地执行还是在群集上执行取决于执行环境的类型.
lazy evaluation使您可以构建复杂的程序,Flink将这些程序作为一个整体计划的单元来执行。
程序样例
以下程序是流式窗口字数统计应用程序的一个完整的工作示例,该程序在5秒的窗口中统计来自Web套接字的字数。
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Timeobject WindowWordCount {def main(args: Array[String]) {val env = StreamExecutionEnvironment.getExecutionEnvironmentval text = env.socketTextStream("localhost", 9999)val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }.map { (_, 1) }.keyBy(_._1).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1)counts.print()env.execute("Window Stream WordCount")}
}
要运行示例程序,请首先从终端使用netcat启动输入流:
nc -lk 9999
只需输入一些单词,然后按回车键即可获得新单词。这些将作为字数统计程序的输入。如果您希望看到计数大于1,在5秒钟内一次又一次键入相同的单词(如果无法快速键入☺,则将窗口大小从5秒钟增加)。
Data Sources
源是程序从中读取其输入的位置。您可以使用StreamExecutionEnvironment.addSource(sourceFunction)将源附加到程序中。Flink附带了许多预先实现的源函数,但是您始终可以通过为非并行源实现SourceFunction来编写自己的自定义源,或通过实现ParallelSourceFunction接口或为并行源扩展RichParallelSourceFunction来实现。
从StreamExecutionEnvironment访问几个预定义的流源:
基于文件:
readTextFile(path)-逐行读取文本文件,即符合TextInputFormat规范的文件,并将其作为字符串返回。
readFile(fileInputFormat,path)-读取(一次)由指定文件输入格式指定的文件。
readFile(fileInputFormat,path,watchType,interval,pathFilter)-这是前两个方法在内部调用的方法。它根据给定的fileInputFormat读取路径中的文件。根据提供的watchType,此来源可能会定期(每间隔ms)监视新数据的路径(FileProcessingMode.PROCESS_CONTINUOUSLY),或处理一次当前路径中的数据并退出(FileProcessingMode.PROCESS_ONCE)。使用pathFilter,用户可以进一步从处理文件中排除文件。
IMPLEMENTATION:
在后台,Flink将文件读取过程分为两个子任务,即目录监视和数据读取。这些子任务中的每一个都是由单独的实体实现的。监视由单个非并行(并行= 1)任务实现,而读取由并行运行的多个任务执行。后者的并行性等于作业并行性。单个监视任务的作用是扫描目录(定期或仅扫描一次,具体取决于watchType),查找要处理的文件,将其拆分为多个拆分,然后将这些拆分分配给下游读取器。读者将是阅读实际数据的人。每个拆分只能由一个阅读器读取,而阅读器可以一对一地阅读多个拆分。IMPORTANT NOTES:
1. 如果watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,则在修改文件时,将完全重新处理其内容。如果watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,则在修改文件时,将完全重新处理其内容。
2. 如果watchType设置为FileProcessingMode.PROCESS_ONCE,则源将扫描路径一次并退出,而无需等待读取器完成文件内容的读取。当然,读者将继续阅读,直到读取了所有文件内容。关闭源将导致在该点之后没有更多的检查点。这可能导致节点故障后恢复速度变慢,因为作业将从上一个检查点恢复读取。
基于套接字:
- socketTextStream-从套接字读取。元素可以由定界符分隔。
基于集合:
- fromCollection(Seq)-从Java Java.util.Collection创建数据流。集合中的所有元素必须具有相同的类型。
- fromCollection(Iterator)-从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
- fromElements(elements:_ *)-从给定的对象序列创建数据流。所有对象必须具有相同的类型。
- fromParallelCollection(SplittableIterator)-从迭代器并行创建数据流。该类指定迭代器返回的元素的数据类型。
- generateSequence(from, to) -在给定间隔内并行生成数字序列。
Custom:
- addSource-附加一个新的源函数。例如,要读取Apache Kafka,可以使用addSource(new FlinkKafkaConsumer <>(…))。有关更多详细信息,请参见连接器。
DataStream Transformations
算子
用户通过算子能将一个或多个 DataStream 转换成新的 DataStream,在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑。
这部分内容将描述 Flink DataStream API 中基本的数据转换API,数据转换后各种数据分区方式,以及算子的链接策略。
数据流转换算子
-
Map
DataStream → DataStream
将一个元素变成另一个元素。一个映射函数,将输入流的值乘二:dataStream.map { x => x * 2 }
-
FlatMap
DataStream → DataStream
取一个元素并产生零个,一个或多个元素。FlatMap功能可将句子拆分为单词:dataStream.flatMap { str => str.split(" ") }
-
Filter
DataStream → DataStream
为每个元素评估一个布尔函数,并保留该函数返回true的布尔函数。过滤出零值的过滤器:dataStream.filter { _ != 0 }
-
KeyBy
DataStream → KeyedStream
从逻辑上将流划分为不相交的分区。具有相同键的所有记录都分配给相同的分区。在内部,KeyBy()是通过哈希分区实现的。有多种指定Key的方法。dataStream.keyBy(_.someKey) dataStream.keyBy(_._1)
以下类型不能作为Key:1. 它是POJO类型,但不覆盖hashCode()方法,而是依赖于Object.hashCode()实现。2. 数组类型。
-
Reduce
KeyedStream → DataStream
数据流上Key的“滚动”合并。将当前元素与最后一个元素的值合并,并发出新值。 reduce函数创建部分和流:keyedStream.reduce { _ + _ }
-
Window
KeyedStream → WindowedStream
可以在已经分区的KeyedStreams上定义Windows。 Windows根据某些特征将每个Key中的数据分组(例如,最近5秒钟内到达的数据)dataStream.keyBy(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5)))
-
WindowAll
DataStreamStream → AllWindowedStream
Windows可以在常规DataStreams上定义。 Windows会根据某些特征(例如,最近5秒钟内到达的数据)对所有流事件进行分组。在许多情况下,这是非并行转换。所有记录将被收集在windowAll运算符的一项任务中。
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
-
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream
将一般功能应用于整个窗口。以下是一个手动汇总窗口元素的函数。如果使用windowAll转换,则需要使用AllWindowFunction。
windowedStream.apply { WindowFunction }// 在非键控窗口流上应用AllWindowFunction allWindowedStream.apply { AllWindowFunction }
-
WindowReduce
WindowedStream → DataStream
将功能化的reduce函数应用于窗口,并返回合并后的值。windowedStream.reduce { _ + _ }
-
Union
DataStream* → DataStream
两个或多个数据流的并集,以创建一个包含所有流中的所有元素的新流。注意:如果将数据流与其自身合并,则在结果流中每个元素将获得两次。dataStream.union(otherStream1, otherStream2, ...);
-
Window Join
DataStream,DataStream → DataStream
在给定Key和一个公共窗口上连接两个数据流。dataStream.join(otherStream).where(<key selector>).equalTo(<key selector>).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply { ... }
-
Interval Join
KeyedStream,KeyedStream → DataStream
在给定的时间间隔内,将两个键控流的两个元素e1和e2与一个公共键连接起来,从而使e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound。// 加入两个流 // key1 == key2 && leftTs - 2 < rightTs < leftTs + 2 keyedStream.intervalJoin(otherKeyedStream).between(Time.milliseconds(-2), Time.milliseconds(2)) // 下限和上限.upperBoundExclusive(true) // optional.lowerBoundExclusive(true) // optional.process(new IntervalJoinFunction() {...})
-
Window CoGroup
DataStream,DataStream → DataStream
在给定的Key和一个公共窗口上将两个数据流组合在一起。dataStream.coGroup(otherStream).where(0).equalTo(1).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply {}
-
Connect
DataStream,DataStream → ConnectedStream
“连接”两个保留其类型的数据流。连接允许两个流之间共享状态。someStream : DataStream[Int] = ... otherStream : DataStream[String] = ...val connectedStreams = someStream.connect(otherStream)
-
CoMap, CoFlatMap
ConnectedStream → DataStream
与连接的数据流上的map和flatMap相似connectedStreams.map((_ : Int) => true,(_ : String) => false ) connectedStreams.flatMap((_ : Int) => true,(_ : String) => false )
-
Iterate
DataStream → IterativeStream → ConnectedStream
通过将一个运算符的输出重定向到某个先前的运算符,在流中创建“反馈”循环。这对于定义不断更新模型的算法特别有用。以下代码从流开始,并连续应用迭代主体。大于0的元素将被发送回反馈通道,其余元素将被转发到下游。initialStream.iterate {iteration => {val iterationBody = iteration.map {/*do something*/}(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))} }
物理分区
Flink 也提供以下方法让用户根据需要在数据转换完成后对数据分区进行更细粒度的配置。
-
Custom Partitioning
DataStream → DataStream
使用用户定义的分区程序为每个元素选择目标任务。dataStream.partitionCustom(partitioner, "someKey") dataStream.partitionCustom(partitioner, 0)
-
Random Partitioning
DataStream → DataStream
根据均匀分布对元素进行随机划分。dataStream.shuffle()
-
Rescaling
DataStream → DataStream
将元素循环分配到下游操作的子集。如果您希望拥有pipelines,例如,将其从源的每个并行实例散开到几个映射器的子集以分配负载,但又不希望rebalance()引起全部重新平衡,则这很有用。这将仅需要本地数据传输,而不需要通过网络传输数据,具体取决于其他配置值,例如TaskManager的slots数量。上游操作向其发送元素的下游操作的子集取决于上游操作和下游操作两者的并行度。例如,如果上游操作具有并行度2,而下游操作具有并行度6,则一个上游操作将元素分配给三个下游操作,而另一个上游操作将分配给其他三个下游操作。另一方面,如果下游操作具有并行性2,而上游操作具有并行性6,则三个上游操作将分配给一个下游操作,而其他三个上游操作将分配给另一个下游操作。
如果不同的并行度不是彼此的倍数,则一个或几个下游操作将具有与上游操作不同的输入数量。
请参见此图,以查看上面示例中的连接模式:
dataStream.rescale()
-
Broadcasting
DataStream → DataStream
将元素广播到每个分区。dataStream.broadcast()
算子链和资源组
将两个算子链接在一起能使得它们在同一个线程中执行,从而提升性能。Flink 默认会将能链接的算子尽可能地进行链接(例如, 两个 map 转换操作)。此外, Flink 还提供了对链接更细粒度控制的 API 以满足更多需求:
如果想对整个作业禁用算子链,可以调用 StreamExecutionEnvironment.disableOperatorChaining()。下列方法还提供了更细粒度的控制。需要注 意的是, 这些方法只能在 DataStream 转换操作后才能被调用,因为它们只对前一次数据转换生效。例如,可以 someStream.map(…).startNewChain() 这样调用,而不能 someStream.startNewChain()这样。
一个资源组对应着 Flink 中的一个 slot 槽,更多细节请看slots 槽。 你可以根据需要手动地将各个算子隔离到不同的 slot 中。
- Start New Chain
从此运算符开始,开始新的链。这两个映射器将被链接,并且过滤器将不会链接到第一个映射器。someStream.filter(...).map(...).startNewChain().map(...)
- Disable Chaining
不链接Map算子someStream.map(...).disableChaining()
- Set Slot Sharing Group
设置操作的slot共享组。Flink会将具有相同slot共享组的操作放入同一slot,同时将没有slot共享组的操作保留在其他slot中。这可以用来隔离slot。如果所有输入操作都在同一slot共享组中,则slot共享组将从输入操作继承。默认slot共享组的名称为“ default”,可以通过调用slotSharingGroup(“ default”)将操作显式放入该组中。someStream.filter(...).slotSharingGroup("name")
Data Sinks
数据接收器使用DataStreams并将其转发到文件,套接字,外部系统或打印它们。Flink带有各种内置的输出格式,这些格式封装在DataStreams的操作后面:
- writeAsText()/ TextOutputFormat-将元素以行形式写为String。通过调用每个元素的toString()方法获得字符串。
- writeAsCsv(…)/ CsvOutputFormat-将元组写为逗号分隔的值文件。行和字段定界符是可配置的。每个字段的值来自对象的toString()方法。
- print()/ printToErr() -在标准输出/标准错误流上打印每个元素的toString()值。可选地,可以提供前缀(msg),该前缀在输出之前。这可以帮助区分打印的不同调用。如果并行度大于1,则输出之前还将带有产生输出的任务的标识符。
- writeUsingOutputFormat()/ FileOutputFormat-的方法和自定义文件输出基类。支持自定义对象到字节的转换。
- writeToSocket -根据 SerializationSchema
- addSink-调用自定义接收器功能。Flink与其他系统(例如Apache Kafka)的连接器捆绑在一起,这些系统已实现为接收器功能。
请注意,上的write*()方法DataStream主要用于调试目的。他们没有参与Flink的检查点,这意味着这些功能通常具有至少一次的语义。刷新到目标系统的数据取决于OutputFormat的实现。这意味着并非所有发送到OutputFormat的元素都立即显示在目标系统中。同样,在失败的情况下,这些记录可能会丢失。
为了将流可靠,准确地一次传输到文件系统中,请使用StreamingFileSink。同样,通过该.addSink(…)方法的自定义实现可以参与Flink一次精确语义的检查点。
迭代
迭代式流程序实现了step函数并将其嵌入到IterativeStream。由于DataStream程序可能永远不会完成,因此没有最大迭代次数。相反,您需要使用侧面输出 或,指定流的哪一部分反馈给迭代,哪一部分向下游转发filter。在这里,我们显示了一个示例迭代,其中主体(重复计算的一部分)是简单的映射变换,并且通过使用过滤器向下游转发的元素来区分反馈的元素。
val iteratedStream = someDataStream.iterate(iteration => {val iterationBody = iteration.map(/* 这被执行了很多次*/)(iterationBody.filter(/* 流的一部分 */), iterationBody.filter(/*流的其他部分*/))})
例如,下面的程序从一系列整数中连续减去1,直到它们达到零为止:
val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)val iteratedStream = someIntegers.iterate(iteration => {val minusOne = iteration.map( v => v - 1)val stillGreaterThanZero = minusOne.filter (_ > 0)val lessThanZero = minusOne.filter(_ <= 0)(stillGreaterThanZero, lessThanZero)}
)
执行参数
在StreamExecutionEnvironment包含了ExecutionConfig允许用于运行组工作的具体配置值。
- setAutoWatermarkInterval(long milliseconds):设置自动水印发射的间隔。您可以使用获取当前值long getAutoWatermarkInterval()