本篇优化笔记全部基于 Scala 语言,不讨论 Java。

函数式编程

Spark 中使用函数式编程时,要尽可能避免在函数中对状态的修改和变更,换句话说,尽量不要在函数式编程中对某个变量,集合进行复杂操作;

一个算子中对一个变量和集合进行修改的开销并不大,性能也不低,但在 Spark 任务中,这个算子可能会被调用许多次,累计下来,开销就会提升,性能也会跟着下降。

这个我就不举例子了,太常见了,但有些时候是无法避免的需要在算子中进行一些操作,难免会用到一些临时变量,所以只能说尽可能避免。

不要在 Driver 中进行计算

这个是在对去年年底(2020年)的一个项目进行优化的时候记录的优化方案,先说一下项目大致背景,就是要对一批对象进行统计,但是每个对象开始统计的时间并不一样,而且每个对象都需要统计一段很长时间内的数据,且统计规则相当复杂,暂时列举为 数据集A数据集B数据集C

第一版代码: 是将每个对象的数据全部提取出来,collectdriver 得到 数据集A,然后遍历 A 得到统计 B 的条件,再用 SparkSQL 根据条件拉取新的数据并 collect 到 driver 中得到数据集 B,然后在 driver 端进行一些复杂操作计算得到数据集 C,最后汇总三个数据集输出到 HDFS,又有一个 collect

由以上流程可以看到,整个离线计算中使用了三个 collect() Action 算子(实际上使用的 collect 不止三个),将大量的数据聚集到 driver 中进行计算,导致 executor 空跑浪费资源,对 driver 造成极大的压力。这版代码跑一天的数据需要 16 分钟,而这只是统计接近 60 个对象的数据(计算历史数据需要跑2到3天),这已经是极慢的速度了,大量的数据聚集到 driver 对网络的 IO 压力也是成倍提升,将一个分布式计算的离线任务活生生写成了单机单线程的本地任务。

第二版代码: 这版代码并没有对 collect 操作进行简化,只是对于数据源的获取进行了优化,比如使用 SparkSQL 拉取指定字段,一次性拉取一天的数据,根据时间阈值选择不同的数据源(Elasticsearch 或者 ClickHouse)。

这版改进并没有显著提升该离线任务的性能,只是在原有基础上缩减了 1 分钟左右。

第三版代码: 在痛定思痛后,我决定重写整个业务逻辑,定下了只有最终把数据存入 HDFS 时需要把数据聚合时使用聚合操作,计算过程中不允许出现任何一个聚合操作,新的代码中大量使用 SparkSQL 进行业务设计,重构计算规则,对于数据集C,使用自定义的 UDAF(这个可以查看 Spark 自定义 UDAF 函数 博客)进行计算;将这个任务全部分发到 executor 中,让每个 executor 充分发挥性能。

这一版代码在编写完成测试通过上线后,有了质的飞越,每日统计数据所消耗的时间从 16 分钟缩减为 40 秒左右,历史数据统计时间从 2 ~ 3 天缩减为 16 ~ 20 分钟,单日统计提升 20 多倍,历史统计效率提升 180 倍以上,同时使用 SparkSQL 还修正了前两版代中统计的错误数据(这个是在测试过程中发现的,会有极个别的错误数据出现)

第三版代码优化的主要方向就是将计算任务分发到 executor 中,积极使用集群资源和性能,以及使用自定义 HDAF 将复杂的计算规则进行封装,禁止 driver 成为离线计算的性能瓶颈。