大数据开发-Flink-数据流DataStream和DataSet

article/2025/9/16 19:50:41

文章目录

  • 一、DataStream的三种流处理Api
      • 1.1 DataSource
      • 1.2 Transformation
      • 1.3 Sink
  • 二、DataSet的常用Api
      • 2.1 DataSource
      • 2.2 Transformation
      • 2.3 Sink


Flink主要用来处理数据流,所以从抽象上来看就是对数据流的处理,正如前面大数据开发-Flink-体系结构 && 运行架构提到写Flink程序实际上就是在写DataSource、Transformation、Sink.

  • DataSource是程序的数据源输入,可以通过StreamExecutionEnvironment.addSource(sourceFuntion)为程序
    添加一个数据源
  • Transformation是具体的操作,它对一个或多个输入数据源进行计算处理,比如Map、FlatMap和Filter等操作
  • Sink是程序的输出,它可以把Transformation处理之后的数据输出到指定的存储介质中

一、DataStream的三种流处理Api

1.1 DataSource

Flink针对DataStream提供了两种实现方式的数据源,可以归纳为以下四种:

  • 基于文件

readTextFile(path) 读取文本文件,文件遵循TextInputFormat逐行读取规则并返回

  • 基于Socket

socketTextStream 从Socket中读取数据,元素可以通过一个分隔符分开

  • 基于集合

fromCollection(Collection) 通过Java的Collection集合创建一个数据流,集合中的所有元素必须是相同类型的,需要注意的是,如果集合里面的元素要识别为POJO,需要满足下面的条件

  • 该类有共有的无参构造方法
  • 该类是共有且独立的(没有非静态内部类)
  • 类(及父类)中所有的不被static、transient修饰的属性要么有公有的(且不被final修饰),要么是包含公有的getter和setter方法,这些方法遵循java bean命名规范

总结:上面的要求其实就是为了让Flink可以方便地序列化和反序列化这些对象为数据流

  • 自定义Source

使用StreamExecutionEnvironment.addSource(sourceFunction) 将一个流式数据源加到程序中,具体这个sourceFunction 是为非并行源implements SourceFunction,或者为并行源 implements ParallelSourceFunction接口,或者extends RichParallelSourceFunction,对于自定义Source,Sink, Flink内置了下面几种Connector

在这里插入图片描述

对于Source的使用,其实较简单,这里给一个较常用的自定义Source的KafaSource的使用例子。更多相关源码可以查看:

package com.hoult.stream;public class SourceFromKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String topic = "animalN";Properties props = new Properties();props.put("bootstrap.servers", "linux121:9092");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), props);DataStreamSource<String> data = env.addSource(consumer);SingleOutputStreamOperator<Tuple2<Long, Long>> maped = data.map(new MapFunction<String, Tuple2<Long, Long>>() {@Overridepublic Tuple2<Long, Long> map(String value) throws Exception {System.out.println(value);Tuple2<Long,Long> t = new Tuple2<Long,Long>(0l,0l);String[] split = value.split(",");try{t = new Tuple2<Long, Long>(Long.valueOf(split[0]), Long.valueOf(split[1]));} catch (Exception e) {e.printStackTrace();}return t;}});KeyedStream<Tuple2<Long,Long>, Long> keyed = maped.keyBy(value -> value.f0);//按照key分组策略,对流式数据调用状态化处理SingleOutputStreamOperator<Tuple2<Long, Long>> flatMaped = keyed.flatMap(new RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {ValueState<Tuple2<Long, Long>> sumState;@Overridepublic void open(Configuration parameters) throws Exception {//在open方法中做出StateValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>("average",TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),Tuple2.of(0L, 0L));sumState = getRuntimeContext().getState(descriptor);
//                super.open(parameters);}@Overridepublic void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {//在flatMap方法中,更新StateTuple2<Long, Long> currentSum = sumState.value();currentSum.f0 += 1;currentSum.f1 += value.f1;sumState.update(currentSum);out.collect(currentSum);/*if (currentSum.f0 == 2) {long avarage = currentSum.f1 / currentSum.f0;out.collect(new Tuple2<>(value.f0, avarage));sumState.clear();}*/}});flatMaped.print();env.execute();}
}

1.2 Transformation

对于Transformation ,Flink提供了很多的算子.

  • map
    DataStream → DataStream Takes one element and produces one element. A map function that doubles the values of the input stream:
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer value) throws Exception {return 2 * value;}
});
  • flatMap
    DataStream → DataStream Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
dataStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {for(String word: value.split(" ")){out.collect(word);}}
});
  • filter
    DataStream → DataStream Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:
dataStream.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) throws Exception {return value != 0;}
});

