「Flink实时数据分析系列」6.基于时间和窗口的算子

article/2025/9/12 18:26:57

来源 | 「Stream Processing with Apache Flink」

作者 | Fabian Hueske and Vasiliki Kalavri

翻译 | 吴邪 大数据4年从业经验,目前就职于广州一家互联网公司,负责大数据基础平台自研、离线计算&实时计算研究

校对 | gongyouliu

编辑 | auroral-L

全文共21006字,预计阅读时间90分钟。

目录

一、配置时间特性

       1.分配时间戳和生成水位线

       2.水位线、延迟及完整性问题

二、处理函数

       1.处理服务和计时器

       2.向侧输出流发送数据

       3.CoProcessFunction

三、窗口算子

       1.定义窗口算子

       2.内置窗口分配器

       3.在窗口上应用函数

       4.自定义窗口算子

四、基于时间的多流关联

       1.基于间隔的关联(Interval Join)

       2.基于窗口的关联

五、处理迟到数据

       1.丢弃迟到事件

       2.重定向迟到事件

       3.基于迟到事件更新结果

❤总结

在本章中,我们将介绍用于时间处理的DataStream API方法和基于时间的算子(如windows)。正如你在“时间语义”一节中学到的,Flink基于时间的算子可以应用于不同的时间语义。

首先,我们将学习如何定义时间特性、时间戳和水位线。然后,我们将涵盖处理函数以及低级算子的转换操作,从而对时间戳和水位线进行处理和注册定时器。接下来,我们将使用Flink的窗口API,它提供了最常见窗口类型的内置实现。你还将了解关于自定义、用户定义的窗口操作和核心窗口结构等内容,如assigners, triggers和evictors。最后,我们将讨论如何定时加入流以及处理延迟事件的策略。

一、配置时间特性 

在定义分布式流处理应用程序中的时间算子操作之前,我们先了解“时间”的含义,当你指定了一个窗口用于收集每一分钟的bucket中产生的事件时,如何确定每个bucket中具体包含了哪些事件呢?在DataStream API,你可以在创建窗口的时候使用时间特性去告知Flink如何定义时间,时间特性是StreamExecutionEnvironment的一个属性,包括了几种时间类型:

处理时间(Processing Time)

处理时间是指执行相应算子操作的机器的系统时间。当流式程序按处理时间运行时,所有基于时间的操作(如时间窗口)将使用运行相应算子的计算机的系统时钟。每小时处理时间窗口将包括系统时钟指示整小时的时间之间到达特定算子的所有记录。例如,如果应用程序在9:15 am开始运行,则第一个每小时处理时间窗口将包括在9:15 am和10:00 am之间处理的事件,下一个窗口将包括在10:00 am和11:00 am之间处理的事件,以此类推。处理时间是最简单的时间概念,不需要流和机器之间的协调,无需依赖水位线,它提供了最佳的性能和最低的延迟。但是,在分布式和异步环境中,处理时间不能提供结果确定性,因为它容易受到记录到达系统(例如从消息队列写入)的速度以及数据在上下游算子之间的处理速度的影响。

事件时间(Event Time)

Event Time指的是数据流中每个元素或者每个事件自带的时间属性,一般是指事件发生的时间,系统的逻辑时间由水位线去定义。正如我们在“时间戳”章节中了解到的,时间戳要么在进入数据处理管道之前就存在于数据中,要么由源函数生成,在事件时间中,时间的进度取决于数据,而不取决于任何时钟。当水位线声明某个时间间隔内的所有时间戳都已接收到了,事件时间窗口将触发计算。理想情况下,事件时间窗口会产生确定性结果,即使事件发生顺序混乱,窗口结果将不依赖于读取或处理流的速度。

注入时间(Ingest Time)

将源算子操作的处理时间指定为每个接入记录的事件时间戳,并自动生成水位线。它是EventTime和ProcessingTime的混合体。事件的接入时间是它进入流处理器的时间。与事件时间相比,接入时间并没有提供太多的实际价值,因为它不能提供确定的结果,并且具有与事件时间相近的性能。

示例6-1展示了如何在“Hello, Flink!”中编写的传感器流应用程序代码来设置时间特性。

object AverageSensorReadings {
// main() defines and executes the DataStream program
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// ingest sensor stream
val sensorData: DataStream[SensorReading] = env.addSource(...)
}
}

将时间特性设置为EventTime可以进行时间戳和水位线处理,从而可以进行事件时间操作。当然,就算你选择EventTime时间特性,仍然可以使用处理时间窗口和定时器。

如果使用处理时间,使用TimeCharacteristic.ProcessingTime替换TimeCharacteristic.EventTime。

1.分配时间戳和生成水位线 

正如在“事件时间处理”章节中所讨论的,你的应用程序需要提供两个重要的信息给Flink,以使用事件时间语义进行操作。第一个信息是每个事件必须与一个时间戳相关联,该时间戳通常指示事件实际发生的时间。第二个信息是事件时间流还需要附带水位线,使得算子可以从中推断当前事件时间。

时间戳和水位线单位为毫秒。水位线会通知算子不希望出现时间戳小于或等于水位线的事件。时间戳和水位线可以由SourceFunction分配和生成,也可以使用用户定义的时间戳生成器和水位线生成器。在“源函数、时间戳和水位线”一节中讨论了在SourceFunction中分配时间戳和生成水位线。在这里,我们将解释如何使用用户定义的函数来实现这一点。

如果使用时间戳分配程序,则现有的时间戳和水位线都会被覆盖。

DataStream API提供了TimestampAssigner接口,以便在元素被接入到流应用程序后从元素中提取时间戳。通常,时间戳分配程序是在源函数之后立即调用的,因为大多数分配程序在生成水位线时都对元素的时间戳顺序作了假设性猜想。由于元素通常是并行摄入的,所以任何导致Flink跨并行流分区重新分配元素的操作,都会打乱元素的时间戳顺序,例如并行性更改、KeyBy()或其他引起重新分配的操作。

最好的做法是分配时间戳,并在尽可能靠近源的地方甚至在SourceFunction内生成水位线。根据应用场景,在分配时间戳之前,如果这些操作没有引起元素的重新分配,可以对输入流应用执行过滤或转换操作。

为了确保事件时间操作按预期运行,应该在任何依赖于事件时间的转换之前调用分配器,例如, 在第一个事件时间窗口之前。

时间戳分配程序的行为类似于其他转换算子,它们在一个数据流上调用,并产生一个新的带有时间戳的数据流和水位线。时间戳分配程序不会更改DataStream的数据类型。

示例6-2中的代码展示了如何使用时间戳分配器。在本例中,读取流之后,我们首先使用了一个过滤转换,接着调用assignTimestampsAndWatermarks()方法,在这个方法中我们定义了时间戳分配器MyAssigner()。

val env = StreamExecutionEnvironment.getExecutionEnvironment
// set the event time characteristic
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// ingest sensor stream
val readings: DataStream[SensorReading] = env
.addSource(new SensorSource)
// assign timestamps and generate watermarks
.assignTimestampsAndWatermarks(new MyAssigner())

在上面的例子中,MyAssigner的类型可以是AssignerWithPeriodicWatermarks 或 AssignerWithPunctuatedWatermarks类型。这两个接口底层继承了DataStream API提供的TimestampAssigner接口。第一个接口定义周期性发出水位线的分配器,而第二个接口根据输入事件的属性注入水位线。接下来我们将详细描述这两个接口。

 

周期性水位线分配器(Assigner with periodic watermarks

周期性地分配水位线意味着我们指示系统以固定的机器时间间隔检查事件时间的进度。默认时间间隔设置是200毫秒,但是我们可以使用

ExecutionConfig.setAutoWatermarkInterval()方法来动态配置它:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// generate watermarks every 5 seconds
env.getConfig.setAutoWatermarkInterval(5000)

在上面的示例中,程序每5秒发出一次水位线。实际上,每隔5秒,Flink就会调用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法。如果该方法返回非空值的时间戳大于前一个水位线的时间戳,则生成新的水位线。否则,如果方法返回一个空值,或者返回的水位线的时间戳小于最后发出的水位线的时间戳,则不会生成新水位线。

示例6-3显示了一个具有周期性时间戳的assigner程序,它通过获得到当前元素的最大时间戳来生成水位线。当请求新的水位线时,assigner返回一个最大时间戳减去1分钟容忍间隔的水位线。

class PeriodicAssigner
extends AssignerWithPeriodicWatermarks[SensorReading] {
val bound: Long = 60 * 1000 // 1 min in ms
var maxTs: Long = Long.MinValue // the maximum observed timestamp
override def getCurrentWatermark: Watermark = {
// generated watermark with 1 min tolerance
new Watermark(maxTs - bound)
}
override def extractTimestamp(
r: SensorReading,
previousTS: Long): Long = {
// update maximum timestamp
maxTs = maxTs.max(r.timestamp)
// return record timestamp
r.timestamp
}
}

DataStream API为具有周期性水位线的时间戳分配程序(assigners)的两种常见情况提供了实现。如果你的输入元素具有单调递增的时间戳,则可以使用快捷方法assignAscendingTimeStamps。此方法使用当前时间戳来生成水位线,因为不能出现更早的时间戳。下面演示如何为升序时间戳生成水位线:

val stream: DataStream[SensorReading] = ...
val withTimestampsAndWatermarks = stream.assignAscendingTimestamps(e => e.timestamp)

周期水位线生成的另一种常见情况是,当你知道输入流中的最大延迟时——元素的时间戳与所有已导入元素的最大时间戳之间的最大差异。对于这种情况,Flink提供了BoundedOutOfOrdernessTimeStampExtractor,它将最大延迟时间作为参数:

val stream: DataStream[SensorReading] = ...
val output = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(10))(e =>.timestamp)

