Flink WaterMark 详解

article/2025/9/20 20:08:16

摘录仅供学习使用,原文来自: Flink详解系列之五--水位线(watermark) - 简书

1、概念

在Flink中,水位线是一种衡量Event Time进展的机制,用来处理实时数据中的乱序问题的,通常是水位线和窗口结合使用来实现。

从设备生成实时流事件,到Flink的source,再到多个oparator处理数据,过程中会受到网络延迟、背压等多种因素影响造成数据乱序。在进行窗口处理时,不可能无限期的等待延迟数据到达,当到达特定watermark时,认为在watermark之前的数据已经全部达到(即使后面还有延迟的数据), 可以触发窗口计算,这个机制就是 Watermark(水位线),具体如下图所示。

 window的生成时间是基于wall clock(processing time)的,与event time无关。

2、水位线的计算

watermark本质上是一个时间戳,且是动态变化的,会根据当前最大事件时间产生。watermarks具体计算为:

watermark = 进入 Flink 窗口的最大的事件时间(maxEventTime)— 指定的延迟时间(t)

当watermark时间戳大于等于窗口结束时间时,意味着窗口结束,需要触发窗口计算。

 watermark的本质是使用event time 做一个函数映射生成触发window计算的wall clock时间(processing time).

3、水位线生成

3.1 生成的时机

水位线生产的最佳位置是在尽可能靠近数据源的地方,因为水位线生成时会做出一些有关元素顺序相对时间戳的假设。由于数据源读取过程是并行的,一切引起Flink跨行数据流分区进行重新分发的操作(比如:改变并行度,keyby等)都会导致元素时间戳乱序。但是如果是某些初始化的filter、map等不会引起元素重新分发的操作,可以考虑在生成水位线之前使用。

3.2 水位线分配器

  • Periodic Watermarks

周期性分配水位线比较常用,是我们会指示系统以固定的时间间隔发出的水位线。在设置时间为事件时间时,会默认设置这个时间间隔为200ms, 如果需要调整可以自行设置。比如下面的例子是手动设置每隔1s发出水位线。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 手动设置时间间隔为1s
env.getConfig().setAutoWatermarkInterval(1000);

周期水位线需要实现接口:AssignerWithPeriodicWatermarks,下面是示例:

public class TestPeriodWatermark implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {Long currentMaxTimestamp = 0L;final Long maxOutOfOrderness = 1000L;// 延迟时长是1s@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {long timestamp = element.f1;currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);return timestamp;}
}
  • Punctuated Watermarks

定点水位线不是太常用,主要为输入流中包含一些用于指示系统进度的特殊元组和标记,方便根据输入元素生成水位线的场景使用的。

由于数据流中每一个递增的EventTime都会产生一个Watermark。
在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。

public class TestPunctuateWatermark implements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> {@Nullable@Overridepublic Watermark checkAndGetNextWatermark(Tuple2<String, Long> lastElement, long extractedTimestamp) {return new Watermark(extractedTimestamp);}@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {return element.f1;}
}

4、水位线与数据完整性

水位线可以用于平衡延迟和结果的完整性,它控制着执行某些计算需要等待的时间。这个时间是预估的,现实中不存在完美的水位线,因为总会存在延迟的记录。现实处理中,需要我们足够了解从数据生成到数据源的整个过程,来估算延迟的上线,才能更好的设置水位线。

如果水位线设置的过于宽松,好处是计算时能保证近可能多的数据被收集到,但由于此时的水位线远落后于处理记录的时间戳,导致产生的数据结果延迟较大。

如果设置的水位线过于紧迫,数据结果的时效性当然会更好,但由于水位线大于部分记录的时间戳,数据的完整性就会打折扣。

所以,水位线的设置需要更多的去了解数据,并在数据时效性和完整性上有一个权衡。


 

===================================

从Flink 1.12 开始WaterMark接口进行了更新,原理没有变化。

WatermarkStrategy

-- TimestampAssigner
-- WatermarkGenerator

TimestampAssigner


/*** A {@code TimestampAssigner} assigns event time timestamps to elements. These timestamps are used* by all functions that operate on event time, for example event time windows.** <p>Timestamps can be an arbitrary {@code long} value, but all built-in implementations represent* it as the milliseconds since the Epoch (midnight, January 1, 1970 UTC), the same way as {@link* System#currentTimeMillis()} does it.** @param <T> The type of the elements to which this assigner assigns timestamps.*/
@Public
@FunctionalInterface
public interface TimestampAssigner<T> {/*** The value that is passed to {@link #extractTimestamp} when there is no previous timestamp* attached to the record.*/long NO_TIMESTAMP = Long.MIN_VALUE;/*** Assigns a timestamp to an element, in milliseconds since the Epoch. This is independent of* any particular time zone or calendar.** <p>The method is passed the previously assigned timestamp of the element. That previous* timestamp may have been assigned from a previous assigner. If the element did not carry a* timestamp before, this value is {@link #NO_TIMESTAMP} (= {@code Long.MIN_VALUE}: {@value* Long#MIN_VALUE}).** @param element The element that the timestamp will be assigned to.* @param recordTimestamp The current internal timestamp of the element, or a negative value, if*     no timestamp has been assigned yet.* @return The new timestamp.*/long extractTimestamp(T element, long recordTimestamp);
}

