FlinkSql系列6之 Interval Join

article/2025/10/14 19:19:35

FlinkSql系列6之 Interval Join


前言

本次我们来学习flinksql的Interval Join,这个方式主要是针对两个流在一定时间区间内的join,支持事件时间和处理时间,而且这个流每次只会产生一条数据,是一个完全的追加流。


一、Interval Join是什么?

Interval Join 可以让⼀条流去 Join 另⼀条流中前后⼀段时间内的数据。

1.1 Inner Interval Join

只有两条流中的满足时间区间符合,并且满足其他的join条件,这时候才会匹配的上并且输出对应结果 +[L,R]

1.2 Left Interval Join

这里以左流为主,在指定的时间区间内,如果右流还没到,那么会先存入state中,当右流到了之后,并且在时间区间内那么就输出结果+ [L,R],否则如果右流在时间区间内没有到,并且时间到了,那么就会删除state,输出+[L,null].

1.3 Right Interval Join

如上,反着来

1.4 Full Interval Join

左流或者右流的数据到达之后,如果没有 Join 到另外⼀条流的数据,就 会等待(左流放在左流对应的 State 中等,右流放在右流对应的 State 中等),如果之后另⼀条流数据到 达之后,发现能和刚刚那条数据 Join 到,则会输出 +[L, R] 。事件时间中随着 Watermark 的推进(也⽀ 持处理时间),发现 State 中的数据能够过期了,就将这些数据从 State 中删除并且输出(左流过期输出
+[L, null] ,右流过期输出 -[null, R] )

二、实际测试效果

准备步骤

CREATE TABLE show_log_table(
--曝光日志id
`log_id` BIGINT,
--曝光日志参数
`show_params` STRING,
--事件时间
`row_time` as CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)),
--watermark设置
WATERMARK FOR row_time AS row_time - INTERVAL '2' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '2','fields.show_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '100'
)CREATE TABLE click_log_table(
--点击日志id
`log_id` BIGINT,
--点击日志参数
`click_params` STRING,
--事件时间
`row_time` as CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)),
--watermark设置
WATERMARK FOR row_time AS row_time - INTERVAL '2' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '2','fields.click_params.length' = '1','fields.log_id.min' = '1','fields.log_id.max' = '10'
)CREATE TABLE sink_table6(
--曝光日志id
`s_id` BIGINT,
--点击日志id
`c_id` BIGINT,
--曝光日志参数
`show_params` STRING,
--点击日志参数
`click_params` STRING
) WITH (
'connector' = 'print'
)

2.1 Inner Interval Join

INSERT INTO sink_table6
SELECT 
show_log_table.log_id as s_id,
click_log_table.log_id as c_id,
show_log_table.show_params,
click_log_table.click_params
FROM show_log_table INNER JOIN click_log_table on show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '3' SECOND AND click_log_table.row_time + INTERVAL '2' SECOND;

在这里插入图片描述

2.2 Left Interval Join

INSERT INTO sink_table6
SELECT 
show_log_table.log_id as s_id,
click_log_table.log_id as c_id,
show_log_table.show_params,
click_log_table.click_params
FROM show_log_table LEFT JOIN click_log_table on show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '3' SECOND AND click_log_table.row_time + INTERVAL '2' SECOND;

在这里插入图片描述

2.3 Right Interval Join

INSERT INTO sink_table6
SELECT 
show_log_table.log_id as s_id,
click_log_table.log_id as c_id,
show_log_table.show_params,
click_log_table.click_params
FROM show_log_table RIGHT JOIN click_log_table on show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '3' SECOND AND click_log_table.row_time + INTERVAL '2' SECOND;

在这里插入图片描述

2.4 Full Interval Join

INSERT INTO sink_table6
SELECT 
show_log_table.log_id as s_id,
click_log_table.log_id as c_id,
show_log_table.show_params,
click_log_table.click_params
FROM show_log_table FULL JOIN click_log_table on show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '3' SECOND AND click_log_table.row_time + INTERVAL '2' SECOND;

