pregel 与 spark graphX 的 pregel api

article/2025/9/7 10:46:24
        [原文](https://blog.csdn.net/u013468917/article/details/51199808)

简介

在Hadoop兴起之后,google又发布了三篇研究论文,分别阐述了了Caffeine、Pregel、Dremel三种技术,这三种技术也被成为google的新“三驾马车”,其中的Pregel是google提出的用于大规模分布式图计算框架。主要用于图遍历(BFS)、最短路径(SSSP)、PageRank计算等等计算。
在Pregel计算模式中,输入是一个有向图,该有向图的每一个顶点都有一个相应的独一无二的顶点id (vertex identifier)。每一个顶点都有一些属性,这些属性可以被修改,其初始值由用户定义。每一条有向边都和其源顶点关联,并且也拥有一些用户定义的属性和值,并同时还记录了其目的顶点的ID。
一个典型的Pregel计算过程如下:读取输入,初始化该图,当图被初始化好后,运行一系列的supersteps,每一次superstep都在全局的角度上独立运行,直到整个计算结束,输出结果。

在pregel中顶点有两种状态:活跃状态(active)和不活跃状态(halt)。如果某一个顶点接收到了消息并且需要执行计算那么它就会将自己设置为活跃状态。如果没有接收到消息或者接收到消息,但是发现自己不需要进行计算,那么就会将自己设置为不活跃状态。这种机制的描述如下图:

计算过程

Pregel中的计算分为一个个“superstep”,这些”superstep”中执行流程如下:
1、 首先输入图数据,并进行初始化。
2、 将每个节点均设置为活跃状态。每个节点根据预先定义好的sendmessage函数,以及方向(边的正向、反向或者双向)向周围的节点发送信息。
3、 每个节点接收信息如果发现需要计算则根据预先定义好的计算函数对接收到的信息进行处理,这个过程可能会更新自己的信息。如果接收到消息但是不需要计算则将自己状态设置为不活跃。
4、 每个活跃节点按照sendmessage函数向周围节点发送消息。
5、 下一个superstep开始,像步骤3一样继续计算,直到所有节点都变成不活跃状态,整个计算过程结束。
下面以一个具体例子来说明这个过程:假设一个图中有4个节点,从左到右依次为第1/2/3/4个节点。圈中的数字为节点的属性值,实线代表节点之间的边,虚线是不同超步之间的信息发送,带阴影的圈是不活跃的节点。我们的目的是让图中所有节点的属性值都变成最大的那个属性值。

superstep 0:首先所有节点设置为活跃,并且沿正向边向相邻节点发送自身的属性值。
Superstep 1:所有节点接收到信息,节点1和节点4发现自己接受到的值比自己的大,所以更新自己的节点(这个过程可以看做是计算),并保持活跃。节点2和3没有接收到比自己大的值,所以不计算、不更新。活跃节点继续向相邻节点发送当前自己的属性值。
Superstep 2:节点3接受信息并计算,其它节点没接收到信息或者接收到但是不计算,所以接下来只有节点3活跃并发送消息。
Superstep 3:节点2和4接受到消息但是不计算所以不活跃,所有节点均不活跃,所以计算结束。
在pregel计算框架中有两个核心的函数:sendmessage函数和F(Vertex)节点计算函数。


Spark graphX的pregel API

Spark在其graphX组件中提供了pregel API,让我们可以用pregel的计算框架来处理spark上的图数据。以下操作均在spark-shell上进行,我们建立一个图,然后通过一个求单源最短路径的例子解释pregel的操作。

准备工作

操作之前我们需要导入一些可能用到的包:
Import org.apache.spark._
Import org.apache.spark.graphx._
Import org.apache.spark.rdd.RDD

再根据hdfs上的web-Google.txt文件生成图,这个文件可以在 https://snap.stanford.edu/data/web-Google.html下载。
val graph = GraphLoader.edgeListFile(sc,"/Spark/web-Google.txt")

初次用edgelistfile建立图时,所有vertices、edges、triplets的属性值由于我没有指定所以默认值均为整数 1.

计算

首先设定源点,这里设置源点为0:
val sourceId: VertexId = 0


然后对图进行初始化:

val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)

这段代码的意思是对所有的非源顶点,将顶点的属性值设置为无穷,因为我们打算将所有顶点的属性值用于保存源点到该点之间的最短路径。在正式开始计算之前将源点到自己的路径长度设为0,到其它点的路径长度设为无穷大,如果遇到更短的路径替换当前的长度即可。如果源点到该点不可达,那么路径长度自然为无穷大了。
接下来开始计算最短路径:
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a,b) => math.min(a,b) // Merge Message
)


