Flink DataStream API 介绍

article/2025/9/16 19:49:21

Flink DataStream API 介绍

StreamExecutionEnvironment

StreamExecutionEnvironment
StateBackend管理
setStateBackend()
Checkpoint管理
enableCheckpointing()
Serialzer序列化管理
addDefaultKryoSerialize()
类型和序列化注册
registerTypewithKryoSerializer()
registerType()
DataStream数据源创建
addSource()
readTextFile()
fromCollection()
fromElements()
socketTextStream()
TimeCharacteristic管理
setStreamTimeCharacteristic()
Transformation存储与管理
addOperation()
StreamGraph创建和获取
getStreamGraph()
CacheFile注册与管理
registerCacheFile()
任务提交与运行
execute()
executeAsync()
重启策略
setRestartStrategy()

DataStram数据源

StreamExecutionEnvironment 数据源
基本数据源接口(直接使用)
GenerateSequence
Collection集合
Socket
File(HDFS,Local)
数据源连接器(需要依赖第三方依赖)
Kafka Connector
Es Connector
Custom DataSource
根据具体数据源决定
addSource()方法

Datastream 基本数据源

//从给定的数据元素中转换
DatastreamSource<OUT> fromElements(OUT... data)
//从指定的集合中转换成DataStream
DatastreamSource<OUT> flomCollection(Collection<OUT> data)
//读取文件并转换
DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath)
//从Scocket端口中读取
DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter)
//直接通过InputFormat创建
DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat)

最终都是通过 ExecutionEnvironment 创建 fromSource() 方法转换成DataStreamSource

Datastream 数据源连接器

Flink 内 置 Connector:

  • Apache Kafka (source/sink)

  • Apache Cassandra (Sink)

  • Amazon Kinesis Streams (source/sink)

  • Elasticsearch(Sink)

  • Hadoop FileSystem (sink)

  • RabbitMQ (source/sink)

  • Apache NiFi (source/sink)

  • Twitter Streaming API (source)

  • Google PubSub (source/sink)

  • JDBC (sinkJ

Apache Bahir 项 目 :

  • Apache ActiveMQ (source/sink)
  • Apache Flume (sink)
  • Redis (sink)
  • Akka (sink)
  • Netty (source)

以Kafka 连接器为例 :

<dependency><groupld>org.apache.flink</groupld><artifactId>flink-connector-kafka_2.11</artifactId><version>1.11.0</version>
</dependency>

Datastream 数据源连接器 - Source

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092);
properties.setProperty("group.id", "test0");Datastream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topic",new SimpleStringSchema(), properties));

Datastream 数据源连接器

