欢迎光临散文网 会员登陆 & 注册

OpenRaft 在交易撮合引擎中的应用

2022-08-03 12:00 作者:Databend  | 我要投稿


前言

由于工作需要,一直对原子多播应用有非常浓厚的兴趣。通过一段时间的技术选型。我们非常幸运的得到了 Openraft[1] 实操分享 Databend[2] 社区的热心支持。我也想通过我们的实际工作,对 Openraft 的未来应用尽一些微薄之力。Openraft 是一个 Raft 的改进版(包括优化选举冲突[3], 解决网络抖动对 leadership 的影响), 它在 Databend[4] 中为 db, table 等元数据提供分布式存储和强一致读写, 为databend 的云端数据库集群提供事务性保证.我的实践的上一篇文章反应了我们的选型过程,有兴趣的人可以看一下。Raft in Rust (原子多播+撮合引擎)[5] 这篇文章更多的是想说明我们在使用 OpenRaft 的实际问题,并且通过我们的实现,揭秘 OpenRaft 的一些机制。

代码仓库

大家在使用 OpenRaft 的时候,我相信很多人都查看了手册:Getting Started - openraftThe openraft user guide.[6]

当然,这是一个非常优秀的手册。我们从这个手册里,会学习到如何使用 OpenRaft 实现自己的应用。而且,openraft/example-raft-kv[7] 这个例子确实能够很好的说明如何实现一个简单的应用。但是,这个例子是使用的内存来做持久化实现。当然内存不会真正做持久化,所以很容易在节点退出后,丢失状态。而我们真正需要的示例是一个能够持久化的例子。

另外一个实例就是 databend/metasrv[8] 而这个示例里面,我们可以看到一个完整的 metadata 存储服务的实现。实际上,metasrv 本身是一个类似于 etcd 的kv存储。存储整个 databend 集群的 metadata。这个示例里面,metasrv 使用了 sled 作为底层存储。sled 既存储 log,也存储 statemachine 的状态。这个例子,statemachine 所有的更新都直接在 sled 存储里通过 sled 的事物完成了。所以,对于如何存储 snapshot 这个问题,我们并不太容易看清楚。所以 snapshot 的产生和传递主要是在节点间同步的时候使用。

这里,大家可以看到我们开放的源代码。虽然这个示例是基于 example-raft-kv 示例,没有达到 metasrv 的生产强度。但是我们还是非常全面的表现出了 openraft 对 log, snapshot 处理的行为和能力。

GitHub - raymondshe/matchengine-raft[9]


应用场景

和 metasrv 的场景不同。我们需要我们的 statemachine 尽量在内存里面更新,尽量少落盘。虽然 sled 本地落盘的速度也很快,但是内存操作的速度会更快。所以,我们基本上就是这样进行操作的。


总体设计图[10]

所以在这个图里面,大家可以看到日志是通过 sled 进行存储的。而这些日志由于通过 Raft 协议,实际上他们在每台机器上的顺序是一致的。所以,不同的 matchengine-raft 实例,在相同的日志流情况下,对状态机的操作就是一致的。所以,不管我们从哪一个日志开始写 snapshot,通过加载 snapshot 并且回放后续的日志,我们都可以恢复到最新状态。

按照设计图中显示,当前 StateMachine 的状态是处理了第 9 个日志里的消息。这时候,系统保存了所有的消息到 sled。并且在第 3 个消息的时候落盘了一次 snapshot,并且在低 6 个消息的时候落盘了一次 snapshot。如果这台机器当机,我们是可以从编号为 3 的 snapshot 恢复状态机,并且继续处理 3,4,5,6,7,8,9 这 6 条消息来恢复当前状态。当然,我们也可以从编号为 6 的 snapshot 恢复状态机,并且继续处理 7,8,9 这 3 条消息来恢复当前状态。

当然我们可以选择多少个消息进行一次落盘。当然落盘的次数越多越可靠,但是性能影响比较大。好在 snapshot 的生成和落盘是异步的方式做的。

有兴趣的朋友可以看一下 akka 的 EventSroucing[11]  模式。这种模式和 Raft 单节点非常相像。不同的是 OpenRaft 强调多实例一致性,而 Akka 则提供了非常多的方式来存储 Log(Journal) 和 Snapshot。


实现细节

谈到实现细节。我们还是回到官方文档 geting-started[12] 来。我们也按照这个文档的顺序进行说明。

Raft 对于从应用开发着的角度,我们可以简化到下面的这张图里。Raft 的分布式共识就是要保证驱动状态机的指令能够在 Log 里被一致的复制到各个节点里。


Raft 有两个重要的组成部分:

  • 如何一致的在节点之间复制日志

  • 并且在状态机里面如何消费这些日志

