Spark GraphX 中的 pregel函数(转载)

article/2025/9/7 10:48:47

文章目录

    • pregel函数源码 与 各个参数介绍:
    • 案例: 求顶点5 到 其他各顶点的 最短距离
    • pregel原理分析


一篇关于 Spark GraphX 中 pregel函数 的笔记,通过一个小案例将pregel函数理解透彻。


  • pregel函数源码 与 各个参数介绍:


  def pregel[A: ClassTag](initialMsg: A,maxIterations: Int = Int.MaxValue,activeDirection: EdgeDirection = EdgeDirection.Either)(vprog: (VertexId, VD, A) => VD,sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],mergeMsg: (A, A) => A): Graph[VD, ED] = {Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)}
  • 参数说明
    initialMsg图初始化的时候,开始模型计算的时候,所有节点都会先收到一个消息
    maxIterations最大迭代次数
    activeDirection规定了发送消息的方向
    vprog节点调用该消息将聚合后的数据和本节点进行属性的合并
    sendMsg激活态的节点调用该方法发送消息
    mergeMsg如果一个节点接收到多条消息,先用mergeMsg 来将多条消息聚合成为一条消息,如果节点只收到一条消息,则不调用该函数

    • 案例: 求顶点5 到 其他各顶点的 最短距离


    在理解案例之前,首先要清楚关于 顶点 的两点知识:

    1. 顶点 的状态有两种:
      (1)、钝化态【类似于休眠,不做任何事】
      (2)、激活态【干活】

    2. 顶点 能够处于激活态需要有条件:
      (1)、成功收到消息 或者
      (2)、成功发送了任何一条消息

    案例源码如下:

package com.hanwei.sparkgraphx01.helloworld
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDDobject Graphx06_Pregel extends App{//1、创建SparkContextval sparkConf = new SparkConf().setAppName("GraphxHelloWorld").setMaster("local[*]")val sparkContext = new SparkContext(sparkConf)//2、创建顶点val vertexArray = Array((1L, ("Alice", 28)),(2L, ("Bob", 27)),(3L, ("Charlie", 65)),(4L, ("David", 42)),(5L, ("Ed", 55)),(6L, ("Fran", 50)))val vertexRDD: RDD[(VertexId, (String,Int))] = sparkContext.makeRDD(vertexArray)//3、创建边,边的属性代表 相邻两个顶点之间的距离val edgeArray = Array(Edge(2L, 1L, 7),Edge(2L, 4L, 2),Edge(3L, 2L, 4),Edge(3L, 6L, 3),Edge(4L, 1L, 1),Edge(2L, 5L, 2),Edge(5L, 3L, 8),Edge(5L, 6L, 3))val edgeRDD: RDD[Edge[Int]] = sparkContext.makeRDD(edgeArray)//4、创建图(使用aply方式创建)val graph1 = Graph(vertexRDD, edgeRDD)/* ************************** 使用pregle算法计算 ,顶点5 到 各个顶点的最短距离 ************************** *///被计算的图中 起始顶点idval srcVertexId = 5L val initialGraph = graph1.mapVertices{case (vid,(name,age)) =>//如果为起始顶点,则值为0,否则为无穷大 if(vid==srcVertexId) 0.0 else Double.PositiveInfinity}//5、调用pregelval pregelGraph = initialGraph.pregel(//initialMsgDouble.PositiveInfinity,//maxIterationsInt.MaxValue,//activeDirectionEdgeDirection.Out)(//vprog(vid: VertexId, vd: Double, distMsg: Double) => {val minDist = math.min(vd, distMsg)println(s"顶点${vid},属性${vd},收到消息${distMsg},合并后的属性${minDist}")minDist},//sendMsg(edgeTriplet: EdgeTriplet[Double,PartitionID]) => {if (edgeTriplet.srcAttr + edgeTriplet.attr < edgeTriplet.dstAttr) {println(s"顶点${edgeTriplet.srcId} 给 顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}")Iterator[(VertexId, Double)]((edgeTriplet.dstId, edgeTriplet.srcAttr + edgeTriplet.attr))} else {Iterator.empty}},//mergeMsg(msg1: Double, msg2: Double) => math.min(msg1, msg2))//6、输出结果
//  pregelGraph.triplets.collect().foreach(println)
//  println(pregelGraph.vertices.collect.mkString("\n"))//7、关闭SparkContextsparkContext.stop()
}

  • 输出结果:

    //------------------------------------------ 各个顶点接受初始消息initialMsg ------------------------------------------
    顶点3,属性Infinity,收到消息Infinity,合并后的属性Infinity
    顶点2,属性Infinity,收到消息Infinity,合并后的属性Infinity
    顶点4,属性Infinity,收到消息Infinity,合并后的属性Infinity
    顶点6,属性Infinity,收到消息Infinity,合并后的属性Infinity
    顶点1,属性Infinity,收到消息Infinity,合并后的属性Infinity
    顶点5,属性0.0,收到消息Infinity,合并后的属性0.0
    //------------------------------------------ 第一次迭代 ------------------------------------------
    顶点5 给 顶点6 发送消息 3.0
    顶点5 给 顶点3 发送消息 8.0
    顶点3,属性Infinity,收到消息8.0,合并后的属性8.0
    顶点6,属性Infinity,收到消息3.0,合并后的属性3.0
    //------------------------------------------ 第二次迭代 ------------------------------------------
    顶点3 给 顶点2 发送消息 12.0
    顶点2,属性Infinity,收到消息12.0,合并后的属性12.0
    //------------------------------------------ 第三次迭代 ------------------------------------------
    顶点2 给 顶点4 发送消息 14.0
    顶点2 给 顶点1 发送消息 19.0
    顶点1,属性Infinity,收到消息19.0,合并后的属性19.0
    顶点4,属性Infinity,收到消息14.0,合并后的属性14.0
    //------------------------------------------ 第四次迭代 ------------------------------------------
    顶点4 给 顶点1 发送消息 15.0
    顶点1,属性19.0,收到消息15.0,合并后的属性15.0
    //------------------------------------------ 第五次迭代不用发送消息 ------------------------------------------


    • pregel原理分析


    调用pregel方法之前,先把图的各个顶点的属性初始化为如下图所示:顶点5到自己的距离为0,所以设为0,其他顶点都设为 正无穷大Double.PositiveInfinity。见代码44行

    当调用pregel方法开始:
    首先,所有顶点都将接收到一条初始消息initialMsg ,使所有顶点都处于激活态(红色标识的节点)。
    图1

    第一次迭代开始:

    所有顶点以EdgeDirection.Out的边方向调用sendMsg方法发送消息给目标顶点,如果 源顶点的属性+边的属性<目标顶点的属性,则发送消息。否则不发送。
    发送成功的只有两条边:
    5—>3(0+8<Double.Infinity , 成功),
    5—>6(0+3<Double.Infinity , 成功)
    3—>2(Double.Infinity+4>Double.Infinity , 失败)
    3—>6(Double.Infinity+3>Double.Infinity , 失败)
    2—>1(Double.Infinity+7>Double.Infinity , 失败)
    2—>4(Double.Infinity+2>Double.Infinity , 失败)
    2—>5(Double.Infinity+2>Double.Infinity , 失败)
    4—>1(Double.Infinity+1>Double.Infinity , 失败)。

    sendMsg方法执行完成之后,根据顶点处于激活态的条件,顶点5 成功地分别给顶点3 和 顶点6 发送了消息,顶点3 和 顶点6 也成功地接受到了消息。所以 此时只有5,3,6 三个顶点处于激活态,其他顶点全部钝化。然后收到消息的顶点3和顶点6都调用vprog方法,将收到的消息 与 自身的属性合并。如下图2所示。到此第一次迭代结束。
    在这里插入图片描述

    第二次迭代开始:
    顶点3 给 顶点6 发送消息失败,顶点3 给 顶点2 发送消息成功,此时 顶点3 成功发送消息,顶点2 成功接收消息,所以顶点2 和 顶点3 都成为激活状态,其他顶点都成为钝化状态。然后顶点2 调用vprog方法,将收到的消息 与 自身的属性合并。 见图3. 至此第二次迭代结束。
    在这里插入图片描述

    第三次迭代开始:
    顶点3分别发送消息给顶点2失败 和 顶点6失败,顶点2 分别发消息给 顶点1成功、顶点4成功、顶点5失败 ,所以 顶点2、顶点1、顶点4 成为激活状态,其他顶点为钝化状态。顶点1 和 顶点4分别调用vprog方法,将收到的消息 与 自身的属性合并。见图4。至此第三次迭代结束
    在这里插入图片描述

    第四次迭代开始:
    顶点2 分别发送消息给 顶点1失败 和 顶点4失败。顶点4 给 顶点1发送消息成功,顶点1 和 顶点4 进入激活状态,其他顶点进入钝化状态。顶点1 调用vprog方法,将收到的消息 与 自身的属性合并 。见图5
    在这里插入图片描述

    第五次迭代开始:
    顶点4 再给 顶点1发送消息失败,顶点4 和 顶点1 进入钝化状态,此时全图都进入钝化状态。至此结束,见图6.
    在这里插入图片描述

