WaterMark使用和详解

article/2025/9/20 21:19:24

上篇:基于flink的会话窗口的api实现

WaterMark翻译为水位线,什么时候用到水位线呢?

比如说水控在顺水的时候达到紧梯就会触发,若不放水就可以发现危险的现状

在spark程序划分成窗口的时候,主要是衡量什么时候触发,这也是需要用到的水位线,其实它是来判断水位窗口触发的机制,在这个窗口水位线会不停的增加

其实水位线有两种方式获取,一种是根据数据时间来提取,另一种是定期生成水位线

当我们输入的数据有大也有小的,它就会用这个分区最大的Eventime作为它的水位线

那么这个水位线是怎么计算出来的?

实水位线还有一个作用,让窗口延迟发,举一个例子,我们在生产环境中,拉数据是从中间件拉取出来的,如kafka。

在kakfa下有多个分区,由生产者写入进入,在生产者有2个或多个写,当一对一写完,它还会切换写,在kafka里如果只有一个分区它是有序的,但是多个分区就无法保证它是有序的

一开始写一条数据,在另一个消费者会有延迟, 比如:第一个生产者由于网络的问题就会发生延迟效果,如图所示:

 接下来,采用flink从kakfa里面的数据拉过来 ,但是如何拉取呢?其实flink的并行再大,它的solt只有2个state ,通过直连拉数据有可能是延迟,但是如何容忍数据的延续时间,这一点就是需要解决数据乱序问题。其实,在窗口迟到的数据是不会被触发的

带着这个问题,可以去设计吗?可以的

1、Watermarks的设计主要从 它们定义何时停止等待早期事件

Flink中的事件时间处理取决于特殊的带时间戳的元素,称为watermarks,由数据源或watermarks生成器插入到流中。 具有时间戳t的watermarks可以被理解为断言(assertion )所有具有时间戳<t的事件已经(具有合理的概率)已经到达

2、我们可以设想不同的策略来决定如何生成watermarks

我们知道每个事件都会在延迟一段时间后到达并且这些延迟会有所不同,因此有些事件会比其他事件延迟更多。 一种简单的方法是假设这些延迟受到一些最大延迟的限制。 Flink将此策略称为有界无序watermarks。 很容易想象出更复杂的watermarks方法,但对于许多应用来说,固定延迟效果还不错。

如果要构建像流分类器这样的应用程序,Flink的ProcessFunction是正确的构建块。 它提供对事件时间(event-time )计时器的访问(即,基于watermarks到达而触发的回调),并具有用于管理缓冲事件所需状态的挂钩,直到轮到它们被发送到下游

代码实现:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;/*** 先keyBy,再进行EventTime划分滚动窗口--[无界流]*  设置延迟时间为2秒**/
public class EventTimeTumblingWindowAllDemo4 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());//1000,spark,3//1200,spark,5//2000,hadoop,2//socketTextStream返回的并行度为1DataStreamSource<String> lines = env.socketTextStream("Master", 8888);//TODO 当前分区中的数据的数据携带的最大EventTime - 乱序延迟时间 >= 窗口的结束时间 就会触发该窗口SingleOutputStreamOperator<String> dataWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {  //设置为0,这里表示数据的延迟时间@Overridepublic long extractTimestamp(String element) {//提取数据中的时间return Long.parseLong(element.split(",")[0]);}});SingleOutputStreamOperator<Tuple2<String,Integer>> workAndCount = dataWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {String[] fields = s.split(",");return Tuple2.of(fields[1],Integer.parseInt(fields[2]));}});env.execute();}
}

查看job:http://localhost:8081/#/job/01840ac7b18e65bbec107848545fe68e/overview

使用场景(解决问题) 

处理乱序数据:flink中是实时处理数据,但是在处理数据的时候会出现因为网络传输的问题,所以数据先产生的反而到后面才到达,在被处理时候就会出现数据混乱,而且因为开窗,窗口关闭但是本窗口的数据来迟,导致数据丢失;

多并行度下的watermark

一个子任务中watermark会发往所有下一算子中的子任务,也就是一发多,
同样一个子任务会接收上一个算子中所有子任务的watermark,这时起作用的就是最小的哪一个watermark。
watermark可以理解为一个特殊的数据,这个数据不参与计算,仅仅是对窗口的触发关闭起作用;


