分布式机器学习:PageRank算法的并行化实现(PySpark)

2023-11-11 11:50

本文主要是介绍分布式机器学习:PageRank算法的并行化实现(PySpark),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

🚀 优质资源分享 🚀

学习路线指引(点击解锁)知识定位人群定位
🧡 Python实战微信订餐小程序 🧡进阶级本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。
💛Python量化交易实战💛入门级手把手带你打造一个易扩展、更安全、效率更高的量化交易系统

1. PageRank的两种串行迭代求解算法

我们在博客《数值分析:幂迭代和PageRank算法(Numpy实现)》算法中提到过用幂法求解PageRank。
给定有向图


我们可以写出其马尔科夫概率转移矩阵MMM(第iii列对应对iii节点的邻居并沿列归一化)

⎛⎝⎜⎜01212001100⎞⎠⎟⎟(00112001210)\left(\begin{array}{lll}
0 & 0 & 1 \
\frac{1}{2} & 0 & 0 \
\frac{1}{2} & 1 & 0
\end{array}\right)
然后我们定义Google矩阵为

G=qnE+(1−q)MG=qnE+(1−q)MG=\frac{q}{n} E+(1-q) M
此处qqq为上网者从一个页面转移到另一个随机页面的概率(一般为0.15),1−q1−q1-q 为点击当前页面上链接的概率,EEE为元素全1的n×nn×nn\times n 矩阵( nnn 为节点个数)。

而PageRank算法可以视为求解Google矩阵占优特征值(对于随机矩阵而言,即1)对应的特征向量。设初始化Rank向量为 xxx( xixix_i 为页面iii的Rank值),则我们可以采用幂法来求解:

xt+1=Gxtxt+1=Gxtx_{t+1}=G x_{t}
(每轮迭代后要归一化)

现实场景下的图大多是稀疏图,即MMM是稀疏矩阵。幂法中计算 (1−q)Mxt(1−q)Mxt(1-q)Mx_t ,对于节点 iii 需使用reduceByKey()(key为节点编号)操作。计算 qnExtqnExt\frac{q}{n}{E}x_t 则需要对所有节点的Rank进行reduce()操作,操作颇为繁复。

PageRank还有一种求解算法(名字就叫“迭代算法”),它的迭代形式如下:

xt+1=qn1+(1−q)Mxtxt+1=qn1+(1−q)Mxtx_{t+1} = \frac{q}{n}\bm{1} + (1-q)Mx_t
可以看到,这种迭代方法就规避了计算 qnExtqnExt\frac{q}{n}Ex_t,通信开销更小。我们接下来就采用这种迭代形式。

2. 图划分的两种方法

目前对图算法进行并行化的主要思想是将大图切分为多个子图,然后将这些子图分布到不同的机器上进行并行计算,在必要时进行跨机器通信同步计算得出结果。学术界和工业界提出了多种将大图切分为子图的划分方法,主要包括两种,边划分(Edge Cut)和点划分(Vertex Cut)。

2.1 边划分

如下图所示,边划分是对图中某些边进行切分。具体在Pregel[1]图计算框架中,每个分区包含一些节点和节点的出边;在GraphLab[2]图计算框架中,每个分区包含一些节点、节点的出边和入边,以及这些节点的邻居节点。边划分的优点是可以保留节点的邻居信息,缺点是容易出现划分不平衡,如对于度很高的节点,其关联的边都被划分到一个分区中,造成其他分区中的边可能很少。另外,如下图最右边的图所示,边划分可能存在边冗余。

2.2 点划分

如下图所示,点划分是对图中某些点进行切分,得到多个图分区,每个分区包含一部分边,以及与边相关联的节点。具体地,PowerGraph[3],GraphX[4]等框架采用点划分,被划分的节点存在多个分区中。点划分的优缺点与边划分的优缺点正好相反,可以将边较为平均地分配到不同机器中,但没有保留节点的邻居关系。


总而言之,边划分将节点分布到不同机器中(可能划分不平衡),而点划分将边分布到不同机器中(划分较为平衡)。接下来我们使用的算法为类似Pregel的划分方式,使用边划分。我们下面的算法是简化版,没有处理悬挂节点的问题。

3. 对迭代算法的并行化

我们将Rank向量用均匀分布初始化(也可以用全1初始化,不过就不再以概率分布的形式呈现),设分区数为3,算法总体迭代流程可以表示如下:

