#####带时间衰减因子#####应用实战: 如何利用Spark集群计算物品相似度

本文主要是介绍#####带时间衰减因子#####应用实战: 如何利用Spark集群计算物品相似度,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文是Spark调研笔记的最后一篇,以代码实例说明如何借助Spark平台高效地实现推荐系统CF算法中的物品相似度计算。

在推荐系统中,最经典的推荐算法无疑是协同过滤(Collaborative Filtering, CF),而item-cf又是CF算法中一个实现简单且效果不错的算法。
在item-cf算法中,最关键的步骤是计算物品之间的相似度。本文以代码实例来说明如何利用Spark平台快速计算物品间的余弦相似度。
Cosine Similarity是相似度的一种常用度量,根据《推荐系统实践》一书第2.4.2节关于Item-CF算法部分的说明,其计算公式如下:

举个例子,若对item1有过行为的用户集合为{u1, u2, u3},对item2有过行为的用户集合为{u1, u3, u4, u5},则根据上面的式子,item1和item2间的相似度为2/(3*4),其中分子的2是因为item1的user_list与item2的user_list的交集长度为2,即item1和item2的共现(co-occurence)次数是2。

在工程实现上,根据论文"Empirical Analysis of Predictive Algorithms for Collaborative Filtering"的分析,为对活跃用户做惩罚,引入了IUF (Inverse User Frequency)的概念(与TF-IDF算法引入IDF的思路类似:活跃用户对物品相似度的贡献应该小于不活跃的用户),因此,对余弦相似度做改进后相似度计算公式如下:

可以看到,上式分子部分的1/log(1 + N(u))体现了对活跃用户的惩罚。

此外,通常认为用户在相隔很短的时间内喜欢的物品具有更高相似度。因此,工程实现上,还会考虑时间衰减效应。一种典型的时间衰减函数如下所示:

最终,时间上下文相关的Item-CF算法中的相似度计算公式如下:

上式中,分母部分与标准的相似度公式分母保持一致;分子部分参与运算的是item_i和item_j的共现用户集合,其中,f(t)是时间衰减效应的体现,N(u)对活跃用户做了惩罚。

下面的Python代码是计算物品相似度的Spark任务的代码片段(从HDFS加载用户历史行为日志,计算物品相似度,相似列表取TopN,将相似度计算结果写会HDFS),供大家参考:

