Flink之IntervalJoin介绍

article/2025/10/14 20:39:09

InterValJoin算子
间隔流,一条流去join另一条流去过去一段时间内的数据,该算子将keyedStream与keyedStream转化为DataStream;再给定的时间边界内(默认包含边界),相当于一个窗口,按指定的key对俩个KeyedStream进行Join操作,把符合join条件的俩个event拉倒一起,然后咋么处理右用户来决定。
1、key1 == key2 && e1.timestamp +lowerBound <= e2.timestamp +upperBound
2、场景:把一定时间范围内相关的分组数据拉成一个宽表

语法规则:

leftKeyedStream
.intervalJoin(rightKeyedStream)
//时间间隔,设定下界和上界
.between(Time.minutes(-10),Time.seconds(0))
//不包含下界
.lowerBoundExclusive()
//不包含上界
.upperBoundExclusive()
//自定义ProcessJoinFunction 处理join到的元组
.process(ProcessJoinFunction) 

该算子的注意事项:
1、俩条流都缓存在内部state中。leftElement到达,去获取State中rightElement响应时间范围内的数据,然后执行ProcessJoinFunciton进行Join操作;
2、时间间隔:leftElement默认和【leftElementEventTime + lowerBound,leftElementEventTime +upperBound】时间范围内的rightElement join;
3、举例:leftElementEventTime = 2019-11-16 17:30:00,lowerBound=-10minute,upperBound=0,则这条leftElement按Key和【2019-11-16 17:20:00,2019-11-16 17:30:00】时间范围内的rightElementJoin;
4、IntervalJoin目前只支持EventTime;
5、数据量比较大,可能使用RocksDBStateBackend

demo案列:

package Flink_API;import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.table.shaded.org.joda.time.DateTime;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormat;
import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.util.Properties;public class TestInterViewJoin {public static void main(String[] args) throws Exception {//创建运行环境StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();//Flink是以数据自带的时间戳字段为准env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置并行度env.setParallelism(1);//1、获取第一个流,获取用户的浏览信息DataStream<UserBrowseLog> browseStream = getUserBrowseDataStream(env);//2、获取用户的点击信息DataStream<UserClickLog> clickStream = getUserClickLogDataStream(env);//打印结果browseStream.print();clickStream.print();//核心:双流进行IntervalJoin操作:每个用户的点击信息Join这个用户最近10分钟内的浏览信息//browseStream(左流)关联clickStream(右流)KeyedStream<UserClickLog,String> userClickLogStringKeyedStream = clickStream.keyBy(new KeySelector<UserClickLog,String>(){@Overridepublic String getKey(UserClickLog userClickLog) throws Exception {return userClickLog.userID;}});KeyedStream<UserBrowseLog,String> userBrowseLogStringKeyedStream1=browseStream.keyBy(new KeySelector<UserBrowseLog,String>(){@Overridepublic String getKey(UserBrowseLog userBrowseLog) throws Exception {return userBrowseLog.userID;}});//每个用户的点击Join这个用户最近的10分钟内的浏览DataStream<String> processData = userClickLogStringKeyedStream.intervalJoin(userBrowseLogStringKeyedStream1).between(Time.minutes(-10),Time.seconds(0))//下界:10分钟,上界:当前EventTime时刻(左流去右流10分钟之前去找数据).process(new ProcessJoinFunction<UserClickLog, UserBrowseLog, String>() {//leftElement到达,去获取State中rightElement响应范围内的数据,然后执行ProcessJoinFunction进行Join操作:@Overridepublic void processElement(UserClickLog left, UserBrowseLog right, Context context, Collector<String> collector) throws Exception {collector.collect(left+"<IntevalJoin>"+right);}});processData.print();//程序的入口类env.execute("TestInterViewJoin");}private static DataStream<UserClickLog> getUserClickLogDataStream(StreamExecutionEnvironment env) {Properties consumerProperties = new Properties();consumerProperties.setProperty("bootstrap.severs","page01:9002");consumerProperties.setProperty("grop.id","browsegroup");DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic1", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));DataStream<UserClickLog> processData=dataStreamSource.process(new ProcessFunction<String, UserClickLog>() {@Overridepublic void processElement(String s, Context context, Collector<UserClickLog> collector) throws Exception {try{UserClickLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserClickLog.class);if(browseLog !=null){collector.collect(browseLog);}}catch(Exception e){System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());}}});//设置watermarkreturn processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserClickLog>(Time.seconds(0)){@Overridepublic long extractTimestamp(UserClickLog userBrowseLog) {DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);//用数字表示时间戳,单位是ms,13位return dateTime.getMillis();}});}private static DataStream<UserBrowseLog> getUserBrowseDataStream(StreamExecutionEnvironment env) {Properties consumerProperties = new Properties();consumerProperties.setProperty("bootstrap.severs","page01:9001");consumerProperties.setProperty("grop.id","browsegroup");DataStreamSource<String> dataStreamSource=env.addSource(new FlinkKafkaConsumer010<String>("browse_topic", (KeyedDeserializationSchema<String>) new SimpleStringSchema(),consumerProperties));DataStream<UserBrowseLog> processData=dataStreamSource.process(new ProcessFunction<String, UserBrowseLog>() {@Overridepublic void processElement(String s, Context context, Collector<UserBrowseLog> collector) throws Exception {try{UserBrowseLog browseLog = com.alibaba.fastjson.JSON.parseObject(s, UserBrowseLog.class);if(browseLog !=null){collector.collect(browseLog);}}catch(Exception e){System.out.print("解析Json——UserBrowseLog异常:"+e.getMessage());}}});//设置watermarkreturn processData.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserBrowseLog>(Time.seconds(0)) {@Overridepublic long extractTimestamp(UserBrowseLog userBrowseLog) {DateTimeFormatter dateTimeFormatter= DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");DateTime dateTime=DateTime.parse(userBrowseLog.getEventTime(),dateTimeFormatter);//用数字表示时间戳,单位是ms,13位return dateTime.getMillis();}});}//浏览类public static class UserBrowseLog implements Serializable {private String userID;private String eventTime;private String eventType;private String productID;private Integer productPrice;public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getEventTime() {return eventTime;}public void setEventTime(String eventTime) {this.eventTime = eventTime;}public String getEventType() {return eventType;}public void setEventType(String eventType) {this.eventType = eventType;}public String getProductID() {return productID;}public void setProductID(String productID) {this.productID = productID;}public Integer getProductPrice() {return productPrice;}public void setProductPrice(Integer productPrice) {this.productPrice = productPrice;}@Overridepublic String toString() {return "UserBrowseLog{" +"userID='" + userID + '\'' +", eventTime='" + eventTime + '\'' +", eventType='" + eventType + '\'' +", productID='" + productID + '\'' +", productPrice=" + productPrice +'}';}}//点击类public static class UserClickLog implements Serializable{private String userID;private String eventTime;private String eventType;private String pageID;public String getUserID() {return userID;}public void setUserID(String userID) {this.userID = userID;}public String getEventTime() {return eventTime;}public void setEventTime(String eventTime) {this.eventTime = eventTime;}public String getEventType() {return eventType;}public void setEventType(String eventType) {this.eventType = eventType;}public String getPageID() {return pageID;}public void setPageID(String pageID) {this.pageID = pageID;}@Overridepublic String toString() {return "UserClickLog{" +"userID='" + userID + '\'' +", eventTime='" + eventTime + '\'' +", eventType='" + eventType + '\'' +", pageID='" + pageID + '\'' +'}';}}}


http://chatgpt.dhexx.cn/article/LlWXBZtn.shtml

相关文章

Mysql的INTERVAL()函数和INTERVAL关键字

一&#xff0c;INTERVAL()函数 INTERVAL()函数可以返回分段后的结果&#xff0c;语法如下&#xff1a; INTERVAL(N,N1,N2,N3,..........) 其中&#xff0c;N是要判断的数值&#xff0c;N1,N2,N3,...是分段的间隔。 这个函数的返回值是段的位置&#xff1a; 如果N<N1&am…

Mysql_interval函数与关键字

目录 一、统计不同区间的数量&#xff1a;INTERVAL()函数 二、时间间隔&#xff1a;INTERVAL关键字 一、统计不同区间的数量&#xff1a;INTERVAL()函数 INTERVAL()函数可以返回分段后的结果&#xff0c;语法如下&#xff1a; INTERVAL(N,N1,N2,N3,......,Nn) 其中&#x…

INTERVAL记录

INTERVAL作为sql中关键字 interval在sql中一般表示为时间间隔的意思 常用在date_add()、**date_sub()**函数中&#xff0c;常用于时间的加减法 上述sql表示为当前时间一年后的时间。 interval后可选择的时间粒度&#xff1a; MICROSECOND&#xff0c; SECOND&#xff0c; MIN…

万能的Attention及其代码实现

最近看到以前的代码&#xff0c;想到了attention&#xff0c;趁着代码还在就来整理一下。 文章目录 [Squeeze-and-Excitation Networks](https://arxiv.org/abs/1709.01507)[Concurrent Spatial and Channel ‘Squeeze &Excitation’ in Fully Convolutional Networks](ht…

NLP中的Attention总结

Attention是模拟人脑的注意力机制。人在看到一些东西&#xff0c;往往只关注重要的信息&#xff0c;而忽略其他信息。自然语言处理中的attention是对文本分配注意力权重&#xff0c;Attention的本质就是从关注全部变为关注重点。 1 Attention的原理 Attention的计算过程&#…

Attention 理解汇总

在 Encoder-Decoder中&#xff0c;预测每个Decoder 的语义编码 c 是一样的&#xff0c;句子 X 中每个词对输出 Y 的每个词的影响都是相同的。 这样有两个弊端&#xff1a; 一是语义向量无法完全表示整个序列的信息&#xff1b; 二是先输入的内容携带的信息会被后输入的信息稀…

[深度学习] 自然语言处理 --- Self-Attention(三) 知识点与源码解析

在当前的 NLP 领域&#xff0c;Transformer / BERT 已然成为基础应用&#xff0c;而 Self-Attention 则是两者的核心部分&#xff0c;下面尝试用 Q&A 和源码的形式深入 Self-Attention 的细节。 一 Q&A 1. Self-Attention 的核心是什么&#xff1f; Self-Attention …

attention介绍

Attention 正在被越来越广泛的得到应用。 Attention 到底有什么特别之处&#xff1f;他的原理和本质是什么&#xff1f;Attention都有哪些类型&#xff1f;本文将详细讲解Attention的方方面面。 Attention 的本质是什么 Attention&#xff08;注意力&#xff09;机制如果浅层…

Attention原理

文章目录 Attention原理HAN原理利用Attention模型进行文本分类参考资料 Attention原理 转载一个Hierarchical Attention神经网络的实现 转载 图解Transformer 转载 Attention原理和源码解析 论文链接 Attention is All You Need HAN原理 论文链接Hierarchical Attention Net…

Transformer详解(二):Attention机制

1.Encoder-Decoder中的attention机制 上一篇文章最后&#xff0c;在Encoder-Decoder框架中&#xff0c;输入数据的全部信息被保存在了C。而这个C很容易受到输入句子长度的影响。当句子过长时&#xff0c;C就有可能存不下这些信息&#xff0c;导致模型后续的精度下降。Attentio…

attention模型

以机器翻译为例说明&#xff1a; 普通的RNN机器翻译模型: 次结构弱点在于当target句子太长时&#xff0c;前面encoder学习到的embedding vector(红边框)可能会被后面的decoder遗忘。因此改进版本如下: 这样&#xff0c;每次在输入target的word的时候&#xff0c;除了可以看到…

【深度学习】Self-Attention 原理与代码实现

1.Self-Attention 结构 在计算的时候需要用到矩阵Q(查询),K(键值),V(值)。在实际中&#xff0c;Self-Attention 接收的是输入(单词的表示向量x组成的矩阵X) 或者上一个 Encoder block 的输出。而Q,K,V正是通过 Self-Attention 的输入进行线性变换得到的。 2. Q, K, V 的计算 S…

Self -Attention、Multi-Head Attention、Cross-Attention

Self -Attention Transformer结构图 上图是论文中 Transformer 的内部结构图&#xff0c;左侧为 Encoder block&#xff0c;右侧为 Decoder block。红色圈中的部分为 Multi-Head Attention&#xff0c;是由多个 Self-Attention组成的&#xff0c;可以看到 Encoder block 包含一…

Attention Rollout

问题陈述 从图1a中的原始attention可以看出&#xff0c;只有在最开始的几层&#xff0c;不同位置的attention模式有一些区别&#xff0c;但是更高层中的attention权重更加一致。这表示随着模型层数的增加&#xff0c;嵌入的内容变得更加情境化&#xff0c;可能都带有类似的信息…

Attention可视化

Attention matrix&#xff1a; https://github.com/rockingdingo/deepnlp/blob/r0.1.6/deepnlp/textsum/eval.py plot_attention(data, X_labelNone, Y_labelNone)函数 #!/usr/bin/python # -*- coding:utf-8 -*-""" Evaluation Method for summarization tas…

Attention机制

文章目录 一、Attention机制是什么&#xff1f;二、推荐论文与链接三、self-attention 一、Attention机制是什么&#xff1f; Attention机制最早在视觉领域提出&#xff0c;九几年就被提出来的思想&#xff0c;真正火起来应该算是2014年Google Mind发表了《Recurrent Models o…

Attention详解

1.背景知识 Seq2Seq模型&#xff1a;使用两个RNN&#xff0c;一个作为编码器&#xff0c;一个作为解码器。 编码器&#xff1a;将输入数据编码成一个特征向量。 解码器&#xff1a;将特征向量解码成预测结果。 缺点&#xff1a;只将编码器的最后一个节点的结果进行了输出&am…

浅析NLP中的Attention技术

Attention&#xff08;注意力机制&#xff09;在NLP、图像领域被广泛采用&#xff0c;其显而易见的优点包括&#xff1a; &#xff08;1&#xff09;从context中捕捉关键信息&#xff1b; &#xff08;2&#xff09;良好的可视性和可解释性。 我们常用QKV模型来理解Attention&…

Attention 机制

文章目录 Attention 的本质是什么Attention 的3大优点Attention 的原理Attention 的 N 种类型 转载来源&#xff1a;https://easyai.tech/ai-definition/attention/ Attention 正在被越来越广泛的得到应用。尤其是 BERT 火爆了之后。 Attention 到底有什么特别之处&#xff1f…

详解Transformer中Self-Attention以及Multi-Head Attention

原文名称&#xff1a;Attention Is All You Need 原文链接&#xff1a;https://arxiv.org/abs/1706.03762 如果不想看文章的可以看下我在b站上录的视频&#xff1a;https://b23.tv/gucpvt 最近Transformer在CV领域很火&#xff0c;Transformer是2017年Google在Computation an…