欢迎光临散文网 会员登陆 & 注册

学习日志 220104 FLink试用

2022-01-04 17:45 作者:mayoiwill  | 我要投稿

FLink试用

===============


# 背景/需求

- 数据的事务处理和分析处理对数据结构的要求不同

  - 事务处理关注一致性 范式

  - 分析处理 比如搜索/报表 往往需要进行多表聚合 统计 以及机器学习等

- 需要一种可靠的数据处理中间件

  - 大数据量

  - 准实时

  - exactly once 统计一致性

  - 易用

- flink

  - https://flink.apache.org/

- 目标

  - 从db读取数据送入elastic search


# 使用

- 采用 k8s + docker


## Getting start

- docker pull flink

- https://flink.apache.org/2021/02/10/native-k8s-with-ha.html

- Flink使用k8s watch监控configmap里的leader情况, 一旦leader变更, flink会立该响应

- 这个示例提到了复制my-flink-job.jar的过程

- 所以我们需要先制作这个jar


### 基础教程 制作my-flink-job.jar

- 我们需要对flink的镜像进行定制

  - 加入实际的工作内容 例如my-flink-job.jar

  - 加入其它插件 例如mysql插件

- 先按first step的形式操作一遍

  - https://nightlies.apache.org/flink/flink-docs-release-1.14//docs/try-flink/local_installation/

- 下载

  - 1.14.2 2.12

  - 发现没有bat 不能在windows上跑 跳过

- 编写一个job

  - https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/learn-flink/datastream_api/

  - 采用stream api

  - https://github.com/apache/flink-training/tree/release-1.14/

  - 使用gradle 设置GRADLE_USER_HOME 修改gradle下载的jar包本地存储的位置

    - 因为默认在C盘 C盘空间不足

  - 报类找不到的错误

    - 尝试reload gradle project 不行

    - 尝试 rebuild project 不行

  - 使用idea重新open project

    - 触发idea重新import gradle dependencies

    - 成功解决 找不到类的错误

  - 尝试运行一个job

- 总结

  - 一个flink job可以是一个java程序(可执行jar包)

  - 入口是main方法

  - 一般构造一个graph, 再执行execute

  - 打包?

  - 排除flink本身的包(这些包由flink运行集群提供)


### table api

- https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/table_api/

- https://github.com/apache/flink-playgrounds


## 创建一个自己的任务

- 用maven创建一个一般java工程

- `mvn -B archetype:generate "-DgroupId=org.kaien.mycluster" "-DartifactId=flink-db-es" "-DarchetypeArtifactId=maven-archetype-quickstart" "-DarchetypeVersion=1.4"`

- 所有-D参数全部要用"括起来


- 引入依赖

  - 参考上述table api项目

- 制作docker镜像

  - 参考 springbootdemo 的docker化方案

  - 采用maven-shade-plugin代替原方案的spring打包插件即可


### 使用mysql cdc connector读取mysql数据

- 参考

  - https://www.sohu.com/a/439533649_411876

  - https://www.bilibili.com/video/BV1zt4y1D7kt/

  - https://github.com/ververica/flink-cdc-connectors

  - 基于 Debezium https://github.com/debezium/debezium

- 引入依赖

  ```

  <!-- https://mvnrepository.com/artifact/com.alibaba.ververica/flink-connector-mysql-cdc -->

  <dependency>

      <groupId>com.alibaba.ververica</groupId>

      <artifactId>flink-connector-mysql-cdc</artifactId>

      <version>1.4.0</version>

  </dependency>

  ```

- 源表

  - 参考文档

  - connector 写 mysql-cdc

  - 其它参数依实际mysql数据库连接信息修改

- 目标表(sink)

  - 采用print

  - 参考 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/print/

- 中间的处理过程

  - 直接用tEnv.executeSql

  - 执行 insert into ... select 语句

- 尝试执行

- 问题1 运行时报找不到类 EnvironmentSettings

  - 这个是因为 idea 运行默认不引入 provided的 jar

  - 参考 https://stackoverflow.com/questions/65135298/how-to-include-provided-dependencies-with-the-new-application-run-configuration

  - 在 run configuration 中 点击 modify options

  - 找到 java 段 里面找 add dependencies "provided" ... 选项 勾上即可

- 问题2

  - Could not find a suitable table factory ...

  - TODO


学习日志 220104 FLink试用的评论 (共 条)

分享到微博请遵守国家法律