Spark Graph的outerJoinVertices操作实战!

    xiaoxiao2021-09-13  66

    Spark Graph的outerJoinVertices操作实战!

    一、outerJoinVertices是什么?

    图的vertices进行join操作,就要用到outerJoinVertices。

    /** * Joins the vertices with entries in the `table` RDD and merges the results using `mapFunc`. * The input table should contain at most one entry for each vertex. If no entry in `other` is * provided for a particular vertex in the graph, the map function receives `None`. **/ 其中mapFunc是用来对值进行操作的。

    如果第二个不存在,则返回none,也就是说,跟LeftOuterJoin操作一样。

    二、实战操作

    //设置运行环境 val conf = new SparkConf().setAppName("SNSAnalysisGraphX").setMaster("local[4]") val sc = new SparkContext(conf) //创建点RDD val usersVertices: RDD[(VertexId, (String, String))] = sc.parallelize(Array( (1L, ("Spark", "scala")), (2L, ("Hadoop", "java")), (3L, ("Kafka", "scala")), (4L, ("Zookeeper", "Java ")))) //创建边RDD val usersEdges: RDD[Edge[String]] = sc.parallelize(Array( Edge(2L, 1L, "study"), Edge(3L, 2L, "train"), Edge(1L, 2L, "exercise"), Edge(4L, 1L, "None"))) val salaryVertices :RDD[(VertexId,(String,Long))] =sc.parallelize(Array( (1L,("Spark",30L)),(2L, ("Hadoop", 15L)), (3L, ("Kafka", 10L)), (5L, ("parameter server", 40L)) )) val salaryEdges: RDD[Edge[String]] = sc.parallelize(Array( Edge(2L, 1L, "study"), Edge(3L, 2L, "train"), Edge(1L, 2L, "exercise"), Edge(5L, 1L, "None"))) //构造Graph val graph = Graph(usersVertices, usersEdges) val graph1 = Graph(salaryVertices, salaryEdges) //outerJoinVertices操作, val joinGraph = graph.outerJoinVertices(graph1.vertices) { (id, attr, deps) => deps match { case Some(deps) => deps case None => 0 } } joinGraph.vertices.collect.foreach(println) sc.stop()

    三、运行结果

    (4,0) (1,(Spark,30)) (2,(Hadoop,15)) (3,(Kafka,10)) 可以看见,第一个graph的4个顶点都在,而且1/2/3内容都换成了第二个的,但是4顶点,因为第二个图不存在,内容为0,而第二个图的第5顶点被过滤掉了! 完成! 欢迎加入DT大数据梦工厂王家林老师蘑菇云行动!
    转载请注明原文地址: https://ju.6miu.com/read-677586.html

    最新回复(0)