Flink:watermark

article/2025/9/20 20:03:07

Table of Contents

 

三种时间概念

Processing time

Event Time

Ingestion time

watermark

并行流的Watermarks

迟到的事件

watermark分配器

watermark的两种分配器

 


三种时间概念

在谈watermark之前,首先需要了解flink的三种时间概念。在flink中,有三种时间戳概念:Event Time 、Processing Time 和 Ingestion Time。其中watermark只对Event Time类型的时间戳有用。这三种时间概念分别表示:

Processing time

处理时间,指执行算子操作的机器的当前时间。当基于处理时间运行时,所有关于时间的操作(如时间窗口)都将使用执行算子操作的机器的本地时间。例如,当时间窗口为一小时时,如果应用程序在9:15 am开始运行,则第一个窗口将包括在9:15 am到10:00 am之间被处理的事件,下一个窗口将包含在10:00 am到11:00 am之间被处理的事件,依此类推。

处理时间是最简单的时间概念,不需要流和机器之间的协调。它提供了最佳的性能和最低的延迟。但是,在分布式和异步环境中,处理时间不能提供确定性,因为它容易受到上流系统(例如从消息队列)到达Flink的速度、flink内部operators之间交互的速度,以及中断(调度或其他情况)等因素的影响。

Event Time

事件时间,是每个event在其生产设备上产生的时间,即元素在到达flink之前,本身就自带的时间戳

所以说,Event Time的时间戳取决于数据,而与其他时间无关。使用Event Time,必须在从执行环境中先引入EventTime的时间属性。如:

val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给env创建的每一个stream追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

然后通过Dastream的assignTimestampsAndWatermarks方法指定event time时间戳,具体操作不做赘述。

在理想情况下,事件时间是有序的。但实际上,由于分布式操作,以及网络延迟等原因,event可能不是按照event time的顺序到达的。所以flink对处理乱序数据的方案是提供一个允许延迟时间,在允许延迟时间内到达的元素将会重新触发一次计算。这个延迟时间时相对event time而不是其他时间的,而event time不是由flink决定的,那么如何判断当前的event time到底时多少呢?flink通过一个watermark来确定与维护当前event time的最大值。这也是本文将会在后面重点解释的。

Ingestion time

Ingestion time是event进入Flink的时间,即执行source操作时的时间。

Ingestion time从概念上讲介于Event TimeProcessing time之间

Processing time相比 ,它花费的资源会稍微多一些,但结果却更可预测。由于 Ingestion time使用稳定的时间戳(仅在addSource处分配了一次),因此对记录的不同窗口操作将引用相同的时间戳,而在Processing time中,每个窗口操作都会更新事件的Processing time,所以可能一个上游窗口中的记录会分配给不同的下游窗口(基于本地系统时钟和任何可能的延误)。

Event Time相比,Ingestion time程序无法处理任何乱序事件或迟到的数据,但是程序不必指定如何生成watermarks

 

下图为三种时间语义的图解:

 

watermark

上面说到,支持Event Time需要一种测量时间进度的方法,用于判断当前event time的时间。这个机制就是watermark。

watermark会根据数据流中event的时间戳发生变化。watermark意味着当前流中已经到达的event的最大时间戳,也就是说,往后到达的event的时间戳应该要大于watermark,或者说时间戳小于watermark的数据都已经到达了

如下图例子中,事件是按顺序排列的(相对于其时间戳),理想情况下,watermark周期性维护着当前event的最大时间戳

但是,通常情况下,event都是乱序的,不按时间排序的,通常,watermark用于声明某个时间点,表示某个时间点前的数据都应该到达了(官网上是这么说的,官网描述的有点模糊,实际上对于乱序事件,一般会结合允许延迟机制,触发计算的条件是:watermark = 窗口的endtime + 最大允许延迟时间。所以官网上实际上是说watermark表示某个时间点是指endtime+允许延迟,则endtime前的元素都应该到达),一旦watermark到达触发计算的时间点,那么窗口就会把已经到达的event中,时间戳小于endtime的event进行计算。如下图所示:

那么,watermark到底是以怎样的一种形式存在的呢?实际上,watermark就是一种特殊的event,它被参杂在Dstream中,watermark由flink的某个操作生成后,就在整个程序中随event一同流转,如下图所示:

以下是watermark的代码,可以看出watermark的就是一个流元素,仅包含一个时间戳属性:

