学习日志 220107 elasticsearch sink
elasticsearch sink + 上传到flink集群
================================
上接220105 (6号有事未做学习)
# elasticsearch sink
- 参考 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/elasticsearch/
## 加依赖
- 分为两步
- 1, 加maven依赖 注意使用maven central里的最新版 注意使用provided
- 2, dockerfile里加wget 下载该依赖到系统lib
## 使用table api
- 参考 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/
- 注意之前看的是stream api的, 依赖是一样的
- 用port-forward 把9200端口拉出来
- 走http还是https协议?
- 见下一节
- 问题 1
- Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.SerializationFormatFactory' in the classpath
- 参考 https://blog.csdn.net/pop_xiaohao/article/details/110525357
- 引入依赖 flink-json
- 是否 dockerfile是否 需要copy?
- 问题2
- ssl问题 见下 关闭eck中elasticsearch的tls
- 开发机测试
- 运行中
- 从web端搜索test
- 可以搜到2条数据了
- 但没有第3条, 因为我们只监听了一个库
- 修改代码 增加SQL, 把第二个库也监听起来
- 重新搜索test 现在可以搜到所有3条数据了(参考之前shardingSphere部分)
- 问题3
- flink metric warning 暂时不管
### elasticsearch特性
- primary key
- 在create table时指令primary key即可
- ssl
- org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSink.AuthRestClientFactory#configureRestClientBuilder
- org.apache.http.impl.nio.client.CloseableHttpAsyncClient
- 参考 https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html
- https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/data_stream_api/
- https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/connectors/datastream/elasticsearch/
- 以上方案都不行 原因是elastic search table api目前没有设置tls的功能
- 或者我没找到
- 同时我又不想用datastream级别的api 会有其它问题
- https://www.elastic.co/guide/en/cloud-on-k8s/1.2/k8s-tls-certificates.html#k8s-disable-tls
- 参考以上文档关闭eck的tls k8s描述文件增加如下配置
```
spec:
http:
tls:
selfSignedCertificate:
disabled: true
```
- 以上spec是指根的spec kibana同样
- 重新布署elasticsearch kibana
- 重新部署不影响用户密码和已有数据
- 验证kibana可以工作 注意portforward后 kibana改为http://localhost:5601访问(不是https)
- java应用springbootdemo中elasticsearch的地址改为http协议
- 重新打包发布
- 验证springbootdemo
- 可扩展
- TODO
# 将flink应用发布到k8s上
- 修改链接 按env选择数据库连接和elasticsearch连接
- 参考springbootdemo
- 这也是用代码实现的好处之一, 这些细节可以自定义
- 发布flink集群和任务
- 有两种方式
- session方式
- https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/
- native cluster方式
- https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/resource-providers/native_kubernetes/
- 参考第二种方式
- 搭建bash环境
- 由于native cluster方式需要执行.sh(没有bat选项)
- 所以需要在win10开发环境下搭建bash
- win10目前已安装好ubuntu, 但是是基于WSL2的
- 需要重新安装JAVA和kubectl
- 安装JAVA
```
sudo apt update
sudo apt install openjdk-11-jdk
sudo nano /etc/environment
JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64"
```
- 测试 java --version
- echo $JAVA_HOME
- 安装kubectl 配置
- 试了下kubectl命令是有的, 可能之前装过
- 配置 参考 https://v1-19.docs.kubernetes.io/zh/docs/tasks/tools/install-kubectl/
- 直接复制windows的配置过来
- `cp -r /mnt/c/Users/Administrator/.kube/ ~/`
- 执行 `kubectl cluster-info`
- 报错 crt不存在
- 修改 ~/.kube/config
- 把 E:\minikube\.minikube\ 改为 /mnt/e/minikube/.minikube
- 另外路径里的所有 \ 改为 /
- 重新 kubectl cluster-info 成功
- 尝试布署 flink cluster
- 下载 13.5的flink(和之前的java应用一致)
- `./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster`
- 结果
- `Create flink session cluster my-first-flink-cluster successfully, JobManager Web Interface: http://192.168.2.15:8081`
- 测试运行
```
./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
./examples/streaming/TopSpeedWindowing.jar
```
- 这里挂了 192.168.2.15的8081端口没打开
- 看了下svc, 外部端口是 30504
- 先不管了 把deployment删了
- 以application cluster方式布署
- 问题 cpu不够了
- 用 kubectl get pods 列出所有pods
- 发现新的这个在pending
- 通过 kubectl describe 查看pod部署日志
- 报如下问题
- `0/1 nodes are available: 1 Insufficient cpu.`
- 查看上面的资源要求 发现需要1 CPU + 1600M 内存
- 先撤销部署 flink的指令用不了
- 用kubectl get deployment 查出对应的deployment再delete掉
- 验证所有影响已删除 pod svc
- 查文档 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#pod-template
- 提到 cpu和内存设置无法在pod-template里设置 会被参数覆盖
- 用-D设置kubernetes.jobmanager.cpu和kubernetes.taskmanager.cpu
- 问题 log里说jar包不存在
- 还是先删了deployment 消除影响
- 换docker内的路径再试下
- 问题 找不到入口类
- Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file.
- 找到 pom中用于打包的 maven-shade-plugin
- 里面有注释 <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
- 修改入口类
- 还要 加 -Dkubernetes.container.image.pull-policy=Always
- 默认是 不存在才拉取 我们要改成 总是拉取
- 问题 缺少类
- Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase
- 间接依赖没wget
- 还是别用wget了 把自己加的几个依赖的provided删除
- dockerfile里的wget也不要了
- 检查打出来的包还是没有依赖的这些库 包大小看着就不对 才6k
- 再检查pom 发现是放在build pluginManagements下面了
- 去掉pluginManagements这一级
- 问题 flink需要k8s权限
```
io.fabric8.kubernetes.client.KubernetesClientException: pods is forbidden: User "system:serviceaccount:default:default" cannot watch resource "pods" in API group "" in the namespace "default"
```
- 参考 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#rbac
- `kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default`
- 对默认的service account进行授权
- 重新deployment
- `kubectl rollout restart deployment/my-first-application-cluster`
- 问题 flink第二个pod创建时pending
- 查看第二个pod的describe 发现内存不足
- 增加如下配置 降低每个pod的内存占用
```
-Djobmanager.memory.process.size=1g \
-Dtaskmanager.memory.process.size=1g \
```
- 默认是1600m 改为 1g
- 注意单位 flink的单位和k8s不一样
- 问题 mysql-cdc 驱动类找不到
- `Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.`
- 检查运行中的任务 http://192.168.2.15:32234/#/overview
- 端口是随机的 看kubectl get svc
- 时灵时不灵的 可能后台一直在重启
- 看日志 env是生效的, 连接hostname用的是集群的
- 问题原因
- https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html
- 其中SQL Client JAR 一节 有要求我们下载一个jar包打进docker镜像
- `wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1.1/flink-sql-connector-mysql-cdc-2.1.1.jar;`
- 还可以参考 https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)
- 有提到带-sql的是胖包 大家自己看一下
- 另外 kubectl delete deployment my-first-application-cluster
- 和 kubectl delete deployment/my-first-application-cluster
- 可以混用
- kubectl logs -f 可以一直盯着日志看 直到pods死掉被重启
- pod重启后日志会中断 不过反正重启后的错误往往和重启前是一样的
- 问题 elasticsearch 驱动类找不到
- 根据上个问题的解决经验 我们知道驱动类必须放在lib下
- 先把mysql的驱动类从usrlib里拿掉(改为provided)
- 再找一下elasticsearch驱动类的胖包
- 翻看 https://repo1.maven.org/maven2/org/apache/flink/
- 搜elasticsearch 果然有带-sql的版本 进去找到1.13.5(匹配flink版本)
- https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.12/1.13.5/flink-sql-connector-elasticsearch7_2.12-1.13.5.jar
- 在dockerfile里加到wget里
- 重新打包发布
- 问题 java.lang.IllegalStateException: Unable to instantiate java compiler
- org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile
- flink-table-blink_2.12-1.13.5.jar:1.13.5
- java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory
- org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory
- flink-table-blink_2.12-1.13.5.jar:1.13.5
- 这个包改为provided
- 居然可以。。
- 问题 flink native 的master(jobmanager)想要启动多个worker(taskmanager) 导致内存不足了
- 看能不能降到1个worker?