Flink 水位线(Watermark)

article/2025/9/20 20:10:59

文章目录

  • 什么是水位线
  • 水位线的特性
  • 如何生成水位线
  • Flink 内置水位线生成器
  • 自定义水位线策略
  • 在自定义数据源中发送水位线
  • 水位线的总结

在实际应用中,一般会采用事件时间语义。而水位线,就是基于事件时间提出的概念。一个数据产生的时刻,就是流处理中事件触发的时间点,这就是“事件时间”,一般都会以时间戳的形式作为一个字段记录在数据里。这个时间就像商品的“生产日期”一样,一旦产生就是固定的,印在包装袋上,不会因为运输辗转而变化。如果我们想要统计一段时间内的数据,需要划分时间窗口,这时只要判断一下时间戳就可以知道数据属于哪个窗口了。

在这里插入图片描述

在这个处理过程中,我们其实是基于数据的时间戳,自定义了一个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。这样的好处在于,计算的过程可以完全不依赖处理时间(系统时间),不论什么时候进行统计处理,得到的结果都是正确的。比如双十一的时候系统处理压力大,我们可能会把大量数据缓存在 Kafka中;过了高峰时段之后再读取出来,在几秒之内就可以处理完几个小时甚至几天的数据,而且依然可以按照数据产生的时间段进行统计,所有窗口都能收集到正确的数据。而一般实时流处理的场景中,事件时间可以基本与处理时间保持同步,只是略微有一点延迟,同时保证了窗口计算的正确性。

什么是水位线

一种简单的想法是,在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以更新自己的时钟了。由于类似于水流中用来做标志的记号,在 Flink 中,这种用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。

水位线的特性

水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要。
总结一下水位线的特性:

  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据

  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展

  • 水位线是基于数据的时间戳生成的

  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进

  • 水位线可以通过设置延迟,来保证正确处理乱序数据

  • 一个水位线 Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据

如何生成水位线

在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间:

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy)

具体使用时,直接用 DataStream 调用该方法即可,与普通的 transform 方法完全一样。

DataStream<Event> stream = env.addSource(new ClickSource());
DataStream<Event> withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(<watermark strategy>);

这里读者可能有疑惑:不是说数据里已经有时间戳了吗,为什么这里还要“分配”呢?这是因为原始的时间戳只是写入日志数据的一个字段,如果不提取出来并明确把它分配给数据,
Flink 是无法知道数据真正产生的时间的。当然,有些时候数据源本身就提供了时间戳信息,比如读取 Kafka 时,我们就可以从 Kafka 数据中直接获取时间戳,而不需要单独提取字段分配了。
.assignTimestampsAndWatermarks() 方法需要传入一个 WatermarkStrategy 作为参数,这就是 所 谓 的 “ 水 位 线 生 成 策 略 ” 。 WatermarkStrategy 中 包 含 了 一 个 “ 时 间 戳 分 配器” TimestampAssigner 和一个“水位线生成器”WatermarkGenerator

