Learning Spark笔记3-传递函数给Spark

    xiaoxiao2021-04-17  38

    传递函数给Spark 大多数的Spark转换和一些动作都依赖传递函数计算数据。 1.Python  在Python中我们使用3种形式传递函数到Spark。对于较短的函数,我们可以传lambda表达式、通过顶层函数传递或定义本地函数。 Example 3-18. Passing functions in Python word = rdd.filter(lambda s: "error" in s) def containsError(s): return "error" in s word = rdd.filter(containsError) 有一件事需要注意的是,它会序列化函数中的对象。当你传递一个函数时,如果该函数是一个对象的成员或包含属性的引用(例如,self.field),Spark会将整个对象发送给工作节点,这样的对象肯定要比你需要信息大的多。有些时候,这也可以能会引起你的程序失败, Example 3-19. Passing a function with field references (don’t do this!) class SearchFunctions(object): def __init__(self, query): elf.query = query def isMatch(self, s): return self.query in s def getMatchesFunctionReference(self, rdd): # Problem: references all of "self" in "self.isMatch" return rdd.filter(self.isMatch) def getMatchesMemberReference(self, rdd): # Problem: references all of "self" in "self.query" return rdd.filter(lambda x: self.query in x) 相反的,只需将您需要的字段从您的对象中提取到一个局部变量中,并将其传入,就像我们所做的那样。 class WordFunctions(object): ... def getMatchesNoReference(self, rdd): # Safe: extract only the field we need into a local variable query = self.query return rdd.filter(lambda x: query in x) 2.Scala 在Scala中,我们可以传递定义的内联函数,引用方法或static。 Example 3-21. Scala function passing class SearchFunctions(val query: String) { def isMatch(s: String): Boolean = { s.contains(query) } def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = { // Problem: "isMatch" means "this.isMatch", so we pass all of "this" rdd.map(isMatch) } def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = { // Problem: "query" means "this.query", so we pass all of "this" rdd.map(x => x.split(query)) } def getMatchesNoReference(rdd: RDD[String]): RDD[String] = { // Safe: extract just the field we need into a local variable val query_ = this.query rdd.map(x => x.split(query_)) } } 在Scala中如果出现NotSerializableException,通常都是由一个非序列化类中的方法或属性引用导致的。请注意,传递本地可序列化变量或顶层对象的成员函数是安全的。 3.Java Example 3-22. Java function passing with anonymous inner class RDD<String> errors = lines.filter(new Function<String, Boolean>() { public Boolean call(String x) { return x.contains("error"); } }); Example 3-23. Java function passing with named class class ContainsError implements Function<String, Boolean>() { public Boolean call(String x) {  return x.contains("error");  } } RDD<String> errors = lines.filter(new ContainsError()); 编程风格是个人偏好,但是我们发现,顶层命名函数用于组织大型程序通常比较简单,顶级函数的另一个优点是可以给它们构造函数参数 Example 3-24. Java function class with parameters class Contains implements Function<String, Boolean>() { private String query; public Contains(String query) { this.query = query; } public Boolean call(String x) { return x.contains(query); } } RDD<String> errors = lines.filter(new Contains("error")); 在Java 8中,你也可以使用lambda表达式 Example 3-25. Java function passing with lambda expression in Java 8 RDD<String> errors = lines.filter(s -> s.contains("error"));
    转载请注明原文地址: https://ju.6miu.com/read-673602.html

    最新回复(0)