更多算子操作可以查看官网,官网写的很好:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/overview/

1.3 Sink

Flink针对DataStream提供了大量的已经实现的数据目的地(Sink),具体如下所示

  • writeAsText():讲元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取

  • print()/printToErr():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中

  • 自定义输出:addSink可以实现把数据输出到第三方存储介质中, Flink提供了一批内置的Connector,其中有的Connector会提供对应的Sink支持

这里举一个常见的例子,下层到Kafka

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class StreamToKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> data = env.socketTextStream("teacher2", 7777);String brokerList = "teacher2:9092";String topic = "mytopic2";FlinkKafkaProducer producer = new FlinkKafkaProducer(brokerList, topic, new SimpleStringSchema());data.addSink(producer);env.execute();}
}

二、DataSet的常用Api

2.1 DataSource

对DataSet批处理而言,较为频繁的操作是读取HDFS中的文件数据,因为这里主要介绍两个DataSource组件

  • 基于集合 ,用来测试和DataStream类似

  • 基于文件 readTextFile…

2.2 Transformation

在这里插入图片描述

更多算子可以查看官网:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/dataset/overview/

2.3 Sink

Flink针对DataStream提供了大量的已经实现的数据目的地(Sink),具体如下所示

  • writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取

  • writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的,每个字段的值来自对象的

  • toString()方法

  • print()/pringToErr():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
    Flink提供了一批内置的Connector,其中有的Connector会提供对应的Sink支持,如1.1节中表所示


http://chatgpt.dhexx.cn/article/PmUe2EdJ.shtml

相关文章

