Spark Graphx Pregel(pregel参数详解,pregel调用实现过程的详细解释)

article/2025/9/7 10:54:24

Spark Graphx Pregel

  • 一.Pregel概述
    • 1.什么是pregel?
    • 2.pregel应用场景
  • 二.Pregel源码及参数解释
    • 1.源码
    • 2.参数详细解释
      • (1)initialMsg
      • (2)maxIteration
      • (3)activeDirection
      • (4)vprog
      • (5)sendMsg
      • (6)mergeMsg
  • 三.Pregel计算顶点5 到 其他各顶点的 最短距离
    • 1.图信息
      • (1)顶点信息
      • (2)边信息
    • 2.Pregel原理分析
      • (1)调用pregel方法之前
      • (2)当调用pregel方法开始
      • (3)第一次迭代开始
      • (4)第二次迭代开始
      • (5)第三次迭代开始
      • (6)第四次迭代开始
      • (7)第五次迭代开始
    • 3.代码实现

一.Pregel概述

1.什么是pregel?

Pregel是Google 提出的用于大规模分布式图计算框架。Pregel是个强大的基于图的迭代算法。

2.pregel应用场景

一般pregel可以在图中进行迭代计算,如求最短路径,关键路径,n度关系等。

二.Pregel源码及参数解释

1.源码

 def pregel[A: ClassTag](initialMsg: A,maxIterations: Int = Int.MaxValue,activeDirection: EdgeDirection = EdgeDirection.Either)(vprog: (VertexId, VD, A) => VD,sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],mergeMsg: (A, A) => A): Graph[VD, ED] = {Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)}

2.参数详细解释

(1)initialMsg

初始化消息。这个初始化消息会被用来初始化图中的每个节点的属性,在pregel调用时,会首先在图上使用mapVertices来根据initialMsg的值更新每个几点的值。至于如何更新,则由vprog参数而定,vprog函数就接收了initialMsg消息作为参数来更新对应节点的值。

(2)maxIteration

最大迭代次数

(3)activeDirection

表示边的活跃方向

  • 活跃节点:是指在某一轮迭代中,pregel会以sendMsg和mergeMsg为参数来调用graph的aggregateMessage方法后收到消息的节点

  • 活跃消息:是这轮迭代中所有被成功收到的消息

    则有的边src节点是活跃节点,有的dst节点是活跃节点,有的边两端节点都是活跃节点。如果activeDirection参数被指定为"EdgeDirection.out",则在下一轮迭代中,只接收消息的出边(src—>dst)才会执行sendMsg函数。也就是说,sendMsg回调函数会过滤掉(dst—>src)的edgeTriplet上下文参数

(4)vprog

节点变换函数。

在初始时,以及每轮迭代后,pregel会根据上一轮使用的msg和这里的vprog函数在图上调用joinVertices方法变化每个收到消息的节点。

(5)sendMsg

消息发送函数。该函数的运行参数是一个代表边的上下文,pregel在调用aggregateMessage是,会将EdgeContext转换成EdgeTriplet对象来使用,用户需要通过Iterator[(VertexID,A)]指定发送哪些消息,发送哪些节点,发送哪些内容;因为在一条边上可以发送多个消息,如sendToDst,sendToSrc,所以这里是个Iterator,每个元素是一个tuple,其中的vertexId便是接收此消息的节点id,只能是该边上的srcId或者dstId而A就是要发送的内容

因此,如果要由src发送一条消息A到dst,则有:Iterator((dstId,A)),如果什么消息也不发送,则返回一个空的Iterator:Iterator.empty

(6)mergeMsg

邻居节点收到多条消息时的合并逻辑。

区别与vprog,mergeMsg仅能合并消息内容,但合并后并不会更新到节点中去,而vprog函数可以根据收到的消息(就是mergeMsg产生的结果)更新节点属性

三.Pregel计算顶点5 到 其他各顶点的 最短距离

1.图信息

(1)顶点信息

      (1L, ("Alice", 28)),(2L, ("Bob", 27)),(3L, ("Charlie", 65)),(4L, ("David", 42)),(5L, ("Ed", 55)),(6L, ("Fran", 50))