在前面的代码中,允许元素延迟10秒。这意味着,如果一个元素的事件时间与之前所有元素的最大时间戳之间的差距大于10秒,那么该元素可能在完成相应的计算并发出结果之后到达进行处理。Flink提供了处理这些延迟事件的不同策略,我们将在“处理迟到数据”中讨论这些策略。

 

PUNCTUATED水位线分配器(Assigner with punctuated watermarks)

有时输入流包含特殊的元组或标记,用于指示流的进度。对于这种情况,或者当可以根据输入元素的其他属性定义水位线时,Flink提供了AssignerWithPunctuatedWatermarks接口。它定义了checkAndGetNextWatermark()方法,该方法在extractTimestamp()之后为每个事件调用。该方法可以决定是否生成新的水位线。如果方法返回的非空水位线大于最新发出的水位线,则发出新水位线。

示例6-4显示了一个punctuated水位线分配程序(assigner),它为从ID为“sensor_1”的传感器接收到的每条测量记录发出水位线。

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] {val bound: Long = 60 * 1000 // 1 min in msoverride def checkAndGetNextWatermark(r: SensorReading,extractedTS: Long): Watermark = {if (r.id == "sensor_1") {// emit watermark if reading is from sensor_1new Watermark(extractedTS - bound)} else {// do not emit a watermarknull}}override def extractTimestamp(r: SensorReading,previousTS: Long): Long = {// assign record timestampr.timestamp}
}

2.水位线、延迟及完整性问题 

到目前为止,我们已经讨论了如何使用TimestampAssigner生成水位线,还没有讨论水位线对你的流应用程序的影响。

水位线用于权衡时间延迟和结果完整性。它反映了在执行计算之前等待数据到达的时间,例如完成窗口计算并输出结果,基于事件时间的算子使用水位线来确定其读取记录的完整性和操作的进度。根据接收到的水位线,算子计算某一个时间点之前的窗口数据,直到它接收到这个时间点前的输入记录为止。

然而,现实是我们永远不可能有完美的水位线,因为这意味着我们总是可以确定数据按时到来,没有延迟的记录。在实践中,你需要进行有根据的猜测,假设性设定数据整体之间的延迟,从而在应用程序中生成水位线,这个没有固定的方法,更多的是根据经验来确定水位线。你需要使用关于源、网络和分区的等因素来估计处理进度和输入记录延迟的上限,也就是对迟到数据的容忍度。估计就意味着有出错的空间,在这种情况下,生成的水位线可能是不准确的,往往会造成不必要数据延迟或应用程序延迟变大。要记住这一点,你可以使用水位线来平衡结果延迟和结果完整性。

如果生成松散的水位线(水位线远远落后于处理过的记录的时间戳),则会增加生成结果的延迟,但是可以更大程度上保证了结果完整性。此外,状态的大小通常会增加,因为应用程序需要缓冲更多的数据,直到触发计算为止。在执行计算时,我们基本可以确定所有相关的数据都是可用的。

另一方面,如果你生成了非常紧密的水位线,也就是设置了一个很小的迟到时间,这些水位线可能比一些后续记录的时间戳更大,基于时间的计算可能在所有相关数据到达之前执行,这样做可能会产生不完整或不准确的结果,但是好处是可以降低结果的延迟。

与构建的批处理应用程序不同,在基于所有数据都可用的前提条件下,延迟/结果完整性是权衡流处理应用程序的基本特征,流处理应用程序处理的是接收到的无界数据。水位线是一种功能强大的解决方式,可以根据时间控制应用程序的行为。除了水位线之外,Flink还有许多特性来调整基于时间的操作的确切行为,如process函数和窗口触发器,并提供了处理迟到数据的不同方法,这些方法将在“处理迟到数据”中讨论。

二、处理函数 

尽管时间信息和水位线对于许多流处理应用程序非常重要,但是你可能已经注意到,我们无法通过目前为止看到的基本DataStream API转换来使用它们。例如,MapFunction不能访问时间戳或当前事件时间。

DataStream API提供了一系列底层转换,即process 函数,这些函数可以访问记录的时间戳和水位线,并注册将来某个特定时间触发的定时器。此外,process函数还支持将记录发送到多个输出流。process函数通常用于构建事件驱动的应用程序,并实现可能不适用于预定义窗口和转换的自定义逻辑。例如,Flink的SQL支持的大多数算子都是基于process函数实现的。

目前,Flink提供八种不同的process函数:

ProcessFunction、KeyedProcessFunction、CoProcessFunction、ProcessJoinFunction、BroadcastProcessFunction、KeyedBroadcastProcessFunction、ProcessWindowFunction和ProcessAllWindowFunction。

正如函数名表达的含义,这些函数适用于不同的context中。但是,它们具有非常相似的特性。我们将通过详细讨论KeyedProcessFunction来继续讨论这些常见特性。

KeyedProcessFunction是一个非常通用的函数,可以应用于KeyedStream。对流的每个记录调用该函数,并返回零条、一条或多条记录。所有process函数都实现RichFunction接口,提供open()、close()和getRuntimeContext()方法。另外,KeyedProcessFunction[KEY, IN, OUT]还提供了以下两个方法:

  1. processElement(v: IN, ctx: Context, out:Collector[out])为流的每个记录调用。通常,通过将结果记录传递给收集器(out:Collector)的方式来输出结果记录。context对象使process函数变得特殊。它提供了对时间戳和当前记录的key以及对TimerService的访问。此外,context可以将记录发送到侧输出流(side output)。

  2. onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[out])是一个回调函数,当先前注册的计时器触发时将调用该回调函数。timestamp参数给出触发定时器的时间戳,收集器(out:Collector)可以将记录输出。OnTimerContext提供与processElement()方法的context对象相同的服务,并返回触发触发器的时间域(处理时间或事件时间)。

1.时间服务和计时器

Context和OnTimerContext对象的TimerService提供了以下方法:

  • currentProcessingTime(): Long     返回当前处理时间。

  • currentWatermark(): Long     返回当前水位线的时间戳。

  • registerProcessingTimeTimer(timestamp:Long):Unit    为当前key注册一个处理时间定时器。当所在执行服务器的处理时间达到所提供的时间戳时,定时器将触发。

  • registerEventTimeTimer(timestamp: Long):Unit    为当前key注册一个事件时间定时器。当将水位线更新为与定时器的时间戳相等或更大的时间戳时,定时器将触发。

  • deleteProcessingTimeTimer(timestamp:Long): Unit    删除先前为当前key注册的处理时间定时器。如果不存在这样的定时器,则该方法无效。

  • deleteEventTimeTimer(timestamp:Long):Unit   删除先前为当前key注册的事件时间定时器。如果不存在这样的定时器,则该方法无效。

当定时器触发时,将调用onTimer()回调函数。processElement()和onTimer()方法是同步的,以防止对状态的并发访问和操作。

定时器(Timer)只能在key类型流上注册。定时器(Timer)的一个常见用例是在一段时间不活动之后清除key状态,或者实现基于时间的自定义窗口逻辑。要在非key类型流上使用定时器,可以使用带有常量虚拟键的KeySelector来创建key类型流。注意,这会把所有数据移动到单个任务中,这样算子将以并行度1有效地执行。

对于每个key和时间戳,可以只注册一个定时器(Timer),这意味着每个key可以有多个定时器(Timers),但每个时间戳只能有一个定时器。默认情况下,KeyedProcessFunction会保留heap上的优先级队列中的所有定时器(Timers)的时间戳。但是,你可以配置RocksDB状态后端来存储定时器(Timer)。

定时器与函数的任何其他状态一起被存入检查点。如果应用程序需要从故障中恢复,则在应用程序重新启动时过期的所有处理时间定时器将在应用程序恢复时立即触发。对于保存在保存点(savepoint)中的处理时间定时器也是如此。定时器总是异步写入检查点,只有一个例外。如果你使用带有增量检查点的RocksDB状态后端,并将定时器存储在heap上(默认设置),则它们将同步进行存入检查点。在这种情况下,建议不要过度使用定时器,以避免存入检查点的过程耗时太长。

注意

为过去的时间戳注册的定时器不会自动删除,但也会被处理。处理时间定时器在注册方法返回后立即触发。事件时间定时器在处理下一个水位线时触发。

下面的代码展示了如何将KeyedProcessFunction应用到KeyedStream上。该函数用于监控传感器的温度,如果传感器的温度在处理时间内1秒出现了上升,则发出告警:

val warnings = readings
// key by sensor id
.keyBy(_.id)
// apply ProcessFunction to monitor temperatures
.process(new TempIncreaseAlertFunction)

TempIncreaseAlertFunction的实现如例6-5所示。