注意,图中flatMap()步骤中,节点iii计算其contribution(贡献度):(xt)i/|Ni|(xt)i/|Ni|(x_t)_i/|\mathcal{N}_i|,并将贡献度发送到邻居集合NiNi\mathcal{N}_i中的每一个节点。之后,将所有节点收到的贡献度使用reduceByKey()(节点编号为key)规约后得到向量xx\hat{x},和串行算法中MxtMxtMx_t的对应关系如下图所示:

并按照公式xt+1=qn+(1−q)xxt+1=qn+(1−q)xx_{t+1} = \frac{q}{n} + (1-q)\hat{x}来计算节点的Rank向量。然后继续下一轮的迭代过程。

4. 编程实现

用PySpark对PageRank进行并行化编程实现,代码如下:

import re
import sys
from operator import add
from typing import Iterable, Tuplefrom pyspark.resultiterable import ResultIterable
from pyspark.sql import SparkSessionn_slices = 3  # Number of Slices
n_iterations = 10  # Number of iterations
q = 0.15 #the default value of q is 0.15def computeContribs(neighbors: ResultIterable[int], rank: float) -> Iterable[Tuple[int, float]]:# Calculates the contribution(rank/num\_neighbors) of each vertex, and send it to its neighbours.num_neighbors = len(neighbors)for vertex in neighbors:yield (vertex, rank / num_neighbors)if __name__ == "\_\_main\_\_":# Initialize the spark context.spark = SparkSession\.builder\.appName("PythonPageRank")\.getOrCreate()# link: (source\_id, dest\_id)links = spark.sparkContext.parallelize([(1, 2), (1, 3), (2, 3), (3, 1)],n_slices)                       # drop duplicate links and convert links to an adjacency list.adj_list = links.distinct().groupByKey().cache()# count the number of vertexesn_vertexes = adj_list.count()# init the rank of each vertex, the default is 1.0/n\_vertexesranks = adj_list.map(lambda vertex_neighbors: (vertex_neighbors[0], 1.0/n_vertexes))# Calculates and updates vertex ranks continuously using PageRank algorithm.for t in range(n_iterations):# Calculates the contribution(rank/num\_neighbors) of each vertex, and send it to its neighbours.contribs = adj_list.join(ranks).flatMap(lambda vertex_neighbors_rank: computeContribs(vertex_neighbors_rank[1][0], vertex_neighbors_rank[1][1]  # type: ignore[arg-type]))# Re-calculates rank of each vertex based on the contributions it receivedranks = contribs.reduceByKey(add).mapValues(lambda rank: q/n_vertexes + (1 - q)*rank)# Collects all ranks of vertexs and dump them to console.for (vertex, rank) in ranks.collect():print("%s has rank: %s." % (vertex, rank))spark.stop()

运行结果如下:

1 has rank: 0.38891305880091237.  
2 has rank: 0.214416470596171.
3 has rank: 0.3966704706029163.

该Rank向量与我们采用串行幂法得到的Rank向量 R=(0.38779177,0.21480614,0.39740209)TR=(0.38779177,0.21480614,0.39740209)TR=(0.38779177,0.21480614,0.39740209)^{T} 近似相等,说明我们的并行化算法运行正确。