(2)边信息

      Edge(2L, 1L, 7),Edge(2L, 4L, 2),Edge(3L, 2L, 4),Edge(3L, 6L, 3),Edge(4L, 1L, 1),Edge(2L, 5L, 2),Edge(5L, 3L, 8),Edge(5L, 6L, 3)

2.Pregel原理分析

  • 顶点的两个状态:
    • 钝化态:类似于休眠,不做任何处理
    • 激活态:可以进行数据的接受和发送
  • 顶点能够处于激活状态需要的条件
    • 成功收到消息
    • 成功发送任何一条消息

(1)调用pregel方法之前

先把图的各个顶点的属性初始化,即顶点5到自己的距离为0,所以设为0,其他顶点都设为正无穷大Double.PositiveInifinity

在这里插入图片描述

(2)当调用pregel方法开始

首先,所有顶点都将接收到一条初始消息initialMsg ,使所有顶点都处于激活态(红色标识的节点)

在这里插入图片描述

(3)第一次迭代开始

所有顶点以EdgeDirection.Out的边方向调用sendMsg方法发送消息给目标顶点,如果 源顶点的属性+边的属性<目标顶点的属性,则发送消息。否则不发送。

5—>3(0+8<Double.Infinity,成功),

5—>6(0+3<Double.Infinity,成功),

3—>2(Double.Infinity+4>Double.Infinity,失败),

3—>6(Double.Infinity+3>Double.Infinity,失败),

2—>1(Double.Infinity+7>Double.Infinity,失败),

2—>4(Double.Infinity+2>Double.Infinity,失败),

2—>5(Double.Infinity+2>Double.Infinity,失败),

4—>1(Double.Infinity+1>Double.Infinity,失败)

sendMsg方法执行完成之后,根据顶点处于激活态的条件,顶点5成功地分别给顶点3和顶点6发送消息,顶点3 和 顶点6 也成功地接受到了消息。

所以此时只有5,3,6三个顶点处于激活状态,其他顶点全部钝化。然后收到消息的顶点3和顶点6都调用vprog方法,将收到的消息与自身属性合并。如图所示,至此第一次迭代结束。

在这里插入图片描述

(4)第二次迭代开始

顶点3 给 顶点6 发送消息失败,顶点3 给 顶点2 发送消息成功,此时 顶点3 成功发送消息,顶点2 成功接收消息,所以顶点2 和 顶点3 都成为激活状态,其他顶点都成为钝化状态。然后顶点2 调用vprog方法,将收到的消息 与 自身的属性合并。至此第二次迭代结束

3—>2(8+4<Double.Infinity,成功),

3—>6(8+3>3,失败)

在这里插入图片描述

(5)第三次迭代开始

顶点3分别发送消息给顶点2失败 和 顶点6失败,顶点2 分别发消息给 顶点1成功、顶点4成功、顶点5失败 ,所以 顶点2、顶点1、顶点4 成为激活状态,其他顶点为钝化状态。顶点1 和 顶点4分别调用vprog方法,将收到的消息 与 自身的属性合并。至此第三次迭代结束

3—>2(8+4=12,失败),

3—>6(8+3>3,失败)

2—>1(12+7<Double.Infinity,成功)

2—>4(12+2<Double.Infinity,成功)

在这里插入图片描述

(6)第四次迭代开始

顶点2 分别发送消息给 顶点1失败 和 顶点4失败。顶点4 给 顶点1发送消息成功,顶点1 和 顶点4 进入激活状态,其他顶点进入钝化状态。顶点1 调用vprog方法,将收到的消息 与 自身的属性合并

2—>1(12+7=19,失败)

2—>4(12+2=14,失败)

4—>1(14+1<19,成功)

在这里插入图片描述

(7)第五次迭代开始

顶点4 再给 顶点1发送消息失败,顶点4 和 顶点1 进入钝化状态,此时全图都进入钝化状态。至此结束

4—>1(14+1=15,失败)

在这里插入图片描述

结论:由上述分析过程可知,顶点5到其他各顶点距离全部算出,

5—>1 (15)

5—>2 (12)

5—>3 (8)