我们打印一下sssp中的一些值看一下:

我们可以看到0点到354796的最短路径为11,到291526不可达。

过程详解

接下来详解这个过程:
在调用pregel方法时,initialGraph会被隐式转换成GraphOps类,这个类中pregel方法的源码如下:
  1. def pregel[A: ClassTag](
  2. initialMsg: A,
  3. maxIterations: Int = Int.MaxValue,
  4. activeDirection: EdgeDirection = EdgeDirection.Either)(
  5. vprog: (VertexId, VD, A) => VD,
  6. sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
  7. mergeMsg: (A, A) => A)
  8. : Graph[VD, ED] = {
  9. Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
  10. }

这个方法采用的是典型的柯里化定义方式,第一个括号中的参数序列分别为initialMsg、maxIterations、activeDirection。第一个参数initialMsg表示第一次迭代时即superstep 0,每个节点接收到的消息。maxIterations表示迭代的最大次数,activeDirection表示消息发送的方向,该值为EdgeDirection类型,这是一个枚举类型,有三个可能值:EdgeDirection.In/ EdgeDirection.Out/ EdgeDirection.Either.可以看到,第二和第三个参数都有默认值。
第二个括号中参数序列为三个函数,分别为vprog、sendMsg和mergeMsg。
vprog是节点上的用户定义的计算函数,运行在单个节点之上,在superstep 0,这个函数会在每个节点上以初始的initialMsg为参数运行并生成新的节点值。在随后的超步中只有当节点收到信息,该函数才会运行。
sendMsg在当前超步中收到信息的节点用于向相邻节点发送消息,这个消息用于下一个超步的计算。
mergeMsg用于聚合发送到同一节点的消息,这个函数的参数为两个A类型的消息,返回值为一个A类型的消息。
最后调用Pregel对象的apply方法返回一个graph对象。
Apply方法的源码如下,我们可以看到graph和计算的参数都被传过来了:
  1. def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
  2. (graph: Graph[VD, ED],
  3. initialMsg: A,
  4. maxIterations: Int = Int.MaxValue,
  5. activeDirection: EdgeDirection = EdgeDirection.Either)
  6. (vprog: (VertexId, VD, A) => VD,
  7. sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
  8. mergeMsg: (A, A) => A)
  9. : Graph[VD, ED] =
  10. {
  11. //要求最大迭代数大于0,不然报错。
  12. require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," +
  13. s" but got ${maxIterations}")
  14. //第一次迭代,对每个节点用vprog函数计算。
  15. var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
  16. // 根据发送、聚合信息的函数计算下次迭代用的信息。
  17. var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
  18. //数一下还有多少节点活跃
  19. var activeMessages = messages.count()
  20. // 下面进入循环迭代
  21. var prevG: Graph[VD, ED] = null
  22. var i = 0
  23. while (activeMessages > 0 && i < maxIterations) {
  24. // 接受消息并更新节点信息
  25. prevG = g
  26. g = g.joinVertices(messages)(vprog).cache()
  27. val oldMessages = messages
  28. // Send new messages, skipping edges where neither side received a message. We must cache
  29. // messages so it can be materialized on the next line, allowing us to uncache the previous
  30. /*iteration这里用mapReduceTriplets实现消息的发送和聚合。mapReduceTriplets的*参数中有一个map方法和一个reduce方法,这里的*sendMsg就是map方法,*mergeMsg就是reduce方法
  31. */
  32. messages = GraphXUtils.mapReduceTriplets(
  33. g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
  34. // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
  35. // (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
  36. // and the vertices of g).
  37. activeMessages = messages.count()
  38. logInfo("Pregel finished iteration " + i)
  39. // Unpersist the RDDs hidden by newly-materialized RDDs
  40. oldMessages.unpersist(blocking = false)
  41. prevG.unpersistVertices(blocking = false)
  42. prevG.edges.unpersist(blocking = false)
  43. // count the iteration
  44. i += 1
  45. }
  46. messages.unpersist(blocking = false)
  47. g
  48. } // end of apply


