上篇:基于flink的会话窗口的api实现
WaterMark翻译为水位线,什么时候用到水位线呢?
比如说水控在顺水的时候达到紧梯就会触发,若不放水就可以发现危险的现状
在spark程序划分成窗口的时候,主要是衡量什么时候触发,这也是需要用到的水位线,其实它是来判断水位窗口触发的机制,在这个窗口水位线会不停的增加。
其实水位线有两种方式获取,一种是根据数据时间来提取,另一种是定期生成水位线。
当我们输入的数据有大也有小的,它就会用这个分区最大的Eventime作为它的水位线。
那么这个水位线是怎么计算出来的?
实水位线还有一个作用,让窗口延迟发,举一个例子,我们在生产环境中,拉数据是从中间件拉取出来的,如kafka。
在kakfa下有多个分区,由生产者写入进入,在生产者有2个或多个写,当一对一写完,它还会切换写,在kafka里如果只有一个分区它是有序的,但是多个分区就无法保证它是有序的。
一开始写一条数据,在另一个消费者会有延迟, 比如:第一个生产者由于网络的问题就会发生延迟效果,如图所示:
接下来,采用flink从kakfa里面的数据拉过来 ,但是如何拉取呢?其实flink的并行再大,它的solt只有2个state ,通过直连拉数据有可能是延迟,但是如何容忍数据的延续时间,这一点就是需要解决数据乱序问题。其实,在窗口迟到的数据是不会被触发的
带着这个问题,可以去设计吗?可以的
1、Watermarks的设计主要从 它们定义何时停止等待早期事件
Flink中的事件时间处理取决于特殊的带时间戳的元素,称为watermarks,由数据源或watermarks生成器插入到流中。 具有时间戳t的watermarks可以被理解为断言(assertion )所有具有时间戳<t的事件已经(具有合理的概率)已经到达
2、我们可以设想不同的策略来决定如何生成watermarks
我们知道每个事件都会在延迟一段时间后到达并且这些延迟会有所不同,因此有些事件会比其他事件延迟更多。 一种简单的方法是假设这些延迟受到一些最大延迟的限制。 Flink将此策略称为有界无序watermarks。 很容易想象出更复杂的watermarks方法,但对于许多应用来说,固定延迟效果还不错。
如果要构建像流分类器这样的应用程序,Flink的ProcessFunction是正确的构建块。 它提供对事件时间(event-time )计时器的访问(即,基于watermarks到达而触发的回调),并具有用于管理缓冲事件所需状态的挂钩,直到轮到它们被发送到下游
代码实现:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;/*** 先keyBy,再进行EventTime划分滚动窗口--[无界流]* 设置延迟时间为2秒**/
public class EventTimeTumblingWindowAllDemo4 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());//1000,spark,3//1200,spark,5//2000,hadoop,2//socketTextStream返回的并行度为1DataStreamSource<String> lines = env.socketTextStream("Master", 8888);//TODO 当前分区中的数据的数据携带的最大EventTime - 乱序延迟时间 >= 窗口的结束时间 就会触发该窗口SingleOutputStreamOperator<String> dataWithWaterMark = lines.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) { //设置为0,这里表示数据的延迟时间@Overridepublic long extractTimestamp(String element) {//提取数据中的时间return Long.parseLong(element.split(",")[0]);}});SingleOutputStreamOperator<Tuple2<String,Integer>> workAndCount = dataWithWaterMark.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {String[] fields = s.split(",");return Tuple2.of(fields[1],Integer.parseInt(fields[2]));}});env.execute();}
}
查看job:http://localhost:8081/#/job/01840ac7b18e65bbec107848545fe68e/overview
使用场景(解决问题)
处理乱序数据:flink中是实时处理数据,但是在处理数据的时候会出现因为网络传输的问题,所以数据先产生的反而到后面才到达,在被处理时候就会出现数据混乱,而且因为开窗,窗口关闭但是本窗口的数据来迟,导致数据丢失;
多并行度下的watermark
一个子任务中watermark会发往所有下一算子中的子任务,也就是一发多,
同样一个子任务会接收上一个算子中所有子任务的watermark,这时起作用的就是最小的哪一个watermark。
watermark可以理解为一个特殊的数据,这个数据不参与计算,仅仅是对窗口的触发关闭起作用;