欢迎光临散文网 会员登陆 & 注册

学习日志 220107 elasticsearch sink

2022-01-10 17:23 作者:mayoiwill  | 我要投稿

elasticsearch sink + 上传到flink集群

================================

上接220105 (6号有事未做学习)


# elasticsearch sink

- 参考 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/elasticsearch/


## 加依赖

- 分为两步

- 1, 加maven依赖 注意使用maven central里的最新版 注意使用provided

- 2, dockerfile里加wget 下载该依赖到系统lib


## 使用table api

- 参考 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/

- 注意之前看的是stream api的, 依赖是一样的

- 用port-forward 把9200端口拉出来

- 走http还是https协议?

  - 见下一节

- 问题 1

  - Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.SerializationFormatFactory' in the classpath

  - 参考 https://blog.csdn.net/pop_xiaohao/article/details/110525357

  - 引入依赖 flink-json

  - 是否 dockerfile是否 需要copy?

- 问题2

  - ssl问题 见下 关闭eck中elasticsearch的tls

- 开发机测试

  - 运行中

  - 从web端搜索test

  - 可以搜到2条数据了

  - 但没有第3条, 因为我们只监听了一个库

  - 修改代码 增加SQL, 把第二个库也监听起来

  - 重新搜索test 现在可以搜到所有3条数据了(参考之前shardingSphere部分)

- 问题3

  - flink metric warning 暂时不管


### elasticsearch特性

- primary key

  - 在create table时指令primary key即可

- ssl

  - org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSink.AuthRestClientFactory#configureRestClientBuilder

  - org.apache.http.impl.nio.client.CloseableHttpAsyncClient

  - 参考 https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html

  - https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/data_stream_api/

  - https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/connectors/datastream/elasticsearch/

  - 以上方案都不行 原因是elastic search table api目前没有设置tls的功能

    - 或者我没找到

    - 同时我又不想用datastream级别的api 会有其它问题

  - https://www.elastic.co/guide/en/cloud-on-k8s/1.2/k8s-tls-certificates.html#k8s-disable-tls

  - 参考以上文档关闭eck的tls k8s描述文件增加如下配置

    ```

    spec:

      http:

        tls:

          selfSignedCertificate:

            disabled: true

    ```

  - 以上spec是指根的spec kibana同样

  - 重新布署elasticsearch kibana

    - 重新部署不影响用户密码和已有数据

  - 验证kibana可以工作 注意portforward后 kibana改为http://localhost:5601访问(不是https)

  - java应用springbootdemo中elasticsearch的地址改为http协议

  - 重新打包发布

  - 验证springbootdemo

- 可扩展

  - TODO


# 将flink应用发布到k8s上

- 修改链接 按env选择数据库连接和elasticsearch连接

  - 参考springbootdemo

  - 这也是用代码实现的好处之一, 这些细节可以自定义

- 发布flink集群和任务

  - 有两种方式

  - session方式

    - https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/ 

  - native cluster方式

    - https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/resource-providers/native_kubernetes/

  - 参考第二种方式

- 搭建bash环境

  - 由于native cluster方式需要执行.sh(没有bat选项)

  - 所以需要在win10开发环境下搭建bash

  - win10目前已安装好ubuntu, 但是是基于WSL2的

  - 需要重新安装JAVA和kubectl

- 安装JAVA

  ```

  sudo apt update

  sudo apt install openjdk-11-jdk

  sudo nano /etc/environment

  JAVA_HOME="/usr/lib/jvm/java-11-openjdk-amd64"

  ```

  - 测试 java --version

  - echo $JAVA_HOME

- 安装kubectl 配置

  - 试了下kubectl命令是有的, 可能之前装过

  - 配置 参考 https://v1-19.docs.kubernetes.io/zh/docs/tasks/tools/install-kubectl/

  - 直接复制windows的配置过来

  - `cp -r /mnt/c/Users/Administrator/.kube/ ~/`

  - 执行 `kubectl cluster-info`

  - 报错 crt不存在

  - 修改 ~/.kube/config

  - 把 E:\minikube\.minikube\ 改为 /mnt/e/minikube/.minikube

  - 另外路径里的所有 \ 改为 /

  - 重新 kubectl cluster-info 成功

- 尝试布署 flink cluster

  - 下载 13.5的flink(和之前的java应用一致)

  - `./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster`

  - 结果 

  - `Create flink session cluster my-first-flink-cluster successfully, JobManager Web Interface: http://192.168.2.15:8081`

  - 测试运行

    ```

    ./bin/flink run \

    --target kubernetes-session \

    -Dkubernetes.cluster-id=my-first-flink-cluster \

    ./examples/streaming/TopSpeedWindowing.jar

    ```

    - 这里挂了 192.168.2.15的8081端口没打开

    - 看了下svc, 外部端口是 30504

    - 先不管了 把deployment删了

