SparkCore(10):uv/pv实例

2024-05-24 11:38
文章标签 实例 pv uv sparkcore

本文主要是介绍SparkCore(10):uv/pv实例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.统计样例

2013-05-19 13:00:00	http://www.taobao.com/17/?tracker_u=1624169&type=1	B58W48U4WKZCJ5D1T3Z9ZY88RU7QA7B1	http://hao.360.cn/	1.196.34.243	NULL	-1
2013-05-19 13:00:00	http://www.taobao.com/item/962967_14?ref=1_1_52_search.ctg_1	T82C9WBFB1N8EW14YF2E2GY8AC9K5M5P	http://www.yihaodian.com/ctg/s2/c24566-%E5%B1%B1%E6%A5%82%E5%88%B6%E5%93%81?ref=pms_15_78_258	222.78.246.228	134939954	156
2013-05-19 13:00:00	http://www.taobao.com/1/?tracker_u=1013304189&uid=2687512&type=3	W17C89RU8DZ6NMN7JD2ZCBDMX1CQVZ1W	http://www.yihaodian.com/1/?tracker_u=1013304189&uid=2687512&type=3	118.205.0.18	NULL	-20

2.代码

2.1 SparkUtil 

package SparkUtilimport org.apache.spark.{SparkConf, SparkContext}/*** Created by ibf on 2018/7/18.*/
object SparkUtil {def createSparkContext(isLocal:Boolean,appName:String): SparkContext ={if(isLocal) {val conf = new SparkConf().setAppName(appName).setMaster("local[2]")val  sc = SparkContext.getOrCreate(conf)val ssc=SparkContext.getOrCreate(conf)sc}else{val conf = new SparkConf().setAppName(appName)val sc = SparkContext.getOrCreate(conf)sc}}}

 

2.2 SparkPVAndUV 