public final class Watermark extends StreamElement {/** The watermark that signifies end-of-event-time. */public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);// ------------------------------------------------------------------------/** The timestamp of the watermark in milliseconds. */private final long timestamp;/*** Creates a new watermark with the given timestamp in milliseconds.*/public Watermark(long timestamp) {this.timestamp = timestamp;}

并行流的Watermarks

watermark可以在source处生成(也可以在source后通过其他算子生成,如map、filter等),如果source有多个并行度,那么每个并行度会单独生成一个watermark,这些watermark定义了各分区的event time。
当并行度发生变化(即上游的一个分区可能被下游多个分区使用时),每个分区的watermark是会广播至下游的每个分区的,如一些聚合多个流的操作,如 keyBy(…) 或者partition(…),此类操作的watermark是在所有输入流中取最小的watermark。当带有watermark的流通过此类算子时,会根据每个分区的watermark来更新watermark。

举个例子:当上游并行度数为4,下游的某个分区的窗口中的watermark如下:

1.当已到达的watermark分别为2、4、3、6时,窗口中的watermark为2,触发watermark为2的对应窗口计算,并将watermark=2广播至下游。

2.当第一个窗口的watermark被更新为4时,所有分区中已到达最小的watermark是3,则将窗口的watermark更新为3,触发对应窗口的计算,并将watermark=3广播至下游。

3.当第二个分区的watermark被更新为7,所有分区中已到达最小的watermark还是3,不做处理。

4.当第三个分区的watermark被更新为6,所有分区中已到达最小的watermark是4,则将窗口的watermark更新为4,触发对应窗口的计算,并将watermark=4广播至下游

下图显示了event和watermark在一个并行流的示例,以及算子如何跟踪事件时间的:

 

迟到的事件

在介绍watermark时,提到了现实中往往处理的是乱序事件,即当event处于某些原因而延后到达时,会发生该event time<watermark的情况,显然,这是有违watermark的制定条件的,所以flink对处理乱序事件的watermark有一个允许延迟的机制,允许在一定事件内迟到的时间仍然视为有效事件。

watermark分配器

当watermark完全基于event time时,如果没有元素到达,则watermark不会被更新,这就说明,当一段时间没有元素到达,则在这个时间间隙内,watermark不会增加,那么也不会触发窗口计算。显然,如果这段时间很长的话,那么该窗口中已经到达的元素将会等待很久才会被输出计算结果。

为了避免这种情况,可以使用周期性的watermark分配器(AssignerWithPeriodicWatermarks 下面马上提到),这些分配器不仅仅基于event time进行分配。比如,可以使用一个分配器,当一段时间没有接收到新的event时,则将当前时间作为watermark。

 

watermark的两种分配器

flink生成watermark有两种机制:

  • AssignerWithPeriodicWatermarks :分配时间戳并定期生成watermark(可以取决于event time,或基于处理时间)。
  • AssignerWithPunctuatedWatermarks:分配时间戳并根据每一个元素生成watermark(每来一个元素都进行一次判断,相更消耗性能)

通常情况下会使用第一种机制,原因除了更节省性能外,在上面的分配器中也有提到。

下面分别对两种机制进行介绍。

AssignerWithPeriodicWatermarks

对每个元素都调用extractTimestamp方法获取时间戳,并维护一个最大时间戳。通过ExecutionConfig.setAutoWatermarkInterval(...)定义生成watermark的间隔(每n毫秒) 。根据这个间隔,周期性调用分配器的getCurrentWatermark()方法,为watermark分配值。

在flink自带的BoundedOutOfOrdernessGenerator分配器中, getCurrentWatermark是定期将当前watermark更新为最大时间戳减去允许延迟时间的值。

以下是两个使用AssignerWithPeriodicWatermarks 生成的时间戳分配器的简单示例:

/*** This generator generates watermarks assuming that elements arrive out of order,* but only to a certain degree. The latest elements for a certain timestamp t will arrive* at most n milliseconds after the earliest elements for timestamp t.*/
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {val maxOutOfOrderness = 3500L // 3.5 secondsvar currentMaxTimestamp: Long = _override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {val timestamp = element.getCreationTime()currentMaxTimestamp = max(timestamp, currentMaxTimestamp)timestamp}override def getCurrentWatermark(): Watermark = {// return the watermark as current highest timestamp minus the out-of-orderness boundnew Watermark(currentMaxTimestamp - maxOutOfOrderness)}
}/*** This generator generates watermarks that are lagging behind processing time by a fixed amount.* It assumes that elements arrive in Flink after a bounded delay.*/
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {val maxTimeLag = 5000L // 5 secondsoverride def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {element.getCreationTime}override def getCurrentWatermark(): Watermark = {// return the watermark as current time minus the maximum time lagnew Watermark(System.currentTimeMillis() - maxTimeLag)}
}

 

