欢迎光临散文网 会员登陆 & 注册

面试官:Flink 中水印是什么概念,起到什么作用?

2023-03-30 11:37 作者:程序员四次元ポケット  | 我要投稿

1 为什么要有 Watermark?


当 flink 以 EventTime 模式处理流数据时,它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因,会导致数据乱序的情况。如下图所示:



2 Flink中的WaterMark


只要使用event time,就必须使用watermark,在上游指定,比如:source、map算子后

2.1 基本概念

Watermark的核心本质可以理解成一个延迟触发机制

我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的

我们来设想一下下面这个场景:

使用时间窗口来统计10分钟内的用户流量

有一个时间窗口

  • 开始时间为:2017-03-19 10:00:00

  • 结束时间为:2017-03-19 10:10:00

有一个数据,因为网络延迟

  • 事件发生的时间为:2017-03-19 10:10:00

  • 但进入到窗口的时间为:2017-03-19 10:10:02,延迟了2秒中

时间窗口并没有将59这个数据计算进来,导致数据统计不正确

根据窗口计算时间的不同,这个数据都会被遗漏,只是:

如果按照处理时间来计算,这个窗口在系统时间大于2017-03-19 10:10:00的时候就会关闭,延迟进来的这个59会被忽略

如果按照事件时间来计算,这个窗口当进入一条数据,其事件时间大于2017-03-19 10:10:00的时候,会导致窗口关闭,同样因为这个59延迟了,会因为别的正常顺序的数据进入Flink而导致属于它的窗口被提前关闭

也就是:

处理时间窗口,按照当前系统时间来判断进行窗口关闭

事件时间窗口,按照进入数据的事件时间来判断是否关闭窗口,如果进来的一条新数据是下一个窗口的数据,那么会关闭上一个窗口

总结:

watermark是水印,也称水位线。用来测量事件时间的进度。

watermark作为数据流中的一部分在流动,并且携带一个时间戳t。

watermark(t) 表示这个流里面事件时间已经到了时间t,意味着流中不应该存在时间戳t2<=t的数据。

触发窗口等的计算、关闭

单调递增的(时间不能倒退)

用来处理数据乱序的问题

大数据进阶flink教程

适合零基础自学的Hadoop教程


2.2 使用水印解决网络延迟问题

水印(watermark)就是一个时间戳,Flink可以给数据流添加水印,可以理解为:收到一条消息后,额外给这个消息添加了一个时间字段,这就是添加水印,一般人为添加的消息的水印都会比当前消息的事件时间小一些。

窗口是否关闭按照水印时间来判断,但原有事件时间不会被修改,窗口的边界依旧是事件时间来决定。

  • 水印并不会影响原有Eventtime

  • 当数据流添加水印后,会按照水印时间来触发窗口计算

  • 一般会设置水印时间,比Eventtime小一些(一般几秒钟)

  • 当接收到的水印时间 >= 窗口的endTime且窗口内有数据,则触发计算

水印(水印时间)的计算:事件时间 – 设置的最大允许延迟时间 = 水印时间

比如,事件时间是10分30秒, 最大延迟时间是2秒,那么水印时间就是10分28秒


2.2.1 水印实现延迟等待功能的思路剖析

举例:窗口5秒,延迟(水印)3秒,按照事件时间计算

  • 数据事件时间3, 落入窗口0-5.水印时间0

  • 来一条数据事件时间7, 落入窗口5-10,水印时间4

  • 来一条数据事件时间4,落入窗口0-5,水印时间1

  • 来一条数据事件时间8,落入窗口5-10,水印时间5

  • 这一条数据水印时间大于等于 窗口0-5的窗口结束时间。

满足了对窗口0-5的提交,这个窗口关闭,并触发数据计算

可以看出,第三条数据,其是延迟数据,它的事件时间是4,却来的比事件时间为7的数据还要晚。

但是因为水印的机制,这个数据未错过它的窗口,依旧成功进入属于它的窗口并且被计算

这就是水印的功能:在不影响按照事件时间判断数据属于哪个窗口的前提下,延迟某个窗口的关闭时间,让其等待一会儿延迟数据


2.2.2 多并行度的水印触发

在多并行度下,每个并行有一个水印

比如并行度是6,那么程序中就有6个watermark

分别属于这6个并行度(线程)

那么,触发条件以6个水印中最小的那个为准

比如, 有个窗口是0-5,其中5个并行度的水印都超过了5,但有一个并行度的水印是3,那么,不管另外5个并行度中的水印达到了多大,都不会触发。

因为6个并行度中的6个水印,最小的是3,不满足大于等于窗口结束5的条件

2.2.3 Keyby 分流

一个程序中有多少个水印和并行度有关,和keyby无关