接下来再看一下我们刚开始的求单源最短路径的算法:
首先将所有除了源顶点的其它顶点的属性值设置为无穷大,源顶点的属性值设置为0.
Superstep 0:然后对所有顶点用initialmsg进行初始化,实际上这次初始化并没有改变什么。
Superstep 1 :对于每个triplet:计算triplet.srcAttr + triplet.attr 和 triplet.dstAttr比较,以第一次为例:假设有一条边从0到a,这时就满足triplet.srcAttr + triplet.attr < triplet.dstAttr,这个triplet.attr的值实际上为1(没有自己指定,默认值都是1),而0的attr值我们早已初始化为0,0+1<无穷,所以发出的消息就是(a,1)这个在每个triplet中是从src发放dst的。如果某个边是从3到5,那么triplet.srcAttr + triplet.attr < triplet.dstAttr就不成立,因为无穷大加1等于无穷大,这时消息就是空的。Superstep 1就是这样,这一步执行完后图中所有的与0直接相连的点的attr都成了1而且成为获跃节点,其它点的attr不变同时变成不活跃节点。活结点根据triplet.srcAttr + triplet.attr < triplet.dstAttr继续发消息,mergeMsg函数会对发送到同一节点的多个消息进行聚合,聚合的结果就是最小的那个值。
Superstep 2:所有收到消息的节点比较自己的attr和发过来的attr,将较小的值作为自己的attr。然后自己成为活节点继续向周围的节点发送attr+1这个消息,然后再聚合。
直到没有节点的attr被更新,不再满足activeMessages > 0 && i < maxIterations (活跃节点数为大于0且没有达到最大允许迭代次数)。这时就得到节点0到其它节点的最短路径了。这个路径值保存在其它节点的attr中。


http://chatgpt.dhexx.cn/article/wsgSq20M.shtml

相关文章

Pregel体系结构

在Pregel计算框架中&#xff0c;一个大型图会被划分成许多个分区&#xff0c;每个分区都包含了一部分顶点以及以其为起点的边 一个顶点应该被分配到哪个分区上&#xff0c;是由一个函数决定的&#xff0c;系统默认函数为hash(ID) mod N&#xff0c;其中&#xff0c;N为所有分区…

Spark GraphX 中的 pregel函数(转载)

文章目录 pregel函数源码 与 各个参数介绍&#xff1a;案例&#xff1a; 求顶点5 到 其他各顶点的 最短距离pregel原理分析 一篇关于 Spark GraphX 中 pregel函数 的笔记&#xff0c;通过一个小案例将pregel函数理解透彻。 pregel函数源码 与 各个参数介绍&#xff1a; def…

03 graphx 从 SSSP 来看 pregel

前言 呵呵 最近刚好有一些需要使用到 图的相关计算 然后 在其他文章中找到了一篇 关于最短路径的graphx计算的代码 spark graphx 最短路径及中间节点 呵呵 很久没有用这些东西了, 虽然只是简单的使用, 但是还是要 复习一下, 稍微理解一下 他的执行方式 pregel 相关论文 …

ArangoDB(四)Pregel

