Flink学习:WaterMark

article/2025/9/20 20:06:59

WaterMark

  • 一、什么是水位线?
  • 二、案例分析
  • 三、如何生成水位线?
    • (一)、在SourceFunction中直接定义Timestamps和Watermarks
    • (二)、自定义生成Timstamps和Watermarks

一、什么是水位线?

  • 通常情况下,由于网络或系统等外部因素影响,事件数据往往不能及时传输至Flink系统中,导致数据乱序或者延迟到达等问题,因此,需要有一种机制能够控制数据处理的过程和进度,这种机制就是水位线
  • 水位线本质上是一个时间戳,且是动态变化的,会根据最大事件时间生成
watermark = 进入Flink窗口的最大事件时间(maxEventTime) - 一定的延迟时间(t)
//这个延迟时间t是在程序当中配置的
  • watermark时间戳是与窗口结束时间比较的,当watermark大于窗口结束时间时,意味着窗口结束,需要触发窗口计算
  • 举个例子,某条数据的事件时间为2023:03:16 9:00:00,它的下一条数据的事件时间为2023:03:16 9:06:00,窗口设置为滚动窗口为5分钟,延迟时间t设置为2分钟,此时窗口结束时间为2023:03:16 9:00:00,水位线是2023:03:16 9:06:00 - 2min = 2023:03:16 9:04:00,watermark < window endtime,这两条数据应该在同一个窗口内,下面是具体的例子

二、案例分析

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.api.java.tuple._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Rowcase class User(id:Int,name:String,age:Int,timestamp:Long)
object SqlTest {def main(args: Array[String]): Unit = {val streamEnv = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = TableEnvironment.getTableEnvironment(streamEnv)//指定时间类型为事件时间streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = streamEnv.fromElements(User(1,"nie",22,1511658000),User(2,"hu",20,1511658000),User(2,"xiao",19,1511658000)).assignAscendingTimestamps(_.timestamp * 1000L) //指定水位线tEnv.registerDataStream("testTable",stream,'id, 'name,'age,'event_time.rowtime)val result = tEnv.sqlQuery("select id,sum(age) from testTable group by TUMBLE(event_time,INTERVAL '5' MINUTE),id")result.toRetractStream[Row].print()streamEnv.execute("windowTest")}
}
  • 如上述代码所示,三条数据时间戳一样,也就是事件时间相同,水位线为自带的时间戳*1000L转成毫秒,滚动窗口时间间隔为5min,这时计算结果应该是3条数据都在一个窗口中计算,最终会产生2条数据

在这里插入图片描述

  • 如下所示,把第三条时间戳增加300000,也就是增加了5分钟,下面三条数据的真实日期分别为2017-11-26 9:0:0、2017-11-26
    9:0:0、2017-11-26 9:5:0
    val stream = streamEnv.fromElements(User(1,"nie",22,1511658000000L),User(2,"hu",20,1511658000000L),User(2,"xiao",19,1511658300000L)).assignAscendingTimestamps(_.timestamp) //指定水位线
  • 第三条数据正好晚了5分钟,此时前两条数据在一个窗口,第三条数据在一个窗口,最终应该产生三条数据,如下所示

在这里插入图片描述
我们直到可以利用水位线处理延迟情况,上面assignAscendingTimestamps方法针对的是数据有序,无法设定允许延迟时间,也就无法处理数据延迟的情况,下面介绍几种生成水位线的方式

三、如何生成水位线?

生成水位线分为两步:

  • 第一步需要指定eventTime,可以通过StreamExecutionEnvironment的TimeCharacteristic指定,还需要在Flink程序中指定event
    time时间戳在数据中的字段信息,在Flink程序中会通过指定字段抽取出对应的事件时间,该过程叫做Timestamps Assigning
  • 第二步就是创建相应的Watermarks,需要用户定义根据Timestamps计算出Watermarks的生成策略
  • 目前Flink支持两种方式指定Timestamps和生成WaterMarks,一种方式在DataStream Source算子接口的Source Function定义,另一种方式是通过自定义Timestamp Assigner和Watermark Generator生成

(一)、在SourceFunction中直接定义Timestamps和Watermarks

(二)、自定义生成Timstamps和Watermarks

自定义生成分为两种:

