spark计算模型

By | 2018年12月20日

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_16365849/article/details/50642681

spark计算模型

与Hadoop 不同,Spark 一开始就瞄准性能,将数据(包括部分中间数据)放在内存,在内存中计算。用户将重复利用的数据缓存到内存,提高下次的计算效率,因此Spark 尤其适合迭代型和交互型任务。Spark 需要大量的内存,但性能可随着机器数目呈多线性增长。本章将介绍Spark 的计算模型。

3.1 Spark 程序模型

下面通过一个经典的示例程序来初步了解Spark 的计算模型,过程如下。

1)SparkContext 中的textFile 函数从HDFS读取日志文件,输出变量file

valfile=sc.textFile(“hdfs://xxx”)

2)RDD 中的filter 函数过滤带“ERROR”的行,输出errors(errors 也是一个RDD)。

valerrors=file.filter(line=>line.contains(“ERROR”)

3)RDD 的count 函数返回“ERROR”的行数:errors.count()。

RDD 操作起来与Scala 集合类型没有太大差别,这就是Spark 追求的目标:像编写单机程序一样编写分布式程序,但它们的数据和运行模型有很大的不同,用户需要具备更强的系统把控能力和分布式系统知识。

从RDD 的转换和存储角度看这个过程,如图3-1 所示。

在图3-1 中,用户程序对RDD 通过多个函数进行操作,将RDD进行转换。Block-Manager
管理RDD 的物理分区,每个Block
就是节点上对应的一个数据块,可以存储在内存或者磁盘。而RDD
中的partition
是一个逻辑数据块,对应相应的物理块Block。本质上一个RDD
在代码中相当于是数据的一个元数据结构,存储着数据分区及其逻辑结构映射关系,存储着RDD之前的依赖转换关系。

3.2 弹性分布式数据集

本节简单介绍RDD,并介绍RDD 与分布式共享内存的异同。

3.2.1 RDD 简介

在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(resilient distributed dataset,RDD),它是逻辑集中的实体,在集群中的多台机器上进行了数据分区。通过对多台机器上不同RDD分区的控制,就能够减少机器之间的数据重排(datashuffling)。Spark 提供了“ partitionBy”运算符,能够通过集群中多台机器之间对原始RDD 进行数据再分配来创建一个新的RDD。RDD 是Spark 的核心数据结构,通过RDD 的依赖关系形成Spark
的调度顺序。通过对RDD 的操作形成整个Spark 程序。

(1)RDD 的两种创建方式

1)从Hadoop文件系统(或与Hadoop 兼容的其他持久化存储系统,如Hive、Cassandra、Hbase)输入(如HDFS)创建。

2)从父RDD 转换得到新的RDD。

 

(2)RDD 的两种操作算子

对于RDD 可以有两种计算操作算子:Transformation(变换)与Action(行动)。

1)Transformation(变换)。

Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个RDD 的转换操作不是马上执行,需要等到有Actions 操作时,才真正触发运算。

2)Action(行动)

Action 算子会触发Spark 提交作业(Job),并将数据输出到Spark 系统。

 

(3)RDD 的重要内部属性

1)分区列表。

2)计算每个分片的函数。

3)对父RDD 的依赖列表。

4)对Key-Value 对数据类型RDD 的分区器,控制分区策略和分区数。

5)每个数据分区的地址列表(如HDFS 上的数据块的地址)。

 

3.2.2 RDD 与分布式共享内存的异同

RDD 是一种分布式的内存抽象,表3-1 列出了RDD 与分布式共享内存(DistributedSharedMemory,DSM)的对比。在DSM 系统中,应用可以向全局地址空间的任意位置进行读写操作。DSM 是一种通用的内存数据抽象,但这种通用性同时也使其在商用集群上实现有效的容错性和一致性更加困难RDD
DSM 主要区别在于,不仅可以通过批量转换创建(即“写”)RDD,还可以对任意内存位置读写。RDD 限制应用执行批量写操作,这样有利于实现有效的容错。特别是,由于RDD
可以使用Lineage(血统)来恢复分区,基本没有检查点开销。失效时只需要重新计算丢失的那些RDD
分区,就可以在不同节点上并行执行,而不需要回滚(Roll Back)整个程序。