http://chatgpt.dhexx.cn/article/yVHmrqky.shtml

相关文章

【大数据】带你理解并使用flink中的WaterMark机制

文章目录 一、引导二、WaterMark1、Watermark的原理2、Watermark 的使用2.1、顺序数据流中的watermark示例 2.2、乱序数据流中的WaterMark2.2.1、With Periodic&#xff08;周期性的&#xff09; Watermark示例一&#xff1a;使用周期性的WaterMark2.2.2、With Punctuated&…

JavaEE是什么?JavaSE又是什么?两者的区别有哪些?

Java作为最流行的编程语言受到了许多人的喜爱&#xff0c;其在编程中的地位自不必多说。对于许多才刚刚入门Java的朋友来讲&#xff0c;常常会产生这样的困惑&#xff0c;JavaEE是什么&#xff1f;JavaSE又是什么&#xff1f;两者的区别有哪些&#xff1f;学哪个比较好&#xf…

php mysql Javaee_javaee与php的区别是什么

javaee与php的区别&#xff1a;1、JavaEE是门面向对象的程序设计语言&#xff0c;而PHP是解释执行的服务器脚本语言&#xff1b;2、用JavaEE开发的Web应用从MySQL数据库转到Oracle数据库只需要做很少的修改&#xff0c;而PHP则需要做大量的修改工作。 javaee与php的区别&#x…

解读JAVAEE是什么样的Java

javaEE是指java enterprise edition&#xff0c;java企业版&#xff0c;多用于企业级开发&#xff0c;包括web开发等等很多组建。javaEE开发会设计java的高级特性以及一些spring等架构&#xff0c;需要掌握的内容相对多。 JAVA&#xff0c;所有的Java平台都由一个JVM和一组应用…

java ee 下载什么意思_JavaEE到底是什么?

慕运维8079593 JavaEE只是一个规范吗&#xff1f;我的意思是&#xff1a;EJBJavaEE是吗&#xff1f;JavaEE确实是一个摘要规格说明。任何人都愿意开发和提供规范的工作实现。这个混凝土实现是所谓的应用服务器&#xff0c;如野弗利, 托梅, 玻璃鱼, 自由, WebLogic&#xff0c;等…

java ee api是什么意思_JavaEE的整体概述

标签&#xff1a; JavaEE整体概述 知识点&#xff1a; 1、整体概述JavaEE的知识体系 2、JavaEE是什么? 能干什么? 为什么需要JavaEE? 3、JavaEE有什么? JavaEE的技术体系? JavaEE的本质是什么? 4、JavaEE的零散基础知识 5、JavaEE的组件体系结构 -----------------------…

矩阵求逆方法

1.待定系数法 矩阵A 1, 2 -1,-3 假设所求的逆矩阵为 a,b c,d 则 从而可以得出方程组 a 2c 1 b 2d 0 -a - 3c 0 -b - 3d 1 解得 a3; b2; c -1; d -1 2.伴随矩阵求逆矩阵 伴随矩阵是矩阵元素所对应的代数余子式&#xff0c;所构成的矩阵&#xff0c;转置后得到的新矩阵…

复数矩阵求逆的 C 语言程序

关于复数矩阵求逆&#xff0c;如果使用 MATLAB&#xff0c;就非常简单。我们先用一个 MATLAB 的例子来说明&#xff0c;等会要将 C 语言的程序和 MATLAB 的程序进行对比。 close all; clear all; clc;%定义矩阵a为复数矩阵 a [[42*i,31*i,43*i,55*i];[17*i,82*i,22*i,93*i];[…

科学计算器如何求矩阵的逆

大学本科买了四年的计算器不会求逆&#xff0c;到了研究生了好好研究下这个功能&#xff0c;终于终于会用了&#xff0c;以往 对着那个矩阵功能都发懵&#x1f602;&#xff0c;记录一下这个史诗无敌隐藏功能 1、进入菜单&#xff0c;点击4进入矩阵菜单 2、这里选择1定义矩阵A…

matlab求一个矩阵的逆矩阵的命令,如何用MATLAB求逆矩阵

如何用MATLAB求逆矩阵以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧! 如何用MATLAB求逆矩阵 如果英文好呢,自己看目录 不好还是先看中文的教材,对matlab的框架和功能有了一定的了解后,自己也就看的懂帮助里面…

