DataStream API

article/2025/9/16 19:42:33

目录

原算子

准备工作,环境搭建

读取数据 

从文件中读取数据

从集合中读取数据

从元素中读取数据

从source文件中读取数据

 从kafka中读取数据

自定义source类型输出

转换算子

map转换

Filter转换

FlatMap转换


原算子

准备工作,环境搭建

为了更好地理解,我们先构建一个实际应用场景。比如网站的访问操作,可以抽象成一个三元组(用户名,用户访问的 urrl,用户访问 url 的时间戳),所以在这里,我们可以创建一个类 Event,将用户行为包装成它的一个对象。

import java.sql.Timestamp;
/*
应用场景*/
public class Event {public String user;public String ur1;//用户访问的urlpublic Long timestape;//用户访问url的时间public Event(){};public Event(String user,String ur1,Long timestape){this.timestape=timestape;this.ur1=ur1;this.user=user;}@Overridepublic String toString() {return "Event{" +"user='" + user + '\'' +", ur1='" + ur1 + '\'' +", timestape=" + new Timestamp(timestape)  +'}';}
}

读取数据 

要先创建读取数据的环境

//创建执行环境StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);

从文件中读取数据

运用readTextFile的方法

1.创建一个文件clicks.txt

Mary, ./home, 1000
DOUDOU, ./cart, 2000
Bob, ./porp?id=100, 3000
DOUDOU, ./home, 4000

2.读取文件中的数据

 //从文件中读取数据DataStreamSource<String> stream1=env.readTextFile("input/clicks.txt");stream1.print("1");env.execute();

从集合中读取数据

 //从集合里读取数据ArrayList<Event> events=new ArrayList<>();events.add(new Event("DOUDOU","./home",1000L));DataStreamSource<Event> stream2=env.fromCollection(events);stream2.print("2");env.execute();

从元素中读取数据

//从元素中读取数据DataStreamSource<Event> stream3=env.fromElements(new Event("DOUDOU","./home",1000L));

从source文件中读取数据

首先要打开hadoop102的端口

  //从socket文本流中读取数据DataStreamSource<String> stream4=env.socketTextStream("hadoop102",7777);stream4.print("4");

 从kafka中读取数据

//从kafka中读取数据Properties properties=new Properties();properties.setProperty("bootstrap.servers","hadoop102:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");DataStreamSource<String> kafkaStream=env.addSource(new FlinkKafkaConsumer<String>("clicks",new SimpleStringSchema(),properties));kafkaStream.print();env.execute();

开启zookeeper和kafka

zk.sh start
kf.sh start

创建用户

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic clicks

自定义source类型输出

 1.创建一个实现SourceFunction<Event>的类,创造数据

import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Calendar;
import java.util.Random;public class ClicksSource implements SourceFunction<Event> {//声明一个标志位private Boolean running=true;@Overridepublic void run(SourceContext<Event> sourceContext) throws Exception {//生成随机数据Random random=new Random();//自定义选取的数据集String[] users = {"Mary", "Alice", "Bob", "Cary"};String[] urls = {"./home", "./cart", "./fav", "./prod?id=1","./prod?id=2"};//循环生成的数据while (running){String user=users[random.nextInt(users.length)];String ur1=urls[random.nextInt(urls.length)];Long timestap= Calendar.getInstance().getTimeInMillis();sourceContext.collect(new Event(user,ur1,timestap));Thread.sleep(1000L);}}@Overridepublic void cancel() {running=false;}
}

2.实现自定义source输出

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*
用户自定义source测试*/
public class SourceCustomTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Event> customStream=env.addSource(new ClicksSource());customStream.print();env.execute();}
}

转换算子

map转换

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class TransformMapTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从元素中读取数据DataStreamSource<Event> stream3=env.fromElements(new Event("DOUDOU","./home",1000L));//进行转换计算,提取user字段//1.使用自定义类实现MapFunction接口SingleOutputStreamOperator<String> result=stream3.map(new MyMapper());//2.使用匿名类实现SingleOutputStreamOperator<String> result2=stream3.map(new MapFunction<Event, String>() {@Overridepublic String map(Event event) throws Exception {return event.user;}});//3.传入Lambda表达式SingleOutputStreamOperator<String> relult3=stream3.map(data -> data.user);result2.print();result.print();relult3.print();env.execute();}//自定义MapFunctionpublic static class MyMapper implements MapFunction<Event,String>{@Overridepublic String map(Event event) throws Exception {return event.user;}}
}

