欢迎光临散文网 会员登陆 & 注册

学习日志 220105 flink mysql connector 问题解决

2022-01-05 19:31 作者:mayoiwill  | 我要投稿

# 220105

- 问题3

  - SQL错误 不能是( 

  - 数据类型错误 没有DATETIME 改用 TIMESTAMP(3)

  - 目前使用数据类型有 BIGINT TIMESTAMP(原DATETIME) STRING(原TEXT)

- 问题4

  - The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled'

  - 加上PRIMARY KEY声明

- 问题5

  - Flink doesn't support ENFORCED mode

  - https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API

  - Primary key validity checks

  - 在PRIMARY KEY声明后加 NOT ENFORCED

  - 类似 `PRIMARY KEY(id) NOT ENFORCED,`

- 问题6

  - 主进程退出 退出码 -17

  - 修改依赖

    - 示例中依赖是 org.apache.flink:flink-table-planner-blink_${scala.binary.version}

    - 改为 org.apache.flink:flink-table-planner_2.12

    - 版本号也升级为 1.14.2(当前最新)

- 问题7

  - Illegal reflective access by org.apache.flink.api.java.ClosureCleaner

  - https://stackoverflow.com/questions/60754821/illegal-reflective-access-by-org-apache-flink-api-java-closurecleaner

  - 降低运行环境到 java8

  - 暂时无视? 只是警告

- 问题8

  - log4j:WARN No appenders could be found for logger

  - 加log4j.properties 从 playgrounds里copy一个出来

  - 这个打包进jar会不会影响在flink集群上运行?

  - 参考 https://www.cnblogs.com/upupfeng/p/14489664.html

  - 应该不影响

- 问题9

  - Currently Flink MySql CDC connector only supports MySql whose version is larger or equal to 5.7, but actual is 5.6.

  - 方案一 重装mysql 太麻烦了 不用

    - 1, 删除mysql相关的service和sts

    - 2, 删除mysql相关的pvc

    - 3, 重新编写k8s描述文件 升级mysql版本

    - 4, 重新执行DDL 重新插入数据

    - 5, 重启依赖mysql的服务 springbootdemo

  - 方案二 升级

    - 参考 https://dev.mysql.com/doc/refman/5.7/en/docker-mysql-getting-started.html#docker-upgrading

    - 1, 关闭mysql集群? 重新apply -f应该自动关闭旧pod

    - 2, 编写k8s描述文件 升级 重新 apply -f

    - 3, 用exec执行mysql_upgrade

    - 4, 重启集群

      - `kubectl rollout restart sts/mycluster-mysql`

      - `kubectl rollout status sts/mycluster-mysql`

    - 5, 检查java应用使用数据的情况

      - springbootdemo的搜索功能正常

  - 开发环境 重新运行flink

- 问题10

  ```

  Source: TableSourceScan(table=[[default_catalog, default_database, test_doc]], fields=[id, gmt_create, gmt_modify, doc]) -> WatermarkAssigner(rowtime=[gmt_modify], watermark=[(gmt_modify - 5000:INTERVAL SECOND)]) -> Sink: Sink(table=[default_catalog.default_database.print_table], fields=[id, gmt_create, gmt_modify, doc]) (8/16)#0 (5a897800a4d82dbaa5072840701c1041) switched from INITIALIZING to FAILED with failure cause: java.lang.NoSuchMethodError: org.apache.flink.api.connector.source.SourceReaderContext.metricGroup()Lorg/apache/flink/metrics/MetricGroup;

  ```

  - switched from INITIALIZING to FAILED with failure cause

  - java.lang.NoSuchMethodError: org.apache.flink.api.connector.source.SourceReaderContext.metricGroup()

  - 参考 https://ververica.github.io/flink-cdc-connectors/master/content/quickstart/mysql-postgres-tutorial.html

  - 只支持flink 1.13

  - flink 降到 1.13.5

- 问题11

  - 又出现了问题6的现象

  - 之前问题6的修改 把 table-planner 从blink版本改到了不是blink的版本

  - 这个实际上是不行的, 还是要改回去

  - 问题6的正确解决方法是问题10的方法, 也就是把flink降到13.5

- 问题12

  - Cannot read the binlog filename and position via 'SHOW MASTER STATUS'

  - mysql数据库没开binlog 或者binlog格式不是row

  - 参考 https://www.jianshu.com/p/0a47e387de51

  - 查找my.cnf位置

    - 参考 https://stackoverflow.com/questions/38490785/where-is-mysql-5-7-my-cnf-file

    - mysqld --help --verbose|grep cnf

    - 有三个位置 挨个查一下 发现用的是第二个 即 /etc/mysql/my.cnf

  - 查看my.cnf中是否已有binlog相关配置

    - 没有

  - 修改k8s描述文件

    - `cat << EOF >> /etc/mysql/my.cnf`

    - 支持多行命令 在yaml里换行接着写就行

    - 使用变量 ${HOSTNAME##*-} 来设置server_id

    - 设置initContainers的image为 busybox:1.28

    - 目前的段落如下

      ```

      initContainers:

      - name: enable-binlog

        image: busybox:1.28

        command:

          - sh

          - "-c"

          - |

            cat << EOF >> /etc/mysql/my.cnf

            server_id=${HOSTNAME##*-}

            log_bin=mysql-bin

            binlog_format=ROW

            expire_logs_days=30

            binlog_do_db=test_db

            EOF

      ```

  - 失败 检查日志

    - `kubectl logs mycluster-mysql-1 -c enable-binlog`

    - 指定initContainer 查看日志

    - sh: can't create /etc/mysql/my.cnf: nonexistent directory

  - 解决 /etc/mysql/my.cnf不存在的问题

    - 参考 https://stackoverflow.com/questions/68646958/my-cnf-in-kubernetes-configmap-is-not-recognized-by-mysql-pod

    - 参考 https://www.jianshu.com/p/eec2fd2d7080

    - 参考 https://kubernetes.io/zh/docs/tasks/run-application/run-replicated-stateful-application/

    - 这里需要详细说明 见下一节

  

### 正确的配置mysql的方案

- 首先我们需要排除错误的选项

  - 使用initContainers直接修改/etc/mysql/my.cnf

  - 原因 initContainers和实际的container并不是同一个 改了也互不影响

- 正确方案

  - mysql 的my.cnf会加载 /etc/mysql/conf.d目录下的所有文件

    - `cat /etc/mysql/my.cnf|grep include`

    - 利用这个特性 我们可以mount一个volume到指定位置

  - k8s支持emptyDir 形式的volume 可以在initContainers和真正的container之间共享

    ```

      volumes:

        - name: conf

          emptyDir: { }

    ```

    - 也就是说, 我们按如下顺序处理

    - 先把这个volume mount给initContainer

    - 从initContainer里往这个volume写一些文件

    - 再把这个volume mount到实际container的/etc/mysql/conf.d 目录

  - 技术细节如下

  - mount volume conf给initContainer

    ```

        volumeMounts:

          - name: conf

            mountPath: /mnt/conf.d

    ```

  - 通过initContainer的command 多行指令写入配置文件 这里利用了yaml的|(竖线)特性

    ```

        command:

          - sh

          - "-c"

          - |

            cat << EOF > /mnt/conf.d/my.cnf

            [mysqld]

            server_id=${HOSTNAME##*-}

            log_bin=mysql-bin

            binlog_format=ROW

            expire_logs_days=30

            binlog_do_db=test_db

            EOF


    ```

    - 注意几点

    - yaml的|竖线之下 保持缩进 实际指行时这些缩进会被移除

    - `<< EOF` 和 最后一行的 `EOF` 配对 表示中间的内容从使用cat 指令输出

    - 第一行 直接跟上 `>` 重定向输出到文件 这里注意这段并不是放在最后

    - 根据mysql配置文件的格式, 必须指定 `[mysqld]` 区块

      - 否则mysql会报错 登录时也会报一行提示说部分配置未能解析

    - 可以通过之前说的查看initContainers日志的方法检查

  - 在真正的container中, 把上述volume conf , mount 到 /etc/mysql/conf.d

    ```

        volumeMounts:

        - name: conf

          mountPath: /etc/mysql/conf.d

    ``` 

  - 重新apply -f

- 校验

  - 登录pod 的 bash, 再用mysql登录数据库

  - use test_db;

  - show master status;

  - 有输出binlog文件和位点表示成功

- flink运行校验

  - 进程会持续运行 不会退出

  - 日志中有类似如下的输出

    ```

    7> +I[681163714816618496, 2021-12-24T07:40:57.483, 2021-12-24T07:40:57.483, test content 002]

    7> +I[681097728465809408, 2021-12-24T03:18:46.044, 2021-12-24T03:18:46.044, test content 001]

    ```

  - 表示相应的数据变更已经被CDC捕获了 并输出到了print的table上

- TODO

  - 接入elasticsearch的sink


学习日志 220105 flink mysql connector 问题解决的评论 (共 条)

分享到微博请遵守国家法律