Flink流计算编程--watermark(水位线)简介

article/2025/9/20 21:21:05

1、watermark的概念

watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性。通常基于Event Time的数据,自身都包含一个timestamp,例如1472693399700(2016-09-01 09:29:59.700),而这条数据的watermark时间则可能是:

watermark(1472693399700) = 1472693396700(2016-09-01 09:29:56.700)

这条数据的watermark时间是什么含义呢?即:timestamp小于1472693396700(2016-09-01 09:29:56.700)的数据,都已经到达了。

这里写图片描述
图中蓝色虚线和实线代表着watermark的时间。

2、watermark有什么用?

watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。

我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。

但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。

3、watermark如何分配?

通常,在接收到source的数据后,应该立刻生成watermark;但是,也可以在source后,应用简单的map或者filter操作,然后再生成watermark。

生成watermark的方式主要有2大类:

(1):With Periodic Watermarks
(2):With Punctuated Watermarks

第一种可以定义一个最大允许乱序的时间,这种情况应用较多。
我们主要来围绕Periodic Watermarks来说明,下面是生成periodic watermark的方法:

/*** This generator generates watermarks assuming that elements come out of order to a certain degree only.* The latest elements for a certain timestamp t will arrive at most n milliseconds after the earliest* elements for timestamp t.*/
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {val maxOutOfOrderness = 3500L; // 3.5 secondsvar currentMaxTimestamp: Long;override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {val timestamp = element.getCreationTime()currentMaxTimestamp = max(timestamp, currentMaxTimestamp)timestamp;}override def getCurrentWatermark(): Watermark = {// return the watermark as current highest timestamp minus the out-of-orderness boundnew Watermark(currentMaxTimestamp - maxOutOfOrderness);}
}

程序中有一个extractTimestamp方法,就是根据数据本身的Event time来获取;还有一个getCurrentWatermar方法,是用currentMaxTimestamp - maxOutOfOrderness来获取的。

这里的概念有点抽象,下面我们结合数据,在window中来实际演示下每个element的timestamp和watermark是多少,以及何时触发window。

4、生成并跟踪watermark代码

4.1、程序说明
我们从socket接收数据,然后经过map后立刻抽取timetamp并生成watermark,之后应用window来看看watermark和event time如何变化,才导致window被触发的。
4.2、代码如下