/** Emits a warning if the temperature of a sensor
* monotonically increases for 1 second (in processing time).
*/
class TempIncreaseAlertFunction extends KeyedProcessFunction[String,SensorReading,String]
{// stores temperature of last sensor readinglazy val lastTemp: ValueState[Double]=getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp",Types.of[Double]))// stores timestamp of currently active timerlazy val currentTimer: ValueState[Long] =getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer", Types.of[Long]))override def processElement(r: SensorReading,ctx: KeyedProcessFunction[String,SensorReading,String]#Context,out: Collector[String]): Unit = {// get previous temperatureval prevTemp = lastTemp.value()// update last temperaturelastTemp.update(r.temperature)val curTimerTimestamp = currentTimer.value();if (prevTemp == 0.0 || r.temperature < prevTemp) {// temperature decreased; delete current timerctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp)currentTimer.clear()} else if (r.temperature > prevTemp && curTimerTimestamp == 0) {// temperature increased and we have not set a timer yet// set processing time timer for now + 1 secondval timerTs = ctx.timerService().currentProcessingTime()+ 1000ctx.timerService().registerProcessingTimeTimer(timerTs)// remember current timercurrentTimer.update(timerTs)}}override def onTimer(ts: Long,ctx: KeyedProcessFunction[String, SensorReading,String]#OnTimerContext,out: Collector[String]): Unit = {out.collect("Temperature of sensor '" + ctx.getCurrentKey +"' monotonically increased for 1 second.")currentTimer.clear()}

2.向侧输出流发送数据 

DataStream API的大多数算子只有一个输出,产生一个具有特定数据类型的结果流。只有split算子允许将一个流拆分为多个相同类型的流。侧输出流是处理函数的一个特性,用于输出多个流,通常用于处理迟到数据。侧输出由OutputTag[X]对象标识,其中X是结果侧输出流的类型。处理函数可以通过context对象将一条记录发送到一个或多个侧输出流。

示例6-6展示了如何通过侧输出流的DataStream的ProcessFunction发送数据。

val monitoredReadings: DataStream[SensorReading] = readings
// monitor stream for readings with freezing temperatures
.process(new FreezingMonitor)
// retrieve and print the freezing alarms side output
monitoredReadings.getSideOutput(new OutputTag[String]("freezing-alarms")).print()
// print the main output
readings.print()

示例6-7显示了FreezingMonitor函数,该函数监控传感器读数流,将温度低于32°F的数据发送到侧输出流产生告警信息。

/** Emits freezing alarms to a side output for readings
* with a temperature below 32F. */
class FreezingMonitor extends ProcessFunction[SensorReading,SensorReading] {// define a side output taglazy val freezingAlarmOutput:OutputTag[String]=new OutputTag[String]("freezing-alarms")override def processElement(r: SensorReading,ctx: ProcessFunction[SensorReading,SensorReading]#Context,out: Collector[SensorReading]): Unit = {// emit freezing alarm if temperature is below 32Fif (r.temperature < 32.0) {ctx.output(freezingAlarmOutput, s"Freezing Alarm for ${r.id}")}// forward all readings to the regular outputout.collect(r)}
}

3.CoProcessFunction 

对于两个输入流的关联操作,DataStream API还提供了CoProcessFunction。与CoFlatMapFunction类似,CoProcessFunction为每个输入processElement1()和processElement2()提供转换方法。与ProcessFunction类似,这两个方法都是使用context对象调用的,该context对象允许访问元素或定时器时间戳、TimerService和侧输出流。CoProcessFunction还提供了一个onTimer()回调方法。示例6-8展示了如何应用CoProcessFunction来合并两个流。

// ingest sensor stream
val sensorData: DataStream[SensorReading] = ...
// filter switches enable forwarding of readings
val filterSwitches: DataStream[(String, Long)] = env.fromCollection(Seq(("sensor_2", 10 * 1000L), // forward sensor_2 for 10seconds("sensor_7", 60 * 1000L)) // forward sensor_7 for 1 minute)
val forwardedReadings = readings// connect readings and switches.connect(filterSwitches)// key by sensor ids.keyBy(_.id, _._1)// apply filtering CoProcessFunction.process(new ReadingFilter)

示例6-9中展示了一个ReadingFilter函数的实现,该函数根据过滤器开关动态过滤传感器读数流。

class ReadingFilter extends CoProcessFunction[SensorReading,(String, Long),SensorReading] {// switch to enable forwardinglazy val forwardingEnabled: ValueState[Boolean] =getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("filterSwitch",Types.of[Boolean]))// hold timestamp of currently active disable timerlazy val disableTimer: ValueState[Long] =getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer", Types.of[Long]))override def processElement1(reading: SensorReading,ctx: CoProcessFunction[SensorReading, (String, Long),SensorReading]#Context,out: Collector[SensorReading]): Unit = {// check if we may forward the readingif (forwardingEnabled.value()) {out.collect(reading)}}override def processElement2(switch: (String, Long),ctx: CoProcessFunction[SensorReading, (String, Long),SensorReading]#Context,out: Collector[SensorReading]): Unit = {// enable reading forwardingforwardingEnabled.update(true)// set disable forward timerval timerTimestamp =ctx.timerService().currentProcessingTime() + switch._2val curTimerTimestamp = disableTimer.value()if (timerTimestamp > curTimerTimestamp) {// remove current timer and register new timerctx.timerService().deleteEventTimeTimer(curTimerTimestamp)ctx.timerService().registerProcessingTimeTimer(timerTimestamp)disableTimer.update(timerTimestamp)}}override def onTimer(ts:Long,ctx:CoProcessFunction[SensorReading,(String, Long),SensorReading]#OnTimerContext,out:Collector[SensorReading]): Unit = {// remove all state; forward switch will be false by defaultforwardingEnabled.clear()disableTimer.clear()}
}

三、窗口算子 

窗口操作是流处理应用程序中的常见操作。它们支持在无界流的有限间隔上进行诸如聚合类的转换。通常,这些间隔是使用基于时间的逻辑定义的。窗口算子提供了一种方法来将事件分组到有限大小的buckets(桶)中,并对这些桶中的事件数据应用计算。例如,窗口算子可以将流的事件分组到5分钟的窗口中,并计算每个窗口已经接收了多少事件记录。

DataStream API为最常见的窗口操作提供了内置方法,并提供了非常灵活的窗口机制来自定义窗口逻辑。在本节中,我们将向你展示如何定义窗口算子,介绍DataStream API的内置窗口类型,以及窗口函数的应用,最后解释如何定义自定义窗口逻辑。

1.定义窗口算子

窗口算子可以应用于key类型流或none-key类型流。key类型窗口上的窗口算子是并行计算的,而非key类型窗口是在单线程处理的。

要创建窗口算子,你需要指定两个窗口组件:

  1. 确定输入流的元素如何分组到窗口中的窗口分配器。窗口分配器生成一个WindowedStream(如果应用于非key类型数据流,则生成AllWindowedStream)。

  2. 应用于WindowedStream(或AllWindowedStream)上,并处理分配给窗口的元素的窗口函数。

下面的代码演示了如何指定一个窗口分配器和一个作用于key类型流或非key类型流的窗口函数:

stream.keyBy(...).window(...) // specify the window assigner
.reduce/aggregate/process(...) // specify the window function// define a nonkeyed window-all operator
stream.windowAll(...) // specify the window assigner
.reduce/aggregate/process(...) // specify the window function

在本章的其余部分,我们重点关注key类型窗口。非key类型窗口(在DataStream API中也称为all-windows)是类似的。

请注意,可以通过提供自定义触发器或收回器并声明处理迟到元素的策略来自定义窗口算子。本节后面将详细讨论自定义窗口算子。

2.内置窗口分配器 

Flink为最常见的窗口场景提供了内置的窗口分配器。我们在这里讨论的所有分配程序都是基于时间的,并已经在“数据流上的操作”中做了介绍。基于时间的窗口分配器根据元素的事件时间戳或当前处理时间向窗口分配元素。时间窗口有一个开始时间戳和一个结束时间戳。

所有内置的窗口分配器都提供一个默认触发器,一旦(处理或事件)时间过了窗口的末端,就会触发窗口计算。需要注意的是,当第一个元素被分配给一个窗口时,窗口就被创建了。Flink永远不会计算空窗口。

基于计数器的窗口

除了基于时间的窗口之外,Flink还支持基于计数器的窗口——将固定数量的元素按它们到达窗口算子的顺序分组的窗口。由于它们依赖于摄入顺序,基于计数的窗口是不确定的。此外,如果在使用时没有自定义触发器(在某些情况下会丢弃不完整和陈旧的窗口),则会导致问题。

Flink的内置窗口分配器创建类型为TimeWindow的窗口。此窗口类型实质上表示两个时间戳之间的时间间隔,左闭右开。此类型窗口包括定义窗口边界、检查窗口是否相交以及合并重叠窗口的方法。

下面,我们将展示DataStream API的不同内置窗口分配器,以及如何使用它们来定义窗口算子。

滚动窗口(Tumbling Window)

滚动窗口(Tumbling window)分配程序将元素放置到不重叠的、固定大小的窗口中,如图6-1所示。

 

Datastream API为滚动事件时间窗口和处理时间窗口分别提供了两个分配器—TumblingEventTimeWindows和TumblingProcessingTimeWindows。Tumbling窗口分配器接收一个参数,窗口大小以时间为单位,我们可以使用分配器的of(Time size)方法来指定。时间间隔可以设置为毫秒、秒、分钟、小时或天。

下面的代码展示了如何定义事件时间和处理时间滚动窗口的传感器数据测量流:

val sensorData: DataStream[SensorReading] = ...
val avgTemp = sensorData.keyBy(_.id)// group readings in 1s event-time windows.window(TumblingEventTimeWindows.of(Time.seconds(1))).process(new TemperatureAverager)
val avgTemp = sensorData.keyBy(_.id)// group readings in 1s processing-time windows.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).process(new TemperatureAverager)

在我们的第一个DataStream API示例,“数据流上的操作”章节中,窗口定义看起来有点不同。在那里,我们使用timeWindow(size)方法定义了一个事件时间滚动窗口,这是window.(TumblingEventTimeWindows.of(size)) 或者 window.(TumblingProcessingTimeWindows.of(size))两个窗口定义的快捷方式,具体取决于配置的时间特性。下面的代码演示了如何使用这个快捷方式:

val avgTemp = sensorData
.keyBy(_.id)
// window.(TumblingEventTimeWindows.of(size))的快捷方式
.timeWindow(Time.seconds(1))
.process(new TemperatureAverager)

默认情况下,tumbling窗口与纪元(epoch)时间对齐,1970-01-01-00:00:00.000。例如,大小为1小时的分配程序将在00:00:00、01:00:00、02:00:00等时间定义窗口。或者,你可以指定偏移量作为分配程序中的第二个参数。下面的代码显示了偏移量为15分钟的窗口,偏移量分别从00:15:00、01:15:00、02:15:00开始,依次类推:

val avgTemp = sensorData
.keyBy(_.id)
// group readings in 1 hour windows with 15 min offset
.window(TumblingEventTimeWindows.of(Time.hours(1),Time.minutes(15)))
.process(new TemperatureAverager)

滑动窗口(SLIDING WINDOWS)

滑动窗口分配器将元素分配给固定大小的窗口,这些窗口按指定的滑动间隔移动,如图6-2所示。

 

对于滑动窗口,必须指定窗口大小和滑动间隔,以定义新窗口的滑动频率。当滑动间隔小于窗口大小时,窗口重叠,可以将元素分配给多个窗口。如果滑动间隔比窗口大小大,一些元素可能不会被分配到任何窗口,因此可能被删除。

下面的代码展示了如何将传感器读数分组到1小时大小的滑动窗口中,滑动间隔为15分钟。每个读数将被添加到四个窗口。DataStream API提供了事件时间和处理时间分配器,以及快捷方法,可以将时间间隔偏移量设置为窗口分配器的第三个参数:

// event-time sliding windows assigner
val slidingAvgTemp = sensorData.keyBy(_.id)// create 1h event-time windows every 15 minutes.window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(15))).process(new TemperatureAverager)
// processing-time sliding windows assigner
val slidingAvgTemp = sensorData.keyBy(_.id)// create 1h processing-time windows every 15 minutes.window(SlidingProcessingTimeWindows.of(Time.hours(1),Time.minutes(15))).process(new TemperatureAverager)
// sliding windows assigner using a shortcut method
val slidingAvgTemp = sensorData.keyBy(_.id)// shortcut for window.(SlidingEventTimeWindow.of(size,slide)).timeWindow(Time.hours(1), Time(minutes(15))).process(new TemperatureAverager)

会话窗口(SESSION WINDOWS)

会话窗口分配器将元素放入大小不同的活动的非重叠窗口中。会话窗口的边界由不活动的间隔定义,在这些间隔中没有接收到任何记录。图6-3说明了如何将元素分配给会话窗口。

 

以下示例演示如何将传感器读数分组到会话窗口,其中每个会话都定义为15分钟的不活动时间:

// event-time session windows assigner
val sessionWindows = sensorData.keyBy(_.id)// create event-time session windows with a 15 min gap.window(EventTimeSessionWindows.withGap(Time.minutes(15))).process(...)
// processing-time session windows assigner
val sessionWindows = sensorData.keyBy(_.id)// create processing-time session windows with a 15 min gap.window(ProcessingTimeSessionWindows.withGap(Time.minutes(15))).process(...)

由于会话窗口的开始和结束取决于接收到的元素,所以窗口分配器不能立即将所有元素分配到正确的窗口。相反,SessionWindows assigner最初将每个传入元素映射到它自己的窗口中,以元素的时间戳作为开始时间,会话间隔作为窗口大小。随后,它用重叠的范围合并所有窗口。

3.在窗口上应用函数

窗口函数定义了对窗口中的数据元素执行的计算逻辑。有两种类型的函数可以用于窗口函数:

  1. 增量聚合函数(Incremental aggregation functions):在元素被添加到窗口并保持和更新单个值为窗口状态时直接应用增量聚合函数。这些函数通常非常节省空间,并最终产生聚合值作为结果。ReduceFunction和AggregateFunction都是增量聚合函数。

  2. 全量窗口函数(Full window functions):收集窗口的所有元素,并在对所有收集的元素求值时遍历元素列表。全量窗口函数通常需要更多的内存,可以完成比增量聚合函数更复杂的逻辑。ProcessWindowFunction是一个全量窗口函数。

在本节中,我们将讨论可以应用于窗口的不同类型的函数,以便对窗口的内容执行聚合或任意计算。我们还展示了如何在窗口算子中关联使用增量聚合函数和全量窗口函数。

 

REDUCEFUNCTION

在讨论在key类型流上运行聚合时,在“KeyedStream转换”中引入了ReduceFunction。ReduceFunction接受相同类型的两个值,并将它们组合成相同类型的单个值。当应用于有窗口的流时,ReduceFunction增量地聚合分配给窗口的元素。窗口只存储聚合的当前结果—ReduceFunction的输入(和输出)类型的单个值。当接收到新元素时,使用新元素和从窗口状态读取的当前值调用ReduceFunction。窗口的状态由ReduceFunction的结果代替。

在窗口上应用ReduceFunction的优点是每个窗口的状态大小固定且占用空间小,而且函数接口简单。然而,ReduceFunction的应用程序是有限的,而且通常局限于简单的聚合,且输入和输出类型必须是相同的。

示例6-10显示了一个reduce lambda函数,该函数每15秒计算每个传感器的最小温度。

val minTempPerWindow: DataStream[(String, Double)] = sensorData.map(r => (r.id, r.temperature)).keyBy(_._1).timeWindow(Time.seconds(15)).reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))

AGGREGATEFUNCTION

与ReduceFunction类似,AggregateFunction也增量地应用于应用于窗口的元素。此外,具有AggregateFunction的窗口算子的状态也由一个值组成。

虽然AggregateFunction的接口更加灵活,但是与ReduceFunction的接口相比,它的实现也更加复杂。下面的代码展示了AggregateFunction的接口:

public interface AggregateFunction<IN, ACC, OUT> extends  Function, Serializable {// create a new accumulator to start a new aggregate.ACC createAccumulator();// add an input element to the accumulator and return the accumulator.ACC add(IN value, ACC accumulator);// compute the result from the accumulator and return it.OUT getResult(ACC accumulator);// merge two accumulators and return the result.ACC merge(ACC a, ACC b);
}

该接口定义输入类型IN、一个类型为ACC的累加器和结果类型OUT。与ReduceFunction相反,中间数据类型和输出类型不依赖于输入类型。

示例6-11展示了如何使用聚合函数来计算每个窗口的传感器读数的平均温度。累加器维护前面计数的和,以及计数,getResult()方法计算平均值。

val avgTempPerWindow: DataStream[(String, Double)] = sensorData
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(15))
.aggregate(new AvgTempFunction)
// An AggregateFunction to compute the average tempeature per sensor.
// The accumulator holds the sum of temperatures and an event count.
class AvgTempFunction extends AggregateFunction
[(String, Double), (String, Double, Int), (String, Double)] {override def createAccumulator() = {("", 0.0, 0)}override def add(in: (String, Double), acc: (String, Double,Int)) = {(in._1, in._2 + acc._2, 1 + acc._3)}override def getResult(acc: (String, Double, Int)) = {(acc._1, acc._2 / acc._3)}override def merge(acc1: (String, Double, Int), acc2:(String, Double, Int)) = {(acc1._1, acc1._2 + acc2._2, acc1._3 + acc2._3)}
}

