学习日志 220104 FLink试用
FLink试用
===============
# 背景/需求
- 数据的事务处理和分析处理对数据结构的要求不同
- 事务处理关注一致性 范式
- 分析处理 比如搜索/报表 往往需要进行多表聚合 统计 以及机器学习等
- 需要一种可靠的数据处理中间件
- 大数据量
- 准实时
- exactly once 统计一致性
- 易用
- flink
- https://flink.apache.org/
- 目标
- 从db读取数据送入elastic search
# 使用
- 采用 k8s + docker
## Getting start
- docker pull flink
- https://flink.apache.org/2021/02/10/native-k8s-with-ha.html
- Flink使用k8s watch监控configmap里的leader情况, 一旦leader变更, flink会立该响应
- 这个示例提到了复制my-flink-job.jar的过程
- 所以我们需要先制作这个jar
### 基础教程 制作my-flink-job.jar
- 我们需要对flink的镜像进行定制
- 加入实际的工作内容 例如my-flink-job.jar
- 加入其它插件 例如mysql插件
- 先按first step的形式操作一遍
- https://nightlies.apache.org/flink/flink-docs-release-1.14//docs/try-flink/local_installation/
- 下载
- 1.14.2 2.12
- 发现没有bat 不能在windows上跑 跳过
- 编写一个job
- https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/learn-flink/datastream_api/
- 采用stream api
- https://github.com/apache/flink-training/tree/release-1.14/
- 使用gradle 设置GRADLE_USER_HOME 修改gradle下载的jar包本地存储的位置
- 因为默认在C盘 C盘空间不足
- 报类找不到的错误
- 尝试reload gradle project 不行
- 尝试 rebuild project 不行
- 使用idea重新open project
- 触发idea重新import gradle dependencies
- 成功解决 找不到类的错误
- 尝试运行一个job
- 总结
- 一个flink job可以是一个java程序(可执行jar包)
- 入口是main方法
- 一般构造一个graph, 再执行execute
- 打包?
- 排除flink本身的包(这些包由flink运行集群提供)
### table api
- https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/try-flink/table_api/
- https://github.com/apache/flink-playgrounds
## 创建一个自己的任务
- 用maven创建一个一般java工程
- `mvn -B archetype:generate "-DgroupId=org.kaien.mycluster" "-DartifactId=flink-db-es" "-DarchetypeArtifactId=maven-archetype-quickstart" "-DarchetypeVersion=1.4"`
- 所有-D参数全部要用"括起来
- 引入依赖
- 参考上述table api项目
- 制作docker镜像
- 参考 springbootdemo 的docker化方案
- 采用maven-shade-plugin代替原方案的spring打包插件即可
### 使用mysql cdc connector读取mysql数据
- 参考
- https://www.sohu.com/a/439533649_411876
- https://www.bilibili.com/video/BV1zt4y1D7kt/
- https://github.com/ververica/flink-cdc-connectors
- 基于 Debezium https://github.com/debezium/debezium
- 引入依赖
```
<!-- https://mvnrepository.com/artifact/com.alibaba.ververica/flink-connector-mysql-cdc -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.4.0</version>
</dependency>
```
- 源表
- 参考文档
- connector 写 mysql-cdc
- 其它参数依实际mysql数据库连接信息修改
- 目标表(sink)
- 采用print
- 参考 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/print/
- 中间的处理过程
- 直接用tEnv.executeSql
- 执行 insert into ... select 语句
- 尝试执行
- 问题1 运行时报找不到类 EnvironmentSettings
- 这个是因为 idea 运行默认不引入 provided的 jar
- 参考 https://stackoverflow.com/questions/65135298/how-to-include-provided-dependencies-with-the-new-application-run-configuration
- 在 run configuration 中 点击 modify options
- 找到 java 段 里面找 add dependencies "provided" ... 选项 勾上即可
- 问题2
- Could not find a suitable table factory ...
- TODO