基于 OpenRaft 实现我们自己的 Raft 应用其实并不复杂,只需要以下三部分:

  • 定义好客户端的请求和应答

  • 实现好存储 RaftStore 来持久化状态

  • 实现一个网络层,来保证 Raft 节点之间能相互传递消息。

好,那我们就开始吧:

1.定义好客户端的请求和应答

请求有可能是客户端发出的驱动 Raft 状态机的数据,而应答则是 Raft 状态机会打给客户端的数据。请求和应答需要实现 AppData 和 AppDataResponse 这里,我们的实现如下:

这两个类型完全是应用相关的,而且和 RaftStrage 的状态机实现相关。

  1. 这里,Set 是 example-raft-kv 原示例就有的命令。

  2. 大家也注意到了,命令都是对状态机有驱动的命令。也就是会对状态机状态造成改变的命令。如果我们需要取状态机内部数据的值返回给客户端。我们大可不必定义到这里。

2.实现 RaftStorage

这是整个项目非常关键的一部分。只要实现好 trait RaftStorage,我们就把数据存储和消费的行为定义好。RaftStoreage 可以包装一个像 RocksDB[13],Sled[14] 的本地 KV 存储或者远端的 SQL DB。RaftStorage 定义了下面一些 API

  • 读写 Raft 状态,比方说 term,vote(term:任期,vote:投票结果)

  • 读写日志

  • 将日志的内容应用到状态机

  • 创建和安装快照(snapshot)

在 ExampleStore[15], 这些内存化存储行为是非常明确简单的。而我们不是要真正落盘了吗?那我们就看一下 matchengine-rust 是怎么实现的。

这里是 matchengine-raft/src/store[16]

我们说明一些设计要点

ExampleStore 的数据:

ExchangeStore 里面主要是包含下面的成员变量。


帮助我们落盘的成员主要是 log, vote。而需要产生 snapshot 进行落盘的所有内容都在 state_machine.

1.last_purged_log_id:这是最后删除的日志 ID。删除日志本身可以节约存储,但是,对我们来讲,我了保证数据存储的安全。在删除日志之前,我们必须有这条日志 index 大的 snapshot 产生。否则,我们就没有办法通过 snapshot 来恢复数据。

2.log:这是一个 sled::Tree,也就是一个 map。如果看着部分代码的话,我们就可以清楚的明白 log 对象的结构。key 是一个 log_id_index 的 Big Endian 的字节片段。value 是通过 serd_json 进行序列化的内容

3.state_machine:这里就是通过日志驱动的所有状态的集合

StateMachine 里面最重要的数据就是 orderbook 这部分就是撮合引擎里面重要的订单表。存放买方和卖方的未成交订单信息。这是主要的业务逻辑。data 这部分是原来例子中的 kv 存储。我们还在这里没有删除。

这里 last_applied_loglast_menbership 这些状态和业务逻辑没有太大关系。所以,如果您要实现自己的 StateMachine。还是尽量和例子保持一致。主要是因为这两个状态是通过 apply_to_state_machine() 这个接口更新。也正好需要持久化。如果需要进一步隐藏 Raft 的细节,我们还是建议 openraft 能将这两个状态进一步进行隐藏封装。

对 state_machine 的落盘操作主要集中在这里:store/store.rs[17]。有兴趣的可以看一下。这里面比较有意思的问题是 orderbook 本身无法被默认的serde_json 序列化/反序列化。所以我们才在 matchengine/mod.rs[18] 加了这段代码:

4.vote:就是对最后一次 vote 的存储。具体请看, 这段代码倒不是因为这段代码有多重要,只是由于代码比较简单,看可以少写一些说明

但是这儿确实有个小坑,之前我没有注意到 vote 需要持久化,开始调试的时候产生了很多问题。直到找到 Openraft 作者 Zhang Yanpo[19] 才解决。也是触发我想开源这个 openraft 文件持久化实现的诱因吧。感谢 Zhang Yanpo,好样的。

5.其他的成员变量其实没什么太好说的了。和原例子一样。

对日志和快照的控制:

日志,快照相互配合,我们可以很好的持久化状态,并且恢复最新状态。多久写一次快照,保存多少日志。在这里我们使用了下面的代码。


强烈建议大家看一下 Config in openraft::config - Rust[20]

重点看 snapshot_policy, 代码里可以清楚的标识,我们需要 500 次 log 写一次快照。也就是 openraft 会调用 build_snapshot() 函数创建 snapshot。原示例里,snapshot 只是在内存里保存在 current_snapshot 变量里。而我们需要真实的落盘。请注意这段代码的 self.write_snapshot()