Flink DataStream API 介绍

Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-JKeWa22W2vWA4zBS .error-icon{fill:#552222;}#mermaid-svg-JKeWa22W2vWA4z…

DataStream API介绍与使用(一)

详细API参考官网 DataStream编程模型 在Flink整个系统架构中&#xff0c;对流计算的支持是其最重要的功能之一&#xff0c;Flink基于Google提出的DataFlow模型&#xff0c;实现了支持原生数据流处理的计算引擎。Flink中定义了DataStream API让用户灵活且高效地编写Flink流式应…

DataStream API(一)

Flink 有非常灵活的分层 API 设计&#xff0c;其中的核心层就是 DataStream/DataSet API。由于新版 本已经实现了流批一体&#xff0c; DataSet API 将被弃用&#xff0c;官方推荐统一使用 DataStream API 处理流数 据和批数据。由于内容较多&#xff0c;我们将会用几章的篇幅来…

DataStream(二)

目录 5.3.2 聚合算子&#xff08;Aggregation&#xff09; 5.3.3 用户自定义函数&#xff08;UDF&#xff09; 3. 扁平映射&#xff08;flatMap&#xff09; flatMap 操作又称为扁平映射&#xff0c;主要是将数据流中的整体&#xff08;一般是集合类型&#xff09;拆分成一个 …

Flink DataStream API

Flink DataStream API 编程指南 概览前言什么是DataStreamFlink程序剖析程序样例 Data SourcesDataStream Transformations算子数据流转换算子物理分区算子链和资源组 Data Sinks迭代执行参数 概览 前言 Flink中的DataStream程序是常规程序&#xff0c;可对数据流进行转换&am…

DataStream API(三)

目录 5.3.4 物理分区&#xff08;Physical Partitioning&#xff09; 5.4 输出算子&#xff08;Sink&#xff09; 5.4.1 连接到外部系统 5.4.2 输出到文件 5.4.3 输出到 Kafka 5.4.4 输出到 MySQL&#xff08;JDBC&#xff09; 5.4.5 自定义 Sink 输出 5.5 本章总结 5.3.…

流式数据采集和计算(十):Flink的DataStream学习笔记

Flink的DataStream学习笔记.. 1 Flink 基础.. 3 Flink特性.. 3 Flink和Spark对比.. 3 设计思路.. 3 状态管理.. 3 Flink 初探.. 4 设计架构.. 4 Flink on yarn. 5 流程分析.. 6 DataStream. 7 API程序结构.. 7 DataSource 8 Transformation. 9 Sink. 13 Time 14…

DataStream API(基础篇) 完整使用 (第五章)

DataStream API基础篇 一、执行环境&#xff08;Execution Environment&#xff09;1、创建执行环境1. getExecutionEnvironment2. createLocalEnvironment3. createRemoteEnvironment 二、执行模式(Execution Mode)1. BATCH模式的配置方法&#xff08;1&#xff09;通过命令行…

DataStream API 四 之 Flink DataStream编程

DataStream API 四 之 Flink DataStream编程 1.分布式流处理基本模型2.流应用开发步骤3.数据类型4. Connector5. Execution environment6. 参数传递7.配置并⾏度8.Watermark9.Checkpoint10.State11. Data Source11.111.2 自定义Source 12.Transformations13.Window13.1窗⼝处理…

Flink的DataStream介绍

1|0一&#xff1a;流式处理基本概念 流处理系统本身有很多自己的特点。一般来说&#xff0c;由于需要支持无限数据集的处理&#xff0c;流处理系统一般采用一种数据驱动的处理方式。它会提前设置一些算子&#xff0c;然后等到数据到达后对数据进行处理。 为了表达复杂的逻辑&am…

Flink DataStream API(基础版)

概述 DataStream&#xff08;数据流&#xff09;本身是 Flink 中一个用来表示数据集合的类&#xff08;Class&#xff09;&#xff0c;我们编写的 Flink 代码其实就是基于这种数据类型的处理&#xff0c;所以这套核心API 就以DataStream 命名。对于批处理和流处理&#xff0c;我…

node.js上开启服务,在同一局域网下的另一客户端访问

选择的服务是我之前做的案例&#xff1a;链接 1.在本机上开启服务&#xff1a; 2.本机上用浏览器访问验证无误&#xff1a; 3.运行cmd使用命令ipconfig查看本机ip地址 4.在另一台局域网下的机子&#xff0c;要求可以ping到。 浏览器访问ip地址&#xff1a;3000即可。&#…

前端 面试题

介绍项目 安全性 token 验证 处理令牌续期问题&#xff0c;在header中获取到新令牌时&#xff0c;替换老令牌&#xff0c;以达到用户无感刷新令牌 1、第一次登录的时候&#xff0c;前端调后端的登陆接口&#xff0c;发送用户名和密码 2、后端收到请求&#xff0c;验证用户名和…

基础知识---cmd命令行篇

1、echo&#xff08;输出&#xff09; > 覆盖 >>追加 2、dir&#xff08;展示当前目录的文件 .当前的目录 ..表示上一层目录&#xff09; 3、d&#xff1a;和cd&#xff1a;d为驱动器 cd展示当前目录的文件&#xff08; .为当前文件 ..为上一层目录的文件&#xf…

虚拟机NAT+静态IP+DNS

NAT模式下 虚拟机联网是通过物理机的VMware Nat服务&#xff08;电脑网络切换也无碍&#xff09;&#xff0c;禁用状态下ping不通物理机&#xff0c;也连不了网 物理机连接虚拟机的通过VMnet8虚拟网络适配器&#xff0c;禁用情况下ping不通虚拟机&#xff0c;Xshell工具也没法…

ACP考前错题总结(精华,已过ACP)

前言 证书和战绩镇楼&#xff0c;希望大家都可以拿到自己想要的Certificate。无论ACA-ACP-ACE亦或者GCP、AWS等等。 错题总结 镜像、本地磁盘部分 不建议基于本地服务器制作镜像上传到阿里云ECS并提供服务。 不支持写在座位数据盘使用的本地盘 镜像和快照&#xff1a;镜像可…

React+Native Unable to download JS Bundle解决办法

在配置ReactNative开发环境中&#xff0c;会遇到很多坑。 这个会困扰很多很多人。 在前序工作中&#xff0c;我们开启了8081端口&#xff0c;以供手机通过该端口下载相应的js。 而在命令行执行adb reverse tcp:8081 tcp:8081命令能解决一定的问题&#xff0c;但也有可能失效。…

网络协议 一 OSI参考模型、计算机通信基础 (集线器、网桥、交换机、路由器)

萌宅鹿网络系列 的基础上增强 目录 互联网&#xff08;internet&#xff09;为什么要学习网络协议客户端-服务器跨平台的原理&#xff08;Java、C&#xff09;网络互连模型&#xff08;OSI参考模型&#xff09;计算机之间的通信基础 计算机之间的连接方式 - 网线直连计算机之间…

物联网安全实践二

正文 一 实验目的及要求 物联网智能设备一般都提供WiFi接入&#xff0c;本实验是在WiFi密码破解基础上进一步对物联网智能设备配置服务开展安全性分析实验。比如智能物联网家居网关、智能家居中的智能插座等&#xff0c;一般都内置Web服务&#xff0c;方便本地登录Web网页开展…

ARP协议个人总结

一&#xff1a;引入 当网络设备要发送数据给另一台设备时&#xff0c;必须要知道对方的网络层地址&#xff08;即IP地址&#xff09;。IP地址由网络层来提供&#xff0c;但是仅有IP地址是不够的&#xff0c;IP数据报文必须封装成帧才能通过数据链路进行发送。数据帧必须包含目…