AssignerWithPunctuatedWatermarks

根据每个元素的event time生成watermark,通过extractTimestamp(...)方法为元素分配时间戳,通过checkAndGetNextWatermark(...)检查元素的watermark并更新watermark。

checkAndGetNextWatermark(...)方法的第二个参数是extractTimestamp(...) 返回的时间戳,根据这个时间戳决定是否要生成watermark。每当checkAndGetNextWatermark(...) 方法返回一个非空watermark,并且该watermark大于上一个watermark时,就会更新watermark。

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {element.getCreationTime}override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null}
}

 

 


http://chatgpt.dhexx.cn/article/gLl6JEya.shtml

相关文章

Flink 水位线(Watermark)

文章目录 什么是水位线水位线的特性如何生成水位线Flink 内置水位线生成器自定义水位线策略在自定义数据源中发送水位线水位线的总结 在实际应用中&#xff0c;一般会采用事件时间语义。而水位线&#xff0c;就是基于事件时间提出的概念。一个数据产生的时刻&#xff0c;就是流…

vue -- watermark水印添加方法

作者&#xff1a;蛙哇 原文链接&#xff1a; https://segmentfault.com/a/1190000022055867 来源&#xff1a;segmentfault 前言 项目生成公司水印是很普遍的需求&#xff0c;下面是vue项目生产水印的方法。话不多说&#xff0c;复制粘贴就可以马上解决你的需求。 步骤1 创建…

tp-watermark.js网页添加水印插件

tp-watermark.js网页添加水印插件 作者&#xff1a;鹏仔先生 上周五&#xff0c;出差去改上个前端遗留的小问题&#xff0c;用到了watermark.js这个网站添加水印插件&#xff0c;功能很简单&#xff0c;就是给网页添加个水印&#xff0c;我看了下网上&#xff0c;有很多种&…

实用有效!React项目中使用watermark.js添加水印效果

为了避免公司的内部文档被截图外泄&#xff0c;有必要在系统页面上面增加水印。 第一步&#xff1a; 下载依赖包&#xff1a; npm install watermark-dompackage.json中会添加一个依赖如下&#xff1a; "watermark-dom": "^2.3.0"第二步&#xff1a; 引…

Flink之水位线(Watermark)

在流数据处理应用中&#xff0c;一个很重要、也很常见的操作就是窗口计算。所谓的“窗口”&#xff0c;一般就是划定的一段时间范围&#xff0c;也就是“时间窗”&#xff1b;对在这范围内的数据进行处理&#xff0c;就是所谓的窗口计算。所以窗口和时间往往是分不开的。接下来…

水印watermark

第一步:npm获取水印组件包 npm install watermark-dom 第二步:引入水印模块 import watermark from ‘watermark-dom’ 或者 var watermarkDom require(“watermark-dom”) 根据业务需要&#xff0c;我是登入之后的页面才有水印&#xff0c;前者我是放在验证用户登录状态js文件…

Flink流计算编程--watermark(水位线)简介

1、watermark的概念 watermark是一种衡量Event Time进展的机制&#xff0c;它是数据本身的一个隐藏属性。通常基于Event Time的数据&#xff0c;自身都包含一个timestamp&#xff0c;例如1472693399700&#xff08;2016-09-01 09:29:59.700&#xff09;&#xff0c;而这条数据…

Flink之watermark(水印)讲解

flink中watermark的详细介绍 使用前提&#xff1a; 处理数据开窗&#xff0c;处理数据的时间语义是事件时间&#xff0c;也就是每条数据产生的时间。 使用场景&#xff08;解决问题&#xff09;&#xff1a; 处理乱序数据&#xff1a;flink中是实时处理数据&#xff0c;但是…

WaterMark使用和详解

上篇&#xff1a;基于flink的会话窗口的api实现 WaterMark翻译为水位线&#xff0c;什么时候用到水位线呢&#xff1f; 比如说水控在顺水的时候达到紧梯就会触发&#xff0c;若不放水就可以发现危险的现状 在spark程序划分成窗口的时候&#xff0c;主要是衡量什么时候触发&am…

【大数据】带你理解并使用flink中的WaterMark机制