这下我们有了 snapshot,当然 snapshot 一方面可以用来在节点之间同步状态。另一方面就是在启动的时候恢复状态。而 openraft 的实现非常好。实际上恢复状态只需要回复到最新的 snapshot 就行。只要本地日志完备,openraft 会帮助你调用 apply_to_statemachine() 来恢复到最新状态。所以我们就有了 restore() 函数。#[async_trait]

大家注意一下 snapshot 的操作。当然,在这里,我们也恢复了 last_purged_log_id。当然 store 这个函数会在 ExampleStore 刚刚构建的时候调用。

如何确定 RaftStorage 是对的:

请查阅 Test suite for RaftStorage[21] 如果通过这个测试,一般来讲, OpenRaft 就可以使用他了。

RaftStorage 的竞争状态:

在我们的设计里,在一个时刻,最多有一个线程会写状态,但是,会有多个线程来进行读取。比方说,可能有多个复制任务在同时度日志和存储。

实现必须保证数据持久性:

调用者会假设所有的写操作都被持久化了。而且 Raft 的纠错机制也是依赖于可靠的存储。

实现 RaftNetwork

为了节点之间对日志能够有共识,我们需要能够让节点之间进行通讯。trait RaftNetwork 就定义了数据传输的需求。RaftNetwork 的实现可以是考虑调用远端的 Raft 节点的服务

ExampleNetwork[22]  显示了如何调用传输消息。每一个 Raft 节点都应该提供有这样一个 RPC 服务。当节点收到 raft rpc,服务会把请求传递给 raft 实例,并且通过 raft-server-endpoint[23] 返回应答。

在实际情况下可能使用  Tonic gRPC[24] 是一个更好的选择。 Databend-meta[25] 里有一个非常好的参考实现。

在我们的 matchengen-raft 实现里,我们解决了原示例中大量重连的问题。

1.维护一个可服用量的 client

这段代码在:network/raft_network_impl.rs

2.在服务器端引入keep_alive

这段代码在:lib.rs[26]


这样的改动确实是对性能有一些提升。但是真的需要更快的话,我们使用 grpc,甚至使用 reliable multicast,比方说 pgm。启动集群

启动集群

由于我们保留了之前的 key/value 实现。所以之前的脚本应该还是能够工作的。而且之前的 key/value 有了真正的存储。为了能够运行集群:

  • 启动三个没有初始化的 raft 节点;

  • 初始化其中一台 raft 节点;

  • 把其他的 raft 节点加入到这个集群里;

  • 更新 raft 成员配置。 example-raft-kv[27] 的 readme 文档里面把这些步骤都介绍的比较清楚了。

  • 下面两个测试脚本是非常有用的:test-cluster.sh[28] 这个脚本可以简练的掩饰如何用 curl 和 raft 集群进行交互。在脚本里,您可以看到所有 http 请求和应答。

test_cluster.rs[29] 这个 rust 程序显示了怎么使用 ExampleClient 操作集群,发送请求和接受应答。这里我们要强调的是,在初始化 raft 集群的时候。我们需要上述的过程。如果集群已经被初始化,并且我们已经持久化了相应的状态 (menbership, vote, log) 以后,再某些节点退出并且重新加入,我们就不需要再过多干预了。在使用 metasrv 启动 meta service 的时候,我也遇到了相同的情况。所以还是要先启动一个single node 以保证这个节点作为种子节点被合理初始化了。Deploy a Databend Meta Service Cluster | Databend[30]为了更好的启动管理集群,我们在项目里添加了 test.sh[31]。用法如下:


我们可以在不同阶段调用不同的命令。大家有兴趣的话可以看一下代码。这部分是主程序部分,包含了我们实现的所有命令。


未来的工作

当前我们实现的 matchengine-raft 只是为了示例怎么通过 raft 应用到撮合引擎这样一个对性能,稳定性,高可用要求都非常苛刻的应用场景。通过 raft 集群来完成撮合引擎的分布式管理。我们相信真正把这个玩具撮合引擎推向产品环境,我们还是需要进行很多工作:

1.优化序列化方案,serd_json 固然好,但是通过字符串进行编解码还是差点儿意思。至少用到 bson 或者更好的用 protobuf, avro 等,提高编解码速度,传输和存储的开销。

2.优化 RaftNetwork, 在可以使用 multi-cast 的情况下使用 pgm,如果不行,可以使用 grpc。

3.撮合结果的分发。这部分在很多情况下依赖消息队列中间件比较好。

4.增加更多的撮合算法。这部分完全是业务需求,和 openraft 无关。我们就不在这个文章里讨论了。

5.完善测试和客户端的调用。