在这里插入图片描述

总结

以上就是大概的学习记录,这里对于Interval Join来说,很好的解决了 之前Regular Join中因为要保存大量的state,并且这里涉及到的都是追加流,对于外部系统的支持来说很明显是相比Regular Join这种需要支持回撤流的更高的适用性。


http://chatgpt.dhexx.cn/article/8wEdKkEg.shtml

相关文章

Flink Interval Join使用以及源码解析

1、Interval Join 概述 在之前的Join算子中,一般使用的是coGroup算子,因为一个算子可以提供多种语义,但是也是有一些弊端的。因为coGroup只能实现在同一个窗口的两个数据流之间进行join,在实际的计算过程中,往往会遇到…

mysql datesub interval_Mysql之INTERVAL与DATE_SUB与EXTRACT函数的使用

1. INTERVAL INTERVAL代表的是时间间隔 MySQL中的时间间隔类型有如下几种: 1.1 利用INTERVAL做时间的加减法 示例: 加法:SQL>SELECT DATE 2018-11-01 + INTERVAL 10 11 DAY_HOUR; 结果:2018-11-11 11:00:00 减法:SQL> select date 2018-11-11 11:00:00 -INTERVAL 10 11…

ORACLE中的INTERVAL

ORACLE中的INTERVAL 关于INTERVALINTERVAL类型INTERVAL YEAR TO MONTHINTERVAL DAY TO SECOND 关于INTERVAL oracle中提供了两种日期时间类型,分别是DATE和TIMESTAMP用来存储时间点的数据,同时还提供了INTERVAL来存储一段时间的数据。例如你2022年11月1…

Flink SQL之Interval Joins

1.Interval Joins(区间Join) 区间是双流join的优化,基于处理时间或事件时间,在一定时间区间内数据,相同的key进行join(支持 Batch\Streaming)。Interval Join 可以让一条流去 Join 另一条流中前…

MySQL INTERVAL 关键字指南

在本教程中,我们将了解 MySQL INTERVAL 关键字及其值以对日期和时间算术执行操作。那么,让我们开始吧! 另请阅读:MySQL 中的 While 循环 – 完整参考 MySQL INTERVAL 简介 在 MySQL 中,INTERVAL 关键字用于添加或减…

Flink之IntervalJoin介绍

InterValJoin算子 间隔流,一条流去join另一条流去过去一段时间内的数据,该算子将keyedStream与keyedStream转化为DataStream;再给定的时间边界内(默认包含边界),相当于一个窗口,按指定的key对俩…

Mysql的INTERVAL()函数和INTERVAL关键字

一&#xff0c;INTERVAL()函数 INTERVAL()函数可以返回分段后的结果&#xff0c;语法如下&#xff1a; INTERVAL(N,N1,N2,N3,..........) 其中&#xff0c;N是要判断的数值&#xff0c;N1,N2,N3,...是分段的间隔。 这个函数的返回值是段的位置&#xff1a; 如果N<N1&am…

Mysql_interval函数与关键字

目录 一、统计不同区间的数量&#xff1a;INTERVAL()函数 二、时间间隔&#xff1a;INTERVAL关键字 一、统计不同区间的数量&#xff1a;INTERVAL()函数 INTERVAL()函数可以返回分段后的结果&#xff0c;语法如下&#xff1a; INTERVAL(N,N1,N2,N3,......,Nn) 其中&#x…

INTERVAL记录

INTERVAL作为sql中关键字 interval在sql中一般表示为时间间隔的意思 常用在date_add()、**date_sub()**函数中&#xff0c;常用于时间的加减法 上述sql表示为当前时间一年后的时间。 interval后可选择的时间粒度&#xff1a; MICROSECOND&#xff0c; SECOND&#xff0c; MIN…

万能的Attention及其代码实现