默认实现的类: RecordTimestampAssigner


/*** A {@link TimestampAssigner} that forwards the already-assigned timestamp. This is for use when* records come out of a source with valid timestamps, for example from the Kafka Metadata.*/
@Public
public final class RecordTimestampAssigner<E> implements TimestampAssigner<E> {@Overridepublic long extractTimestamp(E element, long recordTimestamp) {return recordTimestamp;}
}

WatermarkGenerator


/*** The {@code WatermarkGenerator} generates watermarks either based on events or periodically (in a* fixed interval).** <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the {@code* AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.*/
@Public
public interface WatermarkGenerator<T> {/*** Called for every event, allows the watermark generator to examine and remember the event* timestamps, or to emit a watermark based on the event itself.*/void onEvent(T event, long eventTimestamp, WatermarkOutput output);/*** Called periodically, and might emit a new watermark, or not.** <p>The interval in which this method is called and Watermarks are generated depends on {@link* ExecutionConfig#getAutoWatermarkInterval()}.*/void onPeriodicEmit(WatermarkOutput output);
}

实现的类主要有:

BoundedOutOfOrdernessWatermarks
WatermarksWithIdleness

参考:

Flink详解系列之五--水位线(watermark) - 简书

(一)Flink WaterMark 详解及实例 - 掘金

https://mp.csdn.net/mp_blog/creation/editor/129186757


http://chatgpt.dhexx.cn/article/ciS7mdDZ.shtml

相关文章

Flink:watermark

Table of Contents 三种时间概念 Processing time Event Time Ingestion time watermark 并行流的Watermarks 迟到的事件 watermark分配器 watermark的两种分配器 三种时间概念 在谈watermark之前&#xff0c;首先需要了解flink的三种时间概念。在flink中&#xff0c;…

Flink 水位线(Watermark)

文章目录 什么是水位线水位线的特性如何生成水位线Flink 内置水位线生成器自定义水位线策略在自定义数据源中发送水位线水位线的总结 在实际应用中&#xff0c;一般会采用事件时间语义。而水位线&#xff0c;就是基于事件时间提出的概念。一个数据产生的时刻&#xff0c;就是流…

vue -- watermark水印添加方法

作者&#xff1a;蛙哇 原文链接&#xff1a; https://segmentfault.com/a/1190000022055867 来源&#xff1a;segmentfault 前言 项目生成公司水印是很普遍的需求&#xff0c;下面是vue项目生产水印的方法。话不多说&#xff0c;复制粘贴就可以马上解决你的需求。 步骤1 创建…

tp-watermark.js网页添加水印插件

tp-watermark.js网页添加水印插件 作者&#xff1a;鹏仔先生 上周五&#xff0c;出差去改上个前端遗留的小问题&#xff0c;用到了watermark.js这个网站添加水印插件&#xff0c;功能很简单&#xff0c;就是给网页添加个水印&#xff0c;我看了下网上&#xff0c;有很多种&…

实用有效!React项目中使用watermark.js添加水印效果

为了避免公司的内部文档被截图外泄&#xff0c;有必要在系统页面上面增加水印。 第一步&#xff1a; 下载依赖包&#xff1a; npm install watermark-dompackage.json中会添加一个依赖如下&#xff1a; "watermark-dom": "^2.3.0"第二步&#xff1a; 引…

Flink之水位线(Watermark)

在流数据处理应用中&#xff0c;一个很重要、也很常见的操作就是窗口计算。所谓的“窗口”&#xff0c;一般就是划定的一段时间范围&#xff0c;也就是“时间窗”&#xff1b;对在这范围内的数据进行处理&#xff0c;就是所谓的窗口计算。所以窗口和时间往往是分不开的。接下来…

水印watermark

第一步:npm获取水印组件包 npm install watermark-dom 第二步:引入水印模块 import watermark from ‘watermark-dom’ 或者 var watermarkDom require(“watermark-dom”) 根据业务需要&#xff0c;我是登入之后的页面才有水印&#xff0c;前者我是放在验证用户登录状态js文件…

Flink流计算编程--watermark(水位线)简介

1、watermark的概念 watermark是一种衡量Event Time进展的机制&#xff0c;它是数据本身的一个隐藏属性。通常基于Event Time的数据&#xff0c;自身都包含一个timestamp&#xff0c;例如1472693399700&#xff08;2016-09-01 09:29:59.700&#xff09;&#xff0c;而这条数据…

Flink之watermark(水印)讲解

