使用轻量级 CDC debezium-server-databend 构建实时数据同步
作者:韩山杰Databend Cloud 研发工程师https://github.com/hantmac

Debezium Server Databend[1] 是一个基于 Debezium Engine 自研的轻量级 CDC 项目,用于实时捕获数据库更改并将其作为事件流传递最终将数据写入目标数据库 Databend。它提供了一种简单的方式来监视和捕获关系型数据库的变化,并支持将这些变化转换为可消费事件。
使用 Debezium server databend 实现 CDC 无须依赖大型的 Data Infra 比如 Flink, Kafka, Spark 等,只需一个启动脚本即可开启实时数据同步。
这篇教程将展示如何基于 Debezium server databend 快速构建 MySQL 到 Databend 的实时数据同步。
假设我们有电子商务业务,商品的数据存储在 MySQL ,我们需要实时把它同步到 Databend 中。
接下来的内容将介绍如何使用 Debezium server databend CDC 来实现这个需求,系统的整体架构如下图所示:

准备阶段
准备一台已经安装了 Docker ,docker-compose 以及 Java 11 环境 的 Linux 或者 MacOS 。
🙋 准备教程所需要的组件
接下来的教程将以 docker-compose
的方式准备所需要的组件。
🙋 debezium-MySQL
docker-compose.yaml
🙋 Debezium Server Databend
Clone 项目:
git clone ``https://github.com/databendcloud/debezium-server-databend.git
从项目根目录开始:
构建和打包 debezium server:
mvn -Passembly -Dmaven.test.skip package
构建完成后,解压服务器分发包:
unzip debezium-server-databend-dist/target/debezium-server-databend-dist*.zip -d databendDist
进入解压后的文件夹:
cd databendDist
创建
application.properties
文件并修改:nano conf/application.properties
,将下面的 application.properties 拷贝进去,根据用户实际情况修改相应的配置。使用提供的脚本运行服务:
bash run.sh
Debezium Server with Databend 将会启动
同时我们也提供了相应的 Docker image,可以在容器中一键启动:
NOTE: 在容器中启动注意所连接数据库的网络。
🙋 Debezium Server Databend Application Properties
本文章使用下面提供的配置,更多的参数说明以及配置可以参考文档[2]。
🙋 准备数据
1️⃣ 在 MySQL 数据库中准备数据
进入 MySQL 容器
创建数据库 mydb 和表 products
,并插入数据:
2️⃣ 在 Databend 中创建 Database

NOTE: 用户可以不必先在 Databend 中创建表,系统检测到后会自动为用户建表。
🙋 启动 Debezium Server Databend

首次启动会进入 init snapshot 模式,通过配置的 Batch Size 全量将 MySQL 中的数据同步到 Databend,所以在 Databend 中可以看到 MySQL 中的数据已经同步过来了:

🙋 同步 Insert 数据
我们继续往 MySQL 中插入 5 条数据:
Debezium server databend 日志:

同时在 Databend 中可以查到 5 条数据已经同步过来了:

🙋 同步 Update 数据
配置文件中 debezium.sink.databend.upsert=true
,所以我们也可以处理 Update/Delete 的事件。
在 MySQL 中更新 id=10 的数据:
在 Databend 中可以查到 id 为 10 的数据已经被更新:

🙋 同步 Delete 数据
在配置文件中,有以下的配置,既可开启处理 Delete 事件的能力:
Debezim Server 对 Delete 的处理比较复杂,在 DELETE 操作下会生成两条事件记录:
一个包含 "op": "d",其他的行数据以及字段;
一个tombstones记录,它具有与被删除行相同的键,但值为null。
这两条事件会同时发出,在 Debezium Server Databend 中我们选择对 Delete 数据实行软删除,这就要求我们在 target table 中拥有 __deleted
字段,当 Delete 事件过来的时候我们将该字段置为 TRUE 后插入到目标表。
这样设计的好处是,有些用户想要保留这些数据,但可能未来会想到将其删除,这样就为用户提供了可选的方案,未来想要删除这些数据的时候,只需要 delete from table where __deleted=true
即可。
关于 Debezium 对删除事件的说明以及处理方式,详情可参考文档[3] 。
在 MySQL 中删除 id=12 的数据:
delete from products where id=12;
在 Databend 中可以观察到 id=12 的值的 __deleted
字段已经被置为 true
。
环境清理
操作结束后,在 docker-compose.yml
文件所在的目录下执行如下命令停止所有容器:
结论
以上就是基于轻量级 CDC debezium server databend 构建 MySQL 到 Databend 的 实时数据同步的全部过程,这种方式不需要依赖 Flink, Kafka 等大型组件,启动和管理非常方便。
引用链接
[1]
Debezium Server Databend: https://github.com/databendcloud/debezium-server-databend[2
]
文档: https://github.com/databendcloud/debezium-server-databend/blob/main/docs/docs.md[3]
文档: https://debezium.io/documentation/reference/stable/transformations/event-flattening.html