flink watermark

article/2025/9/20 20:10:33

flink1.12版本开始,事件事件作为默认的时间语义

watermark是flink逻辑时钟,不是真正意义上的表,而是靠着数据去推动它的时间不停的往前走

工厂生产的商品上面印有时间戳,八点到九点的商品要坐一班车运走,商品从生产到运上车中间有一定的时间间隔,班车不能以系统时间作为时间判断标准,而应该以商品上面自带的时间戳作为时间判断标准,八点十分的商品来了,班车认为现在时间到了八点十分,九点钟的商品来了,班车认为现在时间到了九点,那么班车就带着[8,9)的商品出发了
但是这样的话只有当前的班车知道现在时间是多少,后面的班车不知道现在时间(逻辑时间),那么后面的班车就不能进行时间相关的操作 应该有一个标志来指明当前数据流里面时钟到底是怎么样前进的,而且这些标志需要从前边的算子任务传递到后面的算子任务,即使当前窗口数据没有输出,也要把当前时钟的标志传递到下游,下游的任务就不用依赖数据里面的标签了

那么水位线就是用来指明当前逻辑时钟进展的标记 (毫秒数)

问题一:如果数据稀疏,来一个数据判断一下时间戳,插入对应的水位线,没问题
但是如果数据非常稠密,同一毫秒有海量的数据到来,这时候如果还每一条数据都判断时间戳,插入水位线,就做了大量的无用功
解决方法:周期性的生成水位线

flink自带的水位线生成方法中,生成水位线的时间间隔为200毫秒
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 发射水位线,默认 200ms 调用一次
output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
}
我们在 onPeriodicEmit()里调用 output.emitWatermark(),就可以发出水位线了;这个方法由系统框架周期性地调用,默认 200ms 一次。
可以通过以下方法设置
env.getConfig().setAutoWatermarkInterval(1000L);
可以通过以下方法查看系统默认间隔
System.out.println(env.getConfig().getAutoWatermarkInterval());
也可以在源码中查看
在这里插入图片描述

问题二:数据是乱序的,比方说八点二十的数据已经到了,八点十分的数据才到,这时候再按照迟到的八点十分的数据生成水位线就倒退了,但是时间一定是单调递增的,时间是不能倒退的
解决方法:判断当前最新的时间戳是否比之前最大的时间戳要大,如果大的话,时间才朝前进展,如果小,那就是一个迟到数据,对时间的进展没有贡献
周期性生成水位线,保留之前所有数据中最大时间戳,需要插入水位线的时候,直接以它作为时间戳生成新的水位线

在水位线策略中的forBoundedOutOfOrderness方法里面的onEvent方法中,每个事件到来的时候都会做一次判断,maxTimestamp = Math.max(maxTimestamp, eventTimestamp);判断是否为最大的时间戳

问题三:如何处理迟到数据
解决方法:经验性的给一个延迟时间
两种方案:1.比方说[0-9)秒的窗口,给了2秒的延迟,那么等到11秒水位线生成的时候才发车 2.[0-9)秒的窗口,给了2秒的延迟,最大时间戳为9秒的时候,生成的水位线减两秒,最大时间戳为11秒的时候,生成的水位线减两秒,也就是九秒,刚好发车 两种处理方法是等价的,但是后面一种更好理解,所以我们选择后面一种

在处理乱序流的forBoundedOutOfOrderness()的参数中可以传Duration.ofSeconds(2)来标记需要水位线延迟几秒钟生成
在水位线生成方法onPeriodicEmit中可以看到
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
最后-1毫秒表示包头不包尾

设置水位线离源越近越好

允许延迟
.allowedLateness(Time.minutes(1))

侧输出流
OutputTag<> outputTag = new OutputTag<>(“late”){};

.sideOutputLateData(outputTag)

生成水位线方法.assignTimestampAndWatermarks()中
参数为水位线策略WatermarkStrategy
水位线策略WatermarkStrategy中需要实现两个方法,
分别是createWatermarkGenerator水位线生成和createTimestampAssigner时间戳分配
调用forBoundedOutOfOrderness完成水位线生成
调用withTimestampAssigner完成时间戳分配

  • 示例

