关于Kafka的Topic和分区发现,这回终于讲清楚了!
分区发现
Flink Kafka Consumer 支持发现动态创建的 Kafka 分区,并使用精准一次的语义保证去消耗它们。在初始检索分区元数据之后(即,当 Job 开始运行时)发现的所有分区将从最早可能的 offset 中消费。
默认情况下,是禁用了分区发现的。若要启用它,请在提供的属性配置中为 flink.partition-discovery.interval-millis 设置大于 0 的值,表示发现分区的间隔是以毫秒为单位的。
Topic 发现
在更高的级别上,Flink Kafka Consumer 还能够使用正则表达式基于 Topic 名称的模式匹配来发现 Topic。请看下面的例子:
在上面的例子中,当 Job 开始运行时,Consumer 将订阅名称与指定正则表达式匹配的所有主题(以 test-topic 开头并以单个数字结尾)。
要允许 consumer 在作业开始运行后发现动态创建的主题,那么请为 flink.partition-discovery.interval-millis 设置非负值。这允许 consumer 发现名称与指定模式匹配的新主题的分区。
实际的生产环境中可能有这样一些需求,比如:
场景一:有一个 Flink 作业需要将五份数据聚合到一起,五份数据对应五个 kafka topic,随着业务增长,新增一类数据,同时新增了一个 kafka topic,如何在不重启作业的情况下作业自动感知新的 topic。
场景二:作业从一个固定的 kafka topic 读数据,开始该 topic 有 10 个 partition,但随着业务的增长数据量变大,需要对 kafka partition 个数进行扩容,由 10 个扩容到 20。该情况下如何在不重启作业情况下动态感知新扩容的 partition?
针对上面的两种场景,首先需要在构建 FlinkKafkaConsumer 时的 properties 中设置 flink.partition-discovery.interval-millis 参数为非负值,表示开启动态发现的开关,以及设置的时间间隔。此时 FlinkKafkaConsumer 内部会启动一个单独的线程定期去 kafka 获取最新的 meta 信息。
针对场景一,还需在构建 FlinkKafkaConsumer 时,topic 的描述可以传一个正则表达式描述的 pattern。每次获取最新 kafka meta 时获取正则匹配的最新 topic 列表。
针对场景二,设置前面的动态发现参数,在定期获取 kafka 最新 meta 信息时会匹配新的 partition。为了保证数据的正确性,新发现的 partition 从最早的位置开始读取。
Kafka Consumer 提交 Offset 的行为配置
Flink Kafka Consumer 允许有配置如何将 offset 提交回 Kafka broker 的行为。请注意:Flink Kafka Consumer 不依赖于提交的 offset 来实现容错保证。提交的 offset 只是一种方法,用于公开 consumer 的进度以便进行监控。
配置 offset 提交行为的方法是否相同,取决于是否为 job 启用了 checkpointing。
禁用 Checkpointing:如果禁用了 checkpointing,则 Flink Kafka Consumer 依赖于内部使用的 Kafka client 自动定期 offset 提交功能。因此,要禁用或启用 offset 的提交,只需将 enable.auto.commit 或者 auto.commit.interval.ms 的Key 值设置为提供的 Properties 配置中的适当值。
启用 Checkpointing:如果启用了 checkpointing,那么当 checkpointing 完成时,Flink Kafka Consumer 将提交的 offset 存储在 checkpoint 状态中。这确保 Kafka broker 中提交的 offset 与 checkpoint 状态中的 offset 一致。用户可以通过调用 consumer 上的 setCommitOffsetsOnCheckpoints(boolean) 方法来禁用或启用 offset 的提交(默认情况下,这个值是 true )。注意,在这个场景中,Properties 中的自动定期 offset 提交设置会被完全忽略。
