Flink Interval Join使用以及源码解析

article/2025/10/14 20:36:39

1、Interval Join 概述

在之前的Join算子中,一般使用的是coGroup算子,因为一个算子可以提供多种语义,但是也是有一些弊端的。因为coGroup只能实现在同一个窗口的两个数据流之间进行join,在实际的计算过程中,往往会遇到当req发生时,resp迟迟无法响应,这个时候,就会出现一个跨窗口的问题。也就是说经常会出现数据乱序,或者数据延迟的情况,导致两个流的数据是不同步的,也就会导致,join的过程中丢失数据问题。不在同一个窗口中的数据无法join,这个问题flink官方提供了另外一种join的方式,也就是interval join。他的核心思想就是,将两个流通过keyed分区。然后,按照key 在一个相对的时间段内进行Join。

Interval join用一个公共键连接两个流的元素(我们现在称它们为A&B),其中流B的元素的时间戳位于流A中元素的时间戳的相对时间间隔内。

这也可以更正式地表达为 b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]ora.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

这里要表达的意思也就是说,当两个流进行join的时候,会根据左流的时间戳在右流中寻找公共键。

其中a和b是a和b中共享公共密钥的元素。只要下限始终小于或等于上限,下限和上限都可以是负值或正值。间隔联接当前仅执行内部联接。

当一对元素传递给ProcessJoinFunction时,它们将被分配有两个元素的较大时间戳(可以通过ProcessJoinFunction.context访问)。

注意:Interval Join 仅支持event time

image-20220221160443471

在上面的例子中,我们将两个流“橙色”和“绿色”连接起来,下限为-2毫秒,上限为+1毫秒。默认情况下,这些边界是包含的,但是。lowerBoundExclusive()和upperBoundExclusive可用于更改是否包含上下界。

再次使用更正式的符号,这将转化为

orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

如三角形所示。

2、代码实现

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(first + "," + second);}});

3、Interval Join源码实现原理

在Interval Join的实现当中,其中的核心实现为IntervalJoinOperator类,这个类提供了执行IntervalJoin的核心逻辑

构造方法

public IntervalJoinOperator(long lowerBound,long upperBound,boolean lowerBoundInclusive,boolean upperBoundInclusive,TypeSerializer<T1> leftTypeSerializer,TypeSerializer<T2> rightTypeSerializer,ProcessJoinFunction<T1, T2, OUT> udf) {super(Preconditions.checkNotNull(udf));Preconditions.checkArgument(lowerBound <= upperBound,"lowerBound <= upperBound must be fulfilled");// Move buffer by +1 / -1 depending on inclusiveness in order not needing// to check for inclusiveness later on// 这里根据是否包含上下界,来进行判断,是否执行 +1 或者 -1 操作this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L;this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L;}

初始化状态

	public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);//构建 左流缓冲区,类型为keyedState的MapState 其中时间戳是key,值为BufferEntry 类型的List ArrayListthis.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(LEFT_BUFFER,LongSerializer.INSTANCE,new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer))));//构建右流缓冲区 类型为keyedState的MapState 其中时间戳是key,值为BufferEntry类型的List ArrayListthis.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(RIGHT_BUFFER,LongSerializer.INSTANCE,new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))));}

