Spark 中的 Master-Worker 与 Driver-Executor
(脑子离家出走系列:是用于记录因 间歇性脑子离家出走,而导致忘记的基础知识系列)
Master 节点与 Worker 节点
一个 Spark 集群中拥有多个 Master 节点和 Worker 节点,一台机器既可以是 Master,也可以是 Worker,同时根据部署模式的不同,这两个角色的数量也不同,至于节点名称的解释可以理解为 Master 节点有一个 Master 进程,负责管理 Worker 节点,spark-submit 都是从 Master 节点提交;同理,Worker 节点中有一个 Worker 进程,与 Master 节点通信,管理 Executor。
Driver 进程与 Executor 进程
这俩玩意儿就稍微有点复杂了,我尽量描述得简单一点。
Driver 进程: Driver 进程本身可以运行在 Master 或者 Worker 上(如果部署模式是 On-Yarn,就不存在 Master 和 Worker 了),Driver 会将我们的编写的 Spark 应用拆分为多个 Stage,Stage 又会创建一批 Task,Task分配到 Executor 中执行 ...
Spark 优化笔记
本篇优化笔记全部基于 Scala 语言,不讨论 Java。
函数式编程
Spark 中使用函数式编程时,要尽可能避免在函数中对状态的修改和变更,换句话说,尽量不要在函数式编程中对某个变量,集合进行复杂操作;
一个算子中对一个变量和集合进行修改的开销并不大,性能也不低,但在 Spark 任务中,这个算子可能会被调用许多次,累计下来,开销就会提升,性能也会跟着下降。
这个我就不举例子了,太常见了,但有些时候是无法避免的需要在算子中进行一些操作,难免会用到一些临时变量,所以只能说尽可能避免。
不要在 Driver 中进行计算
这个是在对去年年底(2020年)的一个项目进行优化的时候记录的优化方案,先说一下项目大致背景,就是要对一批对象进行统计,但是每个对象开始统计的时间并不一样,而且每个对象都需要统计一段很长时间内的数据,且统计规则相当复杂,暂时列举为 数据集A,数据集B 和 数据集C。
第一版代码: 是将每个对象的数据全部提取出来,collect 到 driver 得到 数据集A,然后遍历 A 得到统计 B 的条件,再用 SparkSQL 根据条件拉取新的数据并 collect 到 d ...
Spark 内存模型
环境参数
spark 内存模型中会涉及到多个配置,这些配置由一些环境参数及其配置值有关,为防止后面理解混乱,现在这里列举出来,如果忘记了,可以返回来看看:
spark.executor.memory :JVM On-Heap 内存(堆内内存),在使用 spark submit 提交的时候,可以通过配置 --executor-memory 来对这个值进行修改。
spark.yarn.executor.memoryOverhead :这是用于配置 Executor 的额外内存,因为 Executor 在执行的时候,可能会超过 executor memory,所以会为 executor 预留一部分内存。
spark.memory.offHeap.enabled :用于开启堆外内存(PS:这个是系统级别的,不受 JVM 管理)。
spark.memory.offHeap.size : 设置堆外内存大小;
spark.memeory.fraction :用于配置统一内存,这个值在 Spark 2.0+ 为 60%,Spark 1.6 为 75%。
spark.memory.storageFr ...
Flink 初学笔记(角色及组件)
集群架构
JobManager:管理节点,每个集群至少一个,管理整个集群的计算资源,Job 管理与调度执行,以及 Checkpoint 协调。
TaskManager:每个集群有多个 TaskManager,负责计算资源提供。
Client:本地执行应用 main() 方法,解析 JobGraph 对象,并最终将 JobGraph 提交到 JobManager 运行,同时监控 Job 执行的状态。
JobManager
功能及组件:
Checkpoint Coordinator 组件用于协调每个 TaskManager 中的 Checkpoint 的协调和执行;
由 Client 提交的 JobGraph(逻辑图) 生成 Execution Graph(物理执行图) ,Execution Graph 会被拆分不同的执行单元,提交到 TaskManager 中执行;
将一个 Task 拆分为不同的 Task,并部署到不同的 TaskManager 上运行(JobManager 和 TaskManager 通过 Task 进程进行交互);
JobManager 与 Client ...
JVM 方法加载
JVM 寻找对应的方法
JVM 识别方法的关键在于一下三点:
方法所在的类名
方法名称
方法描述符:由方法的参数类型,以及返回类型构成
(PS:在类的链接阶段中,会有一个验证过程,在验证过程中,如果出现多个名字相同,且描述符相同的方法,那么验证阶段就会报错)
JVM 中并不是 Java 代码,而是字节码,而调用方法的字节码携带着方法描述符,因为描述符中存在着详细的数据,所以 JVM 可以准确的找到对应的方法。
(再 PS:如果子类中定义了与父类相同的非私有,非静态的方法,若方法名,参数类型,返回类型相同,则 JVM 将子类中的这种方法判定为父类方法的重写;如果是重复的是静态方法,则是直接覆盖)
重载与重写
重载
JVM 根据传入方法的参数类型对重载方法进行选择,选取过程为:
不考虑自动装箱拆箱,不考虑可变长度参数,选取重载方法;
考虑自动装箱拆箱,不考虑可变长度参数,选取重载方法;
考虑自动装箱拆箱,考虑可变长度参数,选取重载方法;
如果 Java 编译器在同一个阶段中找到了多个重载方法,会选择一个更加确切的,而这个确切程度是由参数类型的继承关系决定的,举个例子:
123 ...
安全失败机制
简介
在 快速失败机制 一文中,大致说明了在 java.util 中集合内实现的快速失败机制,并在最后指出,一旦多个线程操作同一个集合,就有可能触发 ConcurrentModificationException 异常。
安全失败机制与快速失败机制差不多,但快速失败是为了检测并发修改的 bug,而安全失败机制是为了解决这种 bug 提供一种解决方案。
源码
我还是用 HashMap 的线程安全集合 ConcurrentHashMap 的源码介绍:
12345678910111213141516171819202122public final Iterator<V> iterator() { ConcurrentHashMap<K,V> m = map; Node<K,V>[] t; int f = (t = m.table) == null ? 0 : t.length; return new ValueIterator<K,V>(t, f, 0, f, m);}public Iterato ...
快速失效机制
简介
以下解释摘抄自网络:
在系统设计中,快速失效系统一种可以立即报告任何可能表明故障的情况的系统。快速失效系统通常设计用于停止正常操作,而不是试图继续可能存在缺陷的过程。这种设计通常会在操作中的多个点检查系统的状态,因此可以及早检测到任何故障。快速失败模块的职责是检测错误,然后让系统的下一个最高级别处理错误。
简单点来说就是:快熟失效功能是在 java.util 包中集合的一种失败机制,在多个线程中操作同一个集合时会抛出异常,这种异常是需要停止任务并需要解决的。
源码
在我的 HashMap-源码解读 博客中的 putVal() 方法介绍中有提到一个变量 modCount,与这个变量对应的还有一个变量 expectedModCount,这两个变量一同实现了快速失效机制,这里给出 HashMap 中的 HashIterator 源码(HashMap 中还有很多涉及到这两个变量的源码,拿这个只是方便举例):
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647ab ...
HashMap 源码解读
Hash 算法
简单介绍一下 Hash 算法以及解决 Hash 冲突的方案。
优秀 Hash 算法的特点:
从 Hash 值不可以反向推导出原始的数据
输入数据的微小变化会得到完全不同的 Hash 值,相同的数据会得到相同的值
哈希算法的执行效率要高效,长的文本也能快速地计算出哈希值
Hash 算法的冲突概率要小
Hash 冲突解决方案
链地址法
链表地址法是使用一个链表数组,来存储相应数据,当 Hash 遇到冲突的时候依次添加到链表的后面进行处理;这也是 HashMap 在 1.8 之后做的优化之一。
开放地址法
线性探测法,就是比较常用的一种开放地址哈希表的一种实现方式。
线性探测法的核心思想是当冲突发生时,顺序查看表中下一单元,直到找出一个空单元或查遍全表。简单来说就是:一旦发生冲突,就去寻找下 一个空的散列表地址,只要散列表足够大,空的散列地址总能找到。
HashMap 数据结构
数组:通过 Hash 后得到的值可以直接找到对应的数组下标,时间复杂度为 O(1),就算是遍历素组,复杂度也是 O(n)
链表:1.7 版本采用首插法,在 resize 时,会重新 ...
Spark 自定义 UDAF 函数
简介
UDAF(User Defined Aggregate Function),即用户自定义聚合函数,至于啥叫聚合函数,用来干嘛的,熟悉 SQL 的自不必多说,而且 UDAF 面向的是 SparkSQL,熟悉 SQL 是前提条件。
场景
在一次我对 Spark 项目优化过程中,需要将一个复杂的计算从 Driver 端提取出来,重新设计然后放入 SparkSQL 中进行计算,但是已有的聚合函数是完全无法满足需求的,我需要处理的数据包含三个列,一般的聚合函数只能满足一列,这时候就需要使用自定义聚合函数了。
(PS:至于为什么要从 Driver 端提取出来是因为历史原因,这个放到以后的 Spark 优化方案博客中说明)
UDAF 的使用
自定义 UDAF 一共分为三步:
自定义类继承 UserDefinedAggregateFunction 类,并实现对应的方法;
使用 Spark 对定义好的类进行注册,并提供一个可在 SQL 语句中调用的函数名;
在 SQL 中使用;
自定义 UDAF 类
1234567891011121314151617181920212223242526 ...
JVM 中类的加载
简介
从编译后的 class 文件到内存中的类,需要经过加载,链接以及初始化三个步骤。链接需要经过验证,而内存中的类没有初始化,也无法使用。
加载 —— 双亲委派模型
在说明加载前,需要理解双亲委派模型是个啥玩意儿。
子-类加载器 需要加载某个 class 文件时,会将这个任务委派给他的 父-类加载器,然后递归这个操作,如果 父-类加载器 没有加载,则 子-类加载器 才会去加载。
(注: 上面的类加载器的父子关系并不是 java 语言的继承关系,只是一种组合关系,或者说层级关系)
类的唯一性
类加载器名称 + 类全限定名称
类加载器的类别
BootstrapClassLoader(启动类加载器)
启动类加载器是用 C++ 实现的,没有对应的 Java 对象,因此在 Java 中只能用 NULL 来指代。相当于这个加载器是最顶级的加载器,不能直接通过引用进行操作。
该加载器加载 Java 的核心库 java.*,构造 ExtClassLoader 和 AppClassLoader。
ExtClassLoader(标准扩展类加载器)
ext:extension
由 Java 编 ...