PROCESSWINDOWFUNCTION

ReduceFunction和AggregateFunction用于窗口增量更新。然而,有时我们需要访问窗口的所有元素来执行更复杂的计算,例如计算窗口中值的中位数或最频繁出现的值。对于这样的应用程序,ReduceFunction和AggregateFunction都不合适。Flink的DataStream API提供了ProcessWindowFunction来对窗口的内容执行任意计算。

注意

Flink1.7的DataStream API具有WindowFunction接口。WindowFunction已经被ProcessWindowFunction所取代,这里不再讨论它。

下面的代码显示了ProcessWindowFunction的接口:

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
extends AbstractRichFunction {// Evaluates the windowvoid process(KEY key,Context ctx,Iterable<IN> vals,Collector<OUT> out)throws Exception;// Deletes any custom per-window state when the window is purgedpublic void clear(Context ctx) throws Exception {}// The context holding window metadatapublic abstract class Context implements Serializable {// Returns the metadata of the windowpublic abstract W window();// Returns the current processing timepublic abstract long currentProcessingTime();// Returns the current event-time watermarkpublic abstract long currentWatermark();// State accessor for per-window statepublic abstract KeyedStateStore windowState();// State accessor for per-key global statepublic abstract KeyedStateStore globalState();// Emits a record to the side output identified by the OutputTag.public abstract <X> void output(OutputTag<X> outputTag, X value);}
}

process()方法中使用了窗口key,使用迭代器访问窗口的元素,使用收集器输出结果。此外,该方法具有与其他处理方法类似的context参数。ProcessWindowFunction的context对象允许访问窗口的元数据、当前处理时间和水位线、用于管理每个窗口和每个key的全局状态的状态存储,以及用于输出记录的侧输出。

在介绍Process函数时,我们已经讨论了context对象的一些特性,比如对当前处理的访问以及事件时间和侧输出流。然而,ProcessWindowFunction的context对象也提供了独特的特性。窗口的元数据通常包含可以用作窗口标识符的信息,例如时间窗口的开始和结束时间戳。