附:

ConnectedComponents

算法链接:
GraphX之Connected Components


http://chatgpt.dhexx.cn/article/UIDmSOKm.shtml

相关文章

03 graphx 从 SSSP 来看 pregel

前言 呵呵 最近刚好有一些需要使用到 图的相关计算 然后 在其他文章中找到了一篇 关于最短路径的graphx计算的代码 spark graphx 最短路径及中间节点 呵呵 很久没有用这些东西了, 虽然只是简单的使用, 但是还是要 复习一下, 稍微理解一下 他的执行方式 pregel 相关论文 …

ArangoDB(四)Pregel

arango pregel.status()返回值 localhost:8529_system> pregel.status(1099521660554) {"state" : "done","gss" : 7,"totalRuntime" : 8.389497518539429,"aggregators" : {},"sendCount" : 392647,"re…

graphx中的pregel原理详解

优秀参考&#xff1a; graphx教程参考&#xff1a;https://www.jianshu.com/p/ad5cedc30ba4 pergel函数详细讲解&#xff1a;https://blog.csdn.net/hanweileilei/article/details/89764466 迪杰斯特拉原理简介&#xff1a;https://www.jianshu.com/p/ad5cedc30ba4 ps: 以最…

Pregel模型

简介 在Hadoop兴起之后&#xff0c;google又发布了三篇研究论文&#xff0c;分别阐述了了Caffeine、Pregel、Dremel三种技术&#xff0c;这三种技术也被成为google的新“三驾马车”&#xff0c;其中的Pregel是google提出的用于大规模分布式图计算框架。主要用于图遍历&#xf…

Spark Graphx Pregel(pregel参数详解,pregel调用实现过程的详细解释)

Spark Graphx Pregel 一.Pregel概述1.什么是pregel&#xff1f;2.pregel应用场景 二.Pregel源码及参数解释1.源码2.参数详细解释&#xff08;1&#xff09;initialMsg&#xff08;2&#xff09;maxIteration&#xff08;3&#xff09;activeDirection&#xff08;4&#xff09;…

2020.11.26课堂笔记(sparkGraphx算法之pregel)

参考博客&#xff1a;https://blog.csdn.net/hanweileilei/article/details/89764466 大佬博客写的很详细&#xff0c;不用继续看这篇了&#xff0c;随便写一些记录一下。 Pregel框架&#xff1a; Pregel是一种面向图算法的分布式编程框架&#xff0c;采用迭代的计算模型&…

Pregel(图计算)技术原理

图计算简介 图结构数据&#xff1a; 许多大数据都是以大规模图或网络的形式呈现。许多非图结构的大数据&#xff0c;也常常会被转换为图模型后进行分析。图数据结构很好地表达了数据之间的关联性。关联性计算是大数据计算的核心——通过获得数据的关联性&#xff0c;可以从噪…

python bar函数

bar(left, height, width, color, align, yerr)函数&#xff1a;绘制柱形图。left为x轴的位置序列&#xff0c;一般采用arange函数产生一个序列&#xff1b;height为y轴的数值序列&#xff0c;也就是柱形图的高度&#xff0c;一般就是我们需要展示的数据&#xff1b;width为柱形…

C++ 函数模板

函数模板是通用的函数描述&#xff0c;它们使用泛型来定义函数&#xff0c;其中的泛型可用具体的类型替换。通过将类型作为参数传递给模板&#xff0c;可使编译器生成该类型的函数。由于模板允许以泛型&#xff08;而不是具体类型&#xff09;的方式编写程序&#xff0c;因此有…

lead窗口函数

lead函数在Impala中可以配合over使用&#xff0c;lead函数有三个参数 lead(property,num,default) 第一个参数「property」标识想查询的列&#xff0c;「num」标识相对于当前行的第num行&#xff0c;第三个参数是默认值。 举例&#xff1a; -- 建表 CREATE TABLE test(id s…

C++ 仿函数

文章目录 1.由来2.定义3.实例参考文献 1.由来 我们先从一个非常简单的问题入手&#xff0c;来了解为什么要有仿函数。 假设我们现在有一个数组&#xff0c;数组中存有任意数量的数字&#xff0c;我们希望能够统计出这个数组中大于 10 的数字的数量&#xff0c;你的代码很可能…

心形函数的几种表达式

用两个函数表示&#xff1a; f(x)sqrt(1-(abs(x)-1)^2) h(x)-2*sqrt(1-0.5*abs(x)) 也可以根据图中的q(x)画出心形的内部&#xff1a; q(x)(f(x)-h(x))/2*cos(200*x)(f(x)h(x))/2 带入得&#xff1a; 用一个函数表示&#xff0c;我拟合了很久才画出来的&#xff1a; f(x)…

共轭函数

共轭函数在最近火的不行的Gan生成对抗神经网络进阶版本的数学推理中有着神奇的作用&#xff0c;因此在这边记录下。 共轭函数的定义为&#xff1a; f ∗ ( t ) max ⁡ x ∈ dom ⁡ ( f ) { x t − f ( x ) } f ^ { * } ( t ) \max _ { x \in \operatorname { dom } ( f ) }…

高斯函数解析

高斯函数广泛应用于统计学领域&#xff0c;用于表述正态分布&#xff0c;在信号处理领域&#xff0c;用于定义高斯滤波器&#xff0c;在图像处理领域&#xff0c;二维高斯核函数常用于高斯模糊&#xff0c;在数学领域&#xff0c;主要用于解决热力方程和扩散方程。 https://blo…

PostgreSQL 函数

PostgreSQL 函数 函数的定义 使用函数&#xff0c;可以极大的提高用户对数据库的管理效率。函数表示输入参数表示一个具有特定关系的值。 一、数学函数 绝对值函数、三角函数、对数函数、随机函数等&#xff0c;当有错误产生时&#xff0c;数学函数会返回null值。 二、函数…

EXCEL IFS函数简单使用

IFS函数的使用&#xff1a; 在学生成绩以及绩效考核中&#xff0c;我们需要对每个范围的成绩打分。比如【A】,【B】,【C】,【D】。可以使用【IFS()函数】完成操作。 1&#xff1a;选择单元格【C2】&#xff0c;输入【】&#xff0c;点击【fx】&#xff0c;弹出【插入函数】对话…

函数的返回值

1.什么是函数的返回值? print 和 return 的区别,print 仅仅是打印在控制台,而 return 则是将 return 后面的部分作为返回值作为函数的输出 可以用变量接走,继续使用该返回值做其它事 函数需要先定义后调用,函数体中 return 语句的结果就是返回值 如果一个函数没有 reutrn…

反双曲函数

Chapter10&#xff1a;反双曲函数 10.3 反双曲函数10.3.1 反双曲正弦函数【 yarsinh(x) 】反双曲正弦函数图像反双曲正弦函数的指数形式反双曲正弦函数的对数形式推导反双曲正弦函数的导数推导 10.3.2 反双曲余弦函数【 yarcosh(x) 】反双曲余弦函数图像反双曲余弦函数的指数形…

损失函数作用

前言&#xff1a;损失函数是机器学习里最基础也是最为关键的一个要素&#xff0c;通过对损失函数的定义、优化&#xff0c;就可以衍生到我们现在常用的机器学习等算法中 损失函数的作用&#xff1a;衡量模型模型预测的好坏。 正文&#xff1a; 首先我们假设要预测一个公司某商品…

Python自定义函数

一、自定义函数的固定语句 def contrast(a,b) : #使用def来定义一个名称为contrast的方法,a与b的值是两个变量&#xff0c;称为形参if a>b : #使用条件语句进行判定return a #返回a的值elif b>a :return b #返回b的值else:return (ab) …