Flink自定义生成 Watermark

article/2025/9/20 20:13:24

Watermark 策略简介 #

为了使用事件时间语义,Flink 应用程序需要知道事件时间戳对应的字段,意味着数据流中的每个元素都需要拥有可分配的事件时间戳。其通常通过使用 TimestampAssigner API 从元素中的某个字段去访问/提取时间戳。

时间戳的分配与 watermark 的生成是齐头并进的,其可以告诉 Flink 应用程序事件时间的进度。其可以通过指定 WatermarkGenerator 来配置 watermark 的生成方式。

使用 Flink API 时需要设置一个同时包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategyWatermarkStrategy 工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。WatermarkStrategy 接口如下:

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{/*** 根据策略实例化一个可分配时间戳的 {@link TimestampAssigner}。*/@OverrideTimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);/*** 根据策略实例化一个 watermark 生成器。*/@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

如上所述,通常情况下,你不用实现此接口,而是可以使用 WatermarkStrategy 工具类中通用的 watermark 策略,或者可以使用这个工具类将自定义的 TimestampAssigner 与 WatermarkGenerator 进行绑定。例如,你想要要使用有界无序(bounded-out-of-orderness)watermark 生成器和一个 lambda 表达式作为时间戳分配器,那么可以按照如下方式实现:

WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner((event, timestamp) -> event.f0);

其中 TimestampAssigner 的设置与否是可选的,大多数情况下,可以不用去特别指定。例如,当使用 Kafka 或 Kinesis 数据源时,你可以直接从 Kafka/Kinesis 数据源记录中获取到时间戳。

稍后我们将在自定义 WatermarkGenerator 小节学习 WatermarkGenerator 接口。

注意: 时间戳和 watermark 都是从 1970-01-01T00:00:00Z 起的 Java 纪元开始,并以毫秒为单位。

使用 Watermark 策略 #

WatermarkStrategy 可以在 Flink 应用程序中的两处使用,第一种是直接在数据源上使用,第二种是直接在非数据源的操作之后使用。

第一种方式相比会更好,因为数据源可以利用 watermark 生成逻辑中有关分片/分区(shards/partitions/splits)的信息。使用这种方式,数据源通常可以更精准地跟踪 watermark,整体 watermark 生成将更精确。直接在源上指定 WatermarkStrategy 意味着你必须使用特定数据源接口,参阅 Watermark 策略与 Kafka 连接器以了解如何使用 Kafka Connector,以及有关每个分区的 watermark 是如何生成以及工作的。

仅当无法直接在数据源上设置策略时,才应该使用第二种方式(在任意转换操作之后设置 WatermarkStrategy):

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<MyEvent> stream = env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter(), typeInfo);DataStream<MyEvent> withTimestampsAndWatermarks = stream.filter( event -> event.severity() == WARNING ).assignTimestampsAndWatermarks(<watermark strategy>);withTimestampsAndWatermarks.keyBy( (event) -> event.getGroup() ).window(TumblingEventTimeWindows.of(Time.seconds(10))).reduce( (a, b) -> a.add(b) ).addSink(...);

使用 WatermarkStrategy 去获取流并生成带有时间戳的元素和 watermark 的新流时,如果原始流已经具有时间戳或 watermark,则新指定的时间戳分配器将覆盖原有的时间戳和 watermark。

处理空闲数据源 #

如果数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着 WatermarkGenerator 也不会获得任何新数据去生成 watermark。我们称这类数据源为空闲输入空闲源。在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,则其 watermark 将不会发生变化。

为了解决这个问题,你可以使用 WatermarkStrategy 来检测空闲输入并将其标记为空闲状态。WatermarkStrategy 为此提供了一个工具接口:

WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1));

Watermark alignment Beta #

In the previous paragraph we discussed a situation when splits/partitions/shards or sources are idle and can stall increasing watermarks. On the other side of the spectrum, a split/partition/shard or source may process records very fast and in turn increase its watermark relatively faster than the others. This on its own is not a problem per se. However, for downstream operators that are using watermarks to emit some data it can actually become a problem.

