关于Graphx中的pregel的API理解

    xiaoxiao2021-03-25  133

    由于在做一些图论相关的工作,平时工作中使用Spark比较多,所以决定学习一下Graphx,对以后挖掘数据会比较有帮助。

    Graphx的入门之SSSP(单源点最短路径)

    val inintialGraph: Graph[Double, PartitionID] = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity) 在上面的图中,我们假设graph为已经定义好的图结构,包括定义了vertex和edge,在这一步初始化计算最短路径所需要的图

    在这里,对每一个Vertex做了一个map,即吧所有的点map为源点到这个点的距离,初始化为本点为0,其他的都初始化为无穷大。

    val sssp: Graph[Double, PartitionID] = inintialGraph.pregel(Double.PositiveInfinity)( (id, dist, newDist) => math.min(dist, newDist), triplet => { if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) } else { Iterator.empty } }, (a, b) => math.min(a, b) ) println(sssp.vertices.collect().mkString("\n")) sc.stop() 在这一步计算sssp。

    这里需要看一下pregel的接口,通过看Spark的Graphx的Pregel源码可以看到:

    def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either) (vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] =

    这里的pregel是个科里函数,第一个括号内为初始化需要的参数比如初始化的message,迭代次数等,第二个括号内为pregel在运行过程中所需要的三个处理的函数。

    vprog

    sendMsg

    mergeMsg

    其中第一个vprog函数,为Vertex Program 意思就是对节点的操作函数,看源码可以看到

    var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() 在第一次迭代的时候通过之前的initialMsg来初始化每个节点的信息。

    接下来源码为:

    var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg) 可以看出这里是一个类似于mapReduce的操作,其中sendMsg充当map函数,mergeMsg充当reduce函数,结合sssp计算过程,在每一次迭代过程中,map函数的作用是如果本次的路径计算结果小于这个vertex上已有的距离的值,就发送这个距离,否则发送一个空置。而在reduce阶段,对于每一个vertex来说,可能一个vertex接受到多个map的结果,需要做的是对所有的值去取一个最小的距离。也就是上面代码中实际的math.min(a,b)

    完整的源码可以看文章的末尾。所以根据这里的分析,我们就可以得出这三个函数的实际作用了。

    1、vprog用于更新此时active的vertex的值。

    2、sendMsg用于操作每一个active的Vertex的出度方向的Edge,决定发送message是否为空。

    3、mergeMsg为对下一个进入active状态的Vertex来说,可能接收到多条message,如何处理这多条message,最终只留下一个message。

    这里可能比较容易混淆vprog和mergeMsg的作用,两者的区别在于处理对象的不同,mergeMsg处理的是一个Vertex上接收到的多个message,而vprog则是对于本次迭代开始的时候,根据接收到的message和此时Vertex上的attr属性来说,如何更新Vertex的attr。

    由此,便可以明白pregel的整个处理流程。此处是计算sssp,可以根据这样的一个架构计算PageRank了。

    【参考文章】

    http://blog.csdn.net/u013468917/article/details/51199808

    http://www.cakesolutions.net/teamblogs/graphx-pregel-api-an-example

    【Graphx的pregel的源码】

    def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either) (vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," + s" but got ${maxIterations}") var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() // compute the messages var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg) var activeMessages = messages.count() // Loop var prevG: Graph[VD, ED] = null var i = 0 while (activeMessages > 0 && i < maxIterations) { // Receive the messages and update the vertices. prevG = g g = g.joinVertices(messages)(vprog).cache() val oldMessages = messages // Send new messages, skipping edges where neither side received a message. We must cache // messages so it can be materialized on the next line, allowing us to uncache the previous // iteration. messages = GraphXUtils.mapReduceTriplets( g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache() // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages // (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages // and the vertices of g). activeMessages = messages.count() logInfo("Pregel finished iteration " + i) // Unpersist the RDDs hidden by newly-materialized RDDs oldMessages.unpersist(blocking = false) prevG.unpersistVertices(blocking = false) prevG.edges.unpersist(blocking = false) // count the iteration i += 1 } messages.unpersist(blocking = false) g } // end of apply

    转载请注明原文地址: https://ju.6miu.com/read-7175.html

    最新回复(0)