@Public
public interface WatermarkStrategy<T>extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {// ------------------------------------------------------------------------//  Methods that implementors need to implement.// ------------------------------------------------------------------------/** Instantiates a WatermarkGenerator that generates watermarks according to this strategy. */@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);/*** Instantiates a {@link TimestampAssigner} for assigning timestamps according to this strategy.*/@Overridedefault TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) {// By default, this is {@link RecordTimestampAssigner},// for cases where records come out of a source with valid timestamps, for example from// Kafka.return new RecordTimestampAssigner<>();}
  • TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
  • WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()。
  • onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,以及允许发出水位线的一个 WatermarkOutput,可以基于事件做各种操作
  • onPeriodicEmit:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为200ms。
env.getConfig().setAutoWatermarkInterval(60 * 1000L);

Flink 内置水位线生成器

WatermarkStrategy 这个接口是一个生成水位线策略的抽象,让我们可以灵活地实现自己的需求;但看起来有些复杂,如果想要自己实现应该还是比较麻烦的。好在 Flink 充分考虑到了我们的痛苦,提供了内置的水位线生成(WatermarkGenerator),不仅开箱即用简化了编程,而且也为我们自定义水位线策略提供了模板。这两个生成器可以通过调用 WatermarkStrategy 的静态辅助方法来创建。它们都是周期性生成水位线的,分别对应着处理有序流和乱序流的场景。
(1)有序流
对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps() 方法就可以实现。简单来说,就是直接拿当前最大的时间戳作为水位线就可以了。

                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp; //默认是毫秒}}))

上面代码中我们调用 .withTimestampAssigner() 方法,将数据中的 timestamp 字段提取出来,作为时间戳分配给数据元素;然后用内置的有序流水位线生成器构造出了生成策略。这样,提取出的数据时间戳,就是我们处理计算的事件时间。
这里需要注意的是,时间戳和水位线的单位,必须都是毫秒。
(2)乱序流
由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness() 方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。
代码示例如下:

public class WatermarkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//周期性生成watermarkTestenv.getConfig().setAutoWatermarkInterval(100);// 读取数据源,并行度为 1DataStream<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L),new Event("Alice", "./prod?id=100", 3000L),new Event("Alice", "./prod?id=200", 3500L),new Event("Bob", "/prod?id=2", 2500L),new Event("Alice", "./prod?id=300", 3600L),new Event("Bob", "./home", 3000L),new Event("Bob", "./prod?id=1", 2300L),new Event("Bob", "./prod?id=3", 3300L))//乱序的Watermarks,延迟时间设置为 2s.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {// 抽取时间戳的逻辑@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));env.execute();}
}

上面代码中,我们同样提取了 timestamp 字段作为时间戳,并且以 5 秒的延迟时间创建了处理乱序流的水位线生成器。
事实上,有序流的水位线生成器本质上和乱序流是一样的,相当于延迟设为 0 的乱序流水位线生成器,两者完全等同:

WatermarkStrategy.forMonotonousTimestamps()
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))

这里需要注意的是,乱序流中生成的水位线真正的时间戳,其实是 当前最大时间戳 – 延迟时间 – 1,这里的单位是毫秒。为什么要减 1 毫秒呢?我们可以回想一下水位线的特点:时间戳为 t 的水位线,表示时间戳≤t 的数据全部到齐,不会再来了。如果考虑有序流,也就是延迟时间为 0 的情况,那么时间戳为 7 秒的数据到来时,之后其实是还有可能继续来 7 秒的数据的;所以生成的水位线不是 7 秒,而是 6 秒 999 毫秒,7 秒的数据还可以继续来。这一点可以在 BoundedOutOfOrdernessWatermarks 的源码中明显地看到:

在这里插入图片描述
Flink有序乱序流测试源代码:Flink有序乱序流测试

自定义水位线策略

一般来说,Flink 内置的水位线生成器就可以满足应用需求了。不过有时我们的业务逻辑可能非常复杂,这时对水位线生成的逻辑也有更高的要求,我们就必须自定义实现水位线策略 WatermarkStrategy 了。
在 WatermarkStrategy 中,时间戳分配器 TimestampAssigner 都是大同小异的,指定字段提取时间戳就可以了;而不同策略的关键就在于 WatermarkGenerator 的实现。整体说来,Flink有两种不同的生成水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated)。还记得 WatermarkGenerator 接口中的两个方法吗?—— onEvent()onPeriodicEmit(),前者是在每个事件到来时调用,而后者由框架周期性调用。周期性调用的方法中发出水位线,自然就是周期性生成水位线;而在事件触发的方法中发出水位线,自然就是断点式生成了。两种方式的不同就集中体现在这两个方法的实现上。

(1)周期性水位线生成器(Periodic Generator)
周期性生成器一般是通过 onEvent()观察判断输入的事件,而在 onPeriodicEmit()里发出水位线。
下面是一段自定义周期性生成水位线的代码:
周期性水位线生成器源代码