通过备份任务的复制,RDD
还可以处理落后任务(即运行很慢的节点),这点与MapReduce
类似,DSM 则难以实现备份任务,因为任务及其副本均需读写同一个内存位置的数据。与DSM
相比,RDD 模型有两个优势。第一,对于RDD
中的批量操作,运行时将根据数据存放的位置来调度任务,从而提高性能。第二,对于扫描类型操作,如果内存不足以缓存整个RDD,就进行部分缓存,将内存容纳不下的分区存储到磁盘上。另外,RDD
支持粗粒度和细粒度的读操作。RDD 上的很多函数操作(如count 和collect 等)都是批量读操作,即扫描整个数据集,可以将任务分配到距离数据最近的节点上。同时,RDD 也支持细粒度操作,即在哈希或范围分区的RDD 上执行关键字查找。

 

3.2.3 Spark 的数据存储

Spark 数据存储的核心是弹性分布式数据集(RDD。RDD 可以被抽象地理解为一个大的数组(Array),但是这个数组是分布在集群上的。逻辑上RDD 的每个分区叫一个Partition。在Spark 的执行过程中,RDD 经历一个个的Transfomation
算子之后,最后通过Action算子进行触发操作。逻辑上每经历一次变换,就会将RDD
转换为一个新的RDDRDD
之间通过Lineage
产生依赖关系,这个关系在容错中有很重要的作用。变换的输入和输出都是RDD。RDD 会被划分成很多的分区分布到集群的多个节点中。分区是个逻辑概念,变换前后的新旧分区在物理上可能是同一块内存存储。这是很重要的优化,以防止函数式数据不变性(immutable)导致的内存需求无限扩张。有些RDD 是计算的中间结果,其分区并不一定有相应的内存或磁盘数据与之对应,如果要迭代使用数据,可以调cache() 函数缓存数据。图3-2 为RDD 的数据存储模型。

图3-2 中的RDD1 含有5 个分区(p1、p2、p3、p4、p5),分别存储在4 个节点(Node1、node2、Node3、Node4)中。RDD2 含有3 个分区(p1、p2、p3),分布在3 个节点(Node1、Node2、Node3)中。

在物理上,RDD
对象实质上是一个元数据结构,存储着BlockNode
等的映射关系,以及其他的元数据信息。一个RDD 就是一组分区,在物理数据存储上,RDD 的每个分区对应的就是一个Block,Block 可以存储在内存,当内存不够时可以存储到磁盘上。每个Block 中存储着RDD 所有数据项的一个子集,暴露给用户的可以是一个Block 的迭代器(例如,用户可以通过mapPartitions获得分区迭代器进行操作),也可以就是一个数据项(例如,通过map 函数对每个数据项并行计算)。本书会在后面章节具体介绍数据管理的底层实现细节。如果是从HDFS
等外部存储作为输入数据源,数据按照HDFS
中的数据分布策略进行数据分区,HDFS
中的一个Block 对应Spark
的一个分区。同时Spark 支持重分区,数据通过Spark 默认的或者用户自定义的分区器决定数据块分布在哪些节点。例如,支持Hash分区(按照数据项的Key 值取Hash 值,Hash 值相同的元素放入同一个分区之内)和Range分区(将属于同一数据范围的数据放入同一分区)等分区策略。

3.3 Spark 算子分类及功能

本节将主要介绍Spark 算子的作用,以及算子的分类。

1.Saprk 算子的作用

图3-3 描述了Spark 的输入、运行转换、输出。在运行转换中通过算子对RDD 进行转换。算子是RDD 中定义的函数,可以对RDD 中的数据进行转换和操作。

1)输入:在Spark 程序运行中,数据从外部数据空间(如分布式存储:textFile 读取HDFS 等,parallelize 方法输入Scala 集合或数据)输入Spark,数据进入Spark 运行时数据空间,转化为Spark 中的数据块,通过BlockManager 进行管理。

2)运行:在Spark 数据输入形成RDD 后便可以通过变换算子,如fliter 等,对数据进行操作并将RDD 转化为新的RDD,通过Action 算子,触发Spark 提交作业。如果数据需要复用,可以通过Cache 算子,将数据缓存到内存。