package _0722rddimport SparkUtil.SparkUtil
import org.apache.spark.rdd.RDD/*** */
object SparkPVAndUV {def main(args: Array[String]) {val sc = SparkUtil.createSparkContext(true,"SparkPVAndUV")
//    val path = "hdfs://192.168.244.101:8020/page_views.data"val path = "hdfs://192.168.31.3:8020/page_views.data"val originalRdd: RDD[String] = sc.textFile(path)//因为缓存不是立即操作的api,只有当调用了这块缓存的数据才会cacheoriginalRdd.cache()//originalRdd.count()//某些固定的值,应该要写在配置中,然后通过读取配置来获取val arrLen = 7val timeLen = 16//处理过后的rddval mappedRdd: RDD[(String, String, String)] = originalRdd.map(_.split("\t")).filter(arr =>{arr.length == arrLen && arr(0).trim.length > timeLen && arr(1).length > 0}).map(arr =>{//每分钟的pvval date = arr(0).substring(0,16)val url = arr(1).trimval guid = arr(2).trim(date,url,guid)})mappedRdd.cache()mappedRdd.count()//一、计算PV/*** 其实计算pv只要维度(date)和url*///XXXByKey的操作是针对于PairRdd(二元组rdd)才能实现的,val resultRdd = mappedRdd.map(t => (t._1,t._2)).groupByKey().map {//date就是日期,itr是迭代器,里面把相同日期的value全部放到一起case (date, itr) => {(date, itr.size)}}//resultRdd结果:Array[(String, Int)] = Array((2013-05-19 13:35,3504))//    resultRdd.foreach(println)//思考:groupByKey这样的API,有没有什么其他API可以实现这个功能,他们之间的性能比较//这段代码有哪些地方是可以优化的/*** 优化groupByKey: grouByKey 这个api性能不是特别好*      会把相同key的所有数据全部放到同一个迭代器中,数据倾斜*      API可以替换,*      是否可以不保留url的值,直接写1,然后用于后面的count*///def reduceByKey(func: (V, V) => V, numPartitions: Int)/*** 这里有一个numPartitions可以指定,分区数量* executor 5 个core  就可以并行计算5个分区的数据* 当数量大的时候,甚至出现数据倾斜的时候,可以通过增加分区数量来缓解每个task的计算压力*/val pvRdd = mappedRdd.map(t => (t._1,1))    //mappedRdd.map(t => (t._1,1))为Array[(String, Int)] = Array((2013-05-19 13:00,1), (2013-05-19 13:00,1)).reduceByKey(_ + _,5)pvRdd.foreach(println)    //结果是Array((2013-05-19 13:07,3486), (2013-05-19 13:16,3395))Thread.sleep(100000l)originalRdd.unpersist()mappedRdd.unpersist()//=================================================================================================//二、计算uv/*** uv应该如何计算?count  distinct   groupby(XXX,xxx)* select count(distinct XXX) as uv from XX group by XXX* select count(1) from (select XXX from group by xxx ) tb* 在什么场景下应该用哪一种呢* key较为分散的情况下使用groupByKey, key较为集中的情况下使用reduceByKey*//**方法一**//* val uvRdd = mappedRdd.filter(t => t._3.nonEmpty).map(t => {//把什么作为key然后进行聚合,每分钟的uv(t._1,t._3)}).groupByKey().map({case (date,itr) =>{(date,itr.toSet.size)}})*//**方法二:是否可以使用reduceByKey来做去重呢?* 我只想知道在同一个时间段内出现了多少key,key出现的次数,并不不关注* spark rdd的api的时候,要关注,你的key是什么?*///(date,url,uid)val uvRdd = mappedRdd.filter(t => t._3.nonEmpty)//((date,uid),1) 下面这个是去重操作.map(t => ((t._1,t._3),1)).reduceByKey({case (a,b) => a})//进行第二次聚合  ((13:01,uid1),1),((13:01,uid2),1),((13:01,uid3),1)//想要得到(13:01,3).map({case ((date,uid),int) =>{(date,1)}}).reduceByKey(_ + _)/*** 方法三:spark常用去重API*//*    val uvRdd = mappedRdd.filter(t => t._3.nonEmpty).map(t => (t._1,t._3)).distinct(10).map(t =>(t._1,1)).reduceByKey(_ + _)*///uvRdd.foreach(println)//使用外联,计算出值的就保留值,没计算出来的就给定默认值-1/*** select date,* (case when pv is not null*  then pv*  else*  -1) as pv,*  (case when uv is not null*  then uv*  else*  -1) as uv from (select date,pv from A full join B on A.date = B.date) tb*/
//    val resultRdd: RDD[(String, Int, Int)] = pvRdd.fullOuterJoin(uvRdd)
//        .map({
//      case(date,(optpv,optuv)) =>{
//        (date,optpv.getOrElse(-1),optuv.getOrElse(-1))
//      }
//    }).coalesce(1)
//    resultRdd.foreach(println)//==================================================================================
//================输出==============================================================
//    resultRdd.foreach(println)
//    resultRdd.saveAsTextFile(s"hdfs://192.168.244.101:8020/" +
//      s"spark/sparkPVUV_${System.currentTimeMillis()}")//    Thread.sleep(100000l)//    originalRdd.unpersist()
//    mappedRdd.unpersist()Thread.sleep(100000000l)}
}

 

这篇关于SparkCore(10):uv/pv实例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/998301

相关文章

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

C++操作符重载实例(独立函数)

C++操作符重载实例,我们把坐标值CVector的加法进行重载,计算c3=c1+c2时,也就是计算x3=x1+x2,y3=y1+y2,今天我们以独立函数的方式重载操作符+(加号),以下是C++代码: c1802.cpp源代码: D:\YcjWork\CppTour>vim c1802.cpp #include <iostream>using namespace std;/*** 以独立函数

实例:如何统计当前主机的连接状态和连接数

统计当前主机的连接状态和连接数 在 Linux 中,可使用 ss 命令来查看主机的网络连接状态。以下是统计当前主机连接状态和连接主机数量的具体操作。 1. 统计当前主机的连接状态 使用 ss 命令结合 grep、cut、sort 和 uniq 命令来统计当前主机的 TCP 连接状态。 ss -nta | grep -v '^State' | cut -d " " -f 1 | sort |

Java Websocket实例【服务端与客户端实现全双工通讯】

Java Websocket实例【服务端与客户端实现全双工通讯】 现很多网站为了实现即时通讯,所用的技术都是轮询(polling)。轮询是在特定的的时间间隔(如每1秒),由浏览器对服务器发 出HTTP request,然后由服务器返回最新的数据给客服端的浏览器。这种传统的HTTP request 的模式带来很明显的缺点 – 浏 览器需要不断的向服务器发出请求,然而HTTP

828华为云征文|华为云Flexus X实例docker部署rancher并构建k8s集群

828华为云征文|华为云Flexus X实例docker部署rancher并构建k8s集群 华为云最近正在举办828 B2B企业节,Flexus X实例的促销力度非常大,特别适合那些对算力性能有高要求的小伙伴。如果你有自建MySQL、Redis、Nginx等服务的需求,一定不要错过这个机会。赶紧去看看吧! 什么是华为云Flexus X实例 华为云Flexus X实例云服务是新一代开箱即用、体

LLVM入门2:如何基于自己的代码生成IR-LLVM IR code generation实例介绍

概述 本节将通过一个简单的例子来介绍如何生成llvm IR,以Kaleidoscope IR中的例子为例,我们基于LLVM接口构建一个简单的编译器,实现简单的语句解析并转化为LLVM IR,生成对应的LLVM IR部分,代码如下,文件名为toy.cpp,先给出代码,后面会详细介绍每一步分代码: #include "llvm/ADT/APFloat.h"#include "llvm/ADT/S

OpenStack离线Train版安装系列—11.5实例使用-Cinder存储服务组件

本系列文章包含从OpenStack离线源制作到完成OpenStack安装的全部过程。 在本系列教程中使用的OpenStack的安装版本为第20个版本Train(简称T版本),2020年5月13日,OpenStack社区发布了第21个版本Ussuri(简称U版本)。 OpenStack部署系列文章 OpenStack Victoria版 安装部署系列教程 OpenStack Ussuri版

OpenStack实例操作选项解释:启动和停止instance实例

关于启动和停止OpenStack实例 如果你想要启动和停止OpenStack实例时,有四种方法可以考虑。 管理员可以暂停、挂起、搁置、停止OpenStack 的计算实例。但是这些方法之间有什么不同之处? 目录 关于启动和停止OpenStack实例1.暂停和取消暂停实例2.挂起和恢复实例3.搁置(废弃)实例和取消废弃实例4.停止(删除)实例 1.暂停和取消暂停实例

Cmake之3.0版本重要特性及用法实例(十三)

简介: CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布:《Android系统多媒体进阶实战》🚀 优质专栏: Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏: 多媒体系统工程师系列【原创干货持续更新中……】🚀 优质视频课程:AAOS车载系统+AOSP14系统攻城狮入门视频实战课 🚀 人生格言: 人生从来没有捷径,只有行动才是治疗恐惧

实例demo理解面向接口思想

浅显的理解面向接口编程 Android开发的语言是java,至少目前是,所以理解面向接口的思想是有必要的。下面通过一个简单的例子来理解。具体的概括我也不知道怎么说。 例子: 现在我们要开发一个应用,模拟移动存储设备的读写,即计算机与U盘、MP3、移动硬盘等设备进行数据交换。已知要实现U盘、MP3播放器、移动硬盘三种移动存储设备,要求计算机能同这三种设备进行数据交换,并且以后可能会有新的第三方的