onPeriodicEmit()里调用 output.emitWatermark(),就可以发出水位线了;这个方法由系统框架周期性地调用,默认 200ms 一次。所以水位线的时间戳是依赖当前已有数据的最大时间戳的(这里的实现与内置生成器类似,也是减去延迟时间再减 1),但具体什么时候生成与数据无关。

(2)断点式水位线生成器(Punctuated Generator)
断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时,就立即发出水位线。一般来说,断点式生成器不会通过 onPeriodicEmit()发出水位线。
自定义的断点式水位线生成器代码如下:断点式水位线生成器源代码
在 onEvent()中判断当前事件的 user 字段,只有遇到“Mary”这个特殊的值时,才调用 output.emitWatermark() 发出水位线。这个过程是完全依靠事件来触发的,所以水位线的生成一定在某个数据到来之后。

在自定义数据源中发送水位线

我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks 方法 来 生 成 水 位 线 了 。 在 自 定 义 数 据 源 中 生 成 水 位 线 和 在 程 序 中 使 用assignTimestampsAndWatermarks 方法生成水位线二者只能取其一。
示例程序如下:自定义数据源中发送水位线
在自定义水位线中生成水位线相比 assignTimestampsAndWatermarks 方法更加灵活,可以任意的产生周期性的、非周期性的水位线,以及水位线的大小也完全由我们自定义。所以非常适合用来编写 Flink 的测试程序,测试 Flink 的各种各样的特性。

水位线的总结

水位线在事件时间的世界里面,承担了时钟的角色。也就是说在事件时间的流中,水位线是唯一的时间尺度。
水位线是一种特殊的事件,由程序员通过编程插入的数据流里面,然后跟随数据流向下游流动。

水位线的默认计算公式:水位线 = 观察到的最大事件时间 – 最大延迟时间 – 1 毫秒。

所以这里涉及到一个问题,就是不同的算子看到的水位线的大小可能是不一样的。因为下游的算子可能并未接收到来自上游算子的水位线,导致下游算子的时钟要落后于上游算子的时钟。比如 map->reduce 这样的操作,如果在 map 中编写了非常耗时间的代码,将会阻塞水位线的向下传播,因为水位线也是数据流中的一个事件,位于水位线前面的数据如果没有处理完毕,那么水位线不可能弯道超车绕过前面的数据向下游传播,也就是说会被前面的数据阻塞。
这样就会影响到下游算子的聚合计算,因为下游算子中无论由窗口聚合还是定时器的操作,都需要水位线才能触发执行。这也就告诉了我们,在编写 Flink 程序时,一定要谨慎的编写每一个算子的计算逻辑,尽量避免大量计算或者是大量的 IO 操作,这样才不会阻塞水位线的向下传递。

在数据流开始之前,Flink 会插入一个大小是负无穷大(在 Java 中是-Long.MAX_VALUE)的水位线,而在数据流结束时,Flink 会插入一个正无穷大(Long.MAX_VALUE)的水位线,保证所有的窗口闭合以及所有的定时器都被触发。对于离线数据集,Flink 也会将其作为流读入,也就是一条数据一条数据的读取。在这种情况下,Flink 对于离线数据集,只会插入两次水位线,也就是在最开始处插入负无穷大的水位线,在结束位置插入一个正无穷大的水位线。因为只需要插入两次水位线,就可以保证计算的正确,无需在数据流的中间插入水位线了。

水位线的重要性在于它的逻辑时钟特性,而逻辑时钟这个概念可以说是分布式系统里面最为重要的概念之一了,理解透彻了对理解各种分布式系统非常有帮助。具体可以参考 Leslie Lamport 的论文。


http://chatgpt.dhexx.cn/article/vVcEeE0I.shtml

相关文章

vue -- watermark水印添加方法

作者&#xff1a;蛙哇 原文链接&#xff1a; https://segmentfault.com/a/1190000022055867 来源&#xff1a;segmentfault 前言 项目生成公司水印是很普遍的需求&#xff0c;下面是vue项目生产水印的方法。话不多说&#xff0c;复制粘贴就可以马上解决你的需求。 步骤1 创建…

