图的vertices进行join操作,就要用到outerJoinVertices。
/** * Joins the vertices with entries in the `table` RDD and merges the results using `mapFunc`. * The input table should contain at most one entry for each vertex. If no entry in `other` is * provided for a particular vertex in the graph, the map function receives `None`. **/ 其中mapFunc是用来对值进行操作的。如果第二个不存在,则返回none,也就是说,跟LeftOuterJoin操作一样。
//设置运行环境 val conf = new SparkConf().setAppName("SNSAnalysisGraphX").setMaster("local[4]") val sc = new SparkContext(conf) //创建点RDD val usersVertices: RDD[(VertexId, (String, String))] = sc.parallelize(Array( (1L, ("Spark", "scala")), (2L, ("Hadoop", "java")), (3L, ("Kafka", "scala")), (4L, ("Zookeeper", "Java ")))) //创建边RDD val usersEdges: RDD[Edge[String]] = sc.parallelize(Array( Edge(2L, 1L, "study"), Edge(3L, 2L, "train"), Edge(1L, 2L, "exercise"), Edge(4L, 1L, "None"))) val salaryVertices :RDD[(VertexId,(String,Long))] =sc.parallelize(Array( (1L,("Spark",30L)),(2L, ("Hadoop", 15L)), (3L, ("Kafka", 10L)), (5L, ("parameter server", 40L)) )) val salaryEdges: RDD[Edge[String]] = sc.parallelize(Array( Edge(2L, 1L, "study"), Edge(3L, 2L, "train"), Edge(1L, 2L, "exercise"), Edge(5L, 1L, "None"))) //构造Graph val graph = Graph(usersVertices, usersEdges) val graph1 = Graph(salaryVertices, salaryEdges) //outerJoinVertices操作, val joinGraph = graph.outerJoinVertices(graph1.vertices) { (id, attr, deps) => deps match { case Some(deps) => deps case None => 0 } } joinGraph.vertices.collect.foreach(println) sc.stop()