以Kafka 连接器为例 :

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();specificStartOffsets.put(new KafkaTopicPartition(“myTopic“, 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition(“myTopic“, 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition(“myTopic“, 2), 43L);myConsumer.setStartFromSpecificOffsets(specificstartOffsets);

Datastream 数据源连接器 - Sink

Datastream<string> stream =Properties properties = new Properties();
properties.setpProperty("bootstrap.servers", "localhost:9092");FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>("my-topic",//target topic
new SimpleStringSchema(), // serialization schema
properties, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
stream.addsSink(myProducer);

Datastream 主要转换操作

在这里插入图片描述

DataStream转换操作
基于单数据处理
map
一对一转换
filter
过滤
flatmap
一对多转换
Window操作
NonKeyed DataStream
Keyed DataStream
timeWindowAll
时间窗口
countWindowAll
计数窗口
windowAll
自定义窗口
timeWindowAll
时间窗口
countWindowAll
计数窗口
windowAll
自定义窗口
多流合并
NonKeyed DataStream
join
关联操作
connect
连接操作
coGroup
关联操作
union
合并操作
Keyed DataStream
interval join
间隔join操作
单流切分
split
切分操作
sideOutput
旁路输出

理解Keyedstream

在这里插入图片描述

Datastream 之间的转换

在这里插入图片描述

物理分组操作

类型描述
dataStream.global();全部发往第1个task
dataStream.broadcast();广播
dataStream.forward();上下游并发度一样时一对一发送
dataStream.shuffle();随机均匀分配
dataStream.rebalance();Round-Robin(轮流分配)
dataStream.recale();Local Round-Robin(本地轮流分配)
dataStream.partitionCustom();自定义单播

public DataStream<T> shuffle(){return setConnectionType(new ShufflePartitioner<T>());
}

DataStream Kafka 实例

public class KafkaExample{public static void main(String[] args) throws Exception {// parse inputarg umenltsfinal ParameterTool parameterTool = ParameterTooLfromArgs(args);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));env.enableCheckpointing(5000); // create a checkpoint every 5 secondsenv.getConfig().setGlobalobParameters(parameterTool); // make parameters available in the web interfaceenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<kafkaEvent> input = env.addSource(new FlinkKafKaConsumer<>(parameterTool.getRedquired("input-topic"),new KafkaEventSchema(),parameterTool.getPropelties()).assignTimestampsAndWatermarks(new CustomWatermarkExtractor())).keyBy("word").map(new RollingAdditionMapper()).shuffle();input.addSink(new FlinKafkaProduCer<>(parameterTool.getRequired("output-topic"),new KeyedSerializationSchemaWrapper<>(new KafKaEventSChema()),parameterTool.getProperties(),FlinkKafkaProducer.Semantic.EXACTLY_ONCE));env.execute("Modern Kafka ExamPle");}
}

http://chatgpt.dhexx.cn/article/AFlL2p3N.shtml

相关文章

DataStream API介绍与使用(一)

详细API参考官网 DataStream编程模型 在Flink整个系统架构中&#xff0c;对流计算的支持是其最重要的功能之一&#xff0c;Flink基于Google提出的DataFlow模型&#xff0c;实现了支持原生数据流处理的计算引擎。Flink中定义了DataStream API让用户灵活且高效地编写Flink流式应…

DataStream API(一)

Flink 有非常灵活的分层 API 设计&#xff0c;其中的核心层就是 DataStream/DataSet API。由于新版 本已经实现了流批一体&#xff0c; DataSet API 将被弃用&#xff0c;官方推荐统一使用 DataStream API 处理流数 据和批数据。由于内容较多&#xff0c;我们将会用几章的篇幅来…

DataStream(二)

目录 5.3.2 聚合算子&#xff08;Aggregation&#xff09; 5.3.3 用户自定义函数&#xff08;UDF&#xff09; 3. 扁平映射&#xff08;flatMap&#xff09; flatMap 操作又称为扁平映射&#xff0c;主要是将数据流中的整体&#xff08;一般是集合类型&#xff09;拆分成一个 …

Flink DataStream API

Flink DataStream API 编程指南 概览前言什么是DataStreamFlink程序剖析程序样例 Data SourcesDataStream Transformations算子数据流转换算子物理分区算子链和资源组 Data Sinks迭代执行参数 概览 前言 Flink中的DataStream程序是常规程序&#xff0c;可对数据流进行转换&am…

DataStream API(三)

目录 5.3.4 物理分区&#xff08;Physical Partitioning&#xff09; 5.4 输出算子&#xff08;Sink&#xff09; 5.4.1 连接到外部系统 5.4.2 输出到文件 5.4.3 输出到 Kafka 5.4.4 输出到 MySQL&#xff08;JDBC&#xff09; 5.4.5 自定义 Sink 输出 5.5 本章总结 5.3.…

流式数据采集和计算(十):Flink的DataStream学习笔记

Flink的DataStream学习笔记.. 1 Flink 基础.. 3 Flink特性.. 3 Flink和Spark对比.. 3 设计思路.. 3 状态管理.. 3 Flink 初探.. 4 设计架构.. 4 Flink on yarn. 5 流程分析.. 6 DataStream. 7 API程序结构.. 7 DataSource 8 Transformation. 9 Sink. 13 Time 14…

DataStream API(基础篇) 完整使用 (第五章)

DataStream API基础篇 一、执行环境&#xff08;Execution Environment&#xff09;1、创建执行环境1. getExecutionEnvironment2. createLocalEnvironment3. createRemoteEnvironment 二、执行模式(Execution Mode)1. BATCH模式的配置方法&#xff08;1&#xff09;通过命令行…

DataStream API 四 之 Flink DataStream编程

DataStream API 四 之 Flink DataStream编程 1.分布式流处理基本模型2.流应用开发步骤3.数据类型4. Connector5. Execution environment6. 参数传递7.配置并⾏度8.Watermark9.Checkpoint10.State11. Data Source11.111.2 自定义Source 12.Transformations13.Window13.1窗⼝处理…

Flink的DataStream介绍

1|0一&#xff1a;流式处理基本概念 流处理系统本身有很多自己的特点。一般来说&#xff0c;由于需要支持无限数据集的处理&#xff0c;流处理系统一般采用一种数据驱动的处理方式。它会提前设置一些算子&#xff0c;然后等到数据到达后对数据进行处理。 为了表达复杂的逻辑&am…

Flink DataStream API(基础版)

概述 DataStream&#xff08;数据流&#xff09;本身是 Flink 中一个用来表示数据集合的类&#xff08;Class&#xff09;&#xff0c;我们编写的 Flink 代码其实就是基于这种数据类型的处理&#xff0c;所以这套核心API 就以DataStream 命名。对于批处理和流处理&#xff0c;我…

node.js上开启服务,在同一局域网下的另一客户端访问

选择的服务是我之前做的案例&#xff1a;链接 1.在本机上开启服务&#xff1a; 2.本机上用浏览器访问验证无误&#xff1a; 3.运行cmd使用命令ipconfig查看本机ip地址 4.在另一台局域网下的机子&#xff0c;要求可以ping到。 浏览器访问ip地址&#xff1a;3000即可。&#…

前端 面试题

介绍项目 安全性 token 验证 处理令牌续期问题&#xff0c;在header中获取到新令牌时&#xff0c;替换老令牌&#xff0c;以达到用户无感刷新令牌 1、第一次登录的时候&#xff0c;前端调后端的登陆接口&#xff0c;发送用户名和密码 2、后端收到请求&#xff0c;验证用户名和…

基础知识---cmd命令行篇

1、echo&#xff08;输出&#xff09; > 覆盖 >>追加 2、dir&#xff08;展示当前目录的文件 .当前的目录 ..表示上一层目录&#xff09; 3、d&#xff1a;和cd&#xff1a;d为驱动器 cd展示当前目录的文件&#xff08; .为当前文件 ..为上一层目录的文件&#xf…

虚拟机NAT+静态IP+DNS

NAT模式下 虚拟机联网是通过物理机的VMware Nat服务&#xff08;电脑网络切换也无碍&#xff09;&#xff0c;禁用状态下ping不通物理机&#xff0c;也连不了网 物理机连接虚拟机的通过VMnet8虚拟网络适配器&#xff0c;禁用情况下ping不通虚拟机&#xff0c;Xshell工具也没法…

ACP考前错题总结(精华,已过ACP)

前言 证书和战绩镇楼&#xff0c;希望大家都可以拿到自己想要的Certificate。无论ACA-ACP-ACE亦或者GCP、AWS等等。 错题总结 镜像、本地磁盘部分 不建议基于本地服务器制作镜像上传到阿里云ECS并提供服务。 不支持写在座位数据盘使用的本地盘 镜像和快照&#xff1a;镜像可…

React+Native Unable to download JS Bundle解决办法

在配置ReactNative开发环境中&#xff0c;会遇到很多坑。 这个会困扰很多很多人。 在前序工作中&#xff0c;我们开启了8081端口&#xff0c;以供手机通过该端口下载相应的js。 而在命令行执行adb reverse tcp:8081 tcp:8081命令能解决一定的问题&#xff0c;但也有可能失效。…

网络协议 一 OSI参考模型、计算机通信基础 (集线器、网桥、交换机、路由器)

萌宅鹿网络系列 的基础上增强 目录 互联网&#xff08;internet&#xff09;为什么要学习网络协议客户端-服务器跨平台的原理&#xff08;Java、C&#xff09;网络互连模型&#xff08;OSI参考模型&#xff09;计算机之间的通信基础 计算机之间的连接方式 - 网线直连计算机之间…

物联网安全实践二

正文 一 实验目的及要求 物联网智能设备一般都提供WiFi接入&#xff0c;本实验是在WiFi密码破解基础上进一步对物联网智能设备配置服务开展安全性分析实验。比如智能物联网家居网关、智能家居中的智能插座等&#xff0c;一般都内置Web服务&#xff0c;方便本地登录Web网页开展…

ARP协议个人总结

一&#xff1a;引入 当网络设备要发送数据给另一台设备时&#xff0c;必须要知道对方的网络层地址&#xff08;即IP地址&#xff09;。IP地址由网络层来提供&#xff0c;但是仅有IP地址是不够的&#xff0c;IP数据报文必须封装成帧才能通过数据链路进行发送。数据帧必须包含目…

计算机网络知识点总结(ICMP、PING、OSPF、TIMEWAIT、CLOSEWAIT、HTTPS、HTTP2.0)

概述五层模型物理层数据链路层CSMA/CD协议PPP协议MAC地址局域网交换机 网络层&#xff08;IP层&#xff09;IP地址分类IP地址与物理地址的区别ARP协议&#xff08;重点&#xff09;ICMP协议&#xff08;重点&#xff09;Ping原理&#xff08;重点&#xff09;Traceroute原理&am…