3)输出:程序运行结束数据会输出Spark 运行时空间,存储到分布式存储中(如saveAsTextFile 输出到HDFS),或Scala 数据或集合中(collect 输出到Scala 集合,count 返回Scala int 型数据)。Spark 的核心数据模型是RDD,但RDD 是个抽象类,具体由各子类实现, 如MappedRDD、ShuffledRDD 等子类。Spark 将常用的大数据操作都转化成为RDD 的子类。

2.算子的分类

大致可以分为三大类算子。

1)Value 数据类型的Transformation 算子,这种变换并不触发提交作业,针对处理的数据项是Value 型的数据。

2) Key-Value 数据类型的Transfromation 算子,这种变换并不触发提交作业,针对处理的数据项是Key-Value 型的数据对。

3)Action 算子,这类算子会触发SparkContext 提交Job 作业。

3.3.1 Value 型Transformation 算子

处理数据类型为Value 型的Transformation 算子可以根据RDD 变换算子的输入分区与输出分区关系分为以下几种类型。

1)输入分区与输出分区一对一型。

2)输入分区与输出分区多对一型。

3)输入分区与输出分区多对多型。

4)输出分区为输入分区子集型。

5)还有一种特殊的输入与输出分区一对一的算子类型:Cache 型。Cache 算子对RDD分区进行缓存。

1.输入分区与输出分区一对一型

(1)map

将原来RDD 的每个数据项通过map 中的用户自定义函数f 映射转变为一个新的元素。源码中的map 算子相当于初始化一个RDD,新RDD叫作MappedRDD(this, sc.clean(f))。图3-4 中的每个方框表示一个RDD 分区, 左侧的分区经过用户自定义函数f:T->U 映射为右侧的新的RDD 分区。但是实际只有等到Action 算子触发后,这个f 函数才会和其他函数在一个Stage 中对数据进行运算。V1 输入f 转换输出V’1。

(2)flatMap

将原来RDD 中的每个元素通过函数f 转换为新的元素,并将生成的RDD 的每个集合中的元素合并为一个集合。内部创建FlatMappedRDD(this, sc.clean(f))。图3-5 中小方框表示RDD 的一个分区,对分区进行flatMap 函数操作,flatMap 中传入的函数为f:T->U,T 和U 可以是任意的数据类型。将分区中的数据通过用户自定义函数f转换为新的数据。外部大方框可以认为是一个RDD 分区,小方框代表一个集合。V1、V2、V3 在一个集合作为RDD
的一个数据项,转换为V’1、V’2、V’3 后,将结合拆散,形成为RDD 中的数据项。

(3)mapPartitions

mapPartitions 函数获取到每个分区的迭代器,在函数中通过这个分区整体的迭代器对整个分区的元素进行操作。内部实现是生成MapPartitionsRDD。图3-6 中的方框代表一个RDD 分区。图3-6 中,用户通过函数f (iter)=>iter.filter(_>=3) 对分区中的所有数据进行滤,>=3的数据保留。一个方块代表一个RDD 分区,含有1、2、3 的分区过滤只剩下元素3。

(4)glom

glom 函数将每个分区形成一个数组,内部实现是返回的GlommedRDD。图3-7 中的每个方框代表一个RDD 分区。

图3-7 中的方框代表一个分区。该图表示含有V1、V2、V3 的分区通过函数glom
形成一个数组Array[(V1),(V2),(V3)]

2.输入分区与输出分区多对一型

(1)union

使用union 函数时需要保证两个RDD 元素的数据类型相同,返回的RDD 数据类型和被合并的RDD 元素数据类型相同,并不进行去重操作,保存所有元素。如果想去重,可以使用distinct()。++ 符号相当于union 函数操作。

图3-8 中左侧的大方框代表两个RDD,大方框内的小方框代表RDD 的分区。右侧大方框代表合并后的RDD,大方框内的小方框代表分区。含有V1,V2…U的RDD 和含有V1,V8…U8 的RDD 合并所有元素形成一个RDD。V1、V1、V2、V8 形成一个分区,其他元素同理进行合并。

(2)cartesian

对两个RDD 内的所有元素进行笛卡尔积操作。操作后,内部实现返回CartesianRDD。图3-9 中左侧的大方框代表两个RDD,大方框内的小方框代表RDD 的分区。右侧大方框代表合并后的RDD,大方框内的小方框代表分区。