参考

  • [1] Malewicz G, Austern M H, Bik A J C, et al. Pregel: a system for large-scale graph processing[C]//Proceedings of the 2010 ACM SIGMOD International Conference on Management of data. 2010: 135-146.

  • [2] Low Y, Gonzalez J, Kyrola A, et al. Distributed graphlab: A framework for machine learning in the cloud[J]. arXiv preprint arXiv:1204.6078, 2012.

  • [3] Gonzalez J E, Low Y, Gu H, et al. {PowerGraph}: Distributed {Graph-Parallel} Computation on Natural Graphs[C]//10th USENIX symposium on operating systems design and implementation (OSDI 12). 2012: 17-30.

  • [4] Spark: GraphX Programming Guide

  • [5] GiHub: Spark官方Python样例

  • [6] 许利杰,方亚芬. 大数据处理框架Apache Spark设计与实现[M]. 电子工业出版社, 2021.

  • [7] Stanford CME 323: Distributed Algorithms and Optimization (Lecture 15)

  • [8] wikipedia: PageRank

  • [9] 李航. 统计学习方法(第2版)[M]. 清华大学出版社, 2019.

  • [10] Timothy sauer. 数值分析(第2版)[M].机械工业出版社, 2018.

    • 1. PageRank的两种串行迭代求解算法
  • 2. 图划分的两种方法

  • 2.1 边划分

  • 2.2 点划分

  • 3. 对迭代算法的并行化

  • 4. 编程实现

  • 参考

    __EOF__

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0ARIwS4w-1654298342659)(https://blog.csdn.net/orion-orion)]猎户座 - 本文链接: https://blog.csdn.net/orion-orion/p/16340839.html

  • 关于博主: 本科CS系蒟蒻,机器学习半吊子,并行计算混子。
  • 版权声明: 欢迎您对我的文章进行转载,但请务必保留原始出处哦()。
  • 声援博主: 如果您觉得文章对您有帮助,可以点击文章右下角**【[推荐](javascript:void(0)😉】**一下。

这篇关于分布式机器学习:PageRank算法的并行化实现(PySpark)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/389852

相关文章

C#实现系统信息监控与获取功能

《C#实现系统信息监控与获取功能》在C#开发的众多应用场景中,获取系统信息以及监控用户操作有着广泛的用途,比如在系统性能优化工具中,需要实时读取CPU、GPU资源信息,本文将详细介绍如何使用C#来实现... 目录前言一、C# 监控键盘1. 原理与实现思路2. 代码实现二、读取 CPU、GPU 资源信息1.

SpringBoot实现动态插拔的AOP的完整案例

《SpringBoot实现动态插拔的AOP的完整案例》在现代软件开发中,面向切面编程(AOP)是一种非常重要的技术,能够有效实现日志记录、安全控制、性能监控等横切关注点的分离,在传统的AOP实现中,切... 目录引言一、AOP 概述1.1 什么是 AOP1.2 AOP 的典型应用场景1.3 为什么需要动态插

Oracle查询优化之高效实现仅查询前10条记录的方法与实践

《Oracle查询优化之高效实现仅查询前10条记录的方法与实践》:本文主要介绍Oracle查询优化之高效实现仅查询前10条记录的相关资料,包括使用ROWNUM、ROW_NUMBER()函数、FET... 目录1. 使用 ROWNUM 查询2. 使用 ROW_NUMBER() 函数3. 使用 FETCH FI

Python脚本实现自动删除C盘临时文件夹

《Python脚本实现自动删除C盘临时文件夹》在日常使用电脑的过程中,临时文件夹往往会积累大量的无用数据,占用宝贵的磁盘空间,下面我们就来看看Python如何通过脚本实现自动删除C盘临时文件夹吧... 目录一、准备工作二、python脚本编写三、脚本解析四、运行脚本五、案例演示六、注意事项七、总结在日常使用

Java实现Excel与HTML互转

《Java实现Excel与HTML互转》Excel是一种电子表格格式,而HTM则是一种用于创建网页的标记语言,虽然两者在用途上存在差异,但有时我们需要将数据从一种格式转换为另一种格式,下面我们就来看看... Excel是一种电子表格格式,广泛用于数据处理和分析,而HTM则是一种用于创建网页的标记语言。虽然两

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

使用Python实现在Word中添加或删除超链接

《使用Python实现在Word中添加或删除超链接》在Word文档中,超链接是一种将文本或图像连接到其他文档、网页或同一文档中不同部分的功能,本文将为大家介绍一下Python如何实现在Word中添加或... 在Word文档中,超链接是一种将文本或图像连接到其他文档、网页或同一文档中不同部分的功能。通过添加超

windos server2022里的DFS配置的实现

《windosserver2022里的DFS配置的实现》DFS是WindowsServer操作系统提供的一种功能,用于在多台服务器上集中管理共享文件夹和文件的分布式存储解决方案,本文就来介绍一下wi... 目录什么是DFS?优势:应用场景:DFS配置步骤什么是DFS?DFS指的是分布式文件系统(Distr

NFS实现多服务器文件的共享的方法步骤

《NFS实现多服务器文件的共享的方法步骤》NFS允许网络中的计算机之间共享资源,客户端可以透明地读写远端NFS服务器上的文件,本文就来介绍一下NFS实现多服务器文件的共享的方法步骤,感兴趣的可以了解一... 目录一、简介二、部署1、准备1、服务端和客户端:安装nfs-utils2、服务端:创建共享目录3、服

C#使用yield关键字实现提升迭代性能与效率

《C#使用yield关键字实现提升迭代性能与效率》yield关键字在C#中简化了数据迭代的方式,实现了按需生成数据,自动维护迭代状态,本文主要来聊聊如何使用yield关键字实现提升迭代性能与效率,感兴... 目录前言传统迭代和yield迭代方式对比yield延迟加载按需获取数据yield break显式示迭