Filter转换

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class TransformFilterTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从元素中读取数据DataStreamSource<Event> stream3=env.fromElements(new Event("DOUDOU","./home",1000L));//1.自定义类对象SingleOutputStreamOperator<Event> result1= stream3.filter(new MyFilter());//2.传入匿名类SingleOutputStreamOperator<Event> result2=stream3.filter(new FilterFunction<Event>() {@Overridepublic boolean filter(Event event) throws Exception {return event.user.equals("DOUDOU");}});//输入Lambda表达式SingleOutputStreamOperator<Event> result3=stream3.filter(data ->data.user.equals("DOUDOU"));result3.print();result1.print();result2.print();env.execute();}//自定义对象public static class MyFilter implements FilterFunction<Event>{@Overridepublic boolean filter(Event event) throws Exception {return event.user.equals("DOUDOU");}}
}

FlatMap转换

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class TransformFlatMapTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//从元素中读取数据DataStreamSource<Event> stream3=env.fromElements(new Event("DOUDOU","./home",1000L));//1.自定义stream3.flatMap(new MyFlatMap()).print();//2.传入Lamba表达式stream3.flatMap((Event value ,Collector<String> out) -> {if (value.user.equals("DOUDOU"))out.collect(value.ur1);else if (value.user.equals("DOUDOU")){out.collect(value.user);out.collect(value.ur1);out.collect(value.timestape.toString());}}) .returns(new TypeHint<String>() {}).print("2");env.execute();}//自定义类public static class MyFlatMap implements FlatMapFunction<Event,String>{@Overridepublic void flatMap(Event event, Collector<String> collector) throws Exception {collector.collect(event.user);collector.collect(event.ur1);collector.collect(event.timestape.toString());}}
}


http://chatgpt.dhexx.cn/article/oqDCJoIJ.shtml

相关文章

Flink学习——DataStream API

一个flink程序&#xff0c;其实就是对DataStream的各种转换。具体可以分成以下几个部分&#xff1a; 获取执行环境&#xff08;Execution Environment&#xff09;读取数据源&#xff08;Source&#xff09;定义基于数据的转换操作&#xff08;Transformations&#xff09;定义…

大数据开发-Flink-数据流DataStream和DataSet

文章目录 一、DataStream的三种流处理Api1.1 DataSource1.2 Transformation1.3 Sink 二、DataSet的常用Api2.1 DataSource2.2 Transformation2.3 Sink Flink主要用来处理数据流&#xff0c;所以从抽象上来看就是对数据流的处理&#xff0c;正如前面大数据开发-Flink-体系结构 &…

