一文详解 Flink 的分布式缓存使用步骤!
Flink提供了一个类似于Hadoop的分布式缓存,让并行运行实例的函数可以在本地访问。这个功能可以被使用来分享外部静态的数据,例如:机器学习的逻辑回归模型等!
缓存的使用流程:
使用ExecutionEnvironment实例对本地的或者远程的文件(例如:HDFS上的文件),为缓存文件指定一个名字注册该缓存文件!当程序执行时候,Flink会自动将复制文件或者目录到所有worker节点的本地文件系统中,函数可以根据名字去该节点的本地文件系统中检索该文件!
和广播变量的区别:
-广播变量广播的是程序中的变量(DataSet)数据,分布式缓存广播的是文件
-广播变量将数据广播到各个TaskManager的内存中,分布式缓存广播到各个TaskManager的本地文件系统
用法
使用Flink运行时环境的 registerCachedFile 注册一个分布式缓存
在操作中,使用 getRuntimeContext.getDistributedCache.getFile ( 文件名 )获取分布式缓存
示例
创建一个 成绩 数据集
请通过分布式缓存获取到学生姓名,将数据转换为
注:资料\测试数据源\distribute_cache_student 文件保存了学生ID以及学生姓名
操作步骤
1. 将 distribute_cache_student 文件上传到HDFS / 目录下
2. 获取批处理运行环境
3. 创建成绩数据集
4. 对 成绩 数据集进行map转换,将(学生ID, 学科, 分数)转换为(学生姓名,学科,分数)
RichMapFunction 的 open 方法中,获取分布式缓存数据
在 map 方法中进行转换
5. 实现 open 方法
使用 getRuntimeContext.getDistributedCache.getFile 获取分布式缓存文件
使用 Scala.fromFile 读取文件,并获取行
将文本转换为元组(学生ID,学生姓名),再转换为List
6. 实现 map 方法
从分布式缓存中根据学生ID过滤出来学生
获取学生姓名
构建最终结果元组
7. 打印测试
参考代码