处理左流和右流的数据

	@Overridepublic void processElement1(StreamRecord<T1> record) throws Exception {//左流processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);}@Overridepublic void processElement2(StreamRecord<T2> record) throws Exception {//右流processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);}

我们可以明显的看出来,左流和右流的处理,都是依靠于processElement方法。

private <THIS, OTHER> void processElement(final StreamRecord<THIS> record,final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,final long relativeLowerBound,final long relativeUpperBound,final boolean isLeft) throws Exception {//获取当前流的值,可以是左流也可以是右流final THIS ourValue = record.getValue();//获取当前元素的时间戳,左流 or 右流final long ourTimestamp = record.getTimestamp();//是否迟到,是否小于 当前的 watermarkif (isLate(ourTimestamp)) {return;}//将该方法的实现写到下方了,这里的意思是将当前的元素写入,当前key 所属的 state中,也就是左流keyedstate 或者右流keyed state //addToBuffer(ourBuffer, ourValue, ourTimestamp);List<BufferEntry<THIS>> elemsInBucket = ourBuffer.get(ourTimestamp);if (elemsInBucket == null) {elemsInBucket = new ArrayList<>();}elemsInBucket.add(new BufferEntry<>(ourValue, false));ourBuffer.put(ourTimestamp, elemsInBucket);//遍历 其他流的state 。for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {final long timestamp  = bucket.getKey();//如果时间不在范围内 则看一下保存的元素if (timestamp < ourTimestamp + relativeLowerBound ||timestamp > ourTimestamp + relativeUpperBound) {continue;}//如果在说明有值啊,当前值对应 other 多个元素的时候,会执行for循环,也就是 1 x n 的输出for (BufferEntry<OTHER> entry: bucket.getValue()) {if (isLeft) {collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);} else {collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);}}}//计算清理时间是否到了,到了的话就清理long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;if (isLeft) {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);} else {internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);}}

经过我们上述的分析,可以分析出来,Interval Join 是一种依赖于EventTime的一种join方式,它将左流和右流相同的key的数据按照时间戳来进行存储在不同的缓存里面,leftBuffer 和rightBuffer。

运行思路是这样的。

首先进来一个元素,这个元素可能是左流也可能是右流的元素,然后校验是否过期,过期就丢弃,不过期继续处理。

然后将他放入到左流或右流单独所属的cache中,时间戳为key,然后判断时间段,和他所对应的缓存 里面是否有值,如果有则返回,没有则等待右流将他唤醒。

然后会判断时间戳到达什么位置了。是否到了该清理的时候,如果到了,则会按照时间戳来进行清理。

	public void onEventTime(InternalTimer<K, String> timer) throws Exception {long timerTimestamp = timer.getTimestamp();String namespace = timer.getNamespace();logger.trace("onEventTime @ {}", timerTimestamp);switch (namespace) {case CLEANUP_NAMESPACE_LEFT: {long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;logger.trace("Removing from left buffer @ {}", timestamp);leftBuffer.remove(timestamp);break;}case CLEANUP_NAMESPACE_RIGHT: {long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;logger.trace("Removing from right buffer @ {}", timestamp);rightBuffer.remove(timestamp);break;}default:throw new RuntimeException("Invalid namespace " + namespace);}}

到这里,我想我们对Interval Join 有了一些深入的理解了。

1、根据时间段来进行join,可以处于边界,也可以不处于边界

2、根据双cache来进行存储数据,以及根据keyed来进行join逻辑的实现

3、他是内连接的,目前不支持左外链接,想做的话,可以手动指定清理策略(改源码重新打包,或者基于双亲委派机制的在项目中添加对应的类,来进行改造)。


http://chatgpt.dhexx.cn/article/TKNJJOs6.shtml

相关文章

mysql datesub interval_Mysql之INTERVAL与DATE_SUB与EXTRACT函数的使用

1. INTERVAL INTERVAL代表的是时间间隔 MySQL中的时间间隔类型有如下几种: 1.1 利用INTERVAL做时间的加减法 示例: 加法:SQL>SELECT DATE 2018-11-01 + INTERVAL 10 11 DAY_HOUR; 结果:2018-11-11 11:00:00 减法:SQL> select date 2018-11-11 11:00:00 -INTERVAL 10 11…

ORACLE中的INTERVAL

ORACLE中的INTERVAL 关于INTERVALINTERVAL类型INTERVAL YEAR TO MONTHINTERVAL DAY TO SECOND 关于INTERVAL oracle中提供了两种日期时间类型&#xff0c;分别是DATE和TIMESTAMP用来存储时间点的数据&#xff0c;同时还提供了INTERVAL来存储一段时间的数据。例如你2022年11月1…

Flink SQL之Interval Joins

1.Interval Joins&#xff08;区间Join&#xff09; 区间是双流join的优化&#xff0c;基于处理时间或事件时间&#xff0c;在一定时间区间内数据&#xff0c;相同的key进行join&#xff08;支持 Batch\Streaming&#xff09;。Interval Join 可以让一条流去 Join 另一条流中前…

MySQL INTERVAL 关键字指南

在本教程中&#xff0c;我们将了解 MySQL INTERVAL 关键字及其值以对日期和时间算术执行操作。那么&#xff0c;让我们开始吧&#xff01; 另请阅读&#xff1a;MySQL 中的 While 循环 – 完整参考 MySQL INTERVAL 简介 在 MySQL 中&#xff0c;INTERVAL 关键字用于添加或减…

Flink之IntervalJoin介绍

InterValJoin算子 间隔流&#xff0c;一条流去join另一条流去过去一段时间内的数据&#xff0c;该算子将keyedStream与keyedStream转化为DataStream&#xff1b;再给定的时间边界内&#xff08;默认包含边界&#xff09;&#xff0c;相当于一个窗口&#xff0c;按指定的key对俩…

Mysql的INTERVAL()函数和INTERVAL关键字

一&#xff0c;INTERVAL()函数 INTERVAL()函数可以返回分段后的结果&#xff0c;语法如下&#xff1a; INTERVAL(N,N1,N2,N3,..........) 其中&#xff0c;N是要判断的数值&#xff0c;N1,N2,N3,...是分段的间隔。 这个函数的返回值是段的位置&#xff1a; 如果N<N1&am…

Mysql_interval函数与关键字

目录 一、统计不同区间的数量&#xff1a;INTERVAL()函数 二、时间间隔&#xff1a;INTERVAL关键字 一、统计不同区间的数量&#xff1a;INTERVAL()函数 INTERVAL()函数可以返回分段后的结果&#xff0c;语法如下&#xff1a; INTERVAL(N,N1,N2,N3,......,Nn) 其中&#x…

INTERVAL记录

INTERVAL作为sql中关键字 interval在sql中一般表示为时间间隔的意思 常用在date_add()、**date_sub()**函数中&#xff0c;常用于时间的加减法 上述sql表示为当前时间一年后的时间。 interval后可选择的时间粒度&#xff1a; MICROSECOND&#xff0c; SECOND&#xff0c; MIN…

万能的Attention及其代码实现

最近看到以前的代码&#xff0c;想到了attention&#xff0c;趁着代码还在就来整理一下。 文章目录 [Squeeze-and-Excitation Networks](https://arxiv.org/abs/1709.01507)[Concurrent Spatial and Channel ‘Squeeze &Excitation’ in Fully Convolutional Networks](ht…

NLP中的Attention总结

Attention是模拟人脑的注意力机制。人在看到一些东西&#xff0c;往往只关注重要的信息&#xff0c;而忽略其他信息。自然语言处理中的attention是对文本分配注意力权重&#xff0c;Attention的本质就是从关注全部变为关注重点。 1 Attention的原理 Attention的计算过程&#…

Attention 理解汇总

在 Encoder-Decoder中&#xff0c;预测每个Decoder 的语义编码 c 是一样的&#xff0c;句子 X 中每个词对输出 Y 的每个词的影响都是相同的。 这样有两个弊端&#xff1a; 一是语义向量无法完全表示整个序列的信息&#xff1b; 二是先输入的内容携带的信息会被后输入的信息稀…

[深度学习] 自然语言处理 --- Self-Attention(三) 知识点与源码解析

在当前的 NLP 领域&#xff0c;Transformer / BERT 已然成为基础应用&#xff0c;而 Self-Attention 则是两者的核心部分&#xff0c;下面尝试用 Q&A 和源码的形式深入 Self-Attention 的细节。 一 Q&A 1. Self-Attention 的核心是什么&#xff1f; Self-Attention …

attention介绍

Attention 正在被越来越广泛的得到应用。 Attention 到底有什么特别之处&#xff1f;他的原理和本质是什么&#xff1f;Attention都有哪些类型&#xff1f;本文将详细讲解Attention的方方面面。 Attention 的本质是什么 Attention&#xff08;注意力&#xff09;机制如果浅层…

Attention原理

文章目录 Attention原理HAN原理利用Attention模型进行文本分类参考资料 Attention原理 转载一个Hierarchical Attention神经网络的实现 转载 图解Transformer 转载 Attention原理和源码解析 论文链接 Attention is All You Need HAN原理 论文链接Hierarchical Attention Net…

Transformer详解(二):Attention机制

1.Encoder-Decoder中的attention机制 上一篇文章最后&#xff0c;在Encoder-Decoder框架中&#xff0c;输入数据的全部信息被保存在了C。而这个C很容易受到输入句子长度的影响。当句子过长时&#xff0c;C就有可能存不下这些信息&#xff0c;导致模型后续的精度下降。Attentio…

attention模型

以机器翻译为例说明&#xff1a; 普通的RNN机器翻译模型: 次结构弱点在于当target句子太长时&#xff0c;前面encoder学习到的embedding vector(红边框)可能会被后面的decoder遗忘。因此改进版本如下: 这样&#xff0c;每次在输入target的word的时候&#xff0c;除了可以看到…

【深度学习】Self-Attention 原理与代码实现

1.Self-Attention 结构 在计算的时候需要用到矩阵Q(查询),K(键值),V(值)。在实际中&#xff0c;Self-Attention 接收的是输入(单词的表示向量x组成的矩阵X) 或者上一个 Encoder block 的输出。而Q,K,V正是通过 Self-Attention 的输入进行线性变换得到的。 2. Q, K, V 的计算 S…

Self -Attention、Multi-Head Attention、Cross-Attention

Self -Attention Transformer结构图 上图是论文中 Transformer 的内部结构图&#xff0c;左侧为 Encoder block&#xff0c;右侧为 Decoder block。红色圈中的部分为 Multi-Head Attention&#xff0c;是由多个 Self-Attention组成的&#xff0c;可以看到 Encoder block 包含一…

Attention Rollout

问题陈述 从图1a中的原始attention可以看出&#xff0c;只有在最开始的几层&#xff0c;不同位置的attention模式有一些区别&#xff0c;但是更高层中的attention权重更加一致。这表示随着模型层数的增加&#xff0c;嵌入的内容变得更加情境化&#xff0c;可能都带有类似的信息…

Attention可视化

Attention matrix&#xff1a; https://github.com/rockingdingo/deepnlp/blob/r0.1.6/deepnlp/textsum/eval.py plot_attention(data, X_labelNone, Y_labelNone)函数 #!/usr/bin/python # -*- coding:utf-8 -*-""" Evaluation Method for summarization tas…