欢迎光临散文网 会员登陆 & 注册

如何将Kafka中的二进制数据转换为Java或者Scala对象?

2023-07-11 10:07 作者:ingemar-  | 我要投稿

Flink Kafka Consumer 需要知道如何将 Kafka 中的二进制数据转换为 Java 或者 Scala 对象。KafkaDeserializationSchema 允许用户指定这样的 schema,每条 Kafka 中的消息会调用 T deserialize(ConsumerRecord<byte[], byte[]> record) 反序列化。

为了方便使用,Flink 提供了以下几种 schemas:

  • SimpleStringSchema:按照字符串方式序列化、反序列化

  • TypeInformationSerializationSchema(和 TypeInformationKeyValueSerializationSchema) 基于 Flink 的 TypeInformation 创建 schema。如果该数据的读和写都发生在 Flink 中,那么这将是非常有用的。此 schema 是其他通用序列化方法的高性能 Flink 替代方案。

  • JsonDeserializationSchema(和 JSONKeyValueDeserializationSchema)将序列化的 JSON 转化为 ObjectNode 对象,可以使用 objectNode.get("field").as(Int/String/...)() 来访问某个字段。KeyValue objectNode 包含一个含所有字段的 key 和 values 字段,以及一个可选的"metadata"字段,可以访问到消息的 offset、partition、topic 等信息。

  • AvroDeserializationSchema 使用静态提供的 schema 读取 Avro 格式的序列化数据。它能够从 Avro 生成的类(AvroDeserializationSchema.forSpecific(...))中推断出 schema,或者可以与 GenericRecords 一起使用手动提供的 schema(用 AvroDeserializationSchema.forGeneric(...))。此反序列化 schema 要求序列化记录不能包含嵌入式架构!

要使用此反序列化 schema 必须添加以下依赖:


如何将Kafka中的二进制数据转换为Java或者Scala对象?的评论 (共 条)

分享到微博请遵守国家法律