文章目录 一、引导二、WaterMark1、Watermark的原理2、Watermark 的使用2.1、顺序数据流中的watermark示例 2.2、乱序数据流中的WaterMark2.2.1、With Periodic&#xff08;周期性的&#xff09; Watermark示例一&#xff1a;使用周期性的WaterMark2.2.2、With Punctuated&…

JavaEE是什么?JavaSE又是什么?两者的区别有哪些?

Java作为最流行的编程语言受到了许多人的喜爱&#xff0c;其在编程中的地位自不必多说。对于许多才刚刚入门Java的朋友来讲&#xff0c;常常会产生这样的困惑&#xff0c;JavaEE是什么&#xff1f;JavaSE又是什么&#xff1f;两者的区别有哪些&#xff1f;学哪个比较好&#xf…

php mysql Javaee_javaee与php的区别是什么

javaee与php的区别&#xff1a;1、JavaEE是门面向对象的程序设计语言&#xff0c;而PHP是解释执行的服务器脚本语言&#xff1b;2、用JavaEE开发的Web应用从MySQL数据库转到Oracle数据库只需要做很少的修改&#xff0c;而PHP则需要做大量的修改工作。 javaee与php的区别&#x…

解读JAVAEE是什么样的Java

javaEE是指java enterprise edition&#xff0c;java企业版&#xff0c;多用于企业级开发&#xff0c;包括web开发等等很多组建。javaEE开发会设计java的高级特性以及一些spring等架构&#xff0c;需要掌握的内容相对多。 JAVA&#xff0c;所有的Java平台都由一个JVM和一组应用…

java ee 下载什么意思_JavaEE到底是什么?

慕运维8079593 JavaEE只是一个规范吗&#xff1f;我的意思是&#xff1a;EJBJavaEE是吗&#xff1f;JavaEE确实是一个摘要规格说明。任何人都愿意开发和提供规范的工作实现。这个混凝土实现是所谓的应用服务器&#xff0c;如野弗利, 托梅, 玻璃鱼, 自由, WebLogic&#xff0c;等…

java ee api是什么意思_JavaEE的整体概述

标签&#xff1a; JavaEE整体概述 知识点&#xff1a; 1、整体概述JavaEE的知识体系 2、JavaEE是什么? 能干什么? 为什么需要JavaEE? 3、JavaEE有什么? JavaEE的技术体系? JavaEE的本质是什么? 4、JavaEE的零散基础知识 5、JavaEE的组件体系结构 -----------------------…

矩阵求逆方法

1.待定系数法 矩阵A 1, 2 -1,-3 假设所求的逆矩阵为 a,b c,d 则 从而可以得出方程组 a 2c 1 b 2d 0 -a - 3c 0 -b - 3d 1 解得 a3; b2; c -1; d -1 2.伴随矩阵求逆矩阵 伴随矩阵是矩阵元素所对应的代数余子式&#xff0c;所构成的矩阵&#xff0c;转置后得到的新矩阵…

复数矩阵求逆的 C 语言程序

关于复数矩阵求逆&#xff0c;如果使用 MATLAB&#xff0c;就非常简单。我们先用一个 MATLAB 的例子来说明&#xff0c;等会要将 C 语言的程序和 MATLAB 的程序进行对比。 close all; clear all; clc;%定义矩阵a为复数矩阵 a [[42*i,31*i,43*i,55*i];[17*i,82*i,22*i,93*i];[…

科学计算器如何求矩阵的逆

大学本科买了四年的计算器不会求逆&#xff0c;到了研究生了好好研究下这个功能&#xff0c;终于终于会用了&#xff0c;以往 对着那个矩阵功能都发懵&#x1f602;&#xff0c;记录一下这个史诗无敌隐藏功能 1、进入菜单&#xff0c;点击4进入矩阵菜单 2、这里选择1定义矩阵A…

matlab求一个矩阵的逆矩阵的命令,如何用MATLAB求逆矩阵

如何用MATLAB求逆矩阵以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧! 如何用MATLAB求逆矩阵 如果英文好呢,自己看目录 不好还是先看中文的教材,对matlab的框架和功能有了一定的了解后,自己也就看的懂帮助里面…

matlab矩阵求逆的模块,matlab矩阵求逆矩阵

matlab矩阵求逆矩阵 因为 所以该矩阵可逆&#xff0c;根据 &#xff0c;其中 得到 计算矩阵A每个元素的代数余子式&#xff1a; 所以 可得&#xff1a; matlab计算如下&#xff1a; >> A1[1 2 2;2 1 -2;2 -2 1] A1 1 2 2 2 1 -2 2 -2 1 >> >> >> A2in…