In this case, contrary to idle sources, the watermark of such downstream operator (like windowed joins on aggregations) can progress. However, such operator might need to buffer excessive amount of data coming from the fast inputs, as the minimal watermark from all of its inputs is held back by the lagging input. All records emitted by the fast input will hence have to be buffered in the said downstream operator state, which can lead into uncontrollable growth of the operator’s state.

In order to address the issue, you can enable watermark alignment, which will make sure no sources/splits/shards/partitions increase their watermarks too far ahead of the rest. You can enable alignment for every source separately:

WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)).withWatermarkAlignment("alignment-group-1", Duration.ofSeconds(20), Duration.ofSeconds(1));
Note: You can enable watermark alignment only for  FLIP-27 sources. It does not work for legacy or if applied after the source via  DataStream#assignTimestampsAndWatermarks.

When enabling the alignment, you need to tell Flink, which group should the source belong. You do that by providing a label (e.g. alignment-group-1) which bind together all sources that share it. Moreover, you have to tell the maximal drift from the current minimal watermarks across all sources belonging to that group. The third parameter describes how often the current maximal watermark should be updated. The downside of frequent updates is that there will be more RPC messages travelling between TMs and the JM.

In order to achieve the alignment Flink will pause consuming from the source/task, which generated watermark that is too far into the future. In the meantime it will continue reading records from other sources/tasks which can move the combined watermark forward and that way unblock the faster one.

Note: As of 1.15, Flink supports aligning across tasks of the same source and/or different sources. It does not support aligning splits/partitions/shards in the same task.

In a case where there are e.g. two Kafka partitions that produce watermarks at different pace, that get assigned to the same task watermark might not behave as expected. Fortunately, worst case it should not perform worse than without alignment.

Given the limitation above, we suggest applying watermark alignment in two situations:

  1. You have two different sources (e.g. Kafka and File) that produce watermarks at different speeds
  2. You run your source with parallelism equal to the number of splits/shards/partitions, which results in every subtask being assigned a single unit of work.

自定义 WatermarkGenerator #

TimestampAssigner 是一个可以从事件数据中提取时间戳字段的简单函数,我们无需详细查看其实现。但是 WatermarkGenerator 的编写相对就要复杂一些了,我们将在接下来的两小节中介绍如何实现此接口。WatermarkGenerator 接口代码如下:

/*** {@code WatermarkGenerator} 可以基于事件或者周期性的生成 watermark。** <p><b>注意:</b>  WatermarkGenerator 将以前互相独立的 {@code AssignerWithPunctuatedWatermarks} * 和 {@code AssignerWithPeriodicWatermarks} 一同包含了进来。*/
@Public
public interface WatermarkGenerator<T> {/*** 每来一条事件数据调用一次,可以检查或者记录事件的时间戳,或者也可以基于事件数据本身去生成 watermark。*/void onEvent(T event, long eventTimestamp, WatermarkOutput output);/*** 周期性的调用,也许会生成新的 watermark,也许不会。** <p>调用此方法生成 watermark 的间隔时间由 {@link ExecutionConfig#getAutoWatermarkInterval()} 决定。*/void onPeriodicEmit(WatermarkOutput output);
}

watermark 的生成方式本质上是有两种:周期性生成标记生成

周期性生成器通常通过 onEvent() 观察传入的事件数据,然后在框架调用 onPeriodicEmit() 时发出 watermark。

标记生成器将查看 onEvent() 中的事件数据,并等待检查在流中携带 watermark 的特殊标记事件或打点数据。当获取到这些事件数据时,它将立即发出 watermark。通常情况下,标记生成器不会通过 onPeriodicEmit() 发出 watermark。

接下来,我们将学习如何实现上述两类生成器。

自定义周期性 Watermark 生成器 #

周期性生成器会观察流事件数据并定期生成 watermark(其生成可能取决于流数据,或者完全基于处理时间)。