arango pregel.status()返回值 localhost:8529_system> pregel.status(1099521660554) {"state" : "done","gss" : 7,"totalRuntime" : 8.389497518539429,"aggregators" : {},"sendCount" : 392647,"re…

graphx中的pregel原理详解

优秀参考&#xff1a; graphx教程参考&#xff1a;https://www.jianshu.com/p/ad5cedc30ba4 pergel函数详细讲解&#xff1a;https://blog.csdn.net/hanweileilei/article/details/89764466 迪杰斯特拉原理简介&#xff1a;https://www.jianshu.com/p/ad5cedc30ba4 ps: 以最…

Pregel模型

简介 在Hadoop兴起之后&#xff0c;google又发布了三篇研究论文&#xff0c;分别阐述了了Caffeine、Pregel、Dremel三种技术&#xff0c;这三种技术也被成为google的新“三驾马车”&#xff0c;其中的Pregel是google提出的用于大规模分布式图计算框架。主要用于图遍历&#xf…

Spark Graphx Pregel(pregel参数详解,pregel调用实现过程的详细解释)

Spark Graphx Pregel 一.Pregel概述1.什么是pregel&#xff1f;2.pregel应用场景 二.Pregel源码及参数解释1.源码2.参数详细解释&#xff08;1&#xff09;initialMsg&#xff08;2&#xff09;maxIteration&#xff08;3&#xff09;activeDirection&#xff08;4&#xff09;…

2020.11.26课堂笔记(sparkGraphx算法之pregel)

参考博客&#xff1a;https://blog.csdn.net/hanweileilei/article/details/89764466 大佬博客写的很详细&#xff0c;不用继续看这篇了&#xff0c;随便写一些记录一下。 Pregel框架&#xff1a; Pregel是一种面向图算法的分布式编程框架&#xff0c;采用迭代的计算模型&…

Pregel(图计算)技术原理

图计算简介 图结构数据&#xff1a; 许多大数据都是以大规模图或网络的形式呈现。许多非图结构的大数据&#xff0c;也常常会被转换为图模型后进行分析。图数据结构很好地表达了数据之间的关联性。关联性计算是大数据计算的核心——通过获得数据的关联性&#xff0c;可以从噪…

python bar函数

bar(left, height, width, color, align, yerr)函数&#xff1a;绘制柱形图。left为x轴的位置序列&#xff0c;一般采用arange函数产生一个序列&#xff1b;height为y轴的数值序列&#xff0c;也就是柱形图的高度&#xff0c;一般就是我们需要展示的数据&#xff1b;width为柱形…

C++ 函数模板

函数模板是通用的函数描述&#xff0c;它们使用泛型来定义函数&#xff0c;其中的泛型可用具体的类型替换。通过将类型作为参数传递给模板&#xff0c;可使编译器生成该类型的函数。由于模板允许以泛型&#xff08;而不是具体类型&#xff09;的方式编写程序&#xff0c;因此有…

lead窗口函数

lead函数在Impala中可以配合over使用&#xff0c;lead函数有三个参数 lead(property,num,default) 第一个参数「property」标识想查询的列&#xff0c;「num」标识相对于当前行的第num行&#xff0c;第三个参数是默认值。 举例&#xff1a; -- 建表 CREATE TABLE test(id s…

C++ 仿函数

文章目录 1.由来2.定义3.实例参考文献 1.由来 我们先从一个非常简单的问题入手&#xff0c;来了解为什么要有仿函数。 假设我们现在有一个数组&#xff0c;数组中存有任意数量的数字&#xff0c;我们希望能够统计出这个数组中大于 10 的数字的数量&#xff0c;你的代码很可能…

心形函数的几种表达式

用两个函数表示&#xff1a; f(x)sqrt(1-(abs(x)-1)^2) h(x)-2*sqrt(1-0.5*abs(x)) 也可以根据图中的q(x)画出心形的内部&#xff1a; q(x)(f(x)-h(x))/2*cos(200*x)(f(x)h(x))/2 带入得&#xff1a; 用一个函数表示&#xff0c;我拟合了很久才画出来的&#xff1a; f(x)…

共轭函数

共轭函数在最近火的不行的Gan生成对抗神经网络进阶版本的数学推理中有着神奇的作用&#xff0c;因此在这边记录下。 共轭函数的定义为&#xff1a; f ∗ ( t ) max ⁡ x ∈ dom ⁡ ( f ) { x t − f ( x ) } f ^ { * } ( t ) \max _ { x \in \operatorname { dom } ( f ) }…

高斯函数解析

高斯函数广泛应用于统计学领域&#xff0c;用于表述正态分布&#xff0c;在信号处理领域&#xff0c;用于定义高斯滤波器&#xff0c;在图像处理领域&#xff0c;二维高斯核函数常用于高斯模糊&#xff0c;在数学领域&#xff0c;主要用于解决热力方程和扩散方程。 https://blo…

PostgreSQL 函数

PostgreSQL 函数 函数的定义 使用函数&#xff0c;可以极大的提高用户对数据库的管理效率。函数表示输入参数表示一个具有特定关系的值。 一、数学函数 绝对值函数、三角函数、对数函数、随机函数等&#xff0c;当有错误产生时&#xff0c;数学函数会返回null值。 二、函数…

EXCEL IFS函数简单使用

IFS函数的使用&#xff1a; 在学生成绩以及绩效考核中&#xff0c;我们需要对每个范围的成绩打分。比如【A】,【B】,【C】,【D】。可以使用【IFS()函数】完成操作。 1&#xff1a;选择单元格【C2】&#xff0c;输入【】&#xff0c;点击【fx】&#xff0c;弹出【插入函数】对话…

函数的返回值

1.什么是函数的返回值? print 和 return 的区别,print 仅仅是打印在控制台,而 return 则是将 return 后面的部分作为返回值作为函数的输出 可以用变量接走,继续使用该返回值做其它事 函数需要先定义后调用,函数体中 return 语句的结果就是返回值 如果一个函数没有 reutrn…

反双曲函数

Chapter10&#xff1a;反双曲函数 10.3 反双曲函数10.3.1 反双曲正弦函数【 yarsinh(x) 】反双曲正弦函数图像反双曲正弦函数的指数形式反双曲正弦函数的对数形式推导反双曲正弦函数的导数推导 10.3.2 反双曲余弦函数【 yarcosh(x) 】反双曲余弦函数图像反双曲余弦函数的指数形…