4.4 水印
水印(Watermark)用于处理乱序事件,而正确地处理乱序事件,通常用Watermark机制结合窗口来实现。
从流处理原始设备产生事件,到Flink读取到数据,再到Flink多个算子处理数据,在这个过程中,会受到网络延迟、数据乱序、背压、Failover等多种情况的影响,导致数据是乱序的。虽然大部分情况下没有问题,但是不得不在设计上考虑此类异常情况,为了保证计算结果的正确性,需要等待数据,这带来了计算的延迟。对于延迟太久的数据,不能无限期地等下去,所以必须有一个机制,来保证特定的时间后一定会触发窗口进行计算,这个触发机制就是Watermark。
在DataStream和Flink Table & SQL模块中,使用了各自的Watermark生成体系。
4.4.1 DataStream Watermark生成
通常Watermark在Source Function中生成,如果是并行计算的任务,在多个并行执行的Source Function中,相互独立产生各自的Watermark。而Flink提供了额外的机制,允许在调用DataStream API操作(如map、filter等)之后,根据业务逻辑的需要,使用时间戳和Watermark生成器修改数据记录的时间戳和Watermark。
1. Source Function中生成Watermark
Source Function可以直接为数据元素分配时间戳,同时也会向下游发送Watermark。在Source Function中为数据分配了时间戳和Watermark就不必在DataStream API中使用了。需要注意的是:如果一个timestamp分配器被使用的话,由源提供的任何Timestamp和Watermark都会被重写。
为了通过SourceFunction直接为一个元素分配一个时间戳,SourceFunction需要调用SourceContext中的collectWithTimestamp(...)方法。为了生成Watermark,源需要调用emitWatermark(Watermark)方法,如代码清单4-3所示。
代码清单4-3 SourceFunction中为数据元素分配时间戳和生成Watermark示例
2. DataStream API中生成Watermark
DataStream API中使用的TimestampAssigner接口定义了时间戳的提取行为,其有两个不同接口AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks,分别代表了不同的Watermark生成策略。TimestampAssigner接口体系如图4-9所示。
图4-9 DataStream中TimestampAssigner接口体系
AssignerWithPeriodicWatermarks是周期性生成Watermark策略的顶层抽象接口,该接口的实现类周期性地生成Watermark,而不会针对每一个事件都生成。
AssignerWithPunctuatedWatermarks对每一个事件都会尝试进行Watermark的生成,但是如果生成的Watermark是null或者Watermark小于之前的Watermark,则该Watermark不会发往下游,因为发往下游也不会有任何效果,不会触发任何窗口的执行。
4.4.2 Flink SQL Watermark生成
Flink SQL没有DataStram API开发那么灵活,其Watermark的生成主要是在TableSource中完成的,其定义了3类Watermark生成策略。其Watermark生成策略体系如图4-10所示。
图4-10 Flink SQL的Watermark生成策略体系
Watermark的生成机制分为如下3类。
(1)周期性Watermark策略
周期性Watermark策略在Flink中叫作PeriodicWatermarkAssigner,周期性(一定时间间隔或者达到一定的记录条数)地产生一个Watermark。在实际的生产中使用周期性Watermark策略的时候,必须注意时间和数据量,结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。
1) AscendingTimestamps:递增Watermark,作用在Flink SQL中的Rowtime属性上,Watermark=当前收到的数据元素的最大时间戳-1,此处减1的目的是确保有最大时间戳的事件不会被当做迟到数据丢弃。
2)BoundedOutOfOrderTimestamps:固定延迟Watermark,作用在Flink SQL的Rowtime属性上,Watermark=当前收到的数据元素的最大时间戳-固定延迟。
(2)每事件Watermark策略
每事件Watermark策略在Flink中叫作PuntuatedWatamarkAssigner,数据流中每一个递增的EventTime都会产生一个Watermark。在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark,在一定程度上会对下游算子造成压力,所以只有在实时性要求非常高的场景下才会选择Punctuated的方式进行Watermark的生成。
(3)无为策略
无为策略在Flink中叫作PreserveWatermark。在Flink中可以使用DataStream API和Table & SQL混合编程,所以Flink SQL中不设定Watermark策略,使用底层DataStream中的Watermark策略也是可以的,这时Flink SQL的Table Source中不做处理。
4.4.3 多流的Watermark
在实际的流计算中一个作业中往往会处理多个Source的数据,对Source的数据进行GroupBy分组,那么来自不同Source的相同key值会shuffle到同一个处理节点,并携带各自的Watermark,Apache Flink内部要保证Watermark保持单调递增,多个Source的Watermark汇聚到一起时可能不是单调自增的,对于这样的情况,ApacheFlink的内部处理如图4-11所示。
图4-11 Watermark处理逻辑
Apache Flink内部实现每一个边上只能有一个递增的Watermark,当出现多流携带EventTime汇聚到一起(GroupBy或Union)时,Apache Flink会选择所有流入的EventTime中最小的一个向下游流出,从而保证Watermark的单调递增和数据的完整性。
Watermark是在Source Function中生成或者在后续的DataStream API中生成的。Flink作业一般是并行执行的,作业包含多个Task,每个Task运行一个或一组算子(OperatorChain)实例,Task在生成Watermark的时候是相互独立的,也就是说在作业中存在多个并行的Watermark。
Watermark在作业的DAG从上游向下游传递,算子收到上游Watermark后会更新其Watermark。如果新的Watermark大于算子的当前Watermark,则更新算子的Watermark为新Watermark,并发送给下游算子。
某些算子会有多个上游输入,如Union或keyBy、partition之后的算子。在Flink的底层执行模型上,多流输入会被分解为多个双流输入,所以对于多流Watermark的处理也就是双流Watermark的处理,无论是哪一个流的Watermark进入算子,都需要跟另一个流的当前算子进行比较,选择较小的Watermark,即Min(input1Watermark,intput2Watermark),与算子当前的Watermark比较,如果大于算子当前的Watermark,则更新算子的Watermark为新的Watermark,并发送给下游,如代码清单4-4所示。
代码清单4-4 双流输入的StreamOperator Watermark处理
如图4-12所示,多流Watermark中使用了事件时间。
图4-12 多流Watermark示例
在图4-12中,Source算子产生各自的Watermark,并随着数据流流向下游的map算子,map算子是无状态计算,所以会将Watermark向下透传。window算子收到上游两个输入的Watermark后,选择其中较小的一个发送给下游,window(1)算子比较Watermark 29和Watermark 14,选择Watermark 14作为算子当前Watermark,并将Watermark 14发往下游,window(2)算子也采用相同的逻辑。