大数据营销的特点(大数据分析在营销中的作用)
567
2022-05-29
3.7 共 享 变 量
通过前面的介绍,我们知道Spark是多机器集群部署的,分为Driver、Master和Worker。Master负责资源调度,Worker是不同的运算节点,由Master统一调度,而Driver是我们提交Spark程序的节点,并且所有的reduce类型的操作都会汇总到Driver节点进行整合。
节点之间会给每个节点传递一个map、reduce等操作函数的独立副本,这些变量也会被复制到每台机器上,而节点之间的运算是相互独立的。当我们利用RDD操作(如map、reduce)在远程节点执行一个功能函数时,其会在该节点开辟一块单独的变量空间供函数使用。
这些变量会被复制到每一台机器上,并且当变量发生改变时,并不会同步传播回Driver程序。如果进行通用支持,任务间的读写共享变量需要大量的同步操作,这会导致低效。所以,Spark提供了两种受限类型的共享变量用于两种常见的使用模式:广播变量和累加器。
3.7.1 累加器(Accumulator)
顾名思义,累加器是一种只能通过关联操作进行“加”操作的变量,因此它能够高效地应用于并行操作中。累加器能够用来实现对数据的统计和求和操作。Spark原生支持数值类型的累加器,开发者可以自己添加支持的类型,在2.0.0之前的版本中,通过继承AccumulatorParam来实现,而2.0.0之后的版本需要继承AccumulatorV2来实现自定义类型的累加器。
如果创建了一个具体名称的累加器,它可以在Spark的UI中显示。这对于理解运行阶段(running stages)的过程有很重要的作用,如图3.8所示。
图3.8 累加器展示图
Spark内置了数值型累加器,一个数值累加器可以由SparkContext.longAccumulator()或者SparkContext.doubleAccumulator()函数来创建,分别累加Long型或Double型数据。
之后节点上的任务可以利用add方法进行累加操作,但是它们并不能读取累加器的值。只有Driver程序能够通过value方法读取累加器的值,其具体使用方式如下:
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0,
name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
上面所述的代码是使用了累加器的内建支持类型Long,当然也可以通过集成AccumulatorV2的方式来创建支持我们自定义类型的累加器。
AccumulatorV2是一个包含一些方法的抽象类,其中一些方法必须被覆写:reset方法使得累加器能够被重置为0,add方法即添加另一个值到累加器中,merge方法能够将另一个同类型累加器整合到当前累加器中。
另外,其他必须覆写的方法可以参考API文档。这里我们参考官网的一个例子。假设有一个MyVector类型,表示数学中的向量,可以用以下方式来声明MyVector累加器:
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
// 创建全0向量
private val myVector: MyVector = MyVector.createZeroVector
// 重置操作
def reset(): Unit = {
myVector.reset()
}
// 向量相加
def add(v: MyVector): Unit = {
myVector.add(v)
}
...
}
// 创建向量类型的累加器
val myVectorAcc = new VectorAccumulatorV2
// 将累加器注册到Spark上下文中
sc.register(myVectorAcc, "MyVectorAcc1")
值得一提的是,对于自定义类型的累加器,我们可以设置不同于相加元素的输出元素。
累加器只有在Action操作中才会被更新,Spark保证每个任务对于累加器的更新只会执行一次,如重新启动任务并不会更新累加器的值。在Transformation操作中,如果Task、Job或Stages被重新执行(根据计算图重新计算结果),那么累加器的更新有可能被执行多次。
我们知道,Transformation会建立计算图,只有Action操作才会触发真正的计算,累加器也同样遵循这个懒惰(lazy)原则,即如果只在Transformation操作中调用累加器,其结果并不会改变,示例如下:
// 创建long型累加器
val accum = sc.longAccumulator
// 在map操作内累加器进行累加
data.map { x => accum.add(x); x }
// 由于没有任何Action操作,所以map操作并没有被执行,accum值还是0
?特别注意:上文中关于累加器的使用只适合于Spark 2.0.0之后的版本,在此之前的版本中,累加器的声明方式如下:
scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10
这点在使用不同版本的Spark时要特别注意,因为在Spark 2.0.0之后的版本API接口有了很大变化。
Spark spark 大数据 大数据
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。