You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

192 lines
16 KiB
Markdown

<h1><div style="text-align: center; color: #fa4861">Spark核心手册-自整理</div></h1>
## 0. 目录
- Spark 基础
- Spark SQL
- SparkMLlib
- StructuredStreaming
- Spark性能调优
## 1. 基础认知
- 官方QuickStart: https://spark.apache.org/
- 官方文档: https://spark.apache.org/docs/latest/
- 最新版本 【截止2022-7-20】
- 学习版本 【截止2022-7-20】
## 2. 基础
### 2.1 Spark 的 Hello World
- IDEA 开发配置
- 插件 Scala 下载 + [项目结构 -> 库 -> 添加Jar -> 选spark的jars目录下的文件]
#### A. Word Count 代码实现
- 读取内容、分词、分组计数这三步来看看 Word Count具体怎么实现。
- **第一步,读取内容**
- 首先,我们调用 SparkContext 的 textFile 方法,读取源文件,也就是 wikiOfSpark.txt
```scala
import org.apache.spark.rdd.RDD
val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"
// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
```
- 3 个新概念,分别是 spark、sparkContext 和 RDD
- 其中spark 和 sparkContext 分别是两种不同的开发入口实例:
- spark 是开发入口 SparkSession 实例InstanceSparkSession 在 spark-shell 中会由系统自动创建;
- sparkContext 是开发入口 SparkContext 实例。
- 在 Spark 版本演进的过程中,从 2.0 版本开始SparkSession 取代了 SparkContext成为统一的开发入口。换句话说要开发 Spark 应用,你必须先创建 SparkSession。
- RDD 的全称是 Resilient Distributed Dataset意思是“弹性分布式数据集”。RDD 是 Spark 对于分布式数据的统一抽象,它定义了一系列分布式数据的基本属性与处理方法。
- **第二步,分词**
- “分词”就是把“数组”的行元素打散为单词。要实现这一点,我们可以调用 RDD 的flatMap 方法来完成。flatMap 操作在逻辑上可以分成两个步骤:**映射和展平**。
```scala
// 以行为单位做分词
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
```
- 要把 lineRDD 的行元素转换为单词我们得先用分隔符对每个行元素进行分割Split这里的分隔符是空格
- 分割之后,每个行元素就都变成了单词数组,元素类型也从 String 变成了 Array[String],像这样以元素为单位进行转换的操作,统一称作“映射”。
- RDD 类型由原来的 RDD[String]变为 RDD[Array[String]]
- RDD[String]看成是“数组”的话,那么 RDD[Array[String]]就是一个“二维数组”,它的每一个元素都是单词。
- ![以行为单位做分词](pic/以行为单位做分词.png)
- 为了后续对单词做分组,我们还需要对这个“二维数组”做展平,也就是去掉内层的嵌套
- 结构,把“二维数组”还原成“一维数组”,如下图所示。
- ![分词后做展平](pic/分词后做展平.png)
- 就这样,在 flatMap 算子的作用下,原来以行为元素的 lineRDD转换成了以单词为元素的 wordRDD。
- 不过,值得注意的是,我们用“空格”去分割句子,有可能会产生空字符串。所以,在完成“映射”和“展平”之后,对于这样的“单词”,我们要把其中的空字符串都过滤掉,这里我们调用 RDD 的 filter 方法来过滤:
```scala
// 过滤掉空字符串
val cleanWordRDD: RDD[String] = wordRDD.filter(word => !word.equals(""))
```
- 这样一来我们在分词阶段就得到了过滤掉空字符串之后的单词“数组”类型是RDD[String]。接下来,我们就可以准备做分组计数了。
- **第三步,分组计数**
- 在 RDD 的开发框架下聚合类操作如计数、求和、求均值需要依赖键值对Key Value Pair类型的数据元素也就是KeyValue形式的“数组”元素。
- 因此,在调用聚合算子做分组计数之前,我们要先把 RDD 元素转换为KeyValue的形式也就是把 RDD[String]映射成 RDD[(String, Int)]。
- 其中,我们统一把所有的 Value 置为 1。这样一来对于同一个的单词在后续的计数运算中我们只要对 Value 做累加即可,就像这样:
- ![把元素转换为Key-Value形式](pic/把元素转换为Key-Value形式.png)
- 下面是对应的代码:
```scala
// 把RDD元素转换为KeyValue的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
```
- 这样一来RDD 就由原来存储 String 元素的 cleanWordRDD转换为了存储StringInt的 kvRDD。
- 完成了形式的转换之后,我们就该正式做分组计数了。分组计数其实是两个步骤,也就是先“分组”,再“计数”。下面,我们使用聚合算子 reduceByKey 来同时完成分组和计数这两个操作。
- 对于 kvRDD 这个键值对“数组”reduceByKey 先是按照 Key也就是单词来做分组分组之后每个单词都有一个与之对应的 Value 列表。然后根据用户提供的聚合函数,对同一个 Key 的所有 Value 做 reduce 运算。
- 这里的 reduce你可以理解成是一种计算步骤或是一种计算方法。当我们给定聚合函数后它会用折叠的方式把包含多个元素的列表转换为单个元素值从而统计出不同元素的数量。
- 在 Word Count 的示例中,我们调用 reduceByKey 实现分组计算的代码如下:
```scala
// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
```
- reduceByKey 分组聚合
- 可以看到,我们传递给 reduceByKey 算子的聚合函数是 (x, y) => x + y也就是累加函数。因此在每个单词分组之后reduce 会使用累加函数,依次折叠计算 Value 列表中的所有元素,
最终把元素列表转换为单词的频次。对于任意一个单词来说reduce 的计算过程都是一样的,如下图所示。
- ![reduce操作示意图](pic/reduce操作示意图.png)
- reduceByKey 完成计算之后,我们得到的依然是类型为 RDD[(String, Int)]的 RDD。不过与 kvRDD 不同wordCounts 元素的 Value 值,记录的是每个单词的统计词频。
- ![reduceByKey转换示意图](pic/reduceByKey转换示意图.png)
- 在程序的最后,我们还要把 wordCounts 按照词频做排序,并把词频最高的 5 个单词打印到屏幕上,代码如下所示。
```scala
// 打印词频最高的5个词汇
wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)
```
- 完整代码 : HelloWorldDemo
### 2.2 RDD与编程模型延迟计算是怎么回事
#### A. 什么是 RDD
- RDD 是构建 Spark 分布式内存计算引擎的基石很多Spark 核心概念与核心组件,如 DAG 和调度系统都衍生自 RDD。因此深入理解 RDD有利于你更全面、系统地学习 Spark 的工作原理
- 尽管 RDD API 使用频率越来越低,绝大多数人也都已经习惯于 DataFrame 和Dataset API但是无论采用哪种 API 或是哪种开发语言,你的应用在 Spark 内部最终都会转化为 RDD 之上的分布式计算。
- 换句话说,如果你想要对 Spark 作业有更好的把握,前提是你要对 RDD 足够了解。
- 用一句话来概括,**RDD 是一种抽象是Spark 对于分布式数据集的抽象,它用于囊括所有内存中和磁盘中的分布式数据实体**。
- RDD和数组类似, RDD 和数组对比
- ![RDD与数组的对比](pic/RDD与数组的对比.png)
- 首先,就概念本身来说,数组是实体,它是一种存储同类元素的数据结构,而 RDD 是一种抽象,它所囊括的是分布式计算环境中的分布式数据集。
- 因此,这两者第二方面的不同就是在活动范围,数组的“活动范围”很窄,仅限于单个计算节点的某个进程内,而 RDD 代表的数据集是跨进程、跨节点的,它的“活动范围”是整个集群。
- 至于数组和 RDD 的第三个不同,则是在数据定位方面。在数组中,承载数据的基本单元是元素,而 RDD 中承载数据的基本单元是数据分片。在分布式计算环境中,一份完整的数据集,会按照某种规则切割成多份数据分片。
这些数据分片被均匀地分发给集群内不同的计算节点和执行进程,从而实现分布式并行计算。
- 通过以上对比,不难发现,**数据分片Partitions** 是 RDD 抽象的重要属性之一。
- 接下来咱们换个视角,从 RDD 的重要属性出发去进一步深入理解RDD。要想吃透 RDD我们必须掌握它的 4 大属性:
- partitions数据分片
- partitioner分片切割规则
- dependenciesRDD 依赖
- compute转换函数
#### B. 从薯片的加工流程看 RDD 的 4 大属性
- ![RDD的生活化类比](pic/RDD的生活化类比.png)
- 为了充分利用每一颗土豆、降低生产成本
- 工坊使用 3 条流水线来同时生产 3 种不同尺寸的桶装薯片。3 条流水线可以同时加工 3 颗土豆,每条流水线的作业流程都是一样的
- 分别是清洗、切片、烘焙、分发和装桶
- 其中,分发环节用于区分小、中、大号 3 种薯片3 种不同尺寸的薯片分别被发往第 1、2、3 条流水线。
- 那如果我们把每一条流水线看作是分布式运行环境的计算节点,用薯片生产的流程去类比 Spark 分布式计算,会有哪些有趣的发现呢?
- 这里的每一种食材形态,如“带泥土豆”、“干净土豆”、“土豆片”等,都可以看成是一个个 RDD。
- 而薯片的制作过程,实际上就是不同食材形态的转换过程。
- 从上到下的方向,去观察上图中土豆工坊的制作工艺。
- 我们可以看到对于每一种食材形态来说,流水线上都有多个实物与之对应,比如,“带泥土豆”是一种食材形态,流水线上总共有 3 颗“脏兮兮”的土豆同属于这一形态。
- 如果把“带泥土豆”看成是 RDD 的话,那么 RDD 的 partitions 属性囊括的正是麻袋里那一颗颗脏兮兮的土豆。同理流水线上所有洗净的土豆一同构成了“干净土豆”RDD的 partitions 属性。
- 我们再来看 RDD 的 partitioner 属性这个属性定义了把原始数据集切割成数据分片的切割规则。在土豆工坊的例子中“带泥土豆”RDD 的切割规则是随机拿取,也就是从麻袋中随机拿取一颗脏兮兮的土豆放到流水线上。
后面的食材形态如“干净土豆”、“土豆片”和“即食薯片”则沿用了“带泥土豆”RDD 的切割规则。换句话说后续的这些RDD分别继承了前一个 RDD 的 partitioner 属性。
- 这里面与众不同的是“分发的即食薯片”。显然“分发的即食薯片”是通过对“即食薯片”按照大、中、小号做分发得到的。也就是说对于“分发的即食薯片”来说它的partitioner 属性,重新定义了这个 RDD 数据分片的切割规则,
也就是把先前 RDD 的数据分片打散,按照薯片尺寸重新构建数据分片。
- 由这个例子我们可以看出,数据分片的分布,是由 RDD 的 partitioner 决定的。因此RDD 的 partitions 属性,与它的 partitioner 属性是强相关的。
- 接下来,我们横向地,也就是沿着从左至右的方向,再来观察土豆工坊的制作工艺。
- 流水线上的每一种食材形态,都是上一种食材形态在某种操作下进行转换得到的。比如,“土豆片”依赖的食材形态是“干净土豆”,这中间用于转换的操作是“切片”这个动作。回顾 Word Count 当中 RDD 之间的转换关系,我们也会发现类似的现象。
- ![WordCount中的RDD转换](pic/WordCount中的RDD转换.png)
- 在数据形态的转换过程中,每个 RDD 都会通过 dependencies 属性来记录它所依赖的前一个、或是多个 RDD简称“父 RDD”。与此同时RDD 使用 compute 属性,来记录从父 RDD 到当前 RDD 的转换操作。
- 拿 Word Count 当中的 wordRDD 来举例,它的父 RDD 是 lineRDD因此它的dependencies 属性记录的是 lineRDD。从 lineRDD 到 wordRDD 的转换,其所依赖的操作是 flatMap
因此wordRDD 的 compute 属性,记录的是 flatMap 这个转换函数。
- 总结下来,薯片的加工流程,与 RDD 的概念和 4 大属性是一一对应的:
- 不同的食材形态,如带泥土豆、土豆片、即食薯片等等,对应的就是 RDD 概念;
- 同一种食材形态在不同流水线上的具体实物,就是 RDD 的 partitions 属性;
- 食材按照什么规则被分配到哪条流水线,对应的就是 RDD 的 partitioner 属性;
- 每一种食材形态都会依赖上一种形态,这种依赖关系对应的是 RDD 中的dependencies 属性;
- 不同环节的加工方法对应 RDD 的 compute 属性。
- 编程模型指导我们如何进行代码实现,而延迟计算是 Spark 分布式运行机制的基础。只有搞明白编程模型与延迟计算,你才能流畅地在 Spark 之上做应用开发,在实现业务逻辑的同时,避免埋下性能隐患。
#### C. 编程模型与延迟计算
- map、filter、flatMap 和 reduceByKey 这些算子,有哪些共同点?
- 首先,这 4 个算子都是作用Apply在 RDD 之上、用来做 RDD 之间的转换。比如flatMap 作用在 lineRDD 之上,把 lineRDD 转换为 wordRDD。
- 其次这些算子本身是函数而且它们的参数也是函数。参数是函数、或者返回值是函数的函数我们把这类函数统称为“高阶函数”Higher-order Functions
- 这里,我们先专注在 RDD 算子的第一个共性RDD 转换。
- RDD 是 Spark 对于分布式数据集的抽象,每一个 RDD 都代表着**一种分布式数据形态**。比如 lineRDD它表示数据在集群中以行Line的形式存在而 wordRDD 则意味着数据的形态是单词,分布在计算集群中。
- RDD 代表的是分布式数据形态,因此,**RDD 到 RDD 之间的转换本质上是数据形态上的转换Transformations**。
- 在 RDD 的编程模型中一共有两种算子Transformations 类算子和 Actions 类算子。
- 开发者需要使用 Transformations 类算子定义并描述数据形态的转换过程然后调用Actions 类算子,将计算结果收集起来、或是物化到磁盘。
- 在这样的编程模型下Spark 在运行时的计算被划分为两个环节。
- 基于不用数据形态之间的转换构建计算流图DAGDirected Acyclic Graph
- 通过 Actions 类算子,以回溯的方式去触发执行这个计算流图。
- 换句话说,开发者调用的各类 Transformations 算子,并不立即执行计算,当且仅当开发者调用 Actions 算子时之前调用的转换算子才会付诸执行。在业内这样的计算模式有个专门的术语叫作“延迟计算”Lazy Evaluation
- 为什么 Word Count 在执行的过程中,只有最后一行代码会花费很长时间,而前面的代码都是瞬间执行完毕的呢?
- 答案正是 Spark 的延迟计算。flatMap、filter、map 这些算子,仅用于构建计算流图,因此,当你在 spark-shell 中敲入这些代码时spark-shell 会立即返回。只有在你敲入最后那行包含 take 的代码时,
Spark 才会触发执行从头到尾的计算流程,所以直观地看上去,最后一行代码是最耗时的。
- Spark 程序的整个运行流程如下图所示:
- ![延迟计算](pic/延迟计算.png)
- 常用的 RDD 算子进行了归类,并整理到了下面的表格中,随时查阅
- ![RDD算子归类](pic/RDD算子归类.png)
- 参考: https://spark.apache.org/docs/latest/rdd-programming-guide.html