也就是:比如有单词hadoop spark,按照keyby,会分成hadoop组 和spark组,但是这两个组是共用1个水印的,hadoop来的数据满足了触发条件,会将spark组的数据也触发



3 Watermark 的设定


在Flink1.11中就已经发现assignTimestampsAndWatermarks的有些实现过期了,从Flink1.12版本开始,官网推荐用WatermarkStrategy

3.1 背景

在flink 1.11之前的版本中,提供了两种生成水印(Watermark)的策略,分别是AssignerWithPunctuatedWatermarks和AssignerWithPeriodicWatermarks,这两个接口都继承自TimestampAssigner接口。

用户想使用不同的水印生成方式,则需要实现不同的接口,但是这样引发了一个问题,对于想给水印添加一些通用的、公共的功能则变得复杂,因为我们需要给这两个接口都同时添加新的功能,这样还造成了代码的重复。

所以为了避免代码的重复,在flink 1.11 中对flink的水印生成接口(WatermarkStrategy)进行了重构

3.2 Watermark应用代码结构

WatermarkStrategy在Flink中有两种使用方式:

  • 一种是直接在数据源上使用

  • 另一种是直接在非数据源的操作之后使用

推荐使用第一种方式,因为数据源可以利用watermark生成逻辑中有关分片/分区的信息。使用这种方式可以更加精准的跟踪watermark,整体的watermark生成将更加精确,直接在数据源指定watermarkStrategy必须使用特定的数据源接口,例如与kafka链接,使用kafka Connerctor,仅当无法直接在数据源上设置策略是时才使用第二种方式

在数据源直接使用时如果因为数据源中的某一个分区/分片在一段时间内未发送事件数据,则意味着watermarkStrategy也不会获得任何数据去生成watermark,在这种情况下可以通过设置有一个空闲时间,当超过这个时间则将这个分片或分区标记为空闲状态。


大数据进阶flink教程

适合零基础自学的Hadoop教程


3.3 水印的生成策略

3.3.1 内置水印生成策略

为了方便开发,flink提供了一些内置的水印生成方法供我们使用。

固定延迟生成水印

通过静态方法forBoundedOutOfOrderness提供,入参接收一个Duration类型的时间间隔,也就是我们可以接受的最大的延迟时间。使用这种延迟策略的时候需要我们对数据的延迟时间有一个大概的预估判断。

我们实现一个延迟3秒的固定延迟水印,可以这样做:

他的底层使用的WatermarkGenerator接口的一个实现类BoundedOutOfOrdernessWatermarks。我们看下源码中的这两个方法:

单调递增生成水印

周期性 watermark 生成方式的一个最简单特例就是你给定的数据源中数据的时间戳升序出现。在这种情况下,当前时间戳就可以充当 watermark,因为后续到达数据的时间戳不会比当前的小。

注意:在 Flink 应用程序中,如果是并行数据源,则只要求并行数据源中的每个单分区数据源任务时间戳递增。例如,设置每一个并行数据源实例都只读取一个 Kafka 分区,则时间戳只需在每个 Kafka 分区内递增即可。Flink 的 watermark 合并机制会在并行数据流进行分发(shuffle)、联合(union)、连接(connect)或合并(merge)时生成正确的 watermark

通过静态方法forMonotonousTimestamps来提供.

这个也就是相当于上述的延迟策略去掉了延迟时间,以event中的时间戳充当了水印。

在程序中可以这样使用:

它的底层实现是AscendingTimestampsWatermarks,其实它就是BoundedOutOfOrdernessWatermarks类的一个子类,没有了延迟时间,我们来看看具体源码的实现。

3.3.2 自定义水印生成策略


watermark 的生成方式本质上是有两种:周期性生成和标记生成

  • 周期性生成器通常通过 onEvent() 观察传入的事件数据,然后在框架调用 onPeriodicEmit() 时发出 watermark

  • 标记生成器将查看 onEvent() 中的事件数据,并等待检查在流中携带 watermark 的特殊标记事件或打点数据。当获取到这些事件数据时,它将立即发出 watermark。通常情况下,标记生成器不会通过 onPeriodicEmit() 发出 watermark

周期性watermark策略

官方提供的 watermark生成器有两种

  • 升序watermark:单调递增生成水印

  • 乱序watermark:固定延迟生成水印

都是基于周期性生成,默认的周期是 200ms,一般不去改,保持在ms级别 onPeriodicEmit()

每间隔200ms一个周期,就会生成一个watermark

间歇性watermark策略

每一个事件时间都会产生一个watermark


3.4 在非数据源的操作之后使用 Watermark [重点]

3.4.1 Watermark的三种使用情况

  • 本来有序的Stream中的 Watermark