图3-9 中的大方框代表RDD,大方框中的小方框代表RDD 分区。例如,V1 和另一个RDD 中的W1、W2、Q5 进行笛卡尔积运算形(V1,W1)、(V1,W2)、(V1,Q5)。

3.输入分区与输出分区多对多型

groupBy:将元素通过函数生成相应的Key,数据就转化为Key-Value 格式,之后将Key 相同的元素分为一组。函数实现如下。

① sc.clean( ) 函数将用户函数预处理:val cleanF = sc.clean(f)

②对数据map 进行函数操作,最后再对groupByKey进行分组操作。

this.map(t => (cleanF(t),t)).groupByKey(p)

其中,p 中确定了分区个数和分区函数,也就决定了并行化的程度。图3-10 中的方框代表RDD 分区。

图3-10 中的方框代表一个RDD 分区,相同key 的元素合并到一个组。例如,V1,V2合并为一个Key-Value 对,其中key 为“V”,Value 为“V1,V2”,形成V,Seq(V1,V2)。

4.输出分区为输入分区子集型

(1)filter

filter 的功能是对元素进行过滤,对每个元素应用f 函数,返回值为true 的元素在RDD中保留,返回为false 的将过滤掉。内部实现相当于生成FilteredRDD(this,sc.clean(f))。

下面代码为函数的本质实现:def filter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))

图3-11 中的每个方框代表一个RDD 分区。T 可以是任意的类型。通过用户自定义的过滤函数f,对每个数据项进行操作,将满足条件,返回结果为true 的数据项保留。例如,过滤掉V2、V3 保留了V1,将区分命名为V1’。

(2)distinct

distinct 将RDD 中的元素进行去重操作。图3-12 中的方框代表RDD 分区。图3-12 中的每个方框代表一个分区,通过distinct 函数,将数据去重。例如,重复数据V1、V1 去重后只保留一份V1。

(3)subtract

subtract 相当于进行集合的差操作,RDD 1 去除RDD 1 和RDD 2 交集中的所有元素。图3-13 中左侧的大方框代表两个RDD,大方框内的小方框代表RDD 的分区。右侧大方框代表合并后的RDD,大方框内的小方框代表分区。V1 在两个RDD 中均有,根据差集运算规则,新RDD 不保留,V2 在第一个RDD 有,第二个RDD 没有,则在新RDD 元素中包含V2。

(4)sample

sample 将RDD 这个集合内的元素进行采样,获取所有元素的子集。用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方式。内部实现是生成SampledRDD(withReplacement, fraction,seed)。函数参数设置如下:

­ withReplacement=true,表示有放回的抽样;

­ withReplacement=false,表示无放回的抽样。

图3-14 中的每个方框是一个RDD 分区。通过sample 函数,采样50% 的数据。V1、V2、U1、U2、U3、U4 采样出数据V1 和U1、U2,形成新的RDD。图3-14 中的方框代表RDD 分区。

(5)takeSample

takeSample() 函数和上面的sample 函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行Collect(),返回结果的集合为单机的数组。

图3-15 中左侧的方框代表分布式的各个节点上的分区,右侧方框代表单机上返回的结果数组。通过takeSample 对数据采样,设置为采样一份数据,返回结果为V1。 

5.Cache 型

(1)cache

cache 将RDD 元素从磁盘缓存到内存,相当于persist(MEMORY_ONLY)函数的功能。

图3-16 中的每个方框代表一个RDD 分区,左侧相当于数据分区都存储在磁盘,通过cache 算子将数据缓存在内存。

(2)persist

persist 函数对RDD 进行缓存操作。数据缓存在哪里由StorageLevel 枚举类型确定。有以下几种类型的组合(见图3-15),DISK 代表磁盘,MEMORY 代表内存,SER 代表数据是否进行序列化存储。

下面为函数定义,StorageLevel 是枚举类型,代表存储模式,用户可以通过图3-17 按需选择。persist(newLevel: StorageLevel)

图3-17 中列出persist 函数可以缓存的模式。例如,MEMORY_AND_DISK_SER 代表数据可以存储在内存和磁盘,并且以序列化的方式存储。其他同理。

