03 graphx 从 SSSP 来看 pregel

article/2025/9/7 10:45:17

前言

呵呵 最近刚好有一些需要使用到 图的相关计算 

然后 在其他文章中找到了一篇 关于最短路径的graphx计算的代码 spark graphx 最短路径及中间节点 

呵呵 很久没有用这些东西了, 虽然只是简单的使用, 但是还是要 复习一下, 稍微理解一下 他的执行方式 

pregel 相关论文 : 留一个占位符 

 

本文主要是根据一个 SSSP 的最短路径的测试代码来进行开始, 大致了解一下 pregel 的执行模式, 调试一下 pregel 的执行, 以及 spark 本身提供的最短路径的 api 理解一下 

 

环境如下 : spark2.4.5 + scala2.11 + jdk8

 

 

测试代码 

为了便于调试, 只配置了1个executor节点 

在 vprog, sendMsg, mergeMsg 里面加了一些日志, 是为了查看执行过程 

package com.hx.testimport org.apache.spark.graphx.{Edge, EdgeDirection, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** Test19SSSP** @author Jerry.X.He <970655147@qq.com>* @version 1.0* @date 2020-05-25 15:06*/
object Test19SSSP {def main(args: Array[String]) {val conf = new SparkConf().setAppName("Pregel_SSSP").setMaster("local[1]")val sc = new SparkContext(conf)val sourceId: VertexId = 0 // The ultimate source// 创造一个边的RDD, 包含各种关系val edges: RDD[Edge[Double]] = sc.parallelize(Array(Edge(3L, 7L, 1.0d),Edge(5L, 3L, 1.0d),Edge(2L, 5L, 1.0d),Edge(5L, 7L, 1.0d),Edge(0L, 3L, 1.0d),Edge(3L, 2L, 1.0d),Edge(7L, 9L, 1.0d),Edge(0L, 5L, 1.0d)))// 创造一个点的 RDD// 0L, 2L, 3L, 5L, 7L, 9Lval vertexes: RDD[(VertexId, (Double, List[VertexId]))] = edges.flatMap(edge => Array(edge.srcId, edge.dstId)).distinct().map(id =>if (id == sourceId) (id, (0, List[VertexId](sourceId)))else (id, (Double.PositiveInfinity, List[VertexId]())))val defaultVertex = (-1.0d, List[VertexId]())// A graph with edge attributes containing distancesval initialGraph: Graph[(Double, List[VertexId]), Double] = Graph(vertexes, edges, defaultVertex)println(" edges as follow : ")initialGraph.edges.foreach(println)// initialMsg, 会向每一个 vertex 发送 initialMsg, 然后使用 vprog 来计算, 更新顶点数据, 0, 1, ..., 9// 第一轮消息, 然后 各个顶点向邻近的顶点发送消息, 0, 2, 3, 5, 7[根据边]// 然后 各个收到消息的顶点, 执行 vprog, 3, 5// 第二轮消息, 第一轮收到消息的顶点, 向邻近的顶点发送消息, 3, 5// 如果某个顶点收到多个消息, 进行 merge// 然后 各个收到消息的顶点, 执行 vprog 2, 7, 3// 第三轮消息, ...val sssp = initialGraph.pregel((Double.PositiveInfinity, List[VertexId]()), 2, EdgeDirection.Out)(// Vertex Program(id, dist, newDist) => {println(" vertex : " + id)if (dist._1 < newDist._1) dist else newDist},// Send Messagetriplet => {println(" sendMsg " + triplet.srcId + " -> " + triplet.dstId)if (triplet.srcAttr._1 < triplet.dstAttr._1 - triplet.attr) {Iterator((triplet.dstId, (triplet.srcAttr._1 + triplet.attr, triplet.srcAttr._2 :+ triplet.dstId)))} else {Iterator.empty}},//Merge Message(a, b) => {println(" merge : " + a + ", " + b)if (a._1 < b._1) a else b})println(" result as follow : ")println(sssp.vertices.collect.mkString("\n"))}}

 

执行结果如下 

edges as follow :
Edge(0,3,1.0)
Edge(0,5,1.0)
Edge(2,5,1.0)
Edge(3,2,1.0)
Edge(3,7,1.0)
Edge(5,3,1.0)
Edge(5,7,1.0)
Edge(7,9,1.0)vertex : 0
vertex : 3
vertex : 7
vertex : 9
vertex : 5
vertex : 2sendMsg 0 -> 3
sendMsg 0 -> 5
sendMsg 2 -> 5
sendMsg 3 -> 2
sendMsg 3 -> 7
sendMsg 5 -> 3
sendMsg 5 -> 7
sendMsg 7 -> 9vertex : 3
vertex : 5sendMsg 3 -> 2
sendMsg 3 -> 7
sendMsg 5 -> 3
sendMsg 5 -> 7
merge : (2.0,List(0, 3, 7)), (2.0,List(0, 5, 7))vertex : 7
vertex : 2sendMsg 7 -> 9
sendMsg 2 -> 5result as follow :
(0,(0.0,List(0)))
(3,(1.0,List(0, 3)))
(7,(2.0,List(0, 5, 7)))
(9,(Infinity,List()))
(5,(1.0,List(0, 5)))
(2,(2.0,List(0, 3, 2)))

 

边的信息如下 

edges as follow :
Edge(0,3,1.0)
Edge(0,5,1.0)
Edge(2,5,1.0)
Edge(3,2,1.0)
Edge(3,7,1.0)
Edge(5,3,1.0)
Edge(5,7,1.0)
Edge(7,9,1.0)

构造的图如下 

各个顶点的属性, 除了 0(开始节点) 是 (0, List(0)), 其他的都是 (Double.PositiveInfinity, List[VertexId]())

 

 

Test19SSSP 的执行过程

1. 初始化发送消息个各个节点, 然后执行 vprog 

执行之前如下图 

执行之后如下图 

 

对应于上面的日志 

vertex : 0
vertex : 3
vertex : 7
vertex : 9
vertex : 5
vertex : 2

 

2. 收到消息的顶点执行 sendMsg 

根据 sendMsg 的逻辑 

执行之后  sendMsg 0 -> 3, sendMsg 0 -> 5 发送了消息 

 sendMsg 2 -> 5,  sendMsg 3 -> 2,  sendMsg 3 -> 7,  sendMsg 5 -> 3, sendMsg 5 -> 7,  sendMsg 7 -> 9 因为源节点, 目标节点的属性均是 (PositiveInfinity, List()), 不满足条件 "(triplet.srcAttr._1 < triplet.dstAttr._1 - triplet.attr)"(到当前节点的最短路径 + 路径的权重 < 到目标节点的最短路径, 表示到目标节点的最短路径可以更小, 发消息给目标节点) 

sendMsg 0 -> 3 : 给节点3发送了消息 (1, List(0, 3))

sendMsg 0 -> 5 : 给节点5发送了消息 (1, List(0, 5)) 

 

对应于上面的日志 

sendMsg 0 -> 3
sendMsg 0 -> 5
sendMsg 2 -> 5
sendMsg 3 -> 2
sendMsg 3 -> 7
sendMsg 5 -> 3
sendMsg 5 -> 7
sendMsg 7 -> 9

 

3. 第一轮迭代, 收到消息的节点执行 vprog 

上面 节点 3, 5 收到了消息, 之后执行 vprog 

执行之前如下图

执行之后如下图 

 

对应于上面的日志 

vertex : 3
vertex : 5

 

4. 第一轮迭代, 收到消息的节点执行 sendMsg 

根据 sendMsg 的逻辑 

执行之后  sendMsg 3 -> 2, sendMsg 3 -> 7, sendMsg 5 -> 7 发送了消息 

sendMsg 5 -> 3 因为源节点的最短路径 + 边权重 不小于 到目标节点的最短路径, 不发送消息 

sendMsg 3 -> 2 : 给节点2发送了消息 (2, List(0, 3, 2))

sendMsg 3 -> 7 : 给节点7发送了消息 (2, List(0, 3, 7))

sendMsg 5 -> 7 : 给节点7发送了消息 (2, List(0, 5, 7)) 

 

对应于上面的日志 

sendMsg 3 -> 2
sendMsg 3 -> 7
sendMsg 5 -> 3
sendMsg 5 -> 7

 

5. 第一轮迭代, 两个节点对节点7发送消息 mergeMsg 

然后 由于两个节点同时向 节点 7 发送了消息, 使用 mergeMsg 对消息进行 merge 

对应于上面的日志 

 merge : (2.0,List(0, 3, 7)), (2.0,List(0, 5, 7))

 

根据 mergeMsg 的逻辑 

merge 的结果为 (2.0,List(0, 5, 7) 

所以, 节点2 收到的消息为 (2, List(0, 3, 2)) 

节点7 收到的消息为 (2.0,List(0, 5, 7) 

 

6. 第二轮迭代, 收到消息的节点执行 vprog 

上面 节点 2, 3, 7 收到了消息, 之后执行 vprog 

执行之前如下图

执行之后如下图

 

对应于上面的日志 

vertex : 7
vertex : 2

 

7. 第二轮迭代, 收到消息的节点执行 sendMsg  

根据 sendMsg 的逻辑 

执行之后  sendMsg 7 -> 9 发送了消息 

sendMsg 2 -> 5 因为源节点的最短路径 + 边权重 不小于 到目标节点的最短路径, 不发送消息 

sendMsg 7 -> 9 : 给节点2发送了消息 (3, List(0, 5, 7, 9))

 

对应于上面的日志 

sendMsg 7 -> 9
sendMsg 2 -> 5

 

8. 第三轮迭代 

代码中限定了最多两轮迭代, 因此 整个迭代结束 

最后各个 顶点上面的信息如下, 对应于上面的日志 

result as follow :
(0,(0.0,List(0)))
(3,(1.0,List(0, 3)))
(7,(2.0,List(0, 5, 7)))
(9,(Infinity,List()))
(5,(1.0,List(0, 5)))
(2,(2.0,List(0, 3, 2)))

 

 

从 Pregel 的代码来看 Test19SSSP 的执行过程

pregel 的代码如下, 红框处大致如下 

1. 使用初始化消息 初始化各个顶点 

2. 各个顶点根据边发送初始化消息 

3. 收到消息的顶点执行 vprog 

4. 收到消息的顶点根据边发送消息(一个节点收到多个消息, 使用 mergeMsg 进行消息的合并) 

迭代 3, 4, 直到没有顶点之间消息传递, 或者 迭代次数达到上限 

 

以我们这里 Test19SSSP 为例 

进入 pregrel 的时候 graph 的边信息如下  

各个顶点的信息如下 

 

1. 初始化发送消息个各个节点, 然后执行 vprog

想各个顶点发送初始化消息, 以及各个顶点发送初始化消息 的情况如下 

初始化消息发送到各个顶点, 各个顶点执行 vprog 之后, 各个顶点的数据没有变化(因为初始化消息的 _1 是 PositiveInfinitely)

 

2. 收到消息的顶点执行 sendMsg 

收到消息的顶点开始执行 sendMsg, 发送了两个消息 

sendMsg 0 -> 3 : 给节点3发送了消息 (1, List(0, 3))

sendMsg 0 -> 5 : 给节点5发送了消息 (1, List(0, 5)) 

 

3. 第一轮迭代, 收到消息的节点执行 vprog 

 

4. 第一轮迭代, 收到消息的节点执行 sendMsg 

5. 第一轮迭代, 两个节点对节点7发送消息 mergeMsg 

 

6. 第二轮迭代, 收到消息的节点执行 vprog 

 

7. 第二轮迭代, 收到消息的节点执行 sendMsg  

 

8. 第三轮迭代 

代码中限定了最多两轮迭代, 因此 整个迭代结束 

迭代结束, 然后 走用例程序后面的 打印结果 相关代码 

 

 

spark 官方的 SSSP 测试用例

样例代码如下 

package com.hx.testimport org.apache.spark.graphx.lib.ShortestPaths
import org.apache.spark.graphx.{Edge, EdgeDirection, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** Test21SSSPOfficial** @author Jerry.X.He <970655147@qq.com>* @version 1.0* @date 2020-06-25 10:06*/
object Test21SSSPOfficial {def main(args: Array[String]) {val conf = new SparkConf().setAppName("Pregel_SSSP").setMaster("local[1]")val sc = new SparkContext(conf)val targetId: VertexId = 9 // The ultimate source// 创造一个边的RDD, 包含各种关系val edges: RDD[Edge[Double]] = sc.parallelize(Array(Edge(3L, 7L, 1.0d),Edge(5L, 3L, 1.0d),Edge(2L, 5L, 1.0d),Edge(5L, 7L, 1.0d),Edge(0L, 3L, 1.0d),Edge(3L, 2L, 1.0d),Edge(7L, 9L, 1.0d),Edge(0L, 5L, 1.0d)))// 创造一个点的 RDD// 0L, 2L, 3L, 5L, 7L, 9Lval vertexes: RDD[(VertexId, Long)] = edges.flatMap(edge => Array(edge.srcId, edge.dstId)).distinct().map(id => (id, 1L))val defaultVertex = 1L// A graph with edge attributes containing distancesval graph: Graph[Long, Double] = Graph(vertexes, edges, defaultVertex)val landmarks = Seq(targetId).map(_.toLong)val vertices = ShortestPaths.run(graph, landmarks).vertices.collectval results = vertices.map {case (v, spMap) => (v, spMap.mapValues(i => i))}results.foreach(println)}}

执行结果如下, 0 -> 9 最短路径为 3, 3 -> 9 最短路径为 2 以此类推 

(0,Map(9 -> 3))
(3,Map(9 -> 2))
(7,Map(9 -> 1))
(9,Map(9 -> 0))
(5,Map(9 -> 2))
(2,Map(9 -> 3))

 

 

参考 

spark graphx 最短路径及中间节点 

/spark-2.4.5/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPath.scala 

 

 


http://chatgpt.dhexx.cn/article/a59M7738.shtml

相关文章

ArangoDB(四)Pregel

arango pregel.status()返回值 localhost:8529_system> pregel.status(1099521660554) {"state" : "done","gss" : 7,"totalRuntime" : 8.389497518539429,"aggregators" : {},"sendCount" : 392647,"re…

graphx中的pregel原理详解

优秀参考&#xff1a; graphx教程参考&#xff1a;https://www.jianshu.com/p/ad5cedc30ba4 pergel函数详细讲解&#xff1a;https://blog.csdn.net/hanweileilei/article/details/89764466 迪杰斯特拉原理简介&#xff1a;https://www.jianshu.com/p/ad5cedc30ba4 ps: 以最…

Pregel模型

简介 在Hadoop兴起之后&#xff0c;google又发布了三篇研究论文&#xff0c;分别阐述了了Caffeine、Pregel、Dremel三种技术&#xff0c;这三种技术也被成为google的新“三驾马车”&#xff0c;其中的Pregel是google提出的用于大规模分布式图计算框架。主要用于图遍历&#xf…

Spark Graphx Pregel(pregel参数详解,pregel调用实现过程的详细解释)

Spark Graphx Pregel 一.Pregel概述1.什么是pregel&#xff1f;2.pregel应用场景 二.Pregel源码及参数解释1.源码2.参数详细解释&#xff08;1&#xff09;initialMsg&#xff08;2&#xff09;maxIteration&#xff08;3&#xff09;activeDirection&#xff08;4&#xff09;…

2020.11.26课堂笔记(sparkGraphx算法之pregel)

参考博客&#xff1a;https://blog.csdn.net/hanweileilei/article/details/89764466 大佬博客写的很详细&#xff0c;不用继续看这篇了&#xff0c;随便写一些记录一下。 Pregel框架&#xff1a; Pregel是一种面向图算法的分布式编程框架&#xff0c;采用迭代的计算模型&…

Pregel(图计算)技术原理

图计算简介 图结构数据&#xff1a; 许多大数据都是以大规模图或网络的形式呈现。许多非图结构的大数据&#xff0c;也常常会被转换为图模型后进行分析。图数据结构很好地表达了数据之间的关联性。关联性计算是大数据计算的核心——通过获得数据的关联性&#xff0c;可以从噪…

python bar函数

bar(left, height, width, color, align, yerr)函数&#xff1a;绘制柱形图。left为x轴的位置序列&#xff0c;一般采用arange函数产生一个序列&#xff1b;height为y轴的数值序列&#xff0c;也就是柱形图的高度&#xff0c;一般就是我们需要展示的数据&#xff1b;width为柱形…

C++ 函数模板

函数模板是通用的函数描述&#xff0c;它们使用泛型来定义函数&#xff0c;其中的泛型可用具体的类型替换。通过将类型作为参数传递给模板&#xff0c;可使编译器生成该类型的函数。由于模板允许以泛型&#xff08;而不是具体类型&#xff09;的方式编写程序&#xff0c;因此有…

lead窗口函数

lead函数在Impala中可以配合over使用&#xff0c;lead函数有三个参数 lead(property,num,default) 第一个参数「property」标识想查询的列&#xff0c;「num」标识相对于当前行的第num行&#xff0c;第三个参数是默认值。 举例&#xff1a; -- 建表 CREATE TABLE test(id s…

C++ 仿函数

文章目录 1.由来2.定义3.实例参考文献 1.由来 我们先从一个非常简单的问题入手&#xff0c;来了解为什么要有仿函数。 假设我们现在有一个数组&#xff0c;数组中存有任意数量的数字&#xff0c;我们希望能够统计出这个数组中大于 10 的数字的数量&#xff0c;你的代码很可能…

心形函数的几种表达式

用两个函数表示&#xff1a; f(x)sqrt(1-(abs(x)-1)^2) h(x)-2*sqrt(1-0.5*abs(x)) 也可以根据图中的q(x)画出心形的内部&#xff1a; q(x)(f(x)-h(x))/2*cos(200*x)(f(x)h(x))/2 带入得&#xff1a; 用一个函数表示&#xff0c;我拟合了很久才画出来的&#xff1a; f(x)…

共轭函数

共轭函数在最近火的不行的Gan生成对抗神经网络进阶版本的数学推理中有着神奇的作用&#xff0c;因此在这边记录下。 共轭函数的定义为&#xff1a; f ∗ ( t ) max ⁡ x ∈ dom ⁡ ( f ) { x t − f ( x ) } f ^ { * } ( t ) \max _ { x \in \operatorname { dom } ( f ) }…

高斯函数解析

高斯函数广泛应用于统计学领域&#xff0c;用于表述正态分布&#xff0c;在信号处理领域&#xff0c;用于定义高斯滤波器&#xff0c;在图像处理领域&#xff0c;二维高斯核函数常用于高斯模糊&#xff0c;在数学领域&#xff0c;主要用于解决热力方程和扩散方程。 https://blo…

PostgreSQL 函数

PostgreSQL 函数 函数的定义 使用函数&#xff0c;可以极大的提高用户对数据库的管理效率。函数表示输入参数表示一个具有特定关系的值。 一、数学函数 绝对值函数、三角函数、对数函数、随机函数等&#xff0c;当有错误产生时&#xff0c;数学函数会返回null值。 二、函数…

EXCEL IFS函数简单使用

IFS函数的使用&#xff1a; 在学生成绩以及绩效考核中&#xff0c;我们需要对每个范围的成绩打分。比如【A】,【B】,【C】,【D】。可以使用【IFS()函数】完成操作。 1&#xff1a;选择单元格【C2】&#xff0c;输入【】&#xff0c;点击【fx】&#xff0c;弹出【插入函数】对话…

函数的返回值

1.什么是函数的返回值? print 和 return 的区别,print 仅仅是打印在控制台,而 return 则是将 return 后面的部分作为返回值作为函数的输出 可以用变量接走,继续使用该返回值做其它事 函数需要先定义后调用,函数体中 return 语句的结果就是返回值 如果一个函数没有 reutrn…

反双曲函数

Chapter10&#xff1a;反双曲函数 10.3 反双曲函数10.3.1 反双曲正弦函数【 yarsinh(x) 】反双曲正弦函数图像反双曲正弦函数的指数形式反双曲正弦函数的对数形式推导反双曲正弦函数的导数推导 10.3.2 反双曲余弦函数【 yarcosh(x) 】反双曲余弦函数图像反双曲余弦函数的指数形…

损失函数作用

前言&#xff1a;损失函数是机器学习里最基础也是最为关键的一个要素&#xff0c;通过对损失函数的定义、优化&#xff0c;就可以衍生到我们现在常用的机器学习等算法中 损失函数的作用&#xff1a;衡量模型模型预测的好坏。 正文&#xff1a; 首先我们假设要预测一个公司某商品…

Python自定义函数

一、自定义函数的固定语句 def contrast(a,b) : #使用def来定义一个名称为contrast的方法,a与b的值是两个变量&#xff0c;称为形参if a>b : #使用条件语句进行判定return a #返回a的值elif b>a :return b #返回b的值else:return (ab) …

可测函数

1 定义 可测函数:设是定义在可测集上的实函数,称为上的可测函数,如果满足: a])=b_{a})" class="mathcode" src="https://private.codecogs.com/gif.latex?%5Cforall%20a%5Cin%20R%2C%20%7Ca%7C%20%3C%20&plus;%5Cinfty%2C%5Cexists%20b_%7Ba%7D…