Spark Streaming实时流式大数据处理实战》 ——3.7 共 享 变 量

网友投稿 528 2022-05-29

3.7  共 享 变 量

通过前面的介绍,我们知道Spark是多机器集群部署的,分为Driver、Master和Worker。Master负责资源调度,Worker是不同的运算节点,由Master统一调度,而Driver是我们提交Spark程序的节点,并且所有的reduce类型的操作都会汇总到Driver节点进行整合。

节点之间会给每个节点传递一个map、reduce等操作函数的独立副本,这些变量也会被复制到每台机器上,而节点之间的运算是相互独立的。当我们利用RDD操作(如map、reduce)在远程节点执行一个功能函数时,其会在该节点开辟一块单独的变量空间供函数使用。

这些变量会被复制到每一台机器上,并且当变量发生改变时,并不会同步传播回Driver程序。如果进行通用支持,任务间的读写共享变量需要大量的同步操作,这会导致低效。所以,Spark提供了两种受限类型的共享变量用于两种常见的使用模式:广播变量和累加器。

3.7.1  累加器(Accumulator)

顾名思义,累加器是一种只能通过关联操作进行“加”操作的变量,因此它能够高效地应用于并行操作中。累加器能够用来实现对数据的统计和求和操作。Spark原生支持数值类型的累加器,开发者可以自己添加支持的类型,在2.0.0之前的版本中,通过继承AccumulatorParam来实现,而2.0.0之后的版本需要继承AccumulatorV2来实现自定义类型的累加器。

如果创建了一个具体名称的累加器,它可以在Spark的UI中显示。这对于理解运行阶段(running stages)的过程有很重要的作用,如图3.8所示。

图3.8  累加器展示图

《Spark Streaming实时流式大数据处理实战》 ——3.7 共 享 变 量

Spark内置了数值型累加器,一个数值累加器可以由SparkContext.longAccumulator()或者SparkContext.doubleAccumulator()函数来创建,分别累加Long型或Double型数据。

之后节点上的任务可以利用add方法进行累加操作,但是它们并不能读取累加器的值。只有Driver程序能够通过value方法读取累加器的值,其具体使用方式如下:

scala> val accum = sc.longAccumulator("My Accumulator")

accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0,

name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))

...

10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value

res2: Long = 10

上面所述的代码是使用了累加器的内建支持类型Long,当然也可以通过集成AccumulatorV2的方式来创建支持我们自定义类型的累加器。

AccumulatorV2是一个包含一些方法的抽象类,其中一些方法必须被覆写:reset方法使得累加器能够被重置为0,add方法即添加另一个值到累加器中,merge方法能够将另一个同类型累加器整合到当前累加器中。

另外,其他必须覆写的方法可以参考API文档。这里我们参考官网的一个例子。假设有一个MyVector类型,表示数学中的向量,可以用以下方式来声明MyVector累加器:

class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

// 创建全0向量

private val myVector: MyVector = MyVector.createZeroVector

// 重置操作

def reset(): Unit = {

myVector.reset()

}

// 向量相加

def add(v: MyVector): Unit = {

myVector.add(v)

}

...

}

// 创建向量类型的累加器

val myVectorAcc = new VectorAccumulatorV2

// 将累加器注册到Spark上下文中

sc.register(myVectorAcc, "MyVectorAcc1")

值得一提的是,对于自定义类型的累加器,我们可以设置不同于相加元素的输出元素。

累加器只有在Action操作中才会被更新,Spark保证每个任务对于累加器的更新只会执行一次,如重新启动任务并不会更新累加器的值。在Transformation操作中,如果Task、Job或Stages被重新执行(根据计算图重新计算结果),那么累加器的更新有可能被执行多次。

我们知道,Transformation会建立计算图,只有Action操作才会触发真正的计算,累加器也同样遵循这个懒惰(lazy)原则,即如果只在Transformation操作中调用累加器,其结果并不会改变,示例如下:

// 创建long型累加器

val accum = sc.longAccumulator

// 在map操作内累加器进行累加

data.map { x => accum.add(x); x }

// 由于没有任何Action操作,所以map操作并没有被执行,accum值还是0

?特别注意:上文中关于累加器的使用只适合于Spark 2.0.0之后的版本,在此之前的版本中,累加器的声明方式如下:

scala> val accum = sc.accumulator(0, "My Accumulator")

accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

...

10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value

res2: Int = 10

这点在使用不同版本的Spark时要特别注意,因为在Spark 2.0.0之后的版本API接口有了很大变化。

Spark spark 大数据 大数据

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:豆丁的文档分享与盈利模式
下一篇:K8s 原理架构介绍(一)
相关文章