matlab矩阵求逆的模块,matlab矩阵求逆矩阵

matlab矩阵求逆矩阵 因为 所以该矩阵可逆&#xff0c;根据 &#xff0c;其中 得到 计算矩阵A每个元素的代数余子式&#xff1a; 所以 可得&#xff1a; matlab计算如下&#xff1a; >> A1[1 2 2;2 1 -2;2 -2 1] A1 1 2 2 2 1 -2 2 -2 1 >> >> >> A2in…

求矩阵的逆的三种方法

我们知道求矩阵的逆具有非常重要的意义&#xff0c;本文分享给大家如何针对3阶以内的方阵&#xff0c;求出逆矩阵的3种手算方法&#xff1a;待定系数法、伴随矩阵法、初等变换法&#xff08;只介绍初等行变换&#xff09; 待定系数法求逆矩阵 1 首先&#xff0c;我们来看如何使…

千万不能错过的Android NDK下载安装及配置

Java 语言是一个跨平台的语言&#xff0c;有着“Write Once&#xff0c;Run Anywhere”的美誉。但是却导致了它和本地交互的能力不够强&#xff0c;无法完成一些和操作系统相关的特性。 而 JNI 就是 Java Native Interface&#xff08;Java 本地接口&#xff09;&#xff0c;用…

NDK在Linux下载配置以及C、C++编译配置(交叉编译)

NDK在Linux下载配置以及C、C编译配置&#xff08;交叉编译&#xff09; 前言&#xff1a; 我们搭建好Ubutu虚拟机之后&#xff0c;通过xShell远程登录Ubutu上SSH服务器&#xff0c;在xShell上可以进行相关的编译操作了。但是我们在xShell上gcc、g编译的可执行文件只能在Linux…

Android——NDK下载提示缺少toolchains问题解决

更新下载了最新的NDK&#xff0c;发现Android SDK报错。 No toolchains found in the NDK toolchains folder for ABI with prefix: mips64el-linux-android。 解决办法&#xff1a; 1.找到并进入下载安装的ndk目录 lydeMacBook-Pro:~ imac$ cd /Users/ly/Library/Android/sd…

android ndk 下载安装(ubuntu)

1. 下载并解压安装包 官网下载&#xff1a;https://developer.android.com/studio ndk各个版本下载地址&#xff1a;https://blog.csdn.net/u011077027/article/details/102706283 官网下载需要梯子&#xff0c;百度云盘下载&#xff1a;链接: https://pan.baidu.com/s/1Ge8fQu…

Window NDK下载以及环境变量配置

作者介绍&#xff1a;铸梦xy。IT公司技术合伙人&#xff0c;IT高级讲师&#xff0c;资深Unity架构师&#xff0c;铸梦之路系列课程创始人。 第一种 NDK下载安装步骤一 NDK环境变量配置步骤一 测试NDK配置 第二种 支持编译c ninja 前言 NDK是开发者必不可少的一部分&#xff0c…

3、NDK下载、安装

文章目录 一、下载NDK二、配置环境变量三、测试 一、下载NDK 官网下载地址:https://developer.android.google.cn/ndk/downloads/,选择自己相应的版本&#xff0c;下载解压。(我安装在D:\utils\android-ndk-r21d) 特别注意&#xff1a;安装路径不要有中文和空格 二、配置环境…

【Android NDK 开发】Android NDK 下载 ( 下载指定历史版本 NDK | Android NDK r10e - 2015 年 5 月 )

文章目录 一、下载指定历史版本 NDK 一、下载指定历史版本 NDK 进入到 ndk 下载的 " 修订历史记录 " 页面 https://developer.android.google.cn/ndk/downloads/revision_history ; 在该页面中 , 有 Android NDK r1&#xff08;2009 年 6 月&#xff09;~ Android N…

android ndk官网下载地址,android ndk下载

android ndk&#xff0c;在SDK前加上原生二字就是Native Development Kit&#xff0c;支持32位和64位使用&#xff0c;有需要的朋友可以来本站下载。 【使用方法】 1、打开Eclipse&#xff0c;点Window->Preferences->Android->NDK,设置NDK路径&#xff0c;例如Shamoo…