一、main
- 子图匹配程序run.cpp中主要使用到worker.h和comper.h分别对应线程和进程单位,接下来我们从main函数入手解析源码
从主函数可以看到,子图匹配程序中GMatchWorker继承了worker,主函数声明了workerparams并且传入了路径和线程参数,设置了继承了Trimmer作为节点过滤的GMatchTrimmer和继承了作为结果汇总的Aggregator的GMatchAgg,然后运行worker中的run函数,可以看出work类似进程,在此处开启了worker进程。
二、worker
-
进入Worker.h中,查看worker的构造函数,我们传入了线程数,每个线程对应一个comper,req_counter中记录本worker到其他的worker的请求数量,同时声明一个新的CTable作为vertex的缓存,利用idle_set记录当前worker中线程comper是够空闲,初始化为不空闲。
-
接下来我们看worker中的run函数,开始如果为master worker需要对用户传入的输入路径进行一次检查。
-
接下来进行文件的读取,其中arrangement为记录分配到各节点进行读取的二维vector,每个vector对应一个worker,记录其该进行读取的文件,利用作者提供的put程序上传时可以将整个大文件进行分片,然后通过这里进行分配。master worker需要通过调用dispatchRan函数进行文件的分配。
-
dispatchRan函数首先对路径文件夹进行遍历,同时对得到的文件名和大小进行存储,接下来根据文件大小对文件进行升序排列,然后进行文件分配,分配原则为最大空闲空间有限策略,及对读取文件大小总量最小的worker有限分配,函数返回assignment指针。
-
返回之后的到assignment指针,得到分配策略,接下来调用masterscatter函数将得到的分配策略通过mpi通信将结果发送至其余slave节点
masterscatter函数主要利用ibinstream输入流将结果通过MPI通信接口发送至其他节点。 -
完成通信之后,master从arrangement中取出自己需要读取文件部分,调用load_graph函数对部分文件进行图加载。
load_graph函数中首先生成调用在run.cpp中重写的toVertex函数对读入的文件进行vertex转化,得到相应的vertex集合,接下来调用之前run.cpp重写的顶点剪枝器Trimmer对不满足条件的节点进行剪枝,留下满足条件的vertex集合。 -
接下来进行同步图的操作
sync_graph函数对本地读取起来的vertex集合进行hash分配,其中hash策略在vertex.h中进行了重写,将节点分配到了所有work中,其中调用了比较重要的all_to_all函数进行worker之间的通信,刚白分配都是所有worker将自己读到的节点进行了分配,这时候需要将所有分配到其他worker的vertex传送到其他节点,同时需要接收其他节点的传送过来的vertex。 -
vertex分配到各个worker之后则设置vertex位置初始化为0,同时将得到的vertexes结果设置为local_table
-
更新本地表后进行下一步,启动reqserver,作为请求服务器,接收其他worker对它的请求,接收到请求后从localtable得到vertex并返回给请求worker;然后开启计算线程,具体下一步说明;接着开启响应服务器,当从其他worker接收到vertex消息时,进行处理,并加到cachetable中并更新对应的task
三、Comper
-
从worker中调用了create_compers函数,comper类似线程的作用,一个进程对应多个线程,一个worker对应多个comper。
首先根据设定的线程数初始化comper数组,生成对应的taskmap数组,每个comper对应一个taskmap,然后设置为全局变量,同时global_tasknum_vec记录每个线程中的任务数量,并开启相应comper。start函数中开启了调用了run函数线程。
-
在run函数中主要实现了任务的执行过程,要满足cache容量充足和task数量不超过设置的task阈值的条件下进行task的执行
-
在pop_task中,当任务小于batch_num,需要进行任务的读取,task的读取中主要遵循以下三个优先级,首先调用file2queue函数从本地文件中读取任务到q_task中,该文件记录了超出任务最大数量之外的任务;如果本地磁盘不存在时则需要调用push_task_from_taskmap函数从taskmap中获取任务并且获取到task_buf为空,如果taskmap中为空,则需要调用locTable2queue将本地图中生成种子任务加到q_task中。当q_task为空时,无论是哪种情况,都会读任务到q_task中,然后通过读取q_task执行。
从本地获取任务时会通过global_vertex_pos类似访问指针的作用进行获取,每个task可以通过线程锁保证同步,每次对本地vertex进行以TASK_BATCH_NUM数量的访问,如果满足条件则生成相应的种子任务
push_task_from_taskmap函数中主要是从task_buf获取task,调用我们之前重写的compute函数,每次调用只会执行一次,对没有执行完的任务会加入q_task,执行完的task则进行删除,在run中重写的compute分为两步,在这里会在第一次执行完之后加入q_task中继续执行。 -
任务添加到q_task,当任务添加到队列时,判读是否达到3 * TASK_BATCH_NUM,如果达到则将2* TASK_BATCH_NUM到3 * TASK_BATCH_NUM的vertex输出到本地。
-
q_task中,task执行过程中需要进行vertex的拉取时,通过调用pull_all函数进行节点的拉取操作,拉取完之后task会加入到taskmap中的taskbuf通过compute函数继续进行计算,任务完成之后则删除相应的任务,这样循环执行完之后确定task队列中没有未完成的task时则完成任务。
当要拉取的点则先从本地进行检查,如果在远处即其他worker中则需要调用相应的函数获取远处的节点,主要通过之前实现的ReqServer和RespServer机制进行vertex的拉取。
拉取完之后则将task加入到taskmap的task_buf中,然后再通过push_task_from_taskmap进行task的执行。 -
所有task完成之后再次对taskmap进行检查,确定没有task之后则将该comper设为空闲。
四、 Worker续
-
查看程序,在steal_planning函数中实现了worker之间task的负载均衡,将繁忙worker中的task转移到task较少的worker中。
对于slave worker主要负责收master的消息,确定分配策略;master负责任务分配,分配原理主要是使用优先队列实现最大堆和最小堆,将worker根据task数量加到堆中。
分配策略为取出最大堆最多task中取出TASK_BATCH_NUM数量的vertex转移到最小堆的最小task中,然后将计划存到plans中,并将这些计划保存和发送到其他slave中。 -
分配计划完成之后则需要执行分配计划,接收task主要将收到的task输出到磁盘文件中,后面的task读取可以获取。发送task首先确定数量是否依然很大,然后先从磁盘生成种子task,生成不了再从磁盘文件中读取,如果都读不到则发送空消息。
-
当最后任务全部完成时,则需要通过master同步消息将全部空闲进行发送,然后slave和master一同关闭。