另一个特性是每个窗口和每个key的全局状态。全局状态是指不局限于任何窗口的key状态,而Per-window状态是指当前正在计算的窗口实例。每个窗口的状态有助于维护在同一窗口上的process()方法的多个调用之间共享的信息,这些调用可能由于配置允许的延迟或使用自定义触发器而发生的。利用每个窗口状态的ProcessWindowFunction需要实现它的clear()方法,用于清除当前窗口之前的特定窗口状态。全局状态可用于在同一key上的多个窗口之间共享信息。

示例6-12将传感器读取流分组为5秒的tumbling窗口,并使用ProcessWindowFunction计算窗口内发生的最低和最高温度。它为每个窗口输出一条记录,包括窗口的开始和结束时间戳以及最小和最大温度。

// output the lowest and highest temperature reading every 5 seconds
val minMaxTempPerWindow: DataStream[MinMaxTemp] = sensorData.keyBy(_.id).timeWindow(Time.seconds(5)).process(new HighAndLowTempProcessFunction)
case class MinMaxTemp(id: String, min: Double, max:Double,endTs: Long)
/**
* A ProcessWindowFunction that computes the lowest and highest temperature
* reading per window and emits them together with the end timestamp of the window.
*/
class HighAndLowTempProcessFunction
extends ProcessWindowFunction[SensorReading, MinMaxTemp,String, TimeWindow] {override def process(key: String,ctx: Context,vals: Iterable[SensorReading],out: Collector[MinMaxTemp]): Unit = {val temps = vals.map(_.temperature)val windowEnd = ctx.window.getEndout.collect(MinMaxTemp(key, temps.min, temps.max,windowEnd))}
}

在内部,由ProcessWindowFunction计算的窗口将所有分配的事件存储在一个ListState中。通过收集所有事件并提供对窗口元数据和其他特性的访问,ProcessWindowFunction可以处理比ReduceFunction或AggregateFunction更多的用例。但是,收集所有事件的窗口的状态可能比元素增量聚合窗口的状态大得多。

 

INCREMENTAL AGGREGATION 和 PROCESSWINDOWFUNCTION

ProcessWindowFunction是一个非常强大的窗口函数,但是需要谨慎地使用它,因为它通常比增量聚合函数保存更多的状态数据。通常,需要应用于窗口的逻辑可以表示为增量聚合,但也需要访问窗口元数据或状态。

如果你具有增量聚合逻辑,同时需要访问窗口元数据,则可以将执行增量聚合的ReduceFunction或AggregateFunction与提供对更多功能访问的ProcessWindowFunction组合使用。分配给窗口的元素将被立即聚合,当窗口的触发器触发时,聚合的结果将被传递给ProcessWindowFunction。process()方法的可迭代参数将只提供单个值,即增量聚合的结果。

在DataStream API中,这是通过将ProcessWindowFunction作为reduce()或aggregate()方法的第二个参数来实现的,如下面的代码所示:

input
.keyBy(...)
.timeWindow(...)
.reduce(incrAggregator: ReduceFunction[IN],function: ProcessWindowFunction[IN, OUT, K, W])input
.keyBy(...)
.timeWindow(...)
.aggregate(incrAggregator: AggregateFunction[IN, ACC, V],windowFunction: ProcessWindowFunction[V, OUT, K, W])

例子6-13和6-14中的代码展示了如何解决相同的用例(代码例子6-12),联合ReduceFunction和ProcessWindowFunction,每5秒输出最小和最大温度传感器和每个窗口的结束时间戳。