- 以application cluster方式布署

- 问题 cpu不够了

  - 用 kubectl get pods 列出所有pods

  - 发现新的这个在pending

  - 通过 kubectl describe 查看pod部署日志

  - 报如下问题

  - `0/1 nodes are available: 1 Insufficient cpu.`

  - 查看上面的资源要求 发现需要1 CPU + 1600M 内存

  - 先撤销部署 flink的指令用不了

  - 用kubectl get deployment 查出对应的deployment再delete掉

  - 验证所有影响已删除 pod svc

  - 查文档 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#pod-template

  - 提到 cpu和内存设置无法在pod-template里设置 会被参数覆盖

  - 用-D设置kubernetes.jobmanager.cpu和kubernetes.taskmanager.cpu

- 问题 log里说jar包不存在

  - 还是先删了deployment 消除影响

  - 换docker内的路径再试下

- 问题 找不到入口类

  - Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file.

  - 找到 pom中用于打包的 maven-shade-plugin

  - 里面有注释 <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->

  - 修改入口类

  - 还要 加 -Dkubernetes.container.image.pull-policy=Always

  - 默认是 不存在才拉取 我们要改成 总是拉取

- 问题 缺少类

  - Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase

  - 间接依赖没wget

  - 还是别用wget了 把自己加的几个依赖的provided删除

  - dockerfile里的wget也不要了

  - 检查打出来的包还是没有依赖的这些库 包大小看着就不对 才6k

  - 再检查pom 发现是放在build pluginManagements下面了

  - 去掉pluginManagements这一级

- 问题 flink需要k8s权限

  ```

  io.fabric8.kubernetes.client.KubernetesClientException: pods is forbidden: User "system:serviceaccount:default:default" cannot watch resource "pods" in API group "" in the namespace "default"

  ```

  - 参考 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#rbac

  - `kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default`

  - 对默认的service account进行授权

  - 重新deployment

  - `kubectl rollout restart deployment/my-first-application-cluster`

- 问题 flink第二个pod创建时pending

  - 查看第二个pod的describe 发现内存不足

  - 增加如下配置 降低每个pod的内存占用

    ```

    -Djobmanager.memory.process.size=1g \

    -Dtaskmanager.memory.process.size=1g \

    ```

  - 默认是1600m 改为 1g

  - 注意单位 flink的单位和k8s不一样

- 问题 mysql-cdc 驱动类找不到

  - `Could not find any factory for identifier 'mysql-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.`

  - 检查运行中的任务 http://192.168.2.15:32234/#/overview

  - 端口是随机的 看kubectl get svc

  - 时灵时不灵的 可能后台一直在重启

  - 看日志 env是生效的, 连接hostname用的是集群的

  - 问题原因

  - https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html

  - 其中SQL Client JAR 一节 有要求我们下载一个jar包打进docker镜像

  - `wget -P /opt/flink/lib/ https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1.1/flink-sql-connector-mysql-cdc-2.1.1.jar;`

  - 还可以参考 https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)

  - 有提到带-sql的是胖包 大家自己看一下

  - 另外 kubectl delete deployment my-first-application-cluster

  - 和 kubectl delete deployment/my-first-application-cluster

  - 可以混用

  - kubectl logs -f 可以一直盯着日志看 直到pods死掉被重启

  - pod重启后日志会中断 不过反正重启后的错误往往和重启前是一样的

- 问题 elasticsearch 驱动类找不到

  - 根据上个问题的解决经验 我们知道驱动类必须放在lib下

  - 先把mysql的驱动类从usrlib里拿掉(改为provided)

  - 再找一下elasticsearch驱动类的胖包

  - 翻看 https://repo1.maven.org/maven2/org/apache/flink/

  - 搜elasticsearch 果然有带-sql的版本 进去找到1.13.5(匹配flink版本)

  - https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.12/1.13.5/flink-sql-connector-elasticsearch7_2.12-1.13.5.jar

  - 在dockerfile里加到wget里

  - 重新打包发布

- 问题  java.lang.IllegalStateException: Unable to instantiate java compiler

  - org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile

  - flink-table-blink_2.12-1.13.5.jar:1.13.5

  - java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory

  - org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory

  - flink-table-blink_2.12-1.13.5.jar:1.13.5

  - 这个包改为provided

  - 居然可以。。

- 问题 flink native 的master(jobmanager)想要启动多个worker(taskmanager) 导致内存不足了

  - 看能不能降到1个worker?


学习日志 220107 elasticsearch sink的评论 (共 条)

分享到微博请遵守国家法律