.assignTimestampsAndWatermarks(//分配时间戳和水位线
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))//延迟两秒的乱序流水位线生成器
.withTimestampAssigner( (SerializableTimestampAssigner) (adData,recordTimestamp) -> adData.getTimestamp() )//序列化的时间戳分配器
);
在这里插入图片描述


http://chatgpt.dhexx.cn/article/JxaJdj2w.shtml

相关文章

Flink WaterMark 详解

摘录仅供学习使用&#xff0c;原文来自&#xff1a; Flink详解系列之五--水位线&#xff08;watermark&#xff09; - 简书 1、概念 在Flink中&#xff0c;水位线是一种衡量Event Time进展的机制&#xff0c;用来处理实时数据中的乱序问题的&#xff0c;通常是水位线和窗口结合…

Flink:watermark

Table of Contents 三种时间概念 Processing time Event Time Ingestion time watermark 并行流的Watermarks 迟到的事件 watermark分配器 watermark的两种分配器 三种时间概念 在谈watermark之前&#xff0c;首先需要了解flink的三种时间概念。在flink中&#xff0c;…

Flink 水位线(Watermark)

文章目录 什么是水位线水位线的特性如何生成水位线Flink 内置水位线生成器自定义水位线策略在自定义数据源中发送水位线水位线的总结 在实际应用中&#xff0c;一般会采用事件时间语义。而水位线&#xff0c;就是基于事件时间提出的概念。一个数据产生的时刻&#xff0c;就是流…

vue -- watermark水印添加方法

作者&#xff1a;蛙哇 原文链接&#xff1a; https://segmentfault.com/a/1190000022055867 来源&#xff1a;segmentfault 前言 项目生成公司水印是很普遍的需求&#xff0c;下面是vue项目生产水印的方法。话不多说&#xff0c;复制粘贴就可以马上解决你的需求。 步骤1 创建…

tp-watermark.js网页添加水印插件

tp-watermark.js网页添加水印插件 作者&#xff1a;鹏仔先生 上周五&#xff0c;出差去改上个前端遗留的小问题&#xff0c;用到了watermark.js这个网站添加水印插件&#xff0c;功能很简单&#xff0c;就是给网页添加个水印&#xff0c;我看了下网上&#xff0c;有很多种&…

实用有效!React项目中使用watermark.js添加水印效果

为了避免公司的内部文档被截图外泄&#xff0c;有必要在系统页面上面增加水印。 第一步&#xff1a; 下载依赖包&#xff1a; npm install watermark-dompackage.json中会添加一个依赖如下&#xff1a; "watermark-dom": "^2.3.0"第二步&#xff1a; 引…

Flink之水位线(Watermark)

在流数据处理应用中&#xff0c;一个很重要、也很常见的操作就是窗口计算。所谓的“窗口”&#xff0c;一般就是划定的一段时间范围&#xff0c;也就是“时间窗”&#xff1b;对在这范围内的数据进行处理&#xff0c;就是所谓的窗口计算。所以窗口和时间往往是分不开的。接下来…

水印watermark

第一步:npm获取水印组件包 npm install watermark-dom 第二步:引入水印模块 import watermark from ‘watermark-dom’ 或者 var watermarkDom require(“watermark-dom”) 根据业务需要&#xff0c;我是登入之后的页面才有水印&#xff0c;前者我是放在验证用户登录状态js文件…

Flink流计算编程--watermark(水位线)简介

1、watermark的概念 watermark是一种衡量Event Time进展的机制&#xff0c;它是数据本身的一个隐藏属性。通常基于Event Time的数据&#xff0c;自身都包含一个timestamp&#xff0c;例如1472693399700&#xff08;2016-09-01 09:29:59.700&#xff09;&#xff0c;而这条数据…

Flink之watermark(水印)讲解

flink中watermark的详细介绍 使用前提&#xff1a; 处理数据开窗&#xff0c;处理数据的时间语义是事件时间&#xff0c;也就是每条数据产生的时间。 使用场景&#xff08;解决问题&#xff09;&#xff1a; 处理乱序数据&#xff1a;flink中是实时处理数据&#xff0c;但是…

WaterMark使用和详解

