Apache Kafka 真的保留了消息排序吗?
Apache Kafka最著名的口头禅之一是“它保留了每个主题分区的消息排序”,但这总是正确的吗?在这篇博文中,我们将分析一些真实的场景,在这些场景中,接受教条而不质疑它可能会导致意想不到的、错误的消息序列。
基本方案:单个生产者
我们可以从一个基本场景开始我们的旅程:单个生产者通过单个分区依次向具有单个分区的 Apache Kafka 主题发送消息。

在这种基本情况下,根据已知的咒语,我们应该期待始终正确的排序。但是,这是真的吗?……这要看情况!
网络不平等
在理想情况下,单个生产者方案应始终导致正确的排序。但是我们的世界并不完美!不同的网络路径、错误和延迟可能意味着消息延迟或丢失。
让我们想象一下下面的情况:一个生产者,向一个主题发送三条消息:
消息【1】,由于某种原因,找到到Apache Kafka的长网络路由
消息【2】查找到 Apache Kafka 的最快网络路由
消息【3】在网络中丢失

即使在这种基本场景中,只有一个生产者,我们也可能会在主题中获得一系列意想不到的消息。 Kafka 主题的最终结果将仅显示两个正在存储的事件,意外顺序为【2】【1】。
如果你仔细想想,从Apache Kafka的角度来看,这是正确的顺序:主题只是一个信息的日志,Apache Kafka会根据它何时“感知”到新事件的到来将消息写入日志。它基于 Kafka 引入时间,而不是消息的创建时间(事件时间)。
确认和重试
但是,并非一切都丢失了!如果我们研究生产库(aiokafka 就是一个例子),我们有办法确保消息正确传递。
首先,为了避免上述场景中的消息【3】问题,我们可以定义一个适当的确认机制。acks生产者参数允许我们定义我们希望从 Apache Kafka 获得的消息接收确认。
将此参数设置为【1】将确保我们收到来自负责主题(和分区)的主代理的确认。将其设置为【all】 将确保我们仅在主副本和副本都正确存储消息时才收到 ack,从而避免在只有主节点收到消息然后在将其传播到副本之前失败时出现问题。
一旦我们设置了一个合理的ack,我们应该设置如果我们没有收到正确的确认,则重试发送消息的可能性。与其他库(kafka-python 是其中之一)不同,aiokafka 将自动重试发送消息,直到超过超时(由request_timeout_ms参数设置)。
通过确认和自动重试,我们应该解决消息【3】的问题。第一次发送时,生产者不会收到ack ,因此,在retry_backoff_ms间隔之后,它将再次发送消息【3】。

最大飞行请求数
但是,如果您仔细观察 Apache Kafka 主题中的最终结果,则结果排序不正确:我们发送了1,2,3,在主题中得到了2,1,3……如何解决这个问题?
旧方法(在kafka-python中可用)是设置每个连接的最大飞行请求:我们允许同时“空中”而不确认的消息数量。我们同时允许在空中传播的消息越多,获得无序消息的风险就越大。
使用 kafka-python 时,如果我们绝对需要在主题中有一个特定的排序,我们被迫将max_in_flight_requests_per_connection限制为1 。基本上,假设我们将ack参数设置为至少1,那么在发送下一条消息之前,我们等待每一条消息的确认(如果消息大小小于批大小,则等待每批消息的确认)。

排序、确认和重试的绝对正确性是以吞吐量为代价的。我们允许同时“空中”的消息量越少,我们需要接收的确认就越多,我们可以在定义的时间范围内传递给 Kafka 的总体消息就越少。
幂等生产者
为了克服一次发送一条消息并等待确认的严格序列化,我们可以定义幂等生产者。使用幂等生产者时,每条消息都标有生产者 ID 和序列号(为每个分区维护的序列)。然后,此组合 ID 将与消息一起发送到代理。
代理跟踪每个生产者和主题/分区的序列号。每当有新消息到达时,代理都会检查组合 ID,如果在同一生产者中,该值等于前一个数字 + 1,则确认新消息,否则将被拒绝。这保证了消息的全局排序,允许每个连接有更多的动态请求(Java 客户端最多 5 个)。
增加多个生产商的复杂性
到目前为止,我们设想了一个只有一个生产者的基本场景,但 Apache Kafka 的现实是,生产者通常是多个。如果我们想确定最终订购结果,需要注意哪些小细节?
不同的位置,不同的延迟
同样,网络是不平等的,并且由于多个生产者可能位于非常偏远的位置,不同的延迟意味着 Kafka 排序可能与基于事件时间的排序不同。

不幸的是,地球上不同位置之间的不同延迟无法修复,因此我们需要接受这种情况。
批处理,一个附加变量
为了实现更高的吞吐量,我们可能需要批处理消息。通过批处理,我们以“组”的形式发送消息,最大限度地减少调用总数,并提高有效负载与整体消息大小的比率。但是,通过这样做,我们可以再次改变事件的顺序。Apache Kafka 中的消息将按批次存储,具体取决于批处理引入时间。因此,每批的消息顺序是正确的,但不同的批次中可能具有不同的排序消息。

现在,由于不同的延迟和批处理都到位,我们的全球订购前提似乎将完全丢失......那么,为什么我们声称我们可以按顺序管理事件呢?
救世主:事件时间
我们知道,关于 Kafka 保持消息排序的原始前提并不是 100% 正确的,消息的顺序取决于 Kafka 摄取时间,而不是事件生成时间。但是,如果基于事件时间的排序很重要呢?
好吧,我们不能在生产方面解决问题,但我们可以在消费者方面解决问题。所有与Apache Kafka一起使用的最常见工具都能够定义将哪个字段用作事件时间,包括Kafka Streams,Kafka Connect与专用的时间戳提取器单消息转换(SMT)和Apache Flink®。
如果定义得当,消费者将能够重新调整来自特定 Apache Kafka 主题的消息的顺序。下面我们来分析一下 Apache Flink 的例子:

在上面的 Apache Flink 表定义中,我们可以注意到:
occurred_at
:该字段在 Unix 时间的源 Apache Kafka 主题中定义(数据类型为 BIGINT)。time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3)
:将 unix 时间转换为 Flink 时间戳。WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' SECOND
将新字段(计算自 occurred_at)定义为事件时间,并定义事件延迟最多为 10 秒的事件延迟的阈值。
定义上表后,可以使用time_ltz字段对事件进行正确排序并定义聚合窗口,从而确保计算中包含可接受的延迟内的所有事件。
- INTERVAL '10' SECOND定义了数据管道的延迟,并且是我们需要包含的惩罚值,以允许正确引入延迟到达的事件。但请注意,吞吐量不受影响。我们可以根据需要在管道中流动任意数量的消息,但是在计算任何最终 KPI 之前,我们需要“等待 10 秒”,以确保我们将特定时间范围内的所有事件都包含在图中。
另一种仅在事件包含完整状态时才有效的方法是为某个键(在上面的示例中是主机名和cpu)保留到目前为止达到的最大事件时间,并且只接受新事件时间大于最大值的更改。
总结
在 Kafka 中排序的概念可能很棘手,即使我们只包含具有单个分区的单个主题。这篇文章分享了一些可能导致一系列意外事件的常见情况。幸运的是,限制传输中的消息数量或使用幂等生产者等选项可以帮助实现符合预期的排序。在多个生产者以及网络延迟不可预测的情况下,可用的选项是通过正确处理需要在有效负载中指定的事件时间来修复使用者端的整体排序。