欢迎光临散文网 会员登陆 & 注册

MapReduce,Hive,Spark 的两个示例——Word Count 和 JOIN

2022-04-13 23:13 作者:友纪V-入OP  | 我要投稿

原文见:https://v-yop.fun/2022/03-18MapReduce%EF%BC%8CHive%EF%BC%8CSpark%E7%9A%84%E4%B8%A4%E4%B8%AA%E7%A4%BA%E4%BE%8B%E2%80%94%E2%80%94Word-Count%E5%92%8CJOIN.html

本文章采用 CC BY-NC-SA 4.0 协议 ,转载请注明出处!

来进行一些超有趣的事情——分别使用 MapReduce,Hive,Spark 来实现同一个程序,感受一下三者代码的差异,这里选择去实现一下 WordCount,以及一个表连接的示例。

Word Count

Word Count,不从它开始从谁开始呢?首先是 MapReduce,使用 Java 语言,代码是庞然大物。

可以看到,Mapper 和 Reducer 的代码是很显然的,在 Mapper 中,我们将每一行字符串按空格分割,并构造`(WORD, 1)`的键值对,然后我们通过 Combiner 进行本地聚集,再发送给各个 Reducer,每个 Reducer 对每个 KEY 对应的所有值,进行 sum 操作,得到结果。但这里却需要定义一堆东西,更别说 Driver 里的许多东西实际上都是重复的。

我们再来看看 Scala 的 Spark 的代码,Spark 实现 WordCount 其实是对我们这些开发人员最舒服的,但我还没系统学习过 Spark,所以不知道自己的描述是否正确。

不要看着 flatMap 就想入非非,RDD 不是 Monad!但虽然 RDD 不是 Monad,它仍旧可以使用 for,但这时它的上下文是列表的上下文——flatMap 函数的函数参数不能返回 RDD,因此我们在 for 里所能做的只有列表能做的。

另外,我不知道为何 Spark 最后得到的结果的 KEY 为何是无序的……按理说经过 shuffle,这里应当是有序的才对,我只能猜测,Spark 利用单进程的方便之处,在折叠时是并行进行的,并输出到同一个文件中,放到 MapReduce 的语境下,就是多个 Reducer 的输出文件为同一个,这样无论如何也不可能得出有序的结果。但这也是符合需求的——我确实没有指定排序。

然后是 Hive,这里展示了 Hive 从建模,读取数据到写出数据的全流程。

(SELECT explode(split(line, ' ')) AS word FROM docs)需要特别解释一下,这里使用了所谓的表生成函数(UDTF) explode,即通过一行数据生成一个表(该函数处理一个 Array,生成一个表,而 split 得到的是 Array),这里是 docs 表中每行数据按空格进行切割,并将每个结果展平,最后得到一张表,表中每一个记录都是一个单词,可以认为这是一种 flatMap,反过来说,这种在 Mapper 阶段干 flatMap 的操作,是显然需要使用 UDTF 的。

在得到这个单词表后,我们将其按照单词进行分组,并对每个分组进行 COUNT 聚集,得到结果并用该查询结果创建一张新表。通过SELECT *可以发现其生成的结果和 MapReduce 的版本一致。

JOIN

现在考虑一个传统的案例——现在有一张部门表和一张雇员表,其中雇员属于特定部门,且有自己的工资,现在要求获取每个部门的工资最大的雇员的信息,相关表的定义如下。

这个需求和 SQL 非常契合,所以我们先使用 Hive 进行描述,我们显然需要一个内连接来进行此工作。(可能有更简单的方式,但我还没学到)

这里使用了 RANK 窗口函数,其中根据部门分区,根据工资降序,获取 rank 为 1 的就是工资最高的,这里耗时 28 秒。一个更加清晰的表述使用子查询,但 Hive 不支持子查询,所以这里使用 JOIN 来表述。

这个更加麻烦一些,而且性能更加差——两次 JOIN,要执行两次 MapReduce 任务才行,它耗时 64 秒(Hive 不是会进行 MapJoin 优化吗??)。下面的 MapReduce 只描述第一种。

关于 MapReduce 的编写,考虑到部门和雇员是一对多的关系,且部门的数量一定是较少的,我们本可以使用 map 端 join,但为了通用性,我们使用 reduce 端 join,为此,用于 join 的键必须要在键值对的键中,因此,Mapper 的输出的键中应当包含 deptno,然后在 reduce 的值的集合里,我们要让部门作为第一个元素,因此我们需要为此创建一个虚拟键。

再考虑分区,分组,排序,关于分区,肯定是按 deptno 进行分区;关于分组,我们也是按 deptno 进行分组;关于排序,为了能保证部门在最前面,我们要按 deptno 和虚拟键进行排序,这里也可以把工资也加进来进行排序,以保证第二个元素就能获取到最高的工资,但这其实并无必要。

根据上面的分析,我们规定,Mapper 的输出类型为<(deptno, virtualKey), Text>(懒得定义一个泛类型了,机械操作太多了),按 deptno 进行分区,按 deptno 进行分组,按 deptno、virtualKey 进行排序;同时也可以发现,这里可以有一个 Combiner;Reducer 的输出类型就随意了,人能看就行。

MapReduce 编写这种业务代码真的是非常……非常痛苦,底层细节太多了,远不如 SQL 那样直接,更不用说有三个以上的表进行 reduce 端 join 的时候需要使用多个 MR 任务,且需要注意的细节更多,需要定义更多 Bean……

然后是 Spark 版本。Spark 版本怎么写其实我还没有学过,但跟随着直觉,还是挺容易的。

Spark 的代码真是优雅!但一个遗憾的地方是函数参数没法像 Haskell 那样解构,不太痛快!

总结

总结个啥,我只能说 Spark 太香了。上面的 JOIN 代码,如果在工程实践中应当定义 Bean,MapReduce 进行连接操作时应该使用 GenericWritable 作为 VALUE。实际上本还想写个分布式排序示例,但我对 Hive 和 Spark 都还没学到那里,先跳过了。


MapReduce,Hive,Spark 的两个示例——Word Count 和 JOIN的评论 (共 条)

分享到微博请遵守国家法律