最近看到以前的代码&#xff0c;想到了attention&#xff0c;趁着代码还在就来整理一下。 文章目录 [Squeeze-and-Excitation Networks](https://arxiv.org/abs/1709.01507)[Concurrent Spatial and Channel ‘Squeeze &Excitation’ in Fully Convolutional Networks](ht…

NLP中的Attention总结

Attention是模拟人脑的注意力机制。人在看到一些东西&#xff0c;往往只关注重要的信息&#xff0c;而忽略其他信息。自然语言处理中的attention是对文本分配注意力权重&#xff0c;Attention的本质就是从关注全部变为关注重点。 1 Attention的原理 Attention的计算过程&#…

Attention 理解汇总

在 Encoder-Decoder中&#xff0c;预测每个Decoder 的语义编码 c 是一样的&#xff0c;句子 X 中每个词对输出 Y 的每个词的影响都是相同的。 这样有两个弊端&#xff1a; 一是语义向量无法完全表示整个序列的信息&#xff1b; 二是先输入的内容携带的信息会被后输入的信息稀…

[深度学习] 自然语言处理 --- Self-Attention(三) 知识点与源码解析

在当前的 NLP 领域&#xff0c;Transformer / BERT 已然成为基础应用&#xff0c;而 Self-Attention 则是两者的核心部分&#xff0c;下面尝试用 Q&A 和源码的形式深入 Self-Attention 的细节。 一 Q&A 1. Self-Attention 的核心是什么&#xff1f; Self-Attention …

attention介绍

Attention 正在被越来越广泛的得到应用。 Attention 到底有什么特别之处&#xff1f;他的原理和本质是什么&#xff1f;Attention都有哪些类型&#xff1f;本文将详细讲解Attention的方方面面。 Attention 的本质是什么 Attention&#xff08;注意力&#xff09;机制如果浅层…

Attention原理

文章目录 Attention原理HAN原理利用Attention模型进行文本分类参考资料 Attention原理 转载一个Hierarchical Attention神经网络的实现 转载 图解Transformer 转载 Attention原理和源码解析 论文链接 Attention is All You Need HAN原理 论文链接Hierarchical Attention Net…

Transformer详解(二):Attention机制

1.Encoder-Decoder中的attention机制 上一篇文章最后&#xff0c;在Encoder-Decoder框架中&#xff0c;输入数据的全部信息被保存在了C。而这个C很容易受到输入句子长度的影响。当句子过长时&#xff0c;C就有可能存不下这些信息&#xff0c;导致模型后续的精度下降。Attentio…

attention模型

以机器翻译为例说明&#xff1a; 普通的RNN机器翻译模型: 次结构弱点在于当target句子太长时&#xff0c;前面encoder学习到的embedding vector(红边框)可能会被后面的decoder遗忘。因此改进版本如下: 这样&#xff0c;每次在输入target的word的时候&#xff0c;除了可以看到…

【深度学习】Self-Attention 原理与代码实现

1.Self-Attention 结构 在计算的时候需要用到矩阵Q(查询),K(键值),V(值)。在实际中&#xff0c;Self-Attention 接收的是输入(单词的表示向量x组成的矩阵X) 或者上一个 Encoder block 的输出。而Q,K,V正是通过 Self-Attention 的输入进行线性变换得到的。 2. Q, K, V 的计算 S…

Self -Attention、Multi-Head Attention、Cross-Attention

Self -Attention Transformer结构图 上图是论文中 Transformer 的内部结构图&#xff0c;左侧为 Encoder block&#xff0c;右侧为 Decoder block。红色圈中的部分为 Multi-Head Attention&#xff0c;是由多个 Self-Attention组成的&#xff0c;可以看到 Encoder block 包含一…

Attention Rollout

问题陈述 从图1a中的原始attention可以看出&#xff0c;只有在最开始的几层&#xff0c;不同位置的attention模式有一些区别&#xff0c;但是更高层中的attention权重更加一致。这表示随着模型层数的增加&#xff0c;嵌入的内容变得更加情境化&#xff0c;可能都带有类似的信息…