  • Periodic Watermarks:根据设定时间间隔周期性地生成Watermarks
  • Punctuated Watermarks:根据接入数据的数量生成

1、Periodic Watermarks

  • Periodic Watermark又分为两种:升序模式和固定时延间隔

1)、升序模式

  • 会将数据中的Timestamp根据指定字段提取,并用当前的Timestamp作为最新的watermarks,适用于事件按顺序生成
  • 调用DataStream API中的assignAscendingTimestamps来指定Timestamp字段

eg:

    //指定时间类型为事件时间streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = streamEnv.fromElements(User(1,"nie",22,1511658000),User(2,"hu",20,1511658000),User(2,"xiao",19,1511658300)).assignAscendingTimestamps(_.timestamp * 1000L) //指定水位线

2)、使用固定时延间隔的Timestamp Assigner

  • 通过设定固定的时间间隔来指定Watermark落后于Timestamp的区间长度,也就是最长容忍到多长时间内的数据到达系统

如下代码所示,通过创建BoundedOutOfOrdernessTimestampExtractor实现类来定义Timestamp Assigner,其中第一个参数Time.seconds(10)代表了最长的时延为10s,第二个为extractTimestamp抽取逻辑,选择样例类User的第三个元素作为Timestamps
eg:

	case class User(id:Int,name:String,age:Int,timestamp:Long)//指定时间类型为事件时间streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = streamEnv.fromElements(User(1, "nie", 22, 1511658000),User(2, "hu", 20, 1511658000),User(2, "xiao", 19, 1511658000)).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[User](Time.seconds(10)) {override def extractTimestamp(t: User): Long = t.timestamp})

eg:

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.api.java.tuple._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Rowcase class User(id:Int,name:String,age:Int,timestamp:Long)
object SqlTest {def main(args: Array[String]): Unit = {val streamEnv = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = TableEnvironment.getTableEnvironment(streamEnv)//指定时间类型为事件时间streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = streamEnv.fromElements(User(1, "nie", 22, 1511658000000L),User(2, "hu", 20, 1511658000000L),User(2, "xiao", 19, 1511658003000L),User(2, "feng", 31, 1511658002000L)).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[User](Time.seconds(1)) {override def extractTimestamp(t: User): Long = t.timestamp})tEnv.registerDataStream("testTable",stream,'id, 'name,'age,'event_time.rowtime)val result = tEnv.sqlQuery("select id,sum(age) from testTable group by TUMBLE(event_time,INTERVAL '2' SECOND),id")result.toRetractStream[Row].print()streamEnv.execute("windowTest")}
}

如上述代码所示,设置滚动窗口,窗口大小为2s,允许延迟时间为1s,四条数据的日期分别为2017-11-26 9:0:0、2017-11-26 9:0:0、2017-11-26 9:0:3、2017-11-26 9:0:2,可以看到第1、2、4条数据应该属于同一个窗口,只不过第四条数据延迟了,当第三条数据到达后,水位线应该为1511658003000L - 1000 = 1511658002000L,没有超过窗口结束时间1511658002000L,所以不触发窗口计算,第1、2、4条数据应该还是在一个窗口中计算的
在这里插入图片描述
这时候我们修改下代码,如下所示,修改了第三条数据的时间戳

    val stream = streamEnv.fromElements(User(1, "nie", 22, 1511658000000L),User(2, "hu", 20, 1511658000000L),User(2, "xiao", 19, 1511658004000L),User(2, "feng", 31, 1511658002000L)).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[User](Time.seconds(1)) {override def extractTimestamp(t: User): Long = t.timestamp})

这时候当第三条数据到达的时候,水位线为1511658004000L - 1000 = 1511658003000L,超过了窗口结束时间1511658002000L,前两条数据触发计算,这时候第四条数据就没有加入计算
在这里插入图片描述
2、Punctuated Watermarks

  • 上述两种是根据时间周期生成Periodic Watermark,用户也可以根据某些特殊条件生成Punctuated Watermarks
  • 如判断数据流中某特殊元素的数量满足条件后生成Watermarks
  • 生成Punctuated Watermarks的逻辑需要通过实现AssignerWithPunctuatedWatermarks接口定义,然后分别复写extractTimestamp方法和checkAndGetNextWatermark方法