flink中watermark的详细介绍 使用前提&#xff1a; 处理数据开窗&#xff0c;处理数据的时间语义是事件时间&#xff0c;也就是每条数据产生的时间。 使用场景&#xff08;解决问题&#xff09;&#xff1a; 处理乱序数据&#xff1a;flink中是实时处理数据&#xff0c;但是…

WaterMark使用和详解

上篇&#xff1a;基于flink的会话窗口的api实现 WaterMark翻译为水位线&#xff0c;什么时候用到水位线呢&#xff1f; 比如说水控在顺水的时候达到紧梯就会触发&#xff0c;若不放水就可以发现危险的现状 在spark程序划分成窗口的时候&#xff0c;主要是衡量什么时候触发&am…

【大数据】带你理解并使用flink中的WaterMark机制

文章目录 一、引导二、WaterMark1、Watermark的原理2、Watermark 的使用2.1、顺序数据流中的watermark示例 2.2、乱序数据流中的WaterMark2.2.1、With Periodic&#xff08;周期性的&#xff09; Watermark示例一&#xff1a;使用周期性的WaterMark2.2.2、With Punctuated&…

JavaEE是什么?JavaSE又是什么?两者的区别有哪些?

Java作为最流行的编程语言受到了许多人的喜爱&#xff0c;其在编程中的地位自不必多说。对于许多才刚刚入门Java的朋友来讲&#xff0c;常常会产生这样的困惑&#xff0c;JavaEE是什么&#xff1f;JavaSE又是什么&#xff1f;两者的区别有哪些&#xff1f;学哪个比较好&#xf…

php mysql Javaee_javaee与php的区别是什么

javaee与php的区别&#xff1a;1、JavaEE是门面向对象的程序设计语言&#xff0c;而PHP是解释执行的服务器脚本语言&#xff1b;2、用JavaEE开发的Web应用从MySQL数据库转到Oracle数据库只需要做很少的修改&#xff0c;而PHP则需要做大量的修改工作。 javaee与php的区别&#x…

解读JAVAEE是什么样的Java

javaEE是指java enterprise edition&#xff0c;java企业版&#xff0c;多用于企业级开发&#xff0c;包括web开发等等很多组建。javaEE开发会设计java的高级特性以及一些spring等架构&#xff0c;需要掌握的内容相对多。 JAVA&#xff0c;所有的Java平台都由一个JVM和一组应用…

java ee 下载什么意思_JavaEE到底是什么?

慕运维8079593 JavaEE只是一个规范吗&#xff1f;我的意思是&#xff1a;EJBJavaEE是吗&#xff1f;JavaEE确实是一个摘要规格说明。任何人都愿意开发和提供规范的工作实现。这个混凝土实现是所谓的应用服务器&#xff0c;如野弗利, 托梅, 玻璃鱼, 自由, WebLogic&#xff0c;等…

java ee api是什么意思_JavaEE的整体概述

标签&#xff1a; JavaEE整体概述 知识点&#xff1a; 1、整体概述JavaEE的知识体系 2、JavaEE是什么? 能干什么? 为什么需要JavaEE? 3、JavaEE有什么? JavaEE的技术体系? JavaEE的本质是什么? 4、JavaEE的零散基础知识 5、JavaEE的组件体系结构 -----------------------…

矩阵求逆方法

1.待定系数法 矩阵A 1, 2 -1,-3 假设所求的逆矩阵为 a,b c,d 则 从而可以得出方程组 a 2c 1 b 2d 0 -a - 3c 0 -b - 3d 1 解得 a3; b2; c -1; d -1 2.伴随矩阵求逆矩阵 伴随矩阵是矩阵元素所对应的代数余子式&#xff0c;所构成的矩阵&#xff0c;转置后得到的新矩阵…

复数矩阵求逆的 C 语言程序

关于复数矩阵求逆&#xff0c;如果使用 MATLAB&#xff0c;就非常简单。我们先用一个 MATLAB 的例子来说明&#xff0c;等会要将 C 语言的程序和 MATLAB 的程序进行对比。 close all; clear all; clc;%定义矩阵a为复数矩阵 a [[42*i,31*i,43*i,55*i];[17*i,82*i,22*i,93*i];[…

科学计算器如何求矩阵的逆

大学本科买了四年的计算器不会求逆&#xff0c;到了研究生了好好研究下这个功能&#xff0c;终于终于会用了&#xff0c;以往 对着那个矩阵功能都发懵&#x1f602;&#xff0c;记录一下这个史诗无敌隐藏功能 1、进入菜单&#xff0c;点击4进入矩阵菜单 2、这里选择1定义矩阵A…

matlab求一个矩阵的逆矩阵的命令,如何用MATLAB求逆矩阵

如何用MATLAB求逆矩阵以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧! 如何用MATLAB求逆矩阵 如果英文好呢,自己看目录 不好还是先看中文的教材,对matlab的框架和功能有了一定的了解后,自己也就看的懂帮助里面…