[python]  view plain  copy
  1. #!/bin/env/python  
  2.   
  3.   
  4. import pyspark as ps  
  5. import math  
  6. import datetime as dt  
  7. import util  
  8.   
  9.   
  10. def generate_item_pair(x):  
  11.     """ 
  12.         Find co-occurence items of every given user  
  13.         Return a tuple in the format of ((item_0, item_1), cooccurrence_factor). 
  14.     """  
  15.     items = x[1]  
  16.     item_cnt = len(items)  
  17.     alpha = 1  
  18.     for i in items:  
  19.         item1 = i[0]  
  20.         ts1   = i[1]  
  21.         for j in items:  
  22.             item2 = j[0]  
  23.             ts2   = j[1]  
  24.             if item1 != item2:  
  25.                 ## introduce time decay and penalize active users  
  26.                 ft = 1.0 / (1 + alpha * abs(ts1 - ts2))  
  27.                 yield ((item1, item2), (ft / math.log(1 + item_cnt)))  
  28.   
  29.   
  30. def compute_item_similarity(x):  
  31.     items = x[0]  
  32.     cooccurrence = float(x[1])  
  33.     item_dict = g_item_freq_d   
  34.     norm_factor = 5  
  35.     if items[0in item_dict and items[1in item_dict:  
  36.         freq_0 = item_dict[items[0]]  
  37.         freq_1 = item_dict[items[1]]  
  38.         ## calculate similarity between the item pair  
  39.         sim = cooccurrence / math.sqrt(freq_0 * freq_1)  
  40.         ## normalize similarity  
  41.         norm_sim = (cooccurrence / (cooccurrence + norm_factor)) * sim  
  42.         yield (items[0], (items[1], norm_sim))  
  43.   
  44.   
  45. def sort_items(x):  
  46.     """ 
  47.         For a given item, sort all items similar to it as descent (using similarity scores), take topN similar items, and return as the following format: 
  48.         given_item \t sorted_item_0$sorted_score_0,sorted_item_1$sorted_score_1,... 
  49.     """  
  50.     similar_items = list(x[1])  
  51.     if len(similar_items) > 0:  
  52.         ## sort list of (item, score) tuple by score from high to low  
  53.         similar_items.sort(key=lambda x: x[1], reverse=True)  
  54.         ## format the list of sorted items as a string  
  55.         similar_items_str = ",".join(["$".join(map(str,item)) for item in similar_items[0:50]])  
  56.         yield "\t".join([str(x[0]), similar_items_str])  
  57.   
  58.   
  59. def main():  
  60.     base_hdfs_uri = "hdfs://to/user/behavior/log"  
  61.     today = dt.date.today()  
  62.     knn_similarity_file = '%s/%s/knn_sim' % (base_hdfs_uri, today.strftime('%Y%m%d'))  
  63.   
  64.     sc = ps.SparkContext()  
  65.   
  66.     ## load user behavior from hdfs log  
  67.     ## each element in user_item is a tuple: (user, (item, timestamp))  
  68.     history_s = (today - dt.timedelta(8)).strftime('%Y%m%d')  
  69.     history_e = (today - dt.timedelta(2)).strftime('%Y%m%d')  
  70.     input_files = util.get_input_files(action='play', start=history_s, end=history_e)  
  71.     user_item = sc.textFile(",".join(input_files))\  
  72.         .mapPartitions(util.parse_user_item) \  
  73.         .map(lambda x: (x[0], (x[1], x[2]))) \  
  74.         .distinct() \  
  75.         .cache()  
  76.   
  77.     ## compute item frequency and store as a global dict  
  78.     item_freq = user_item.map(lambda x: (x[1][0], 1)) \  
  79.         .reduceByKey(lambda x, y: x + y) \  
  80.         .collect()  
  81.     global g_item_freq_d  
  82.     g_item_freq_d = dict()  
  83.     for x in item_freq:  
  84.         g_item_freq_d[x[0]] = x[1]  
  85.      
  86.     ## compute item similarity and find top n most similar items    
  87.     item_pair_sim = user_item.groupByKey() \  
  88.         .flatMap(generate_item_pair) \  
  89.         .reduceByKey(lambda x, y: x + y) \  
  90.         .flatMap(compute_item_similarity) \  
  91.         .groupByKey() \  
  92.         .flatMap(sort_items) \  
  93.         .cache()  
  94.   
  95.     ## dump to hdfs  
  96.     item_pair_sim.repartition(1).saveAsTextFile(knn_similarity_file)  
  97.   
  98.   
  99. if __name__ == '__main__':  
  100.     main()  

上面的代码中,import util中引入的util只是负责从HDFS获取用户历史日志的文件名列表,非常简单,实现细节这里不赘述。

【参考资料】
1. wikipedia: Collaborative filtering
2. 推荐系统实践(项亮著)第2.4.2节: 基于物品的协同过滤算法
3. Paper: Empirical Analysis of Predictive Algorithms for Collaborative Filtering
4. 推荐系统实践(项亮著)第5.1.6节: 时间上下文相关的ItemCF算法
5.  Spark Programming Guide

========================== EOF ===========================

这篇关于#####带时间衰减因子#####应用实战: 如何利用Spark集群计算物品相似度的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/967700

相关文章

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

HDFS—集群扩容及缩容

白名单:表示在白名单的主机IP地址可以,用来存储数据。 配置白名单步骤如下: 1)在NameNode节点的/opt/module/hadoop-3.1.4/etc/hadoop目录下分别创建whitelist 和blacklist文件 (1)创建白名单 [lytfly@hadoop102 hadoop]$ vim whitelist 在whitelist中添加如下主机名称,假如集群正常工作的节

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

csu 1446 Problem J Modified LCS (扩展欧几里得算法的简单应用)

这是一道扩展欧几里得算法的简单应用题,这题是在湖南多校训练赛中队友ac的一道题,在比赛之后请教了队友,然后自己把它a掉 这也是自己独自做扩展欧几里得算法的题目 题意:把题意转变下就变成了:求d1*x - d2*y = f2 - f1的解,很明显用exgcd来解 下面介绍一下exgcd的一些知识点:求ax + by = c的解 一、首先求ax + by = gcd(a,b)的解 这个

hdu1394(线段树点更新的应用)

题意:求一个序列经过一定的操作得到的序列的最小逆序数 这题会用到逆序数的一个性质,在0到n-1这些数字组成的乱序排列,将第一个数字A移到最后一位,得到的逆序数为res-a+(n-a-1) 知道上面的知识点后,可以用暴力来解 代码如下: #include<iostream>#include<algorithm>#include<cstring>#include<stack>#in

zoj3820(树的直径的应用)

题意:在一颗树上找两个点,使得所有点到选择与其更近的一个点的距离的最大值最小。 思路:如果是选择一个点的话,那么点就是直径的中点。现在考虑两个点的情况,先求树的直径,再把直径最中间的边去掉,再求剩下的两个子树中直径的中点。 代码如下: #include <stdio.h>#include <string.h>#include <algorithm>#include <map>#