import java.text.SimpleDateFormatimport org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collectorobject WatermarkTest {def main(args: Array[String]): Unit = {if (args.length != 2) {System.err.println("USAGE:\nSocketWatermarkTest <hostname> <port>")return}val hostName = args(0)val port = args(1).toIntval env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val input = env.socketTextStream(hostName,port)val inputMap = input.map(f=> {val arr = f.split("\\W+")val code = arr(0)val time = arr(1).toLong(code,time)})val watermark = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String,Long)] {var currentMaxTimestamp = 0Lval maxOutOfOrderness = 10000L//最大允许的乱序时间是10svar a : Watermark = nullval format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")override def getCurrentWatermark: Watermark = {a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)a}override def extractTimestamp(t: (String,Long), l: Long): Long = {val timestamp = t._2currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)println("timestamp:" + t._1 +","+ t._2 + "|" +format.format(t._2) +","+  currentMaxTimestamp + "|"+ format.format(currentMaxTimestamp) + ","+ a.toString)timestamp}})val window = watermark.keyBy(_._1).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply(new WindowFunctionTest)window.print()env.execute()}class WindowFunctionTest extends WindowFunction[(String,Long),(String, Int,String,String,String,String),String,TimeWindow]{override def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[(String, Int,String,String,String,String)]): Unit = {val list = input.toList.sortBy(_._2)val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")out.collect(key,input.size,format.format(list.head._2),format.format(list.last._2),format.format(window.getStart),format.format(window.getEnd))}}}

4.3、程序详解
(1)接收socket数据
(2)将每行数据按照字符分隔,每行map成一个tuple类型(code,time)
(3)抽取timestamp生成watermark。并打印(code,time,格式化的time,currentMaxTimestamp,currentMaxTimestamp的格式化时间,watermark时间)。
(4)event time每隔3秒触发一次窗口,输出(code,窗口内元素个数,窗口内最早元素的时间,窗口内最晚元素的时间,窗口自身开始时间,窗口自身结束时间)

注意:new AssignerWithPeriodicWatermarks[(String,Long)中有抽取timestamp和生成watermark2个方法,在执行时,它是先抽取timestamp,后生成watermark,因此我们在这里print的watermark时间,其实是上一条的watermark时间,我们到数据输出时再解释。

这里写图片描述
生成的Job Graph

5、通过数据跟踪watermark的时间

我们重点看看watermark与timestamp的时间,并通过数据来看看window的触发时机。

首先,我们开启socket,输入第一条数据:

000001,1461756862000

输出的out文件如下:

timestamp:000001,1461756862000|2016-04-27 19:34:22.000,1461756862000|2016-04-27 19:34:22.000,Watermark @ -10000

这里,看下watermark的值,-10000,即0-10000得到的。这就说明程序先执行timestamp,后执行watermark。所以,每条记录打印出的watermark,都应该是上一条的watermark。为了观察方便,我汇总了输出如下:
这里写图片描述

此时,wartermark的时间按照逻辑,已经落后于currentMaxTimestamp10秒了。我们继续输入:

这里写图片描述
此时,输出内容如下:
这里写图片描述
我们再次汇总,见下表:
这里写图片描述

我们继续输入,这时我们再次输入:
这里写图片描述
输出如下:
这里写图片描述
汇总如下:
这里写图片描述

到这里,window仍然没有被触发,此时watermark的时间已经等于了第一条数据的Event Time了。那么window到底什么时候被触发呢?我们再次输入:
这里写图片描述
输出:
这里写图片描述
汇总:
这里写图片描述

OK,window仍然没有触发,此时,我们的数据已经发到2016-04-27 19:34:33.000了,最早的数据已经过去了11秒了,还没有开始计算。那是不是要等到13(10+3)秒过去了,才开始触发window呢?答案是否定的。

我们再次增加1秒,输入:
这里写图片描述
输出:
这里写图片描述
汇总:
这里写图片描述

到这里,我们做一个说明:
window的触发机制,是先按照自然时间将window划分,如果window大小是3秒,那么1分钟内会把window划分为如下的形式:

[00:00:00,00:00:03)
[00:00:03,00:00:06)
...
[00:00:57,00:01:00)

如果window大小是10秒,则window会被分为如下的形式:

[00:00:00,00:00:10)
[00:00:10,00:00:20)
...
[00:00:50,00:01:00)

window的设定无关数据本身,而是系统定义好了的。

输入的数据中,根据自身的Event Time,将数据划分到不同的window中,如果window中有数据,则当watermark时间>=Event Time时,就符合了window触发的条件了,最终决定window触发,还是由数据本身的Event Time所属的window中的window_end_time决定。

上面的测试中,最后一条数据到达后,其水位线已经升至19:34:24秒,正好是最早的一条记录所在window的window_end_time,所以window就被触发了。

为了验证window的触发机制,我们继续输入数据:
这里写图片描述
输出:
这里写图片描述
汇总:
这里写图片描述

此时,watermark时间虽然已经达到了第二条数据的时间,但是由于其没有达到第二条数据所在window的结束时间,所以window并没有被触发。那么,第二条数据所在的window时间是:

[19:34:24,19:34:27)

也就是说,我们必须输入一个19:34:27秒的数据,第二条数据所在的window才会被触发。我们继续输入:
这里写图片描述
输出:
这里写图片描述
汇总:
这里写图片描述

此时,我们已经看到,window的触发要符合以下几个条件:

1、watermark时间 >= window_end_time
2、在[window_start_time,window_end_time)中有数据存在

同时满足了以上2个条件,window才会触发。

而且,这里要强调一点,watermark是一个全局的值,不是某一个key下的值,所以即使不是同一个key的数据,其warmark也会增加,例如:
输入:

000002,1461756879000

输出:

timestamp:000002,1461756879000|2016-04-27 19:34:39.000,1461756879000|2016-04-27 19:34:39.000,Watermark @ 1461756867000

我们看到,currentMaxTimestamp也增加了。

6、watermark+window处理乱序

我们上面的测试,数据都是按照时间顺序递增的,现在,我们输入一些乱序的(late)数据,看看watermark结合window机制,是如何处理乱序的。
输入:
这里写图片描述
输出:
这里写图片描述
汇总:
这里写图片描述
可以看到,虽然我们输入了一个19:34:31的数据,但是currentMaxTimestamp和watermark都没变。此时,按照我们上面提到的公式:

1、watermark时间 >= window_end_time
2、在[window_start_time,window_end_time)中有数据存在

watermark时间(19:34:29) < window_end_time(19:34:33),因此不能触发window。

那如果我们再次输入一条19:34:43的数据,此时watermark时间会升高到19:34:33,这时的window一定就会触发了,我们试一试:
输入:
这里写图片描述
输出:
这里写图片描述

这里,我么看到,窗口中有2个数据,19:34:31和19:34:32的,但是没有19:34:33的数据,原因是窗口是一个前闭后开的区间,19:34:33的数据是属于[19:34:33,19:34:36)的窗口的。

上边的结果,已经表明,对于out-of-order的数据,Flink可以通过watermark机制结合window的操作,来处理一定范围内的乱序数据。那么对于“迟到”太多的数据,Flink是怎么处理的呢?

7、late element的处理

我们输入一个乱序很多的(其实只要Event Time < watermark时间)数据来测试下:
输入:
这里写图片描述
输出:
这里写图片描述

我们看到,我们输入的数据是19:34:32的,而当前watermark时间已经来到了19:34:33,Event Time < watermark时间,所以来一条就触发一个window。

8、总结

8.1、Flink如何处理乱序?

watermark+window机制

window中可以对input进行按照Event Time排序,使得完全按照Event Time发生的顺序去处理数据,以达到处理乱序数据的目的。

8.2、Flink何时触发window?

1、Event Time < watermark时间(对于late element太多的数据而言)

或者

1、watermark时间 >= window_end_time(对于out-of-order以及正常的数据而言)
2、在[window_start_time,window_end_time)中有数据存在

8.3、Flink应该如何设置最大乱序时间?

这个要结合自己的业务以及数据情况去设置。如果maxOutOfOrderness设置的太小,而自身数据发送时由于网络等原因导致乱序或者late太多,那么最终的结果就是会有很多单条的数据在window中被触发,数据的正确性影响太大。

最后,我们通过一张图来看看watermark、Event Time和window的关系:
这里写图片描述

9、参考

http://data-artisans.com/how-apache-flink-enables-new-streaming-applications-part-1/
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_time.html
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_timestamps_watermarks.html
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/43864.pdf
http://www.cnblogs.com/fxjwind/p/5627187.html


http://chatgpt.dhexx.cn/article/IbLsbyMB.shtml

相关文章

Flink之watermark(水印)讲解

flink中watermark的详细介绍 使用前提&#xff1a; 处理数据开窗&#xff0c;处理数据的时间语义是事件时间&#xff0c;也就是每条数据产生的时间。 使用场景&#xff08;解决问题&#xff09;&#xff1a; 处理乱序数据&#xff1a;flink中是实时处理数据&#xff0c;但是…

WaterMark使用和详解

上篇&#xff1a;基于flink的会话窗口的api实现 WaterMark翻译为水位线&#xff0c;什么时候用到水位线呢&#xff1f; 比如说水控在顺水的时候达到紧梯就会触发&#xff0c;若不放水就可以发现危险的现状 在spark程序划分成窗口的时候&#xff0c;主要是衡量什么时候触发&am…

【大数据】带你理解并使用flink中的WaterMark机制

文章目录 一、引导二、WaterMark1、Watermark的原理2、Watermark 的使用2.1、顺序数据流中的watermark示例 2.2、乱序数据流中的WaterMark2.2.1、With Periodic&#xff08;周期性的&#xff09; Watermark示例一&#xff1a;使用周期性的WaterMark2.2.2、With Punctuated&…

JavaEE是什么?JavaSE又是什么?两者的区别有哪些?

Java作为最流行的编程语言受到了许多人的喜爱&#xff0c;其在编程中的地位自不必多说。对于许多才刚刚入门Java的朋友来讲&#xff0c;常常会产生这样的困惑&#xff0c;JavaEE是什么&#xff1f;JavaSE又是什么&#xff1f;两者的区别有哪些&#xff1f;学哪个比较好&#xf…

php mysql Javaee_javaee与php的区别是什么

javaee与php的区别&#xff1a;1、JavaEE是门面向对象的程序设计语言&#xff0c;而PHP是解释执行的服务器脚本语言&#xff1b;2、用JavaEE开发的Web应用从MySQL数据库转到Oracle数据库只需要做很少的修改&#xff0c;而PHP则需要做大量的修改工作。 javaee与php的区别&#x…

解读JAVAEE是什么样的Java

javaEE是指java enterprise edition&#xff0c;java企业版&#xff0c;多用于企业级开发&#xff0c;包括web开发等等很多组建。javaEE开发会设计java的高级特性以及一些spring等架构&#xff0c;需要掌握的内容相对多。 JAVA&#xff0c;所有的Java平台都由一个JVM和一组应用…

java ee 下载什么意思_JavaEE到底是什么?

慕运维8079593 JavaEE只是一个规范吗&#xff1f;我的意思是&#xff1a;EJBJavaEE是吗&#xff1f;JavaEE确实是一个摘要规格说明。任何人都愿意开发和提供规范的工作实现。这个混凝土实现是所谓的应用服务器&#xff0c;如野弗利, 托梅, 玻璃鱼, 自由, WebLogic&#xff0c;等…

java ee api是什么意思_JavaEE的整体概述

标签&#xff1a; JavaEE整体概述 知识点&#xff1a; 1、整体概述JavaEE的知识体系 2、JavaEE是什么? 能干什么? 为什么需要JavaEE? 3、JavaEE有什么? JavaEE的技术体系? JavaEE的本质是什么? 4、JavaEE的零散基础知识 5、JavaEE的组件体系结构 -----------------------…

矩阵求逆方法

1.待定系数法 矩阵A 1, 2 -1,-3 假设所求的逆矩阵为 a,b c,d 则 从而可以得出方程组 a 2c 1 b 2d 0 -a - 3c 0 -b - 3d 1 解得 a3; b2; c -1; d -1 2.伴随矩阵求逆矩阵 伴随矩阵是矩阵元素所对应的代数余子式&#xff0c;所构成的矩阵&#xff0c;转置后得到的新矩阵…

复数矩阵求逆的 C 语言程序

关于复数矩阵求逆&#xff0c;如果使用 MATLAB&#xff0c;就非常简单。我们先用一个 MATLAB 的例子来说明&#xff0c;等会要将 C 语言的程序和 MATLAB 的程序进行对比。 close all; clear all; clc;%定义矩阵a为复数矩阵 a [[42*i,31*i,43*i,55*i];[17*i,82*i,22*i,93*i];[…

科学计算器如何求矩阵的逆

大学本科买了四年的计算器不会求逆&#xff0c;到了研究生了好好研究下这个功能&#xff0c;终于终于会用了&#xff0c;以往 对着那个矩阵功能都发懵&#x1f602;&#xff0c;记录一下这个史诗无敌隐藏功能 1、进入菜单&#xff0c;点击4进入矩阵菜单 2、这里选择1定义矩阵A…

matlab求一个矩阵的逆矩阵的命令,如何用MATLAB求逆矩阵

如何用MATLAB求逆矩阵以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧! 如何用MATLAB求逆矩阵 如果英文好呢,自己看目录 不好还是先看中文的教材,对matlab的框架和功能有了一定的了解后,自己也就看的懂帮助里面…

matlab矩阵求逆的模块,matlab矩阵求逆矩阵

matlab矩阵求逆矩阵 因为 所以该矩阵可逆&#xff0c;根据 &#xff0c;其中 得到 计算矩阵A每个元素的代数余子式&#xff1a; 所以 可得&#xff1a; matlab计算如下&#xff1a; >> A1[1 2 2;2 1 -2;2 -2 1] A1 1 2 2 2 1 -2 2 -2 1 >> >> >> A2in…

求矩阵的逆的三种方法

我们知道求矩阵的逆具有非常重要的意义&#xff0c;本文分享给大家如何针对3阶以内的方阵&#xff0c;求出逆矩阵的3种手算方法&#xff1a;待定系数法、伴随矩阵法、初等变换法&#xff08;只介绍初等行变换&#xff09; 待定系数法求逆矩阵 1 首先&#xff0c;我们来看如何使…

千万不能错过的Android NDK下载安装及配置

Java 语言是一个跨平台的语言&#xff0c;有着“Write Once&#xff0c;Run Anywhere”的美誉。但是却导致了它和本地交互的能力不够强&#xff0c;无法完成一些和操作系统相关的特性。 而 JNI 就是 Java Native Interface&#xff08;Java 本地接口&#xff09;&#xff0c;用…

NDK在Linux下载配置以及C、C++编译配置(交叉编译)

NDK在Linux下载配置以及C、C编译配置&#xff08;交叉编译&#xff09; 前言&#xff1a; 我们搭建好Ubutu虚拟机之后&#xff0c;通过xShell远程登录Ubutu上SSH服务器&#xff0c;在xShell上可以进行相关的编译操作了。但是我们在xShell上gcc、g编译的可执行文件只能在Linux…

Android——NDK下载提示缺少toolchains问题解决

更新下载了最新的NDK&#xff0c;发现Android SDK报错。 No toolchains found in the NDK toolchains folder for ABI with prefix: mips64el-linux-android。 解决办法&#xff1a; 1.找到并进入下载安装的ndk目录 lydeMacBook-Pro:~ imac$ cd /Users/ly/Library/Android/sd…

android ndk 下载安装(ubuntu)

1. 下载并解压安装包 官网下载&#xff1a;https://developer.android.com/studio ndk各个版本下载地址&#xff1a;https://blog.csdn.net/u011077027/article/details/102706283 官网下载需要梯子&#xff0c;百度云盘下载&#xff1a;链接: https://pan.baidu.com/s/1Ge8fQu…

Window NDK下载以及环境变量配置

作者介绍&#xff1a;铸梦xy。IT公司技术合伙人&#xff0c;IT高级讲师&#xff0c;资深Unity架构师&#xff0c;铸梦之路系列课程创始人。 第一种 NDK下载安装步骤一 NDK环境变量配置步骤一 测试NDK配置 第二种 支持编译c ninja 前言 NDK是开发者必不可少的一部分&#xff0c…

3、NDK下载、安装

文章目录 一、下载NDK二、配置环境变量三、测试 一、下载NDK 官网下载地址:https://developer.android.google.cn/ndk/downloads/,选择自己相应的版本&#xff0c;下载解压。(我安装在D:\utils\android-ndk-r21d) 特别注意&#xff1a;安装路径不要有中文和空格 二、配置环境…