case class MinMaxTemp(id: String, min: Double, max:Double,endTs: Long)
val minMaxTempPerWindow2: DataStream[MinMaxTemp] = sensorData
.map(r => (r.id, r.temperature, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(5))
.reduce(// incrementally compute min and max temperature(r1: (String, Double, Double), r2: (String, Double,Double)) => {(r1._1, r1._2.min(r2._2), r1._3.max(r2._3))},// finalize result in ProcessWindowFunctionnew AssignWindowEndProcessFunction()
)

如示例6-13所示,ReduceFunction和ProcessWindowFunction都是在reduce()方法调用中定义的。由于聚合是由ReduceFunction执行的,ProcessWindowFunction只需要将窗口结束时间戳附加到增量计算的结果中,如示例6-14所示。

class AssignWindowEndProcessFunction extends ProcessWindowFunction[(String, Double, Double), MinMaxTemp,String, TimeWindow] {override def process(key: String,ctx: Context,minMaxIt: Iterable[(String, Double, Double)],out: Collector[MinMaxTemp]): Unit = {val minMax = minMaxIt.headval windowEnd = ctx.window.getEndout.collect(MinMaxTemp(key, minMax._2, minMax._3,windowEnd))}
}

4.自定义窗口算子 

使用Flink的内置窗口分配器定义的窗口算子可以处理许多常见的用例。然而,当你开始编写更高级的流处理应用程序时,你可能会发现自己需要实现更复杂的窗口逻辑,比如在处理迟到的元素时,输出早期结果并更新其结果的窗口,或者当收到特定记录时开始和结束的窗口。

DataStream API提供了定义自定义窗口算子的接口和方法,允许你实现自己的分配器、触发器和回收器。通过使用前面所学的窗口函数以及这些组件,对窗口中的元素进行分组和处理。

当元素到达窗口算子时,它被传递给WindowAssigner。WindowAssigner确定元素需要路由到哪个窗口。如果该窗口还不存在,则创建新窗口。

如果窗口算子配置了增量聚合函数,例如ReduceFunction或AggregateFunction,那么新添加的元素将立即被聚合,结果将作为窗口的中间状态存储。如果窗口算子没有增量聚合函数,则将新元素附加到包含所有分配元素的ListState。

每当一个元素被添加到一个窗口时,同时会被传递给窗口的触发器。触发器决定何时触发窗口计算,以及什么时候清除窗口及其内容。触发器可以根据分配的元素或注册的定时器(类似于处理函数)来决定在特定时间点计算或清除其窗口的内容。

触发器触发时的处理操作取决于窗口算子使用的函数类型。如果仅使用增量聚合函数使用的算子,则输出当前聚合结果。这种情况如图6-4所示。

 

如果算子只有一个完整的窗口函数,则该函数将应用于窗口的所有元素,其结果将直接输出,如图6-5所示。

 

最后,如果算子具有增量聚合函数和全量窗口函数,则将全量窗口函数应用于聚合值并输出结果。图6-6描述了这种情况。

 

回收器(evictor)是一个可选组件,可以在调用ProcessWindowFunction之前或之后注入。回收器可以从窗口的内容中删除收集到的元素。因为它必须遍历所有元素,所以只能在没有指定增量聚合函数的情况下使用它。

下面的代码展示了如何定义一个窗口算子与自定义触发器和回收器:

stream
.keyBy(...)
.window(...) // specify the window assigner
[.trigger(...)] // optional: specify the trigger
[.evictor(...)] // optional: specify the evictor
.reduce/aggregate/process(...) // specify the window function

虽然回收器是可选的组件,但是每个窗口算子都需要一个触发器来决定何时计算其窗口。为了提供简洁的窗口算子API,每个WindowAssigner都有一个默认触发器,除非使用自定义触发器,否则将使用该触发器。

请注意,自定义的触发器将覆盖现有的触发器,而不是对其进行补充,窗口将仅根据上次定义的触发器进行计算。

在下面的部分中,我们将讨论windows的生命周期,并介绍定义自定义窗口分配器、触发器和回收器的接口。

窗口的生命周期

窗口算子在处理传入流数据时会创建窗口、删除窗口。如前所述,元素由WindowAssigner分配给窗口,触发器决定何时计算窗口,窗口函数执行实际的窗口计算。在本节中,我们将讨论窗口的生命周期—何时创建它、它包含哪些信息以及何时删除它。

当WindowAssigner将第一个元素分配给窗口时,将创建一个窗口。因此,一个窗口至少有一个元素。一个窗口由以下不同的状态组成:

窗口内容

如果窗口算子具有ReduceFunction或AggregateFunction,则窗口内容包含分配给窗口的元素或增量聚合的结果。

窗口对象

WindowAssigner返回零个、一个或多个窗口对象。窗口算子根据返回的对象对元素进行分组。因此,窗口对象保存用来区分窗口的信息。每个窗口对象都有一个结束timestamp,它定义了窗口及其状态可以删除的时间点。

触发器定时器

触发器可以注册定时器,以便在特定时间点调用,例如,计算窗口或清除其内容。这些定时器由窗口算子维护。

触发器中自定义的状态

触发器可以定义和使用自定义每个窗口和每个key的状态。此状态完全由触发器控制,而不是由窗口算子维护。

当窗口的结束时间(由窗口对象的结束时间戳定义)达到时,窗口算子将删除窗口。是在处理时间发生这种情况,还是在事件时间发生这种情况取决于WindowAssigner.isEventTime()方法返回的值。

当窗口被删除时,窗口算子自动清除窗口内容并丢弃窗口对象。不清除自定义触发器状态和注册的触发器定时器,因为此状态对窗口算子不透明。因此,触发器必须在trigger .clear()方法中清除所有状态,以防止状态泄漏。

窗口分配器

WindowAssigner决定将每个到达的元素分配给哪个窗口。元素可以添加到零、一个或多个窗口。下面展示的是windowAssigner接口:

public abstract class WindowAssigner<T, W extends Window> implements Serializable {// Returns a collection of windows to which the element is assignedpublic abstract Collection<W> assignWindows(T element,long timestamp,WindowAssignerContext context);// Returns the default Trigger of the WindowAssignerpublic abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);// Returns the TypeSerializer for the windows of this WindowAssignerpublic abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);// Indicates whether this assigner creates event-time windowspublic abstract boolean isEventTime();// A context that gives access to the current processing timepublic abstract static class WindowAssignerContext {// Returns the current processing timepublic abstract long getCurrentProcessingTime();}
}

“WindowAssigner”的类型取决于输入元素的类型和元素被分配到的窗口的类型。它还需要提供一个默认触发器,如果没有指定其他触发器,则使用该触发器。例6-15中的代码为30秒滚动事件时间窗口创建了一个自定义的分配程序。

/** A custom window that groups events into 30-second tumbling
windows. */
class ThirtySecondsWindows extends WindowAssigner[Object, TimeWindow] {val windowSize: Long = 30 * 1000Loverride def assignWindows(o: Object,ts: Long,ctx: WindowAssigner.WindowAssignerContext):java.util.List[TimeWindow] = {// rounding down by 30 secondsval startTime = ts - (ts % windowSize)val endTime = startTime + windowSize// emitting the corresponding time windowCollections.singletonList(new TimeWindow(startTime,endTime))}override def getDefaultTrigger(env: environment.StreamExecutionEnvironment):Trigger[Object, TimeWindow] = {EventTimeTrigger.create()}override def getWindowSerializer(executionConfig: ExecutionConfig):TypeSerializer[TimeWindow] = {new TimeWindow.Serializer}override def isEventTime = true
}

全局窗口分配器

GlobalWindows分配程序将所有元素映射到同一个全局窗口。它的默认触发器是NeverTrigger,顾名思义,永不触发。因此,GlobalWindows assigner需要一个自定义触发器和一个回收器来有选择地从窗口状态删除元素。GlobalWindows的结束时间戳是Long.MAX_VALUE。因此,GlobalWindows永远不会被彻底清除。当应用于key空间不断变化的KeyedStream时,GlobalWindows将为每个key维护一些状态。所以请谨慎使用。

除了WindowAssigner接口之外,还有扩展了WindowAssigner的MergingWindowAssigner接口。MergingWindowAssigner用于需要合并现有窗口的窗口算子。我们前面讨论过的EventTimeSessionWindows assigner就是这种分配器的一个例子,它的工作方式是为每个到达的元素创建一个新窗口,然后合并重叠的窗口。

在合并窗口时,需要确保所有合并的窗口及其触发器的状态也已适当合并。触发器接口提供了一个回调方法,该方法在合并窗口时,以及合并与窗口关联的状态时调用。下一节将更详细地讨论窗口合并。

TRIGGERS(触发器)

触发器定义何时计算窗口并输出窗口的结果。触发器可以根据特定于时间或特定数据条件(如元素计数或某些接收到的元素值)中的处理情况决定是否触发。例如,当处理时间或水位线超过窗口结束边界的时间戳时,将触发前面讨论的时间窗口的默认触发器。

触发器可以获取时间属性和定时器,并且可以使用状态。因此,它们与Process函数一样强大。例如,你可以实现触发逻辑,当窗口接收到一定数量的元素时触发,当一个具有特定值的元素被添加到窗口时触发,或者在检测添加元素的模式之后触发,比如“5秒内发生两起相同类型的事件”。自定义触发器还可以用于计算和输出事件时间窗口的早期结果,在水位线到达窗口的结束时间戳之前。这是产生低延迟结果的常见策略,尽管使用了保守的水位线策略。

每次调用触发器时,它都会生成一个TriggerResult来确定应该对窗口执行什么操作。TriggerResult可以取以下值之一:

CONTINUE(跳过

不触发任何操作。

FIRE(触发)

如果窗口算子具有ProcessWindowFunction,则调用该函数并输出结果。如果窗口只有一个增量聚合函数(ReduceFunction或AggregateFunction),则输出当前聚合结果。窗口的状态没有改变。

PURGE(清除)

窗口的内容被完全丢弃,包含所有元数据的窗口被删除。另外,调用ProcessWindowFunction.clear()方法来清除每个窗口的所有自定义状态。

FIRE_AND_PURGE(触发并清除

FIRE_AND_PURGE:首先计算窗口(FIRE),然后删除所有状态和元数据(PURGE)。

可能的TriggerResult值使你能够实现复杂的窗口逻辑。自定义触发器可以触发多次,计算新的或更新的结果,或在满足特定条件时清除窗口而不输出结果。下一个代码块显示了触发器API:

public abstract class Trigger<T, W extends Window> implements Serializable {// Called for every element that gets added to a windowTriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);// Called when a processing-time timer firespublic abstract TriggerResult onProcessingTime(long timestamp,W window,TriggerContext ctx);// Called when an event-time timer firespublic abstract TriggerResult onEventTime(long timestamp, W window, TriggerContext ctx);// Returns true if this trigger supports merging of trigger statepublic boolean canMerge();// Called when several windows have been merged into one window// and the state of the triggers needs to be mergedpublic void onMerge(W window, OnMergeContext ctx);// Clears any state that the trigger might hold for the given window// This method is called when a window is purgedpublic abstract void clear(W window, TriggerContext ctx);
}
// A context object that is given to Trigger methods to allow them
// to register timer callbacks and deal with state
public interface TriggerContext {// Returns the current processing timelong getCurrentProcessingTime();// Returns the current watermark timelong getCurrentWatermark();// Registers a processing-time timervoid registerProcessingTimeTimer(long time);// Registers an event-time timervoid registerEventTimeTimer(long time);// Deletes a processing-time timervoid deleteProcessingTimeTimer(long time);// Deletes an event-time timervoid deleteEventTimeTimer(long time);// Retrieves a state object that is scoped to the window and the key of the trigger<S extends State> S getPartitionedState(StateDescriptor<S,?> stateDescriptor);
}
// Extension of TriggerContext that is given to the Trigger.onMerge() method
public interface OnMergeContext extends TriggerContext {// Merges per-window state of the trigger// The state to be merged must support mergingvoid mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
}

如你所见,触发器API可以通过提供对时间和状态的访问来实现复杂的逻辑。触发器有两个方面需要特别注意:清理状态以及合并触发器。

在触发器中使用每个窗口的状态时,需要确保在删除窗口时正确删除了该状态。否则,窗口算子会随着时间积累越来越多的状态,你的应用程序可能会在某个时候失败。为了在删除窗口时清除所有状态,触发器的clear()方法需要删除每个窗口的所有自定义状态,并使用TriggerContext对象删除所有处理时间和事件时间计时器。无法在定时器回调方法中清理状态,因为在删除窗口后不会调用这些方法。

如果触发器与MergingWindowAssigner一起应用,则需要能够在合并两个窗口时处理这种情况。在这种情况下,还需要合并触发器的任何自定义状态。canMerge()声明触发器是否支持合并,而onMerge()方法需要实现执行合并的逻辑。如果触发器不支持合并,则不能将其与MergingWindowAssigner组合使用。

在合并触发器时,必须将所有自定义状态的描述符提供给OnMergeContext对象的mergePartitionedState()方法。

注意

请注意,可合并触发器只能使用可以自动合并的状态原语——ListState, ReduceState, 或AggregatingState.

示例6-16显示了一个在到达窗口结束时间之前提前触发的触发器。当第一个事件被分配给一个窗口时,触发器注册一个定时器,比当前水位线早1秒。当定时器触发时,将注册一个新计时器。因此,触发器最多每秒触发一次。

/** A trigger that fires early. The trigger fires at most every second. */
class OneSecondIntervalTriggerextends Trigger[SensorReading, TimeWindow] {
override def onElement(r: SensorReading,timestamp: Long,window: TimeWindow,ctx: Trigger.TriggerContext): TriggerResult = {// firstSeen will be false if not set yetval firstSeen: ValueState[Boolean] =
ctx.getPartitionedState(new ValueStateDescriptor[Boolean]("firstSeen",classOf[Boolean]))// register initial timer only for first elementif (!firstSeen.value()) {// compute time for next early firing by rounding watermark to secondval t = ctx.getCurrentWatermark + (1000 -(ctx.getCurrentWatermark % 1000))ctx.registerEventTimeTimer(t)// register timer for the window endctx.registerEventTimeTimer(window.getEnd)firstSeen.update(true)}// Continue. Do not evaluate per elementTriggerResult.CONTINUE
}
override def onEventTime(timestamp: Long,window: TimeWindow,ctx: Trigger.TriggerContext): TriggerResult = {if (timestamp == window.getEnd) {// final evaluation and purge window stateTriggerResult.FIRE_AND_PURGE} else {// register next early firing timerval t = ctx.getCurrentWatermark + (1000 -(ctx.getCurrentWatermark % 1000))if (t < window.getEnd) {ctx.registerEventTimeTimer(t)}// fire trigger to evaluate windowTriggerResult.FIRE}
}
override def onProcessingTime(timestamp: Long,window: TimeWindow,ctx: Trigger.TriggerContext): TriggerResult = {// Continue. We don't use processing time timersTriggerResult.CONTINUE
}
override def clear(window: TimeWindow,ctx: Trigger.TriggerContext): Unit = {// clear trigger stateval firstSeen: ValueState[Boolean] =
ctx.getPartitionedState(new ValueStateDescriptor[Boolean]("firstSeen",classOf[Boolean]))firstSeen.clear()}
}

请注意,触发器使用自定义状态,该状态使用clear()方法清除。因为我们使用的是一个简单的不可合并的ValueState,所以触发器是不可合并的。

EVICTORS

在Flink的窗口机制中,Evictor(驱逐器)是一个可选组件。它可以在窗口函数执行之前或之后从窗口中删除元素。

示例6-17显示了Evictor接口。

public interface Evictor<T, W extends Window> extends Serializable {// Optionally evicts elements. Called before windowing function.void evictBefore(Iterable<TimestampedValue<T>> elements,int size,W window,EvictorContext evictorContext);// Optionally evicts elements. Called after windowing function.void evictAfter(Iterable<TimestampedValue<T>> elements,int size,W window,EvictorContext evictorContext);
}
// A context object that is given to Evictor methods.
interface EvictorContext {// Returns the current processing time.long getCurrentProcessingTime();// Returns the current event time watermark.long getCurrentWatermark();
}

在将窗口函数应用于窗口内容之前和之后分别调用evictBefore()和evictAfter()方法。这两个方法都有一个Iterable参数(服务于添加到窗口的所有元素)、窗口中的元素数量(大小)参数、窗口对象和一个EvictorContext参数。通过调用可从Iterable获得的Iterator对象上的remove()方法,可以从窗口中删除元素。

PREAGGREGATION和EVICTORS

 Evictors遍历窗口中的元素列表。只有当窗口收集所有添加的事件而不应用ReduceFunction或AggregateFunction以增量聚合窗口内容时,才能使用它们。

Evictors通常应用在GlobalWindow上,用于对窗口进行部分清理而不清除整个窗口状态。

四、基于时间的多流关联

使用流时,一个常见需求是connect或join两个流中的事件。Flink的DataStream API提供了两个内置的算子来join具有时间条件的流:间隔连接和窗口连接(interval join 和 window join)。在本节中,我们将介绍这两种算子。

如果无法使用Flink的内置连接算子来表达所需的连接语义,则可以使用CoProcessFunction、BroadcastProcessFunction或KeyedBroadcastProcessFunction实现自定义连接逻辑。

注意,你应该使用有效的状态访问模式和有效的状态清理策略来设计这样的算子。

1.基于间隔的关联(Interval Join)

interval join连接来自两个具有公共key的流的事件,这两个流之间的时间戳间隔不超过指定的时间间隔。

图6-7显示了两个流的interval join:A和B,如果B事件的时间戳早于B事件的时间戳不少于一个小时,并且晚于B事件的时间戳不超过15分钟,则将A事件和B事件进行Join。join interval是对称的,即来自B的一个事件与来自A的所有事件连接,这些事件都比B事件早不超过15分钟,最多晚一个小时。

interval join目前只支持事件时间并使用内部连接语义进行操作(没有匹配事件的事件将不会被转发)。interval join的定义如示例6-18所示。

input1
.keyBy(…)
.between(<lower-bound>, <upper-bound>) // bounds with respect to input1
.process(ProcessJoinFunction) // process pairs of matched events

join事件双方都被传递到ProcessJoinFunction中。下界和上界定义为负的和正的时间间隔,例如,between(Time.hour(-1), Time.minute(15)).。只要下界小于上界,就可以任意选择上界和下界;可以将所有A事件与所有B事件连接起来,只要B事件的时间戳比A事件多一到两个小时。

interval join需要缓冲来自一个或两个输入的记录。对于第一个输入,所有具有大于当前水位线的时间戳(即上限)的记录都将被缓冲。对于第二个输入,所有时间戳大于当前水位线+下界的记录将被缓冲。注意,两个界限都可能是负的。图6-7存储了流A所有加入时间戳大于当前watermark-15分钟的记录,和流B中所有时间戳大于当前watermark---1小时的记录。你应该意识到,interval join的存储需求可能会大大增加,如果两个输入流的事件时间不同步,因为水位线是由“slower”流确定。

2.基于窗口的关联

顾名思义,window join基于Flink的窗口机制。两个输入流的元素都被分配到公共窗口,并在窗口完成时联接(或分组)。

input1.join(input2)
.where(...) // specify key attributes for input1
.equalTo(...) // specify key attributes for input2
.window(...) // specify the WindowAssigner
[.trigger(...)] // optional: specify a Trigger
[.evictor(...)] // optional: specify an Evictor
.apply(...) // specify the JoinFunction

两个输入流都根据它们的key作为key流类型,公共窗口分配器将这两个流的事件映射到公共窗口,这意味着窗口将存储这两个输入的事件。当一个窗口的定时器触发时,对来自第一个和第二个输入的元素的每个组合调用JoinFunction(向量叉积Cross Product)。还可以指定自定义触发器和收回器。由于这两个流的事件被映射到相同的窗口,因此触发器和回收器的行为与常规窗口算子完全相同。

除了连接两个流之外,还可以使用cogroup()而不是join()来进行算子定义,从而将两个流联合到一个窗口上。总体逻辑是相同的,但不是对两个输入的每一对事件调用一个JoinFunction,而是使用来自两个输入的元素的迭代器在每个窗口调用一次CoGroupFunction。

需要注意的是,连接窗口化的流可能会产生意想不到的语义。例如,假设你使用配置了1小时滚动窗口的连接算子连接两个流。第一个输入的元素不会与第二个输入的元素连接,即使它们之间相差只有1秒,但是它们被分配到两个不同的窗口。

五、处理迟到数据 

如前所述,可以使用水位线来平衡结果的完整性和结果延迟。除非你选择一种非常保守的水位线策略,以确保所有相关记录都以高延迟为代价,否则你的应用程序很可能必须处理迟到的数据元素。

迟到元素:当它到达一个算子时,为其提供的计算已经被执行。在事件时间窗口算子的context中,如果事件到达算子,并且窗口分配器将其映射到已经计算过的窗口,则该事件将延迟,因为算子的水位线传递了窗口的结束时间戳。

DataStream API提供了处理延迟事件的不同选项:

  • 延迟事件可以简单地删除。

  • 延迟事件可以重定向到单独的流。

  • 计算结果可以根据延迟事件进行更新,并且必须输出更新。

在下面,我们将详细讨论这些选项,并展示如何将它们应用于处理函数和窗口算子。

1.丢弃迟到事件 

处理延迟事件的最简单方法是简单地丢弃它们。删除延迟事件是事件时间窗口算子的默认行为。因此,迟到的元素不会创建新窗口。

通过比较事件的时间戳和当前水位线,处理函数可以很容易地过滤掉延迟事件。

2.重定向迟到事件 

延迟事件还可以使用侧输出流特性重定向到另一个DataStream,可以使用常规的接收函数处理或发出延迟事件。根据业务需求,后期数据稍后可以通过定期的回填过程集成到流应用程序的结果中。示例6-20展示了如何为延迟事件指定带有侧输出流的窗口算子。

val readings: DataStream[SensorReading] = ???
val countPer10Secs: DataStream[(String, Long, Int)] = readings
.keyBy(_.id)
.timeWindow(Time.seconds(10))
// emit late readings to a side output
.sideOutputLateData(new OutputTag[SensorReading]("latereadings"))
// count readings per window
.process(new CountFunction())
// retrieve the late events from the side output as a stream
val lateStream: DataStream[SensorReading] = countPer10Secs
.getSideOutput(new OutputTag[SensorReading]("late-readings"))

Process函数可以通过比较事件时间戳和当前水位线并使用常规的侧输出流API进行输出从而识别延迟事件。示例6-21显示了一个ProcessFunction,它过滤来自其输入的延迟传感器读数,并将它们重定向到侧输出流。

val readings: DataStream[SensorReading] = ???
val filteredReadings: DataStream[SensorReading] = readings
.process(new LateReadingsFilter)
// retrieve late readings
val lateReadings: DataStream[SensorReading] = filteredReadings
.getSideOutput(new OutputTag[SensorReading]("late-readings"))
/** A ProcessFunction that filters out late sensor readings and
* re-directs them to a side output */
class LateReadingsFilter extends ProcessFunction[SensorReading, SensorReading] {val lateReadingsOut = new OutputTag[SensorReading]("latereadings")override def processElement(r: SensorReading,ctx: ProcessFunction[SensorReading,SensorReading]#Context,out: Collector[SensorReading]): Unit = {// compare record timestamp with current watermarkif (r.timestamp < ctx.timerService().currentWatermark()) {// this is a late reading => redirect it to the side outputctx.output(lateReadingsOut, r)} else {out.collect(r)}}
}

3.基于迟到事件更新结果 

延迟事件在它们应该完成的计算之后到达算子。因此,算子输出的结果是不完整或不准确的。另一种策略是重新计算不完整的结果并输出更新,而不是删除或重定向延迟事件。但是,为了能够重新计算和更新结果,需要考虑一些问题。

支持重新计算和更新已输出结果的算子需要在发出第一个结果后保留计算所需的所有状态。但是,由于算子通常不可能永远保留所有状态,所以需要在某个时候清除状态。一旦清除了某个结果的状态,就不能再更新该结果,只能删除或重定向延迟事件。

除了保持状态外,下游算子或跟随算子的外部系统还需要能够处理这些更新。例如,将结果和key值窗口算子的更新写入key值存储的接收器算子可以通过使用upsert写操作用最新更新结果覆盖不准确的结果来实现这一点。对于某些用例,可能还需要区分第一个结果和由于延迟事件而导致的更新。

窗口算子API提供了一个方法来显式声明你期望的迟到元素。在使用事件时间窗口时,可以指定允许迟到的时间。允许迟到的窗口算子不会在水位线通过窗口的结束时间戳后删除窗口及其状态。相反,算子将继续维护包括迟到时间段内的完整窗口。当一个迟到元素在允许的迟到周期内到达时,它就像一个正常到达的元素一样被处理并传递给触发器。当水位线通过窗口的结束时间戳和延迟间隔时,窗口最终被删除,随后的所有迟到元素被丢弃。

允许的延迟可以使用allowedLateness()方法指定,如示例6-22所示。

val readings: DataStream[SensorReading] = ???
val countPer10Secs: DataStream[(String, Long, Int, String)] =
readings
.keyBy(_.id)
.timeWindow(Time.seconds(10))
// process late readings for 5 additional seconds
.allowedLateness(Time.seconds(5))
// count readings and update results if late readings arrive
.process(new UpdatingWindowCountFunction)
/** A counting WindowProcessFunction that distinguishes between
* first results and updates. */
class UpdatingWindowCountFunction extends ProcessWindowFunction[
SensorReading, (String, Long, Int, String), String,
TimeWindow] {
override def process(
id: String,
ctx: Context,
elements: Iterable[SensorReading],
out: Collector[(String, Long, Int, String)]): Unit = {// count the number of readingsval cnt = elements.count(_ => true)// state to check if this is the first evaluation of the window or notval isUpdate = ctx.windowState.getState(new ValueStateDescriptor[Boolean]("isUpdate",Types.of[Boolean]))if (!isUpdate.value()) {// first evaluation, emit first resultout.collect((id, ctx.window.getEnd, cnt, "first"))isUpdate.update(true)} else {// not the first evaluation, emit an updateout.collect((id, ctx.window.getEnd, cnt, "update"))}}
}

还可以实现ProcessFunction来支持迟到数据。由于状态管理始终是自定义的,并且是在处理函数中手动完成的,所以Flink不提供支持迟到数据的内置API。相反,你可以使用记录时间戳、水位线和计时器的这些要素来实现必要的逻辑。

❤总结

在本章中,学习了如何实现定时运行的流应用程序。我们解释了如何配置流应用程序的时间特性,以及如何分配时间戳和水位线。了解了基于时间的算子,包括Flink的处理函数、内置窗口和自定义窗口。我们还讨论了水位线的语义、如何权衡结果的完整性和结果的延迟以及处理延迟事件的策略。


http://chatgpt.dhexx.cn/article/onC05tLw.shtml

相关文章

大数据Hadoop之——Flink DataStream API 和 DataSet API

文章目录 一、DataStream API概述二、什么是DataStream &#xff1f;三、DataStream 数据处理过程1&#xff09;Data Sources&#xff08;数据源&#xff09;1、Data Sources 原理2、Data Sources 实现方式1&#xff09;基于文件2&#xff09;基于套接字3&#xff09;基于集合4…

数据挖掘常用算法整理

前言&#xff1a; 找工作时&#xff08;IT行业&#xff09;&#xff0c;除了常见的软件开发以外&#xff0c;机器学习岗位也可以当作是一个选择&#xff0c;不少计算机方向的研究生都会接触这个&#xff0c;如果你的研究方向是机器学习/数据挖掘之类&#xff0c;且又对其非常感…

C++ 26 常用算法

目录 一、概述 1.1 常用遍历算法 1.1.1 算法简介 1.1.2 for_each遍历算法 1.1.3 transform遍历算法 1.2 常用查找算法 1.2.1 算法简介 1.2.2 find 查找算法 1.2.3 find_if 查找算法 1.2.4 adjacent_find 查找算法 1.2.5 binary_search 查找算法 1.2.6 count 查找算法…

干货 | 轻松看懂机器学习十大常用算法

通过本篇文章可以对ML的常用算法有个常识性的认识&#xff0c;没有代码&#xff0c;没有复杂的理论推导&#xff0c;就是图解一下&#xff0c;知道这些算法是什么&#xff0c;它们是怎么应用的&#xff0c;例子主要是分类问题。 每个算法都看了好几个视频&#xff0c;挑出讲的最…

常用十种算法

目录 1、二分查找算法&#xff08;非递归&#xff09;二分查找算法(非递归)介绍二分查找算法(非递归)代码实现 2、分治算法分治算法介绍分治算法的基本步骤分治算法最佳实践汉诺塔 3、动态规划动态规划算法介绍应用场景——背包问题 4、KMP算法*next数组生成KMP 算法介绍应用场…

Java常用算法

Java常用算法 一、二分查找算法&#xff08;非递归&#xff09; 1、介绍 ​ 二分查找法只适用于从有序的数列中进行查找(比如数字和字母等)&#xff0c;将数列排序后再进行查找。 ​ 二分查找法的运行时间为对数时间O(㏒₂n) &#xff0c;即查找到需要的目标位置最多只需要㏒…

五大常用算法学习笔记

一。分治算法&#xff1a;快速排序、归并排序、大整数乘法、二分查找、递归&#xff08;汉诺塔&#xff09; 基本概念&#xff1a;把一个复杂的问题分成若干个相同或相似的子问题&#xff0c;再把子问题分成更小的子问题… &#xff0c; 知道最后子问题可以简单的直接求解&…

常见算法

首页 论坛 新闻 文章 下载 源码 网友作品 合作开发 招聘 刻盘服务 编程爱好者光盘 请登陆或者注册新用户 用户名 密 码 记住密码 注册新用户 忘记密码了 您所在位置&#xff1a;编程爱好者论坛 — C/C语言讨论区 — 我见到过的一些常用算法 C/C语言讨论区&#xff1a;…

最常用算法汇总(一)

一、贪心算法 贪心算法&#xff08;又称贪婪算法&#xff09;是指&#xff0c;在对问题求解时&#xff0c;总是做出在当前看来是最好的选择。也就是说&#xff0c;不从整体最优上加以考虑&#xff0c;他所做出的仅是在某种意义上的局部最优解。贪心算法不是对所有问题都能得到整…

十大常用算法

前言 最近在研究一些经常用到的东西想把它们做一个汇总&#xff0c;想了想用到最多的应该是排序算法&#xff0c;所以对排序算法做了个总结&#xff0c;并自己用C实现了一下。 一、算法概述 0.1 算法分类 十种常见排序算法可以分为两大类&#xff1a; 非线性时间比较类排序…

常用十大算法

这里的讲解图主要使用的是尚硅谷韩顺平老师的图&#xff0c;请周知。 目录 二分查找&#xff08;非递归&#xff09; 分治算法 动态规划算法 KMP算法 贪心算法 普利姆&#xff08;Prim&#xff09;算法 克鲁斯卡尔&#xff08;Kruskal&#xff09;算法 迪杰斯特拉&a…

常见十大算法

优劣术语 - 稳定性 原本a在b前&#xff0c;ab,排序之后位置任然不变。不稳定性则相反 - 内排序 所有排序都在内存中完成。外排序数据放磁盘&#xff0c;排序通过磁盘内存的数据传输 - 事件复杂度 算法执行耗费的时间 - 空间复杂度 算法执行耗费的内存 In/out-place: 不占/占额…

常见的10种算法

常见的10种算法 数据结构研究的内容&#xff1a;就是如何按一定的逻辑结构&#xff0c;把数据组织起来&#xff0c;并选择适当的存储表示方法把逻辑结构组织好的数据存储到计算机的存储器里。 算法研究的目的是为了更有效的处理数据&#xff0c;提高数据运算效率。数据的运算是…

常用的10 种算法

译者&#xff1a;claudio jandan.net/2014/05/31/10-algorithms.html Reddit 有篇帖子介绍了算法对我们现在生活的重要性&#xff0c;以及哪些算法对现代文明所做贡献最大。如果对算法有所了解&#xff0c;读这篇文章时你可能会问 “作者知道算法为何物吗&#xff1f;”&#x…

常用的十种算法

十种算法 1、二分查找算法&#xff08;非递归&#xff09; 1、介绍&#xff1a; 1&#xff09;二分查找只适用于从有序的数列中进行查找&#xff08;比如数字和字母等&#xff09;&#xff0c;将数列排序后再进行查找 2&#xff09;二分查找算法的运行时间为对数时间O&…

基础夯实:基础数据结构与算法(二)

基础夯实&#xff1a;基础数据结构与算法&#xff08;二&#xff09; 常见的10种算法1、递归算法例题1&#xff1a;计算n&#xff01;例题2&#xff1a;斐波那契数列例题3&#xff1a;递归将整形数字转换为字符串例题4&#xff1a;汉诺塔例题5&#xff1a;猴子吃桃例题6&#x…

蓝桥杯知识点汇总:基础知识和常用算法

文章目录 JAVA基础语法&#xff1a;算法竞赛常用的JAVA API&#xff1a;算法和数据结构简单算法简单数据结构图论数学贪心动态规划 补充省赛题解待更&#xff1a; 此系列包含蓝桥杯&#xff08;软件类&#xff09;所考察的绝大部分知识点。一共分为 基础语法&#xff0c; 常用…

HBA的WWN号以及存储区域网络

古驰古驰巴拉巴拉&#xff0c;今天讲一下存储区域网络和wwn号以及查看wwn号的方法 存储区域网络&#xff08;Storage Area Network&#xff0c;简称SAN&#xff09;采用网状通道&#xff08;Fibre Channel &#xff0c;简称FC&#xff0c;区别与Fiber Channel光纤通道&#xf…

nsw hnsw

参考了很多该博客 https://blog.csdn.net/u011233351/article/details/85116719&#xff0c;感谢博主。 参考论文《Approximate nearest neighbor algorithm based on navigable small world graphs》 《Efficient and robust approximate nearest neighbor search using Hie…

思科光交MDS9710绑定WWN并激活新的wwn

第一步、查看所有的wwn号 #命令 #show flogi database 内容示例&#xff1a; 第二步、查看是否有发现新的wwn号 图中为新发现的wwn号 第三步、将该wwn号加入到对应的zone下 #先筋肉config模式 #再进入对应的zone zone name Zone_P11_****——** vsan 1 #新增新存在的wwn号…