tp-watermark.js网页添加水印插件

tp-watermark.js网页添加水印插件 作者&#xff1a;鹏仔先生 上周五&#xff0c;出差去改上个前端遗留的小问题&#xff0c;用到了watermark.js这个网站添加水印插件&#xff0c;功能很简单&#xff0c;就是给网页添加个水印&#xff0c;我看了下网上&#xff0c;有很多种&…

实用有效!React项目中使用watermark.js添加水印效果

为了避免公司的内部文档被截图外泄&#xff0c;有必要在系统页面上面增加水印。 第一步&#xff1a; 下载依赖包&#xff1a; npm install watermark-dompackage.json中会添加一个依赖如下&#xff1a; "watermark-dom": "^2.3.0"第二步&#xff1a; 引…

Flink之水位线(Watermark)

在流数据处理应用中&#xff0c;一个很重要、也很常见的操作就是窗口计算。所谓的“窗口”&#xff0c;一般就是划定的一段时间范围&#xff0c;也就是“时间窗”&#xff1b;对在这范围内的数据进行处理&#xff0c;就是所谓的窗口计算。所以窗口和时间往往是分不开的。接下来…

水印watermark

第一步:npm获取水印组件包 npm install watermark-dom 第二步:引入水印模块 import watermark from ‘watermark-dom’ 或者 var watermarkDom require(“watermark-dom”) 根据业务需要&#xff0c;我是登入之后的页面才有水印&#xff0c;前者我是放在验证用户登录状态js文件…

Flink流计算编程--watermark(水位线)简介

1、watermark的概念 watermark是一种衡量Event Time进展的机制&#xff0c;它是数据本身的一个隐藏属性。通常基于Event Time的数据&#xff0c;自身都包含一个timestamp&#xff0c;例如1472693399700&#xff08;2016-09-01 09:29:59.700&#xff09;&#xff0c;而这条数据…

Flink之watermark(水印)讲解

flink中watermark的详细介绍 使用前提&#xff1a; 处理数据开窗&#xff0c;处理数据的时间语义是事件时间&#xff0c;也就是每条数据产生的时间。 使用场景&#xff08;解决问题&#xff09;&#xff1a; 处理乱序数据&#xff1a;flink中是实时处理数据&#xff0c;但是…

WaterMark使用和详解

上篇&#xff1a;基于flink的会话窗口的api实现 WaterMark翻译为水位线&#xff0c;什么时候用到水位线呢&#xff1f; 比如说水控在顺水的时候达到紧梯就会触发&#xff0c;若不放水就可以发现危险的现状 在spark程序划分成窗口的时候&#xff0c;主要是衡量什么时候触发&am…

【大数据】带你理解并使用flink中的WaterMark机制

文章目录 一、引导二、WaterMark1、Watermark的原理2、Watermark 的使用2.1、顺序数据流中的watermark示例 2.2、乱序数据流中的WaterMark2.2.1、With Periodic&#xff08;周期性的&#xff09; Watermark示例一&#xff1a;使用周期性的WaterMark2.2.2、With Punctuated&…

JavaEE是什么?JavaSE又是什么?两者的区别有哪些?

Java作为最流行的编程语言受到了许多人的喜爱&#xff0c;其在编程中的地位自不必多说。对于许多才刚刚入门Java的朋友来讲&#xff0c;常常会产生这样的困惑&#xff0c;JavaEE是什么&#xff1f;JavaSE又是什么&#xff1f;两者的区别有哪些&#xff1f;学哪个比较好&#xf…

php mysql Javaee_javaee与php的区别是什么

javaee与php的区别&#xff1a;1、JavaEE是门面向对象的程序设计语言&#xff0c;而PHP是解释执行的服务器脚本语言&#xff1b;2、用JavaEE开发的Web应用从MySQL数据库转到Oracle数据库只需要做很少的修改&#xff0c;而PHP则需要做大量的修改工作。 javaee与php的区别&#x…