eg:判断某个元素的当前状态,如果状态为0则触发生成Watermarks,如果状态不为0,则不触发生成Watermarks。

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[(String,Long,Int)]{//复写extractTimestamps方法,定义抽取Timestamp逻辑override def extractTimestamp(element:(String,Long,Int),
previousElementTimestamp:Long):Long = {element._2}//复写checkAndGetNextWatermark方法,定义Watermark生成逻辑override def checkAndGetNextWatermark(lastElement:(String,Long,Int),
extractedTimestamp:Long):Watermark = {//根据元素中第三位字段状态是否为0生成Watermarkif (lastElement._3 == 0) new Watermark(extractedTimestamp) else null}
}

http://chatgpt.dhexx.cn/article/RqGib28i.shtml

相关文章

flink watermark

flink1.12版本开始&#xff0c;事件事件作为默认的时间语义 watermark是flink逻辑时钟&#xff0c;不是真正意义上的表&#xff0c;而是靠着数据去推动它的时间不停的往前走 工厂生产的商品上面印有时间戳&#xff0c;八点到九点的商品要坐一班车运走&#xff0c;商品从生产到…

Flink WaterMark 详解

摘录仅供学习使用&#xff0c;原文来自&#xff1a; Flink详解系列之五--水位线&#xff08;watermark&#xff09; - 简书 1、概念 在Flink中&#xff0c;水位线是一种衡量Event Time进展的机制&#xff0c;用来处理实时数据中的乱序问题的&#xff0c;通常是水位线和窗口结合…

Flink:watermark

Table of Contents 三种时间概念 Processing time Event Time Ingestion time watermark 并行流的Watermarks 迟到的事件 watermark分配器 watermark的两种分配器 三种时间概念 在谈watermark之前&#xff0c;首先需要了解flink的三种时间概念。在flink中&#xff0c;…

Flink 水位线(Watermark)

文章目录 什么是水位线水位线的特性如何生成水位线Flink 内置水位线生成器自定义水位线策略在自定义数据源中发送水位线水位线的总结 在实际应用中&#xff0c;一般会采用事件时间语义。而水位线&#xff0c;就是基于事件时间提出的概念。一个数据产生的时刻&#xff0c;就是流…

vue -- watermark水印添加方法

作者&#xff1a;蛙哇 原文链接&#xff1a; https://segmentfault.com/a/1190000022055867 来源&#xff1a;segmentfault 前言 项目生成公司水印是很普遍的需求&#xff0c;下面是vue项目生产水印的方法。话不多说&#xff0c;复制粘贴就可以马上解决你的需求。 步骤1 创建…

tp-watermark.js网页添加水印插件

tp-watermark.js网页添加水印插件 作者&#xff1a;鹏仔先生 上周五&#xff0c;出差去改上个前端遗留的小问题&#xff0c;用到了watermark.js这个网站添加水印插件&#xff0c;功能很简单&#xff0c;就是给网页添加个水印&#xff0c;我看了下网上&#xff0c;有很多种&…

实用有效!React项目中使用watermark.js添加水印效果

为了避免公司的内部文档被截图外泄&#xff0c;有必要在系统页面上面增加水印。 第一步&#xff1a; 下载依赖包&#xff1a; npm install watermark-dompackage.json中会添加一个依赖如下&#xff1a; "watermark-dom": "^2.3.0"第二步&#xff1a; 引…

Flink之水位线(Watermark)

在流数据处理应用中&#xff0c;一个很重要、也很常见的操作就是窗口计算。所谓的“窗口”&#xff0c;一般就是划定的一段时间范围&#xff0c;也就是“时间窗”&#xff1b;对在这范围内的数据进行处理&#xff0c;就是所谓的窗口计算。所以窗口和时间往往是分不开的。接下来…

水印watermark

第一步:npm获取水印组件包 npm install watermark-dom 第二步:引入水印模块 import watermark from ‘watermark-dom’ 或者 var watermarkDom require(“watermark-dom”) 根据业务需要&#xff0c;我是登入之后的页面才有水印&#xff0c;前者我是放在验证用户登录状态js文件…

Flink流计算编程--watermark(水位线)简介

1、watermark的概念 watermark是一种衡量Event Time进展的机制&#xff0c;它是数据本身的一个隐藏属性。通常基于Event Time的数据&#xff0c;自身都包含一个timestamp&#xff0c;例如1472693399700&#xff08;2016-09-01 09:29:59.700&#xff09;&#xff0c;而这条数据…