图3-18 中的方框代表RDD 分区。disk 代表存储在磁盘,mem 代表存储在内存。数据最初全部存储在磁盘,通过persist(MEMORY_AND_DISK) 将数据缓存到内存,但是有的分区无法容纳在内存,例如:图3-18 中将含有V1,V2,V3 的RDD 存储到磁盘,将含有U1,U2 的RDD 仍旧存储在内存。

3.3.2 Key-Value 型Transformation 算子

Transformation 处理的数据为Key-Value 形式的算子,大致可以分为3 种类型:输入分区与输出分区一对一、聚集、连接操作。

1.输入分区与输出分区一对一

mapValues :针对(Key, Value)型数据中的Value 进行Map 操作,而不对Key 进行处理。

图3-19 中的方框代表RDD 分区。a=>a+2 代表只对(V1,1) 数据中的1 进行加2 操作,返回结果为3。

2.对单个RDD 或两个RDD 聚集

(1)单个RDD 聚集

1)combineByKey。

定义combineByKey 算子的代码如下。

combineByKey[C](createCombiner:(V) ⇒ C,mergeValue:(C, V) ⇒ C,mergeCombiners:(C, C) ⇒ C,partitioner: Partitioner, mapSideCombine:Boolean = true,serializer: Serializer =null): RDD[(K, C)]

说明:

­ createCombiner: V => C,在C不存在的情况下,如通过V创建seq C。

­ mergeValue : (C, V) =>C,当 C 已经存在的情况下,需要 merge,如把 item V 加到seq C 中,或者叠加。

­ mergeCombiners: (C, C) =>C,合并两个 C。

­ partitioner:Partitioner(分区器),Shuffle时需要通过Partitioner的分区策略进行分区。

­ mapSideCombine : Boolean =true,为了减小传输量,很多 combine 可以在 map 端先做。例如,叠加可以先在一个partition 中把所有相同的Key 的Value 叠加,再shuffle。

­ serializerClass: String = null,传输需要序列化,用户可以自定义序列化类。

例如,相当于将元素为(Int,Int) 的RDD 转变为了(Int, Seq[Int])类型元素的RDD。

图3-20 中的方框代表RDD 分区。通过combineByKey,将(V1, 2)、(V1, 1)数据合并为(V1, Seq(2, 1)

2)reduceByKey。

reduceByKey 是更简单的一种情况,只是两个值合并成一个值,所以createCombiner很简单,就是直接返回v,而mergeValue 和mergeCombiners 的逻辑相同,没有区别。函数实现代码如下。

def reduceByKey(partitioner: Partitioner, func: (V, V) => V):RDD[(K, V)] ={combineByKey[V]((v: V) => v, func, func, partitioner)}

图3-21 中的方框代表RDD 分区。通过用户自定义函数(A, B)=>(A + B),将相同Key 的数据(V1, 2)、(V1, 1)的value 相加,结果为(V1, 3)。

3)partitionBy。

partitionBy 函数对RDD 进行分区操作。函数定义如下:partitionBy(partitioner: Partitioner)

如果原有RDD 的分区器和现有分区器(partitioner)一致,则不重分区,如果不一致,则相当于根据分区器生成一个新的ShuffledRDD。

图3-22 中的方框代表RDD 分区。通过新的分区策略将原来在不同分区的V1、V2 数据都合并到了一个分区。

(2)对两个RDD 进行聚集

cogroup 函数将两个RDD 进行协同划分,cogroup 函数的定义如下。

cogroup[W](other: RDD[(K, W)],numPartitions: Int): RDD[(K, (Iterable[V],Iterable[W]))]

对在两个RDD 中的Key-Value 类型的元素,每个RDD 相同Key 的元素分别聚合为一个集合,并且返回两个RDD 中对应Key 的元素集合的迭代器(K, (Iterable[V], Iterable[W]))

其中,Key和Value,Value 是两个RDD下相同Key 的两个数据集合的迭代器所构成的元组。

图3-23 中的大方框代表RDD,大方框内的小方框代表RDD 中的分区。将RDD1 中的数据(U1, 1)、(U1, 2)和 RDD2 中的数据(U1, 2)合并为(U1,( (1, 2),( 2)))。