生成 watermark 的时间间隔(每 n 毫秒)可以通过 ExecutionConfig.setAutoWatermarkInterval(...) 指定。每次都会调用生成器的 onPeriodicEmit() 方法,如果返回的 watermark 非空且值大于前一个 watermark,则将发出新的 watermark。

如下是两个使用周期性 watermark 生成器的简单示例。注意:Flink 已经附带了 BoundedOutOfOrdernessWatermarks,它实现了 WatermarkGenerator,其工作原理与下面的 BoundedOutOfOrdernessGenerator 相似。可以在这里参阅如何使用它的内容。

/*** 该 watermark 生成器可以覆盖的场景是:数据源在一定程度上乱序。* 即某个最新到达的时间戳为 t 的元素将在最早到达的时间戳为 t 的元素之后最多 n 毫秒到达。*/
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {private final long maxOutOfOrderness = 3500; // 3.5 秒private long currentMaxTimestamp;@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发出的 watermark = 当前最大时间戳 - 最大乱序时间output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));}}/*** 该生成器生成的 watermark 滞后于处理时间固定量。它假定元素会在有限延迟后到达 Flink。*/
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {private final long maxTimeLag = 5000; // 5 秒@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {// 处理时间场景下不需要实现}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));}
}

自定义标记 Watermark 生成器 #

标记 watermark 生成器观察流事件数据并在获取到带有 watermark 信息的特殊事件元素时发出 watermark。

如下是实现标记生成器的方法,当事件带有某个指定标记时,该生成器就会发出 watermark:

public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {@Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {if (event.hasWatermarkMarker()) {output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));}}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// onEvent 中已经实现}
}
注意: 可以针对每个事件去生成 watermark。但是由于每个 watermark 都会在下游做一些计算,因此过多的 watermark 会降低程序性能。

Watermark 策略与 Kafka 连接器 #

当使用 Apache Kafka 连接器作为数据源时,每个 Kafka 分区可能有一个简单的事件时间模式(递增的时间戳或有界无序)。然而,当使用 Kafka 数据源时,多个分区常常并行使用,因此交错来自各个分区的事件数据就会破坏每个分区的事件时间模式(这是 Kafka 消费客户端所固有的)。

在这种情况下,你可以使用 Flink 中可识别 Kafka 分区的 watermark 生成机制。使用此特性,将在 Kafka 消费端内部针对每个 Kafka 分区生成 watermark,并且不同分区 watermark 的合并方式与在数据流 shuffle 时的合并方式相同。

例如,如果每个 Kafka 分区中的事件时间戳严格递增,则使用单调递增时间戳分配器按分区生成的 watermark 将生成完美的全局 watermark。注意,我们在示例中未使用 TimestampAssigner,而是使用了 Kafka 记录自身的时间戳。

下图展示了如何使用单 kafka 分区 watermark 生成机制,以及在这种情况下 watermark 如何通过 dataflow 传播。

FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)));DataStream<MyType> stream = env.addSource(kafkaSource);

 

算子处理 Watermark 的方式 #

一般情况下,在将 watermark 转发到下游之前,需要算子对其进行触发的事件完全进行处理。例如,WindowOperator 将首先计算该 watermark 触发的所有窗口数据,当且仅当由此 watermark 触发计算进而生成的所有数据被转发到下游之后,其才会被发送到下游。换句话说,由于此 watermark 的出现而产生的所有数据元素都将在此 watermark 之前发出。

相同的规则也适用于 TwoInputStreamOperator。但是,在这种情况下,算子当前的 watermark 会取其两个输入的最小值。

详细内容可查看对应算子的实现:OneInputStreamOperator#processWatermarkTwoInputStreamOperator#processWatermark1 和 TwoInputStreamOperator#processWatermark2

可以弃用 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks 了 #

在 Flink 新的 WatermarkStrategyTimestampAssigner 和 WatermarkGenerator 的抽象接口之前,Flink 使用的是 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks。你仍可以在 API 中看到它们,但建议使用新接口,因为其对时间戳和 watermark 等重点的抽象和分离很清晰,并且还统一了周期性和标记形式的 watermark 生成方式。


