Spark Graphx图计算之二跳邻算法实战!

    xiaoxiao2021-08-16  150

    Spark Graphx图计算之二跳邻算法实战!

    def sendMsgFunc(edge:EdgeTriplet[Int, Int]) = { if(edge.srcAttr <= 0){ if(edge.dstAttr <= 0){ // 如果双方都小于0,则不发送信息 Iterator.empty }else{ // srcAttr小于0,dstAttr大于零,则将dstAttr-1后发送 Iterator((edge.srcId, edge.dstAttr - 1)) } }else{ if(edge.dstAttr <= 0){ // srcAttr大于0,dstAttr<0,则将srcAttr-1后发送 Iterator((edge.dstId, edge.srcAttr - 1)) }else{ // 双方都大于零,则将属性-1后发送 val toSrc = Iterator((edge.srcId, edge.dstAttr - 1)) val toDst = Iterator((edge.dstId, edge.srcAttr - 1)) toDst ++ toSrc } } } val friends = Pregel( graph.mapVertices((vid, value)=> if(vid == 1) 2 else -1), // 发送初始值 -1, // 指定阶数 2, // 双方向发送 EdgeDirection.Either )( // 将值设为大的一方 vprog = (vid, attr, msg) => math.max(attr, msg), // sendMsgFunc, // (a, b) => math.max(a, b) ).subgraph(vpred = (vid, v) => v >= 0) println("\n\n~~~~~~~~~ Confirm Vertices of friends ") friends.vertices.collect.foreach(println(_)) // (4,1) // (8,0) // (2,1) // (1,2) // (3,0) // (5,0) sc.stop

    转载请注明原文地址: https://ju.6miu.com/read-676478.html

    最新回复(0)