Flink之watermark(水印)讲解

flink中watermark的详细介绍 使用前提&#xff1a; 处理数据开窗&#xff0c;处理数据的时间语义是事件时间&#xff0c;也就是每条数据产生的时间。 使用场景&#xff08;解决问题&#xff09;&#xff1a; 处理乱序数据&#xff1a;flink中是实时处理数据&#xff0c;但是…

WaterMark使用和详解

上篇&#xff1a;基于flink的会话窗口的api实现 WaterMark翻译为水位线&#xff0c;什么时候用到水位线呢&#xff1f; 比如说水控在顺水的时候达到紧梯就会触发&#xff0c;若不放水就可以发现危险的现状 在spark程序划分成窗口的时候&#xff0c;主要是衡量什么时候触发&am…

【大数据】带你理解并使用flink中的WaterMark机制

文章目录 一、引导二、WaterMark1、Watermark的原理2、Watermark 的使用2.1、顺序数据流中的watermark示例 2.2、乱序数据流中的WaterMark2.2.1、With Periodic&#xff08;周期性的&#xff09; Watermark示例一&#xff1a;使用周期性的WaterMark2.2.2、With Punctuated&…

JavaEE是什么?JavaSE又是什么?两者的区别有哪些?

Java作为最流行的编程语言受到了许多人的喜爱&#xff0c;其在编程中的地位自不必多说。对于许多才刚刚入门Java的朋友来讲&#xff0c;常常会产生这样的困惑&#xff0c;JavaEE是什么&#xff1f;JavaSE又是什么&#xff1f;两者的区别有哪些&#xff1f;学哪个比较好&#xf…

php mysql Javaee_javaee与php的区别是什么

javaee与php的区别&#xff1a;1、JavaEE是门面向对象的程序设计语言&#xff0c;而PHP是解释执行的服务器脚本语言&#xff1b;2、用JavaEE开发的Web应用从MySQL数据库转到Oracle数据库只需要做很少的修改&#xff0c;而PHP则需要做大量的修改工作。 javaee与php的区别&#x…

解读JAVAEE是什么样的Java

javaEE是指java enterprise edition&#xff0c;java企业版&#xff0c;多用于企业级开发&#xff0c;包括web开发等等很多组建。javaEE开发会设计java的高级特性以及一些spring等架构&#xff0c;需要掌握的内容相对多。 JAVA&#xff0c;所有的Java平台都由一个JVM和一组应用…

java ee 下载什么意思_JavaEE到底是什么?

慕运维8079593 JavaEE只是一个规范吗&#xff1f;我的意思是&#xff1a;EJBJavaEE是吗&#xff1f;JavaEE确实是一个摘要规格说明。任何人都愿意开发和提供规范的工作实现。这个混凝土实现是所谓的应用服务器&#xff0c;如野弗利, 托梅, 玻璃鱼, 自由, WebLogic&#xff0c;等…

java ee api是什么意思_JavaEE的整体概述

标签&#xff1a; JavaEE整体概述 知识点&#xff1a; 1、整体概述JavaEE的知识体系 2、JavaEE是什么? 能干什么? 为什么需要JavaEE? 3、JavaEE有什么? JavaEE的技术体系? JavaEE的本质是什么? 4、JavaEE的零散基础知识 5、JavaEE的组件体系结构 -----------------------…

矩阵求逆方法

1.待定系数法 矩阵A 1, 2 -1,-3 假设所求的逆矩阵为 a,b c,d 则 从而可以得出方程组 a 2c 1 b 2d 0 -a - 3c 0 -b - 3d 1 解得 a3; b2; c -1; d -1 2.伴随矩阵求逆矩阵 伴随矩阵是矩阵元素所对应的代数余子式&#xff0c;所构成的矩阵&#xff0c;转置后得到的新矩阵…

复数矩阵求逆的 C 语言程序

关于复数矩阵求逆&#xff0c;如果使用 MATLAB&#xff0c;就非常简单。我们先用一个 MATLAB 的例子来说明&#xff0c;等会要将 C 语言的程序和 MATLAB 的程序进行对比。 close all; clear all; clc;%定义矩阵a为复数矩阵 a [[42*i,31*i,43*i,55*i];[17*i,82*i,22*i,93*i];[…