6.完善压测程序,准备进一步调优。

结论

通过这个简单的小项目,我们:

1.实现了一个简单的玩具撮合引擎。

2.验证了 OpenRaft 在功能上对撮合引擎场景的支持能力。

3.给 OpenRaft 提供了一个基于 sled KV 存储的日志存储的参考实现。

4.给 OpenRaft 提供了一个基于本地文件的快照存储的参考实现。

给大家透露一个小秘密,SAP也在使用 OpenRaft[32] 来构建关键应用。大家想想,都用到 Raft 协议了,一定是非常重要的应用。

对于 Databend 社区的帮助,我表示由衷的感谢。 作为一个长期工作在软件行业一线的老程序猿,看到中国开源软件开始在基础构建发力,由衷的感到欣慰。也希望中国开源社群越来越好,越来越强大,走向软件行业的顶端。


作者信息:沈勇 Decisive Density  CTO引用链接

引用链接

[1] Openraft: https://github.com/datafuselabs/openraft
[2] Databend: https://github.com/datafuselabs/databend
[3] 优化选举冲突: https://datafuselabs.github.io/openraft/vote.html
[4] Databend: https://github.com/datafuselabs/databend
[5] Raft in Rust (原子多播+撮合引擎): http://t.csdn.cn/jcOnv
[6] Getting Started - openraftThe openraft user guide.: https://datafuselabs.github.io/openraft/getting-started.html
[7] openraft/example-raft-kv: https://github.com/datafuselabs/openraft/tree/main/examples/raft-kv-memstore
[8] databend/metasrv: https://github.com/datafuselabs/databend/tree/main/metasrv
[9] GitHub - raymondshe/matchengine-raft: https://github.com/raymondshe/matchengine-raft
[10] 总体设计图: https://excalidraw.com/#json=kSzwFGNGr_WNjytPO65RN,CjvsM4m_3efHnSIGK37Sow
[11] EventSroucing: https://doc.akka.io/docs/akka/current/typed/index-persistence.html
[12] geting-started: https://datafuselabs.github.io/openraft/getting-started.html
[13] RocksDB: https://docs.rs/rocksdb/latest/rocksdb/
[14] Sled: https://github.com/spacejam/sled
[15] ExampleStore: https://github.com/datafuselabs/openraft/blob/main/examples/raft-kv-memstore/src/store/mod.rs
[16] matchengine-raft/src/store: https://github.com/raymondshe/matchengine-raft/tree/master/src/store
[17] store/store.rs: https://github.com/raymondshe/matchengine-raft/blob/master/src/store/store.rs
[18] matchengine/mod.rs: https://github.com/raymondshe/matchengine-raft/blob/master/src/matchengine/mod.rs
[19] Zhang Yanpo: https://github.com/drmingdrmer
[20] Config in openraft::config - Rust: https://docs.rs/openraft/latest/openraft/config/struct.Config.html
[21] Test suite for RaftStorage: https://github.com/datafuselabs/openraft/blob/main/memstore/src/test.rs
[22] ExampleNetwork: https://github.com/datafuselabs/openraft/blob/main/examples/raft-kv-memstore/src/network/raft_network_impl.rs
[23] raft-server-endpoint: https://github.com/datafuselabs/openraft/blob/main/example-raft-kv/src/network/raft.rs
[24] Tonic gRPC: https://github.com/hyperium/tonic
[25] databend-meta: #L89
[26] lib.rs: https://github.com/raymondshe/matchengine-raft/blob/master/src/lib.rs
[27] example-raft-kv: https://github.com/datafuselabs/openraft/tree/main/examples/raft-kv-memstore
[28] test-cluster.sh: https://github.com/datafuselabs/openraft/blob/main/examples/raft-kv-memstore/test-cluster.sh
[29] test_cluster.rs: https://github.com/datafuselabs/openraft/blob/main/examples/raft-kv-memstore/tests/cluster/test_cluster.rs
[30] Deploy a Databend Meta Service Cluster | Databend: https://databend.rs/doc/manage/metasrv/metasrv-deploy
[31] test.sh: https://github.com/raymondshe/matchengine-raft/blob/master/test.sh
[32] OpenRaft: https://github.com/datafuselabs/openraft

关于 Databend

关于 DatabendDatabend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。

  • Databend 文档:https://databend.rs/

  • Twitter:https://twitter.com/Datafuse_Labs

  • Slack:https://datafusecloud.slack.com/

  • Wechat:Databend

  • GitHub :https://github.com/datafuselabs/databend


文章首发于公众号:Databend


OpenRaft 在交易撮合引擎中的应用的评论 (共 条)

分享到微博请遵守国家法律