上篇&#xff1a;基于flink的会话窗口的api实现 WaterMark翻译为水位线&#xff0c;什么时候用到水位线呢&#xff1f; 比如说水控在顺水的时候达到紧梯就会触发&#xff0c;若不放水就可以发现危险的现状 在spark程序划分成窗口的时候&#xff0c;主要是衡量什么时候触发&am…

【大数据】带你理解并使用flink中的WaterMark机制

文章目录 一、引导二、WaterMark1、Watermark的原理2、Watermark 的使用2.1、顺序数据流中的watermark示例 2.2、乱序数据流中的WaterMark2.2.1、With Periodic&#xff08;周期性的&#xff09; Watermark示例一&#xff1a;使用周期性的WaterMark2.2.2、With Punctuated&…

JavaEE是什么?JavaSE又是什么?两者的区别有哪些?

Java作为最流行的编程语言受到了许多人的喜爱&#xff0c;其在编程中的地位自不必多说。对于许多才刚刚入门Java的朋友来讲&#xff0c;常常会产生这样的困惑&#xff0c;JavaEE是什么&#xff1f;JavaSE又是什么&#xff1f;两者的区别有哪些&#xff1f;学哪个比较好&#xf…

php mysql Javaee_javaee与php的区别是什么

javaee与php的区别&#xff1a;1、JavaEE是门面向对象的程序设计语言&#xff0c;而PHP是解释执行的服务器脚本语言&#xff1b;2、用JavaEE开发的Web应用从MySQL数据库转到Oracle数据库只需要做很少的修改&#xff0c;而PHP则需要做大量的修改工作。 javaee与php的区别&#x…

解读JAVAEE是什么样的Java

javaEE是指java enterprise edition&#xff0c;java企业版&#xff0c;多用于企业级开发&#xff0c;包括web开发等等很多组建。javaEE开发会设计java的高级特性以及一些spring等架构&#xff0c;需要掌握的内容相对多。 JAVA&#xff0c;所有的Java平台都由一个JVM和一组应用…

java ee 下载什么意思_JavaEE到底是什么?

慕运维8079593 JavaEE只是一个规范吗&#xff1f;我的意思是&#xff1a;EJBJavaEE是吗&#xff1f;JavaEE确实是一个摘要规格说明。任何人都愿意开发和提供规范的工作实现。这个混凝土实现是所谓的应用服务器&#xff0c;如野弗利, 托梅, 玻璃鱼, 自由, WebLogic&#xff0c;等…

java ee api是什么意思_JavaEE的整体概述

标签&#xff1a; JavaEE整体概述 知识点&#xff1a; 1、整体概述JavaEE的知识体系 2、JavaEE是什么? 能干什么? 为什么需要JavaEE? 3、JavaEE有什么? JavaEE的技术体系? JavaEE的本质是什么? 4、JavaEE的零散基础知识 5、JavaEE的组件体系结构 -----------------------…

矩阵求逆方法

1.待定系数法 矩阵A 1, 2 -1,-3 假设所求的逆矩阵为 a,b c,d 则 从而可以得出方程组 a 2c 1 b 2d 0 -a - 3c 0 -b - 3d 1 解得 a3; b2; c -1; d -1 2.伴随矩阵求逆矩阵 伴随矩阵是矩阵元素所对应的代数余子式&#xff0c;所构成的矩阵&#xff0c;转置后得到的新矩阵…

复数矩阵求逆的 C 语言程序

关于复数矩阵求逆&#xff0c;如果使用 MATLAB&#xff0c;就非常简单。我们先用一个 MATLAB 的例子来说明&#xff0c;等会要将 C 语言的程序和 MATLAB 的程序进行对比。 close all; clear all; clc;%定义矩阵a为复数矩阵 a [[42*i,31*i,43*i,55*i];[17*i,82*i,22*i,93*i];[…

科学计算器如何求矩阵的逆

大学本科买了四年的计算器不会求逆&#xff0c;到了研究生了好好研究下这个功能&#xff0c;终于终于会用了&#xff0c;以往 对着那个矩阵功能都发懵&#x1f602;&#xff0c;记录一下这个史诗无敌隐藏功能 1、进入菜单&#xff0c;点击4进入矩阵菜单 2、这里选择1定义矩阵A…