5—>4 (14)

5—>6 (3)

3.代码实现

package suanfaimport org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object PregelDemo {def main(args: Array[String]): Unit = {//TODO:1.创建SparkContext对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("pregeldemo")val sc = new SparkContext(conf)//TODO:2、创建顶点val vertexArray = Array((1L, ("Alice", 28)),(2L, ("Bob", 27)),(3L, ("Charlie", 65)),(4L, ("David", 42)),(5L, ("Ed", 55)),(6L, ("Fran", 50)))val vertexRDD: RDD[(VertexId, (String, Int))] = sc.makeRDD(vertexArray)//TODO:3、创建边,边的属性代表 相邻两个顶点之间的距离val edgeArray = Array(Edge(2L, 1L, 7),Edge(2L, 4L, 2),Edge(3L, 2L, 4),Edge(3L, 6L, 3),Edge(4L, 1L, 1),Edge(2L, 5L, 2),Edge(5L, 3L, 8),Edge(5L, 6L, 3))val edgeRDD: RDD[Edge[Int]] = sc.makeRDD(edgeArray)//TODO:4、创建图(使用apply方式创建)val graph1 = Graph(vertexRDD, edgeRDD)/* ************************** 使用pregle算法计算 ,顶点5 到 各个顶点的最短距离 ************************** *///TODO:5、调用pregel算法//todo:(1)設置頂點信息//被计算的图中 起始顶点idval srcVertexId = 5L//给每个顶点赋属性值val initialGraph: Graph[Double, Int] = graph1.mapVertices { case (vid, (name, age)) => if (vid == srcVertexId) 0.0 else Double.PositiveInfinity }println(" 1.每个顶点的属性值如下")initialGraph.vertices.collect().foreach(println)println("---------------开始调用pregel---------------")//todo:(2)調用pregel,返回的还是一个图val pregelGraph: Graph[Double, PartitionID] = initialGraph.pregel(Double.PositiveInfinity,//每个点的初始值,无穷大Int.MaxValue,               //最大迭代次数EdgeDirection.Out       //发送消息的方向)(//todo:vprog:接受到的消息和自己的消息进行合并//这个顶点sendMsg发送的顶点信息// 三个参数 vprog: (VertexId, VD, A) => VD,//VertexId当前节点的顶点id,VD当前顶点的属性,A接收到的信息//返回值:当前顶点更新后的属性(vid: VertexId, vd: Double, distMsg: Double) => {println(s"----------顶点${vid}调用vprog:接受到的消息和自己的消息进行合并----------------")//即将接收到的信息和顶点属性进行比较,取最小值进行更新该顶点属性val minDist = math.min(vd, distMsg)println(s"顶点${vid},顶点属性${vd},收到消息${distMsg},合并后的属性${minDist}")minDist},//todo: sendMsg:发送消息,如果自己的消息+权重<目的地的消息,则发送//一个参数 : sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)]//即表的信息//返回值:发送成功后的节点id和发送的消息的一个迭代器(edgeTriplet: EdgeTriplet[Double, PartitionID]) => {println(s"----------调用${edgeTriplet.srcId}调用sendMsg发送消息给顶点${edgeTriplet.dstId}------------")if (edgeTriplet.srcAttr + edgeTriplet.attr < edgeTriplet.dstAttr) {println(s"顶点${edgeTriplet.srcId}给顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}")Iterator[(VertexId, Double)]((edgeTriplet.dstId, edgeTriplet.srcAttr + edgeTriplet.attr))} else {Iterator.empty}},//todo:mergeMsg多条接收消息,mergeMessage,取小合并多条消息// mergeMsg: (A, A) => A)(msg1: Double, msg2: Double) => {println(s"----------${msg1},${msg2}调用mergeMsg:合并多条接收消息------------")println(msg1,msg2)math.min(msg1, msg2)})}
}

输出结果:

//初始化每个节点的属性
1.每个顶点的属性值如下
(1,Infinity)
(2,Infinity)
(3,Infinity)
(4,Infinity)
(5,0.0)
(6,Infinity)
---------------开始调用pregel---------------
//最开始每个节点都会收到初始化的属性Double.PositiveInfinity,会通过调用vprog(接受到的消息和自己的消息进行合并)对节点属性进行合并
----------顶点5调用vprog:接受到的消息和自己的消息进行合并----------------
顶点5,顶点属性0.0,收到消息Infinity,合并后的属性0.0
----------顶点6调用vprog:接受到的消息和自己的消息进行合并----------------
顶点6,顶点属性Infinity,收到消息Infinity,合并后的属性Infinity
----------顶点1调用vprog:接受到的消息和自己的消息进行合并----------------
顶点1,顶点属性Infinity,收到消息Infinity,合并后的属性Infinity
----------顶点3调用vprog:接受到的消息和自己的消息进行合并----------------
顶点3,顶点属性Infinity,收到消息Infinity,合并后的属性Infinity
----------顶点2调用vprog:接受到的消息和自己的消息进行合并----------------
顶点2,顶点属性Infinity,收到消息Infinity,合并后的属性Infinity
----------顶点4调用vprog:接受到的消息和自己的消息进行合并----------------
顶点4,顶点属性Infinity,收到消息Infinity,合并后的属性Infinity
//---------------第一次迭代---------------
//第一次进行迭代,按照发送消息的方向发送消息,可知,第一次迭代只有顶点5发送消息到顶点6和3是满足sendMsg的条件的。即发送成功,
----------调用3调用sendMsg发送消息给顶点6------------
----------调用5调用sendMsg发送消息给顶点6------------
顶点5给顶点6 发送消息 3.0
----------调用2调用sendMsg发送消息给顶点5------------
----------调用3调用sendMsg发送消息给顶点2------------
----------调用4调用sendMsg发送消息给顶点1------------
----------调用5调用sendMsg发送消息给顶点3------------
顶点5给顶点3 发送消息 8.0
----------调用2调用sendMsg发送消息给顶点4------------
----------调用2调用sendMsg发送消息给顶点1------------
//顶点6和顶点3接收到信息后就会调用vprog进行合并属性
----------顶点6调用vprog:接受到的消息和自己的消息进行合并--------------
顶点6,顶点属性Infinity,收到消息3.0,合并后的属性3.0
----------顶点3调用vprog:接受到的消息和自己的消息进行合并----------------
顶点3,顶点属性Infinity,收到消息8.0,合并后的属性8.0
//---------------第二次迭代---------------
//经过第一次迭代后,3,5,6处于激活状态
//顶点3有出边,顶点6没有出边,可知,顶点3给顶点2发送消息成功,给顶点6发送失败
----------调用3调用sendMsg发送消息给顶点2------------
顶点3给顶点2 发送消息 12.0
----------调用3调用sendMsg发送消息给顶点6------------
//收到消息的顶点2调用vprog合并顶点属性
----------顶点2调用vprog:接受到的消息和自己的消息进行合并----------------
顶点2,顶点属性Infinity,收到消息12.0,合并后的属性12.0
//---------------第三次迭代---------------
----------调用2调用sendMsg发送消息给顶点5------------
----------调用2调用sendMsg发送消息给顶点1------------
顶点2给顶点1 发送消息 19.0
----------调用2调用sendMsg发送消息给顶点4------------
顶点2给顶点4 发送消息 14.0
----------顶点1调用vprog:接受到的消息和自己的消息进行合并----------------
顶点1,顶点属性Infinity,收到消息19.0,合并后的属性19.0
----------顶点4调用vprog:接受到的消息和自己的消息进行合并----------------
顶点4,顶点属性Infinity,收到消息14.0,合并后的属性14.0
//---------------第四次迭代---------------
----------调用4调用sendMsg发送消息给顶点1------------
顶点4给顶点1 发送消息 15.0
----------顶点1调用vprog:接受到的消息和自己的消息进行合并----------------
顶点1,顶点属性19.0,收到消息15.0,合并后的属性15.0
//---------- 第五次迭代不用发送消息,所有节点钝化 -----------------

通过对输出结果进行分析,可知,大致流程是首先在未调用pregel方法之前给每个节点一个初始值,然后通过调用pregel给每个顶点收到一条初始消息initialMsg,所有顶点处于激活状态,调用vprog对每个节点进行属性合并。然后每个激活态的顶点开始调用sendMsg根据 EdgeDirection.Out方向进行消息发送,将发送成功的顶点进行激活,其他顶点进行钝化处理,接收消息成功的顶点开始调用vprog进行合并顶点信息。这些被激活的顶点进行再次迭代,直到所有顶点钝化结束完成


http://chatgpt.dhexx.cn/article/bSFyCrf9.shtml

相关文章

2020.11.26课堂笔记(sparkGraphx算法之pregel)

参考博客&#xff1a;https://blog.csdn.net/hanweileilei/article/details/89764466 大佬博客写的很详细&#xff0c;不用继续看这篇了&#xff0c;随便写一些记录一下。 Pregel框架&#xff1a; Pregel是一种面向图算法的分布式编程框架&#xff0c;采用迭代的计算模型&…

Pregel(图计算)技术原理

图计算简介 图结构数据&#xff1a; 许多大数据都是以大规模图或网络的形式呈现。许多非图结构的大数据&#xff0c;也常常会被转换为图模型后进行分析。图数据结构很好地表达了数据之间的关联性。关联性计算是大数据计算的核心——通过获得数据的关联性&#xff0c;可以从噪…

python bar函数

bar(left, height, width, color, align, yerr)函数&#xff1a;绘制柱形图。left为x轴的位置序列&#xff0c;一般采用arange函数产生一个序列&#xff1b;height为y轴的数值序列&#xff0c;也就是柱形图的高度&#xff0c;一般就是我们需要展示的数据&#xff1b;width为柱形…

C++ 函数模板

函数模板是通用的函数描述&#xff0c;它们使用泛型来定义函数&#xff0c;其中的泛型可用具体的类型替换。通过将类型作为参数传递给模板&#xff0c;可使编译器生成该类型的函数。由于模板允许以泛型&#xff08;而不是具体类型&#xff09;的方式编写程序&#xff0c;因此有…

lead窗口函数

lead函数在Impala中可以配合over使用&#xff0c;lead函数有三个参数 lead(property,num,default) 第一个参数「property」标识想查询的列&#xff0c;「num」标识相对于当前行的第num行&#xff0c;第三个参数是默认值。 举例&#xff1a; -- 建表 CREATE TABLE test(id s…

C++ 仿函数

文章目录 1.由来2.定义3.实例参考文献 1.由来 我们先从一个非常简单的问题入手&#xff0c;来了解为什么要有仿函数。 假设我们现在有一个数组&#xff0c;数组中存有任意数量的数字&#xff0c;我们希望能够统计出这个数组中大于 10 的数字的数量&#xff0c;你的代码很可能…

心形函数的几种表达式

用两个函数表示&#xff1a; f(x)sqrt(1-(abs(x)-1)^2) h(x)-2*sqrt(1-0.5*abs(x)) 也可以根据图中的q(x)画出心形的内部&#xff1a; q(x)(f(x)-h(x))/2*cos(200*x)(f(x)h(x))/2 带入得&#xff1a; 用一个函数表示&#xff0c;我拟合了很久才画出来的&#xff1a; f(x)…

共轭函数

共轭函数在最近火的不行的Gan生成对抗神经网络进阶版本的数学推理中有着神奇的作用&#xff0c;因此在这边记录下。 共轭函数的定义为&#xff1a; f ∗ ( t ) max ⁡ x ∈ dom ⁡ ( f ) { x t − f ( x ) } f ^ { * } ( t ) \max _ { x \in \operatorname { dom } ( f ) }…

高斯函数解析

高斯函数广泛应用于统计学领域&#xff0c;用于表述正态分布&#xff0c;在信号处理领域&#xff0c;用于定义高斯滤波器&#xff0c;在图像处理领域&#xff0c;二维高斯核函数常用于高斯模糊&#xff0c;在数学领域&#xff0c;主要用于解决热力方程和扩散方程。 https://blo…

PostgreSQL 函数

PostgreSQL 函数 函数的定义 使用函数&#xff0c;可以极大的提高用户对数据库的管理效率。函数表示输入参数表示一个具有特定关系的值。 一、数学函数 绝对值函数、三角函数、对数函数、随机函数等&#xff0c;当有错误产生时&#xff0c;数学函数会返回null值。 二、函数…

EXCEL IFS函数简单使用

IFS函数的使用&#xff1a; 在学生成绩以及绩效考核中&#xff0c;我们需要对每个范围的成绩打分。比如【A】,【B】,【C】,【D】。可以使用【IFS()函数】完成操作。 1&#xff1a;选择单元格【C2】&#xff0c;输入【】&#xff0c;点击【fx】&#xff0c;弹出【插入函数】对话…

函数的返回值

1.什么是函数的返回值? print 和 return 的区别,print 仅仅是打印在控制台,而 return 则是将 return 后面的部分作为返回值作为函数的输出 可以用变量接走,继续使用该返回值做其它事 函数需要先定义后调用,函数体中 return 语句的结果就是返回值 如果一个函数没有 reutrn…

反双曲函数

Chapter10&#xff1a;反双曲函数 10.3 反双曲函数10.3.1 反双曲正弦函数【 yarsinh(x) 】反双曲正弦函数图像反双曲正弦函数的指数形式反双曲正弦函数的对数形式推导反双曲正弦函数的导数推导 10.3.2 反双曲余弦函数【 yarcosh(x) 】反双曲余弦函数图像反双曲余弦函数的指数形…

损失函数作用

前言&#xff1a;损失函数是机器学习里最基础也是最为关键的一个要素&#xff0c;通过对损失函数的定义、优化&#xff0c;就可以衍生到我们现在常用的机器学习等算法中 损失函数的作用&#xff1a;衡量模型模型预测的好坏。 正文&#xff1a; 首先我们假设要预测一个公司某商品…

Python自定义函数

一、自定义函数的固定语句 def contrast(a,b) : #使用def来定义一个名称为contrast的方法,a与b的值是两个变量&#xff0c;称为形参if a>b : #使用条件语句进行判定return a #返回a的值elif b>a :return b #返回b的值else:return (ab) …

可测函数

1 定义 可测函数:设是定义在可测集上的实函数,称为上的可测函数,如果满足: a])=b_{a})" class="mathcode" src="https://private.codecogs.com/gif.latex?%5Cforall%20a%5Cin%20R%2C%20%7Ca%7C%20%3C%20&plus;%5Cinfty%2C%5Cexists%20b_%7Ba%7D…

