提取RDD内部信息
对于提取RDD内部信息有的人或许认为是一件非常简单的事情,“直接foreach操作不就行了?”。如果RDD不是为了分布式式计算。那么這样的逻辑就完全可以。
直接在外部顶一个可变量A,然后遍历RDD,A放入RDD遍历逻辑中,遍历完之后,就可以提取RDD内部的信息。但是RDD是为分布式而设计,也就是说,你這样的逻辑放在一个分布式中就行不通了。下面看一个例子
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* Created by legotime on 3/7/17.
*/
object RddInfo {
def getInfo(sc:SparkContext,rdd:RDD[Int])={
val br = new StringBuffer()
rdd.foreach {
part =>
br.append(part)
println("rdd运算之内:"+br)
}
println("rdd运算之外:"+br)
br
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("saveAsTextFile").setMaster("local")
val sc =new SparkContext(conf)
val rdd = sc.makeRDD(List(2,3,4,5),2)
print(getInfo(sc,rdd))
}
}
结果:
rdd运算之内:2
rdd运算之内:23
rdd运算之内:4
rdd运算之内:45
rdd运算之外:
你会发现br在rdd,foreach内部还好好的,但是foreach完之后就返回没有放入之前一样,
理由:因为br是driver端的一个顶一的一个变量,而RDD遍历操作是一个executor端的操作逻辑
分析
再来看一下foreach操作函数:
你会发现在runJob的时候,它是只针对RDD进行提交的,也就是说,别看你丢进那个变量到的foreach中了,但是实际操作的时候,根本不带上它。
也就是说br还在driver端内存中,而没有出现在在各个executor(因为只有被序列化的闭包才可以发送到executor上),所以br等于没有执行foreach内部逻辑
那怎么去操作才可以拿到RDD内部的数据。
Spark中可以设置累加器来针对這样的需求
自定义累加器:
object StringAccumulatorParam extends AccumulatorParam[String] {
override def addInPlace(r1: String, r2: String): String = add(r1,r2)
/**
* 初始化
* @param initialValue 初始值
* @return
*/
override def zero(initialValue: String): String = ""
def add(v1:String,v2:String)={
assert((!v1.isEmpty)|| (!v2.isEmpty))
v1+v2+" "
}
}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import txtMIning.StringAccumulatorParam
/**
* Created by legotime on 3/7/17.
*/
object RddInfo {
def getInfo(sc:SparkContext,rdd:RDD[Int])={
val br = new StringBuffer()
rdd.foreach {
part =>
br.append(part)
println("rdd运算之内:"+br)
}
println("rdd运算之外:"+br)
br
}
def getInfo2(sc:SparkContext,rdd:RDD[Int])={
val myAccumulator = sc.accumulator[String](" ")(StringAccumulatorParam)
rdd.foreach{ word =>
myAccumulator.add(word.toString)
//println("rdd运算之内:"+myAccumulator.value),你会发现在这里累加器调用会报错
}
println("rdd运算之外:"+myAccumulator.value)
myAccumulator.value
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("saveAsTextFile").setMaster("local")
val sc =new SparkContext(conf)
val rdd = sc.makeRDD(List(2,3,4,5),2)
//print(getInfo(sc,rdd))
print(getInfo2(sc,rdd))
}
}结果
2 3 4 5
信息放入RDD
直接看例子
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
/**
* Created by root on 3/7/17.
*/
class myConstants {
val a = 2
//静态闭包
//比如
def closures2RDD(sc:SparkContext,rdd:RDD[Int])={
rdd.map(part => part+a).reduce(_+_)
}
}
运行:
println( (new myConstants).closures2RDD(sc,rdd))
报错:
解决方法
1、把类成员变量copy一份到闭包中,改为如下:
class myConstants {
val a = 2
//静态闭包
//比如
def closures2RDD(sc:SparkContext,rdd:RDD[Int])={
val b = a
rdd.map(part => part+b).reduce(_+_)
}
2、把类进行序列化,改为如下:
class myConstants extends Serializable{
val a = 2
//静态闭包
//比如
def closures2RDD(sc:SparkContext,rdd:RDD[Int])={
rdd.map(part => part+a).reduce(_+_)
}
}理由:再来看foreach或map這个函数(发现大多数算子都有這个清理闭包這一步)
再点进clean這个函数:
/**
* Clean a closure to make it ready to serialized and send to tasks
* (removes unreferenced variables in $outer's, updates REPL variables)
* If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively
* check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt>
* if not.
*
* @param f the closure to clean
* @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability
* @throws SparkException if <tt>checkSerializable</tt> is set but <tt>f</tt> is not
* serializable
*/
private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
ClosureCleaner.clean(f, checkSerializable)
f
}一位sparkcommiter给出的理由[1]
When Scala constructs a closure, it determines which outer variables the closure will use and stores references to them in the closure object. This allows the closure to work properly even when it's called from a different scope than it was created in.
Scala sometimes errs on the side of capturing too many outer variables (see SI-1419). That's harmless in most cases, because the extra captured variables simply don't get used (though this prevents them from getting GC'd). But it poses a problem for Spark, which has to send closures across the network so they can be run on slaves. When a closure contains unnecessary references, it wastes network bandwidth. More importantly, some of the references may point to non-serializable objects, and Spark will fail to serialize the closure.
To work around this bug in Scala, the ClosureCleaner traverses the object at runtime and prunes the unnecessary references. Since it does this at runtime, it can be more accurate than the Scala compiler can. Spark can then safely serialize the cleaned closure.
1、https://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark
转载请注明原文地址: https://ju.6miu.com/read-1559.html