3.连接

(1)join

join 对两个需要连接的RDD 进行cogroup 函数操作,cogroup 原理请见上文。cogroup操作之后形成的新RDD,对每个key 下的元素进行笛卡尔积操作,返回的结果再展平,对应Key 下的所有元组形成一个集合,最后返回RDD[(K, (V,W))]下面代码为join 的函数实现, 本质是通过cogroup 算子先进行协同划分,再通过flatMapValues将合并的数据打散。

this.cogroup(other,partitioner).flatMapValues { case (vs, ws) =>

for (v <- vs; w <- ws) yield (v, w)}

图3-24 是对两个RDD 的join 操作示意图。大方框代表RDD,小方框代表RDD 中的分区。函数对拥有相同Key 的元素(例如V1) 为Key, 以做连接后的数据结果为(V1,(1,1)) 和(V1,(1,2))。

(2)leftOutJoin 和rightOutJoin

LeftOutJoin(左外连接)和RightOutJoin(右外连接)相当于在join 的基础上先判断一侧的RDD 元素是否为空,如果为空,则填充为空。如果不为空,则将数据进行连接运算,并返回结果。下面代码是leftOutJoin 的实现。

if (ws.isEmpty) {

vs.map(v => (v, None))

} else {

for (v <- vs; w <- ws) yield (v,Some(w))

}

3.3.3 Actions 算子

本质上在Actions 算子中通过SparkContext 执行提交作业的runJob 操作,触发了RDDDAG 的执行。

例如,Actions 算子collect 函数的代码如下,感兴趣的读者可以顺着这个入口进行源码剖析。

/* 返回这个RDD 的所有数据,结果以数组形式存储*/

def collect(): Array[T] = {

/* 提交Job*/

val results = sc.runJob(this, (iter:Iterator[T]) => iter.toArray)

Array.concat(results: _*)

}

下面根据Action 算子的输出空间将Action 算子进行分类:无输出、HDFS、Scala 集合和数据类型。

1.无输出

(1)foreach

对RDD 中的每个元素都应用f 函数操作,不返回RDD 和Array,而是返回Uint。

图3-25 表示foreach 算子通过用户自定义函数对每个数据项进行操作。本例中自定义函数为println(),控制台打印所有数据项。

2.HDFS

(1)saveAsTextFile

函数将数据输出,存储到HDFS 的指定目录。下面为函数的内部实现:

this.map(x => (NullWritable.get(), newText(x.toString)))

.saveAsHadoopFile[TextOutputFormat[NullWritable,Text]](path)

将RDD 中的每个元素映射转变为(Null, x.toString),然后再将其写入HDFS。

图3-26 中左侧的方框代表RDD 分区,右侧方框代表HDFS 的Block。通过函数将RDD 的每个分区存储为HDFS 中的一个Block。

(2)saveAsObjectFile

saveAsObjectFile 将分区中的每10 个元素组成一个Array,然后将这个Array 序列化,映射为(Null,BytesWritable(Y)) 的元素,写入HDFS 为SequenceFile 的格式。下面代码为函数内部实现:map(x=>(NullWritable.get(), newBytesWritable(Utils.serialize(x))))

图3-27 中的左侧方框代表RDD 分区,右侧方框代表HDFS 的Block。通过函数将RDD的每个分区存储为HDFS 上的一个Block。

3.Scala 集合和数据类型

(1)collect

collect 相当于toArray,toArray 已经过时不推荐使用,collect 将分布式的RDD 返回为一个单机的scala Array 数组。在这个数组上运用scala 的函数式操作。图3-28 中的左侧方框代表RDD 分区,右侧方框代表单机内存中的数组。通过函数操作,将结果返回到Driver 程序所在的节点,以数组形式存储。

(2)collectAsMap

collectAsMap 对(K, V) 型的RDD 数据返回一个单机HashMap。对于重复K 的RDD 元素,后面的元素覆盖前面的元素。

图3-29 中的左侧方框代表RDD 分区, 右侧方框代表单机数组。数据通过collectAsMap 函数返回给Driver 程序计算结果,结果以HashMap 形式存储。

(3)reduceByKeyLocally

实现的是先reduce 再collectAsMap 的功能,先对RDD 的整体进行reduce 操作,然后再收集所有结果返回为一个HashMap。

