优秀参考:
graphx教程参考:https://www.jianshu.com/p/ad5cedc30ba4
pergel函数详细讲解:https://blog.csdn.net/hanweileilei/article/details/89764466
迪杰斯特拉原理简介:https://www.jianshu.com/p/ad5cedc30ba4
ps: 以最短路径为例子讲解
个人理解的pregel函数是通过模拟节点和节点之间的信息传递的过程,计算出想要的结果信息。比如最短距离、出度、入度等等....
可以当作节点和节点之间通过边发送消息来理解。在有向图中,某一个节点可以是源节点src,也可能是目标节点dst,以边的方向确定。
pregel函数参数:
def pregel[A: ClassTag](initialMsg: A,maxIterations: Int = Int.MaxValue,activeDirection: EdgeDirection = EdgeDirection.Either)(vprog: (VertexId, VD, A) => VD,sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],mergeMsg: (A, A) => A): Graph[VD, ED] = {Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)}
initialMsg: 节点的默认初始化信息,更新节点属性要调用vprog函数
maxIrerations:最大迭代次数
activeDiraction:控制sendMsg发送的方向,为out时,只有src->dst方向发送消息。
vprog:更新节点属性的函数。节点收到消息后,通过mergeMsg合并消息,然后通过vprog更新节点信息。
sendMsg:节点和节点之间发送的消息,以Iterator[(VertexID, Msg)]的格式发送,把消息Msg发送给VertexID。不发送则为Iterator.empty
mergeMsg:当接收到多条消息时,用mergeMsg函数来合并多条信息
在迭代过程中一直有活跃节点的概念,只有上一次迭代中活跃的节点才能在当前迭代中执行sendMsg。 使用initialMsg初始化节点的时候,所有节点都是活跃状态。
活跃顶点定义为:在上一次迭代中成功发送消息和成功收到消息的节点,这里指的消息不能为Iterator.empty。
以最短距离为例,代码如下:
//被计算的图中 起始顶点idval srcVertexId = 1L val initialGraph = graph1.mapVertices{case (vid,(name,age)) => if(vid==srcVertexId) 0.0 else Double.PositiveInfinity}//调用pregelval pregelGraph = initialGraph.pregel(Double.PositiveInfinity,Int.MaxValue,EdgeDirection.Out)((vid: VertexId, vd: Double, distMsg: Double) => {val minDist = math.min(vd, distMsg)println(s"顶点${vid},属性${vd},收到消息${distMsg},合并后的属性${minDist}")minDist},(edgeTriplet: EdgeTriplet[Double,PartitionID]) => {if (edgeTriplet.srcAttr + edgeTriplet.attr < edgeTriplet.dstAttr) {println(s"顶点${edgeTriplet.srcId} 给 顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}")Iterator[(VertexId, Double)]((edgeTriplet.dstId, edgeTriplet.srcAttr + edgeTriplet.attr))} else {Iterator.empty}},(msg1: Double, msg2: Double) => math.min(msg1, msg2))
mapVertex函数初始化节点,源节点为0.0, 其他节点为正无穷。
调用pregel:
- 首先使用Double.PositiveInfinity 初始全部节点,vprog函数:val minDist = math.min(vd, distMsg)更新节点信息,结果是源节点为0.0, 其他节点为正无穷,所有节点为活跃状态。
-
第一轮迭代,所有节点发送消息,满足edgeTriplet.srcAttr + edgeTriplet.attr < edgeTriplet.dstAttr则发送成功(活跃节点),不满足则发送不成功(变为非活跃节点)。成功接受消息的节点变为活跃节点。消息内容为:边的权重+源节点属性值1, 节点1向节点2发送0+7->7 2, 节点1向3发送0+9->9 3, 节点1向6发送0+14->14 4, 调用mergeMsg,合并消息。这里节点仅接收到一条消息,所有可忽略 5, 调用vprog函数,更新节点信息。 6, 活跃节点有:1,2,3,6
-
第二轮迭代,活跃节点调用sendMsg发送消息,重复上一步骤1, 节点1向2,3,6发送消息失败 2, 节点2向3发送消息失败 3, 节点2向4发送消息7+15->22 4, 节点3向4发送消息9+11->20 5, 节点2向6发送消息2+9->11 6, 节点4调用mergeMsg合并消息,选取小值。调用vprog更新节点信息 7, 节点6调用vprog更新节点信息
-
....... 第三轮迭代..... 第四轮迭代......