http://chatgpt.dhexx.cn/article/rciElT2w.shtml

相关文章

Flink学习:WaterMark

WaterMark 一、什么是水位线?二、案例分析三、如何生成水位线?(一)、在SourceFunction中直接定义Timestamps和Watermarks(二)、自定义生成Timstamps和Watermarks 一、什么是水位线? 通常情况下,由于网络或系统等外部因素影响,事件数据往往不能及时传输至Flink系统中,导致数…

flink watermark

flink1.12版本开始&#xff0c;事件事件作为默认的时间语义 watermark是flink逻辑时钟&#xff0c;不是真正意义上的表&#xff0c;而是靠着数据去推动它的时间不停的往前走 工厂生产的商品上面印有时间戳&#xff0c;八点到九点的商品要坐一班车运走&#xff0c;商品从生产到…

Flink WaterMark 详解

摘录仅供学习使用&#xff0c;原文来自&#xff1a; Flink详解系列之五--水位线&#xff08;watermark&#xff09; - 简书 1、概念 在Flink中&#xff0c;水位线是一种衡量Event Time进展的机制&#xff0c;用来处理实时数据中的乱序问题的&#xff0c;通常是水位线和窗口结合…

Flink:watermark

Table of Contents 三种时间概念 Processing time Event Time Ingestion time watermark 并行流的Watermarks 迟到的事件 watermark分配器 watermark的两种分配器 三种时间概念 在谈watermark之前&#xff0c;首先需要了解flink的三种时间概念。在flink中&#xff0c;…

Flink 水位线(Watermark)

文章目录 什么是水位线水位线的特性如何生成水位线Flink 内置水位线生成器自定义水位线策略在自定义数据源中发送水位线水位线的总结 在实际应用中&#xff0c;一般会采用事件时间语义。而水位线&#xff0c;就是基于事件时间提出的概念。一个数据产生的时刻&#xff0c;就是流…

vue -- watermark水印添加方法

作者&#xff1a;蛙哇 原文链接&#xff1a; https://segmentfault.com/a/1190000022055867 来源&#xff1a;segmentfault 前言 项目生成公司水印是很普遍的需求&#xff0c;下面是vue项目生产水印的方法。话不多说&#xff0c;复制粘贴就可以马上解决你的需求。 步骤1 创建…

tp-watermark.js网页添加水印插件

tp-watermark.js网页添加水印插件 作者&#xff1a;鹏仔先生 上周五&#xff0c;出差去改上个前端遗留的小问题&#xff0c;用到了watermark.js这个网站添加水印插件&#xff0c;功能很简单&#xff0c;就是给网页添加个水印&#xff0c;我看了下网上&#xff0c;有很多种&…

实用有效!React项目中使用watermark.js添加水印效果

为了避免公司的内部文档被截图外泄&#xff0c;有必要在系统页面上面增加水印。 第一步&#xff1a; 下载依赖包&#xff1a; npm install watermark-dompackage.json中会添加一个依赖如下&#xff1a; "watermark-dom": "^2.3.0"第二步&#xff1a; 引…

Flink之水位线(Watermark)

在流数据处理应用中&#xff0c;一个很重要、也很常见的操作就是窗口计算。所谓的“窗口”&#xff0c;一般就是划定的一段时间范围&#xff0c;也就是“时间窗”&#xff1b;对在这范围内的数据进行处理&#xff0c;就是所谓的窗口计算。所以窗口和时间往往是分不开的。接下来…

水印watermark

第一步:npm获取水印组件包 npm install watermark-dom 第二步:引入水印模块 import watermark from ‘watermark-dom’ 或者 var watermarkDom require(“watermark-dom”) 根据业务需要&#xff0c;我是登入之后的页面才有水印&#xff0c;前者我是放在验证用户登录状态js文件…

Flink流计算编程--watermark(水位线)简介