如果数据元素的事件时间是有序的,Watermark 时间戳会随着数据元素的事件时间按顺序生成,此时水位线的变化和事件时间保持一致(因为既然是有序的时间,就不需要设置延迟了,那么t就是 0。所以 watermark=maxtime-0 = maxtime),也就是理想状态下的水位线。当 Watermark 时间大于 Windows 结束时间就会触发对 Windows 的数据计算,以此类推, 下一个 Window 也是一样。这种情况其实是乱序数据的一种特殊情况。

  • 乱序事件中的Watermark

现实情况下数据元素往往并不是按照其产生顺序接入到 Flink 系统中进行处理,而频繁出现乱序或迟到的情况,这种情况就需要使用 Watermarks 来应对。

3.4.2 有序流中的水印(升序)

升序流水印概念:事件是有序的(按照他们自己的时间戳来看),watermark是流中简单的周期性的标记。

升序的底层实现

  • 底层调用的也是 乱序的 watermark生成器,只是 乱序程度 传了一个 0ms

  • watermark = maxTimestamp - outOfOrdernessMillis - 1

                  = maxTimestamp - 0ms -1 ms

                 => 事件时间 - 1ms

需求-从socket获取数据,来计算传感器水位信息

开发步骤

  • 1.定义类 WaterSensor  String id; Long ts; Integer vc; 

  • 2.创建流执行环境

  • 3.获取socket文本数据

  • 4.将字符串数据切分成 WaterSensor 对象数据

  • 5.分配水印机制,单调递增

  • 6.分配后的数据根据id进行分组

  • 7.设置滚动事件时间窗口,时间为10秒

  • 8.对开窗数据进行process

  • 9.打印输出

  • 10.执行流环境

参考代码


3.4.3 无序流中的水印

按照数据的时间戳来看,事件是乱序的,则watermark就非常重要了:

乱序怎样产生的呢?

  • 采集过程中导致的乱序

  • 网络传输过程导致的乱序

乱序将会导致数据丢失?

如何解决乱序的数据丢失问题呢?- watermark

等待时间(乱序程度)如何设置?

等待时间 = 最大的乱序程度。

  • 经验值 => 对自身集群和数据的了解,大概估算

  • 对数据进行抽样

  • 肯定不会设置为几个小时,一般设为 秒 或者 分钟

底层实现

  • watermark = maxTimestamp - outOfOrdernessMillis - 1 

  • = 最大乱序事件时间 - 乱序程度(等待时间) - 1ms

需求-根据socket文本计算当前乱序3秒的数据统计

分配水印机制-forBoundedOutOfOrderness

参考代码


大数据进阶flink教程

适合零基础自学的Hadoop教程


3.5 直接在数据源上使用Watermark(Kafka 连接器)

当使用 Apache Kafka 连接器作为数据源时,每个 Kafka 分区可能有一个简单的事件时间模式(递增的时间戳或有界无序)。然而,当使用 Kafka 数据源时,多个分区常常并行使用,因此交错来自各个分区的事件数据就会破坏每个分区的事件时间模式(这是 Kafka 消费客户端所固有的)。

在这种情况下,你可以使用 Flink 中可识别 Kafka 分区的 watermark 生成机制。使用此特性,将在 Kafka 消费端内部针对每个 Kafka 分区生成 watermark,并且不同分区 watermark 的合并方式与在数据流 shuffle 时的合并方式相同。

例如,如果每个 Kafka 分区中的事件时间戳严格递增,则使用时间戳单调递增按分区生成的 watermark 将生成完美的全局 watermark。

注意,我们在示例中未使用 TimestampAssigner,而是使用了 Kafka 记录自身的时间戳。

下图展示了如何使用单 kafka 分区 watermark 生成机制,以及在这种情况下 watermark 如何通过 dataflow 传播。

在实际的计算中,往往会出现一个作业中会处理多个source的数据,对source的数据进行groupBy分组,那么来自不同source的相同的key会shuffle到同一个处理节点。并且携带各自的Watermark。

Flink内部要保证Watermark的单调递增,多个source的Watermark汇聚到一起是不可能单调递增的。

Flink内部实现每一个边上只能有一个递增的Watermark,当出现多个流携带EventTime汇聚到一起(groupBy或者Union)。Flink会选择所有输入流中EventTime中最小的一个向下游流出。从而保证Watermark的单调递增和数据的完整性。

需求-读取kafkaconsumer并设置水印机制,根据时间窗口为

操作步骤

  • 1.获取流执行环境,设置并行度1

  • 2.创建FlinkKafkaConsumer和相应的配置

  • 3.设置consumer的水印机制为20

  • 4.通过consumer添加数据源

  • 5.根据滚动处理时间窗口5s做wordcount,先flatMap,keyBy,window,sum

  • 6.打印输出

  • 7.执行流环境

参考代码


面试官:Flink 中水印是什么概念,起到什么作用?的评论 (共 条)

分享到微博请遵守国家法律