Flink DataStream API 介绍

Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-JKeWa22W2vWA4zBS .error-icon{fill:#552222;}#mermaid-svg-JKeWa22W2vWA4z…

DataStream API介绍与使用(一)

详细API参考官网 DataStream编程模型 在Flink整个系统架构中&#xff0c;对流计算的支持是其最重要的功能之一&#xff0c;Flink基于Google提出的DataFlow模型&#xff0c;实现了支持原生数据流处理的计算引擎。Flink中定义了DataStream API让用户灵活且高效地编写Flink流式应…

DataStream API(一)

Flink 有非常灵活的分层 API 设计&#xff0c;其中的核心层就是 DataStream/DataSet API。由于新版 本已经实现了流批一体&#xff0c; DataSet API 将被弃用&#xff0c;官方推荐统一使用 DataStream API 处理流数 据和批数据。由于内容较多&#xff0c;我们将会用几章的篇幅来…

DataStream(二)

目录 5.3.2 聚合算子&#xff08;Aggregation&#xff09; 5.3.3 用户自定义函数&#xff08;UDF&#xff09; 3. 扁平映射&#xff08;flatMap&#xff09; flatMap 操作又称为扁平映射&#xff0c;主要是将数据流中的整体&#xff08;一般是集合类型&#xff09;拆分成一个 …

Flink DataStream API

Flink DataStream API 编程指南 概览前言什么是DataStreamFlink程序剖析程序样例 Data SourcesDataStream Transformations算子数据流转换算子物理分区算子链和资源组 Data Sinks迭代执行参数 概览 前言 Flink中的DataStream程序是常规程序&#xff0c;可对数据流进行转换&am…

DataStream API(三)

目录 5.3.4 物理分区&#xff08;Physical Partitioning&#xff09; 5.4 输出算子&#xff08;Sink&#xff09; 5.4.1 连接到外部系统 5.4.2 输出到文件 5.4.3 输出到 Kafka 5.4.4 输出到 MySQL&#xff08;JDBC&#xff09; 5.4.5 自定义 Sink 输出 5.5 本章总结 5.3.…

流式数据采集和计算(十):Flink的DataStream学习笔记

Flink的DataStream学习笔记.. 1 Flink 基础.. 3 Flink特性.. 3 Flink和Spark对比.. 3 设计思路.. 3 状态管理.. 3 Flink 初探.. 4 设计架构.. 4 Flink on yarn. 5 流程分析.. 6 DataStream. 7 API程序结构.. 7 DataSource 8 Transformation. 9 Sink. 13 Time 14…

DataStream API(基础篇) 完整使用 (第五章)

DataStream API基础篇 一、执行环境&#xff08;Execution Environment&#xff09;1、创建执行环境1. getExecutionEnvironment2. createLocalEnvironment3. createRemoteEnvironment 二、执行模式(Execution Mode)1. BATCH模式的配置方法&#xff08;1&#xff09;通过命令行…

DataStream API 四 之 Flink DataStream编程

DataStream API 四 之 Flink DataStream编程 1.分布式流处理基本模型2.流应用开发步骤3.数据类型4. Connector5. Execution environment6. 参数传递7.配置并⾏度8.Watermark9.Checkpoint10.State11. Data Source11.111.2 自定义Source 12.Transformations13.Window13.1窗⼝处理…

Flink的DataStream介绍

1|0一&#xff1a;流式处理基本概念 流处理系统本身有很多自己的特点。一般来说&#xff0c;由于需要支持无限数据集的处理&#xff0c;流处理系统一般采用一种数据驱动的处理方式。它会提前设置一些算子&#xff0c;然后等到数据到达后对数据进行处理。 为了表达复杂的逻辑&am…

Flink DataStream API(基础版)

概述 DataStream&#xff08;数据流&#xff09;本身是 Flink 中一个用来表示数据集合的类&#xff08;Class&#xff09;&#xff0c;我们编写的 Flink 代码其实就是基于这种数据类型的处理&#xff0c;所以这套核心API 就以DataStream 命名。对于批处理和流处理&#xff0c;我…

node.js上开启服务,在同一局域网下的另一客户端访问

选择的服务是我之前做的案例&#xff1a;链接 1.在本机上开启服务&#xff1a; 2.本机上用浏览器访问验证无误&#xff1a; 3.运行cmd使用命令ipconfig查看本机ip地址 4.在另一台局域网下的机子&#xff0c;要求可以ping到。 浏览器访问ip地址&#xff1a;3000即可。&#…

前端 面试题

介绍项目 安全性 token 验证 处理令牌续期问题&#xff0c;在header中获取到新令牌时&#xff0c;替换老令牌&#xff0c;以达到用户无感刷新令牌 1、第一次登录的时候&#xff0c;前端调后端的登陆接口&#xff0c;发送用户名和密码 2、后端收到请求&#xff0c;验证用户名和…

基础知识---cmd命令行篇

1、echo&#xff08;输出&#xff09; > 覆盖 >>追加 2、dir&#xff08;展示当前目录的文件 .当前的目录 ..表示上一层目录&#xff09; 3、d&#xff1a;和cd&#xff1a;d为驱动器 cd展示当前目录的文件&#xff08; .为当前文件 ..为上一层目录的文件&#xf…

虚拟机NAT+静态IP+DNS

NAT模式下 虚拟机联网是通过物理机的VMware Nat服务&#xff08;电脑网络切换也无碍&#xff09;&#xff0c;禁用状态下ping不通物理机&#xff0c;也连不了网 物理机连接虚拟机的通过VMnet8虚拟网络适配器&#xff0c;禁用情况下ping不通虚拟机&#xff0c;Xshell工具也没法…

ACP考前错题总结(精华,已过ACP)

前言 证书和战绩镇楼&#xff0c;希望大家都可以拿到自己想要的Certificate。无论ACA-ACP-ACE亦或者GCP、AWS等等。 错题总结 镜像、本地磁盘部分 不建议基于本地服务器制作镜像上传到阿里云ECS并提供服务。 不支持写在座位数据盘使用的本地盘 镜像和快照&#xff1a;镜像可…

React+Native Unable to download JS Bundle解决办法

在配置ReactNative开发环境中&#xff0c;会遇到很多坑。 这个会困扰很多很多人。 在前序工作中&#xff0c;我们开启了8081端口&#xff0c;以供手机通过该端口下载相应的js。 而在命令行执行adb reverse tcp:8081 tcp:8081命令能解决一定的问题&#xff0c;但也有可能失效。…

网络协议 一 OSI参考模型、计算机通信基础 (集线器、网桥、交换机、路由器)

萌宅鹿网络系列 的基础上增强 目录 互联网&#xff08;internet&#xff09;为什么要学习网络协议客户端-服务器跨平台的原理&#xff08;Java、C&#xff09;网络互连模型&#xff08;OSI参考模型&#xff09;计算机之间的通信基础 计算机之间的连接方式 - 网线直连计算机之间…