解读JAVAEE是什么样的Java

javaEE是指java enterprise edition&#xff0c;java企业版&#xff0c;多用于企业级开发&#xff0c;包括web开发等等很多组建。javaEE开发会设计java的高级特性以及一些spring等架构&#xff0c;需要掌握的内容相对多。 JAVA&#xff0c;所有的Java平台都由一个JVM和一组应用…

java ee 下载什么意思_JavaEE到底是什么?

慕运维8079593 JavaEE只是一个规范吗&#xff1f;我的意思是&#xff1a;EJBJavaEE是吗&#xff1f;JavaEE确实是一个摘要规格说明。任何人都愿意开发和提供规范的工作实现。这个混凝土实现是所谓的应用服务器&#xff0c;如野弗利, 托梅, 玻璃鱼, 自由, WebLogic&#xff0c;等…

java ee api是什么意思_JavaEE的整体概述

标签&#xff1a; JavaEE整体概述 知识点&#xff1a; 1、整体概述JavaEE的知识体系 2、JavaEE是什么? 能干什么? 为什么需要JavaEE? 3、JavaEE有什么? JavaEE的技术体系? JavaEE的本质是什么? 4、JavaEE的零散基础知识 5、JavaEE的组件体系结构 -----------------------…

矩阵求逆方法

1.待定系数法 矩阵A 1, 2 -1,-3 假设所求的逆矩阵为 a,b c,d 则 从而可以得出方程组 a 2c 1 b 2d 0 -a - 3c 0 -b - 3d 1 解得 a3; b2; c -1; d -1 2.伴随矩阵求逆矩阵 伴随矩阵是矩阵元素所对应的代数余子式&#xff0c;所构成的矩阵&#xff0c;转置后得到的新矩阵…

复数矩阵求逆的 C 语言程序

关于复数矩阵求逆&#xff0c;如果使用 MATLAB&#xff0c;就非常简单。我们先用一个 MATLAB 的例子来说明&#xff0c;等会要将 C 语言的程序和 MATLAB 的程序进行对比。 close all; clear all; clc;%定义矩阵a为复数矩阵 a [[42*i,31*i,43*i,55*i];[17*i,82*i,22*i,93*i];[…

科学计算器如何求矩阵的逆

大学本科买了四年的计算器不会求逆&#xff0c;到了研究生了好好研究下这个功能&#xff0c;终于终于会用了&#xff0c;以往 对着那个矩阵功能都发懵&#x1f602;&#xff0c;记录一下这个史诗无敌隐藏功能 1、进入菜单&#xff0c;点击4进入矩阵菜单 2、这里选择1定义矩阵A…

matlab求一个矩阵的逆矩阵的命令,如何用MATLAB求逆矩阵

如何用MATLAB求逆矩阵以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧! 如何用MATLAB求逆矩阵 如果英文好呢,自己看目录 不好还是先看中文的教材,对matlab的框架和功能有了一定的了解后,自己也就看的懂帮助里面…

matlab矩阵求逆的模块,matlab矩阵求逆矩阵

matlab矩阵求逆矩阵 因为 所以该矩阵可逆&#xff0c;根据 &#xff0c;其中 得到 计算矩阵A每个元素的代数余子式&#xff1a; 所以 可得&#xff1a; matlab计算如下&#xff1a; >> A1[1 2 2;2 1 -2;2 -2 1] A1 1 2 2 2 1 -2 2 -2 1 >> >> >> A2in…

求矩阵的逆的三种方法

我们知道求矩阵的逆具有非常重要的意义&#xff0c;本文分享给大家如何针对3阶以内的方阵&#xff0c;求出逆矩阵的3种手算方法&#xff1a;待定系数法、伴随矩阵法、初等变换法&#xff08;只介绍初等行变换&#xff09; 待定系数法求逆矩阵 1 首先&#xff0c;我们来看如何使…