虚函数详解

文章目录 一、多态与重载1、多态的概念2、重载---编译期多态的体现3、虚函数---运行期多态的体现 二、虚函数实例三、虚函数的实现&#xff08;内存布局&#xff09;1、无继承情况2、单继承情况&#xff08;无虚函数覆盖&#xff09;3、单继承情况&#xff08;有虚函数覆盖&…

Java教程之NIO的基本用法

NIO的基本用法 NIO是New I/O的简称&#xff0c;与旧式基于流的I/O相对&#xff0c;从名字上来看&#xff0c;它表示新的一套I/O标准。它是从JDK1.4中被纳入到JDK中的。 与旧式的IO流相比&#xff0c;NIO是基于Block的&#xff0c;它以块为单位来处理数据&#xff0c;最为重要…

关于vp8,vp8与264比较总结

1 Other Codecs l MSN 使用的video codec “x-rtvc1”,09之前的版本使用的ML20.参考网址&#xff1a; http://www.amsn-project.net/forums/index.php?topic6612.0 l Yahoo messenger 使用GIPS的LSVX codec. l 这两个codecs技术保密性强&#xff0c;找不到有用的信息&#xff…

PCM(脉冲编码调制)、iLBC编解码、opus(声音编码格式)、VP8视频压缩格式、H.264数字视频压缩格式

目录 PCM&#xff08;脉冲编码调制&#xff09; 发展史 工作原理 iLBC编解码 基本介绍 技术优势 Opus&#xff08;声音编码格式&#xff09; 特性 播放 技术细节 VP8视频压缩格式 简介 突破创新 技术分析 H.264数字视频压缩格式 背景介绍 优势 特点 PCM&…