1、watermark的概念 watermark是一种衡量Event Time进展的机制&#xff0c;它是数据本身的一个隐藏属性。通常基于Event Time的数据&#xff0c;自身都包含一个timestamp&#xff0c;例如1472693399700&#xff08;2016-09-01 09:29:59.700&#xff09;&#xff0c;而这条数据…

Flink之watermark(水印)讲解

flink中watermark的详细介绍 使用前提&#xff1a; 处理数据开窗&#xff0c;处理数据的时间语义是事件时间&#xff0c;也就是每条数据产生的时间。 使用场景&#xff08;解决问题&#xff09;&#xff1a; 处理乱序数据&#xff1a;flink中是实时处理数据&#xff0c;但是…

WaterMark使用和详解

上篇&#xff1a;基于flink的会话窗口的api实现 WaterMark翻译为水位线&#xff0c;什么时候用到水位线呢&#xff1f; 比如说水控在顺水的时候达到紧梯就会触发&#xff0c;若不放水就可以发现危险的现状 在spark程序划分成窗口的时候&#xff0c;主要是衡量什么时候触发&am…

【大数据】带你理解并使用flink中的WaterMark机制

文章目录 一、引导二、WaterMark1、Watermark的原理2、Watermark 的使用2.1、顺序数据流中的watermark示例 2.2、乱序数据流中的WaterMark2.2.1、With Periodic&#xff08;周期性的&#xff09; Watermark示例一&#xff1a;使用周期性的WaterMark2.2.2、With Punctuated&…

JavaEE是什么?JavaSE又是什么?两者的区别有哪些?

Java作为最流行的编程语言受到了许多人的喜爱&#xff0c;其在编程中的地位自不必多说。对于许多才刚刚入门Java的朋友来讲&#xff0c;常常会产生这样的困惑&#xff0c;JavaEE是什么&#xff1f;JavaSE又是什么&#xff1f;两者的区别有哪些&#xff1f;学哪个比较好&#xf…

php mysql Javaee_javaee与php的区别是什么

javaee与php的区别&#xff1a;1、JavaEE是门面向对象的程序设计语言&#xff0c;而PHP是解释执行的服务器脚本语言&#xff1b;2、用JavaEE开发的Web应用从MySQL数据库转到Oracle数据库只需要做很少的修改&#xff0c;而PHP则需要做大量的修改工作。 javaee与php的区别&#x…

解读JAVAEE是什么样的Java

javaEE是指java enterprise edition&#xff0c;java企业版&#xff0c;多用于企业级开发&#xff0c;包括web开发等等很多组建。javaEE开发会设计java的高级特性以及一些spring等架构&#xff0c;需要掌握的内容相对多。 JAVA&#xff0c;所有的Java平台都由一个JVM和一组应用…

java ee 下载什么意思_JavaEE到底是什么?

慕运维8079593 JavaEE只是一个规范吗&#xff1f;我的意思是&#xff1a;EJBJavaEE是吗&#xff1f;JavaEE确实是一个摘要规格说明。任何人都愿意开发和提供规范的工作实现。这个混凝土实现是所谓的应用服务器&#xff0c;如野弗利, 托梅, 玻璃鱼, 自由, WebLogic&#xff0c;等…

java ee api是什么意思_JavaEE的整体概述

标签&#xff1a; JavaEE整体概述 知识点&#xff1a; 1、整体概述JavaEE的知识体系 2、JavaEE是什么? 能干什么? 为什么需要JavaEE? 3、JavaEE有什么? JavaEE的技术体系? JavaEE的本质是什么? 4、JavaEE的零散基础知识 5、JavaEE的组件体系结构 -----------------------…

矩阵求逆方法

1.待定系数法 矩阵A 1, 2 -1,-3 假设所求的逆矩阵为 a,b c,d 则 从而可以得出方程组 a 2c 1 b 2d 0 -a - 3c 0 -b - 3d 1 解得 a3; b2; c -1; d -1 2.伴随矩阵求逆矩阵 伴随矩阵是矩阵元素所对应的代数余子式&#xff0c;所构成的矩阵&#xff0c;转置后得到的新矩阵…