Streaming 读取Kafka 保存OFFSET到kafka
Streaming 读取Kafka 实现断点续读功能
老版本的kafka比较麻烦,streaming提供的只有checkpoint方法实现断点续读功能,但是当修改程序之后就没法平滑部署。
因为checkpoint存储的是整个streaming启动类的序列化文件,当文件改动之后没法反序列化了。所以需要更好的方法来实现读取Kafka 实现断点续读功能。
本文主要讲解的就是通过zookeeper保存offset信息来实现的功能。
这里我们解释如何配置Spark Streaming已从Kafka接收数据。有两种方法 - 使用Receivers和Kafka的高级API的旧方法,以及不使用Receiver的新方法(在Spark 1.3中引入)。它们具有不同的编程模型,性能特征和语义保证,因此请继续阅读以获取更多详细信息。从当前版本的Spark开始,这两种方法都被认为是稳定的API。
方法1: Receiver-based Approach
此方法使用Receiver接收数据。Receiver是使用Kafka高级消费者API实现的。与所有接收器一样,从Kafka通过Receiver接收的数据存储在Spark执行器中,然后由Spark Streaming启动的作业处理数据。
但是,在默认配置下,此方法可能会在失败时丢失数据(请参阅接收器可靠性。为确保零数据丢失,您必须在Spark Streaming中另外启用预写日志(在Spark 1.2中引入)。这将同步保存所有收到的Kafka将数据转换为分布式文件系统(例如HDFS)上的预写日志,以便在发生故障时可以恢复所有数据。有关预写日志的更多详细信息,请参阅流式编程指南中的“ 部署”部分。
方法2: Direct Approach (No Receivers)
Spark 1.3中引入了这种新的无接收器“直接”方法,以确保更强大的端到端保证。这种方法不是使用接收器来接收数据,而是定期向Kafka查询每个主题+分区中的最新偏移量,并相应地定义要在每个批次中处理的偏移量范围。当启动处理数据的作业时,Kafka的简单消费者API用于从Kafka读取定义的偏移范围(类似于从文件系统读取的文件)。请注意,此功能是在Spark 1.3中为Scala和Java API引入的,在Python 1.4中为Python API引入。
与基于接收器的方法(即方法1)相比,该方法具有以下优点。
简化的并行性: 无需创建多个输入Kafka流并将它们联合起来。使用directStream,Spark Streaming将创建与要使用的Kafka分区一样多的RDD分区,这些分区将并行地从Kafka读取数据。因此,Kafka和RDD分区之间存在一对一的映射,这更容易理解和调整。
效率: 在第一种方法中实现零数据丢失需要将数据存储在预写日志中,这会进一步复制数据。这实际上是低效的,因为数据有效地被复制两次 - 一次由Kafka复制,第二次由Write-Ahead Log复制。第二种方法消除了问题,因为没有接收器,因此不需要预写日志。只要您有足够的Kafka保留,就可以从Kafka恢复消息。
完全一次的语义: 第一种方法使用Kafka的高级API在Zookeeper中存储消耗的偏移量。传统上,这是从Kafka使用数据的方式。虽然这种方法(与预写日志结合使用)可以确保零数据丢失(即至少一次语义),但某些记录在某些故障下可能会被消耗两次。这是因为Spark Streaming可靠接收的数据与Zookeeper跟踪的偏移之间存在不一致。因此,在第二种方法中,我们使用不使用Zookeeper的简单Kafka API。Spark Streaming在其检查点内跟踪偏移量。这消除了Spark Streaming和Zookeeper / Kafka之间的不一致,因此尽管出现故障,Spark Streaming也会有效地接收每条记录一次。为了获得结果输出的精确一次语义,主编程指南中输出操作的语义以获取更多信息)。
请注意,此方法的一个缺点是它不会更新Zookeeper中的偏移量,因此基于Zookeeper的Kafka监视工具将不会显示进度。但是,您可以在每个批次中访问此方法处理的偏移量,并自行更新Zookeeper。
目前使用较多的是方法二,因为比较方便。但是方法二中存在一个问题,就是需要自己来维护offset信息,才能实现服务重新部署之后能从之前读取的位置继续读取kafka的数据。
保存kafka的offset信息有两种方案
方案一:通过一个文件路劲来保存,存储的是checkpoint的信息。实际上就是把类序列化之后保存到文件系统里面,再次启动的时候去反序列化回来,从反序列化之后的类里面获取offset信息。这种方案的弊端是,当修改streaming程序之后,类的反序列化会失败,因为类里面的代码改变导致反序列化失败。这种方案解决这个问题的策略是同时启动两个流程,新老流程共存一段时间,来保证数据的不丢失,但是如果没有在后面做去重流程,处理完毕的数据会有很多重复数据,需要清洗。
方案二:通过zookeeper来保存offset信息,这样每次启动流程只需要再次去读取zookeeper上的offset信息就好。
首先写一个通用的父类实现基本的streaming流程,然后通过子类继承父类,然后重写父类处理业务的方法,实现个性化需求(主要是实现不同的转换和解析)
通用的父类如下:
然后写一个带main函数的类来启动steam流程。
具体的需求的时候只需要个性化继承KafkaSparkStreaming,然后重写doTask方法就能实现自己的个性化需求了。