(4)lookup

下面代码为lookup 的声明:lookup(key: K): Seq[V]

Lookup 函数对(Key, Value) 型的RDD 操作,返回指定Key 对应的元素形成的Seq。这个函数处理优化的部分在于,如果这个RDD包含分区器,则只会对应处理K所在的分区,然后返回由(K, V) 形成的Seq。如果RDD 不包含分区器,则需要对全部RDD元素进行暴力扫描处理,搜索指定K 对应的元素。

图3-30 中的左侧方框代表RDD 分区,右侧方框代表Seq,最后结果返回到Driver 所在节点的应用中。

(5)count

count 返回整个RDD 的元素个数。内部函数实现如下:Defcount():Long=sc.runJob(this,Utils.getIteratorSize_).sum

在图3-31 中,返回数据的个数为5。一个方块代表一个RDD 分区。

(6)top

top 可返回最大的k 个元素。函数定义如下top(num:Int)(implicit ord: Ordering[T]): Array[T]

相近函数说明如下。

­ top 返回最大的 k 个元素。

­ take 返回最小的 k 个元素。

­ takeOrdered 返回最小的 k 个元素,并且在返回的数组中保持元素的顺序。

­ first 相当于top(1) 返回整个RDD中的前k 个元素,可以定义排序的方式 Ordering[T]。返回的是一个含前k 个元素的数组。

(7)reduce

reduce 函数相当于对RDD 中的元素进行reduceLeft 函数的操作。函数实现如下Some(iter.reduceLeft(cleanF))

reduceLeft 先对两个元素<K,V> 进行reduce 函数操作,然后将结果和迭代器取出的下一个元素<k,V> 进行reduce 函数操作,直到迭代器遍历完所有元素,得到最后结果。在RDD 中,先对每个分区中的所有元素<K, V> 的集合分别进行reduceLeft。每个分区形成的结果相当于一个元素<K, V>,再对这个结果集合进行reduceleft 操作。例如: 用户自定义函数如下f: (A,B)=>(A._1+”@”+B._1,A._2+B._2)

图3-32 中的方框代表一个RDD 分区,通过用户自定函数f 将数据进行reduce 运算。示例最后的返回结果为V1@ V2U1@U2@U3@U4,12

(8)fold

fold 和reduce 的原理相同,但是与reduce 不同,相当于每个reduce 时,迭代器取的第一个元素是zeroValue。

图3-33 中通过下面的用户自定义函数进行fold 运算,图中的一个方框代表一个RDD分区。读者可以参照(7) reduce 函数理解。

fold((“V0@”,2))( (A,B)=>(A._1+”@”+B._1,A._2+B._2))

(9)aggregate

aggregate 先对每个分区的所有元素进行aggregate 操作,再对分区的结果进行fold 操作。aggreagate 与fold 和reduce 的不同之处在于,aggregate 相当于采用归并的方式进行数据聚集,这种聚集是并行化的。而在fold 和reduce 函数的运算过程中,每个分区中需要进行串行处理,每个分区串行计算完结果,结果再按之前的方式进行聚集,并返回最终聚集结果。函数的定义如下aggregate[B](z: B)(seqop: (B,A) ⇒ B,combop:
(B,B) ⇒ B): B

图3-34 通过用户自定义函数对RDD 进行aggregate 的聚集操作,图中的每个方框代表一个RDD 分区。

rdd.aggregate(“V0@”,2)((A,B)=>(A._1+”@”+B._1,A._2+B._2)),(A,B)=>(A._1+”@”+B_1,A._@+B_.2))

最后,介绍两个计算模型中的两个特殊变量。

广播(broadcast)变量:其广泛用于广播Map Side Join 中的小表,以及广播大变量等场景。这些数据集合在单节点内存能够容纳,不需要像RDD 那样在节点之间打散存储。Spark 运行时把广播变量数据发到各个节点,并保存下来,后续计算可以复用。相比Hadoop 的distributed cache,广播的内容可以跨作业共享。Broadcast 的底层实现采用了BT机制。

accumulator 变量:允许做全局累加操作,如accumulator 变量广泛使用在应用中记录当

前的运行指标的情景。

发表评论