学习日志 220105 flink mysql connector 问题解决
# 220105
- 问题3
- SQL错误 不能是(
- 数据类型错误 没有DATETIME 改用 TIMESTAMP(3)
- 目前使用数据类型有 BIGINT TIMESTAMP(原DATETIME) STRING(原TEXT)
- 问题4
- The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled'
- 加上PRIMARY KEY声明
- 问题5
- Flink doesn't support ENFORCED mode
- https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API
- Primary key validity checks
- 在PRIMARY KEY声明后加 NOT ENFORCED
- 类似 `PRIMARY KEY(id) NOT ENFORCED,`
- 问题6
- 主进程退出 退出码 -17
- 修改依赖
- 示例中依赖是 org.apache.flink:flink-table-planner-blink_${scala.binary.version}
- 改为 org.apache.flink:flink-table-planner_2.12
- 版本号也升级为 1.14.2(当前最新)
- 问题7
- Illegal reflective access by org.apache.flink.api.java.ClosureCleaner
- https://stackoverflow.com/questions/60754821/illegal-reflective-access-by-org-apache-flink-api-java-closurecleaner
- 降低运行环境到 java8
- 暂时无视? 只是警告
- 问题8
- log4j:WARN No appenders could be found for logger
- 加log4j.properties 从 playgrounds里copy一个出来
- 这个打包进jar会不会影响在flink集群上运行?
- 参考 https://www.cnblogs.com/upupfeng/p/14489664.html
- 应该不影响
- 问题9
- Currently Flink MySql CDC connector only supports MySql whose version is larger or equal to 5.7, but actual is 5.6.
- 方案一 重装mysql 太麻烦了 不用
- 1, 删除mysql相关的service和sts
- 2, 删除mysql相关的pvc
- 3, 重新编写k8s描述文件 升级mysql版本
- 4, 重新执行DDL 重新插入数据
- 5, 重启依赖mysql的服务 springbootdemo
- 方案二 升级
- 参考 https://dev.mysql.com/doc/refman/5.7/en/docker-mysql-getting-started.html#docker-upgrading
- 1, 关闭mysql集群? 重新apply -f应该自动关闭旧pod
- 2, 编写k8s描述文件 升级 重新 apply -f
- 3, 用exec执行mysql_upgrade
- 4, 重启集群
- `kubectl rollout restart sts/mycluster-mysql`
- `kubectl rollout status sts/mycluster-mysql`
- 5, 检查java应用使用数据的情况
- springbootdemo的搜索功能正常
- 开发环境 重新运行flink
- 问题10
```
Source: TableSourceScan(table=[[default_catalog, default_database, test_doc]], fields=[id, gmt_create, gmt_modify, doc]) -> WatermarkAssigner(rowtime=[gmt_modify], watermark=[(gmt_modify - 5000:INTERVAL SECOND)]) -> Sink: Sink(table=[default_catalog.default_database.print_table], fields=[id, gmt_create, gmt_modify, doc]) (8/16)#0 (5a897800a4d82dbaa5072840701c1041) switched from INITIALIZING to FAILED with failure cause: java.lang.NoSuchMethodError: org.apache.flink.api.connector.source.SourceReaderContext.metricGroup()Lorg/apache/flink/metrics/MetricGroup;
```
- switched from INITIALIZING to FAILED with failure cause
- java.lang.NoSuchMethodError: org.apache.flink.api.connector.source.SourceReaderContext.metricGroup()
- 参考 https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/mysql-postgres-tutorial.html
- 只支持flink 1.13
- flink 降到 1.13.5
- 问题11
- 又出现了问题6的现象
- 之前问题6的修改 把 table-planner 从blink版本改到了不是blink的版本
- 这个实际上是不行的, 还是要改回去
- 问题6的正确解决方法是问题10的方法, 也就是把flink降到13.5
- 问题12
- Cannot read the binlog filename and position via 'SHOW MASTER STATUS'
- mysql数据库没开binlog 或者binlog格式不是row
- 参考 https://www.jianshu.com/p/0a47e387de51
- 查找my.cnf位置
- 参考 https://stackoverflow.com/questions/38490785/where-is-mysql-5-7-my-cnf-file
- mysqld --help --verbose|grep cnf
- 有三个位置 挨个查一下 发现用的是第二个 即 /etc/mysql/my.cnf
- 查看my.cnf中是否已有binlog相关配置
- 没有
- 修改k8s描述文件
- `cat << EOF >> /etc/mysql/my.cnf`
- 支持多行命令 在yaml里换行接着写就行
- 使用变量 ${HOSTNAME##*-} 来设置server_id
- 设置initContainers的image为 busybox:1.28
- 目前的段落如下
```
initContainers:
- name: enable-binlog
image: busybox:1.28
command:
- sh
- "-c"
- |
cat << EOF >> /etc/mysql/my.cnf
server_id=${HOSTNAME##*-}
log_bin=mysql-bin
binlog_format=ROW
expire_logs_days=30
binlog_do_db=test_db
EOF
```
- 失败 检查日志
- `kubectl logs mycluster-mysql-1 -c enable-binlog`
- 指定initContainer 查看日志
- sh: can't create /etc/mysql/my.cnf: nonexistent directory
- 解决 /etc/mysql/my.cnf不存在的问题
- 参考 https://stackoverflow.com/questions/68646958/my-cnf-in-kubernetes-configmap-is-not-recognized-by-mysql-pod
- 参考 https://www.jianshu.com/p/eec2fd2d7080
- 参考 https://kubernetes.io/zh/docs/tasks/run-application/run-replicated-stateful-application/
- 这里需要详细说明 见下一节
### 正确的配置mysql的方案
- 首先我们需要排除错误的选项
- 使用initContainers直接修改/etc/mysql/my.cnf
- 原因 initContainers和实际的container并不是同一个 改了也互不影响
- 正确方案
- mysql 的my.cnf会加载 /etc/mysql/conf.d目录下的所有文件
- `cat /etc/mysql/my.cnf|grep include`
- 利用这个特性 我们可以mount一个volume到指定位置
- k8s支持emptyDir 形式的volume 可以在initContainers和真正的container之间共享
```
volumes:
- name: conf
emptyDir: { }
```
- 也就是说, 我们按如下顺序处理
- 先把这个volume mount给initContainer
- 从initContainer里往这个volume写一些文件
- 再把这个volume mount到实际container的/etc/mysql/conf.d 目录
- 技术细节如下
- mount volume conf给initContainer
```
volumeMounts:
- name: conf
mountPath: /mnt/conf.d
```
- 通过initContainer的command 多行指令写入配置文件 这里利用了yaml的|(竖线)特性
```
command:
- sh
- "-c"
- |
cat << EOF > /mnt/conf.d/my.cnf
[mysqld]
server_id=${HOSTNAME##*-}
log_bin=mysql-bin
binlog_format=ROW
expire_logs_days=30
binlog_do_db=test_db
EOF
```
- 注意几点
- yaml的|竖线之下 保持缩进 实际指行时这些缩进会被移除
- `<< EOF` 和 最后一行的 `EOF` 配对 表示中间的内容从使用cat 指令输出
- 第一行 直接跟上 `>` 重定向输出到文件 这里注意这段并不是放在最后
- 根据mysql配置文件的格式, 必须指定 `[mysqld]` 区块
- 否则mysql会报错 登录时也会报一行提示说部分配置未能解析
- 可以通过之前说的查看initContainers日志的方法检查
- 在真正的container中, 把上述volume conf , mount 到 /etc/mysql/conf.d
```
volumeMounts:
- name: conf
mountPath: /etc/mysql/conf.d
```
- 重新apply -f
- 校验
- 登录pod 的 bash, 再用mysql登录数据库
- use test_db;
- show master status;
- 有输出binlog文件和位点表示成功
- flink运行校验
- 进程会持续运行 不会退出
- 日志中有类似如下的输出
```
7> +I[681163714816618496, 2021-12-24T07:40:57.483, 2021-12-24T07:40:57.483, test content 002]
7> +I[681097728465809408, 2021-12-24T03:18:46.044, 2021-12-24T03:18:46.044, test content 001]
```
- 表示相应的数据变更已经被CDC捕获了 并输出到了print的table上
- TODO
- 接入elasticsearch的sink