Spark_Hive 累加统计函数 sum() over()

2024-05-03 06:08
文章标签 统计 函数 累加 hive sum spark

本文主要是介绍Spark_Hive 累加统计函数 sum() over(),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

spark 累加历史主要用到了窗口函数,而进行全部统计,则需要用到rollup函数

1 应用场景:

1、我们需要统计用户的总使用时长(累加历史)

2、前台展现页面需要对多个维度进行查询,如:产品、地区等等

3、需要展现的表格头如: 产品、2015-04、2015-05、2015-06

2 原始数据:

product_codeevent_dateduration
14382016-05-13165
14382016-05-14595
14382016-05-15105
16292016-05-1312340
16292016-05-1413850
16292016-05-15227

3 业务场景实现

3.1 业务场景1:累加历史:

如数据源所示:我们已经有当天用户的使用时长,我们期望在进行统计的时候,14号能累加13号的,15号能累加14、13号的,以此类推

3.1.1 spark-sql实现

//spark sql 使用窗口函数累加历史数据

sqlContext.sql(
"""select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_durationfrom userlogs_date
""").show
+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
+-----+----------+------------+

3.1.2 dataframe实现

//使用Column提供的over 函数,传入窗口操作
import org.apache.spark.sql.expressions._
val first_2_now_window = Window.partitionBy("pcode").orderBy("event_date")
df_userlogs_date.select($"pcode",$"event_date",sum($"duration").over(first_2_now_window).as("sum_duration")
).show+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
+-----+----------+------------+

3.1.3 扩展 累加一段时间范围内

实际业务中的累加逻辑远比上面复杂,比如,累加之前N天,累加前N天到后N天等等。以下我们来实现:

3.1.3.1 累加历史所有:

select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(Long.MinValue,0)
Window.partitionBy("pcode").orderBy("event_date")

上边四种写法完全相等

3.1.3.2 累加N天之前,假设N=3

//如果,不想要分区,想从每月的第一天累加的当前天 可以去掉partition
select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc rows between 3 preceding and current row) as sum_durationfrom userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,0) 

3.1.3.3 累加前N天,后M天: 假设N=3 M=5

select pcode,event_date,sum(duration) over (partition by pcode order byevent_date asc rows between 3 preceding and 5 following ) as sum_durationfrom userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,5)

3.1.3.4 累加该分区内所有行

select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc rows between unbounded preceding and unbounded following ) 
as sum_duration from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween
(Long.MinValue,Long.MaxValue)

总结如下:
preceding:用于累加前N行(分区之内)。若是从分区第一行头开始,则为 unbounded。 N为:相对当前行向前的偏移量
following :与preceding相反,累加后N行(分区之内)。若是累加到该分区结束,则为 unbounded。N为:相对当前行向后的偏移量
current row:顾名思义,当前行,偏移量为0
说明:上边的前N,后M,以及current row均会累加该偏移量所在行

3.1.3.4 实测结果

累加历史:分区内当天及之前所有 写法
1:select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc) as sum_duration from userlogs_date+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
+-----+----------+------------+
累加历史:分区内当天及之前所有 写法2:
select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc rows between unbounded preceding and current row) as 
sum_duration from userlogs_date+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
+-----+----------+------------+
累加当日和昨天:
select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc rows between 1 preceding and current row) as sum_durationfrom userlogs_date+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         700|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       14077|
+-----+----------+------------+
累加当日、昨日、明日:
select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc rows between 1 preceding and 1 following ) as sum_durationfrom userlogs_date+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         760|
| 1438|2016-05-14|         865|
| 1438|2016-05-15|         700|
| 1629|2016-05-13|       26190|
| 1629|2016-05-14|       26417|
| 1629|2016-05-15|       14077|
+-----+----------+------------+

累加分区内所有:当天和之前之后所有:
select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc rows between unbounded preceding and unbounded following )as sum_duration from userlogs_date+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         865|
| 1438|2016-05-14|         865|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       26417|
| 1629|2016-05-14|       26417|
| 1629|2016-05-15|       26417|
+-----+----------+------------+

3.2 业务场景2:统计全部3.2.1 spark sql实现//spark sql 使用rollup添加all统计
sqlContext.sql(
"""select pcode,event_date,sum(duration) as sum_durationfrom userlogs_date_1group by pcode,event_date with rolluporder by pcode,event_date
""").show()+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| null|      null|       27282|
| 1438|      null|         865|
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         595|
| 1438|2016-05-15|         105|
| 1629|      null|       26417|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       13850|
| 1629|2016-05-15|         227|
+-----+----------+------------

这篇关于Spark_Hive 累加统计函数 sum() over()的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/955992

相关文章

hdu1496(用hash思想统计数目)

作为一个刚学hash的孩子,感觉这道题目很不错,灵活的运用的数组的下标。 解题步骤:如果用常规方法解,那么时间复杂度为O(n^4),肯定会超时,然后参考了网上的解题方法,将等式分成两个部分,a*x1^2+b*x2^2和c*x3^2+d*x4^2, 各自作为数组的下标,如果两部分相加为0,则满足等式; 代码如下: #include<iostream>#include<algorithm

hdu1171(母函数或多重背包)

题意:把物品分成两份,使得价值最接近 可以用背包,或者是母函数来解,母函数(1 + x^v+x^2v+.....+x^num*v)(1 + x^v+x^2v+.....+x^num*v)(1 + x^v+x^2v+.....+x^num*v) 其中指数为价值,每一项的数目为(该物品数+1)个 代码如下: #include<iostream>#include<algorithm>

最大流=最小割=最小点权覆盖集=sum-最大点权独立集

二分图最小点覆盖和最大独立集都可以转化为最大匹配求解。 在这个基础上,把每个点赋予一个非负的权值,这两个问题就转化为:二分图最小点权覆盖和二分图最大点权独立集。   二分图最小点权覆盖     从x或者y集合中选取一些点,使这些点覆盖所有的边,并且选出来的点的权值尽可能小。 建模:     原二分图中的边(u,v)替换为容量为INF的有向边(u,v),设立源点s和汇点t

flume系列之:查看flume系统日志、查看统计flume日志类型、查看flume日志

遍历指定目录下多个文件查找指定内容 服务器系统日志会记录flume相关日志 cat /var/log/messages |grep -i oom 查找系统日志中关于flume的指定日志 import osdef search_string_in_files(directory, search_string):count = 0

C++操作符重载实例(独立函数)

C++操作符重载实例,我们把坐标值CVector的加法进行重载,计算c3=c1+c2时,也就是计算x3=x1+x2,y3=y1+y2,今天我们以独立函数的方式重载操作符+(加号),以下是C++代码: c1802.cpp源代码: D:\YcjWork\CppTour>vim c1802.cpp #include <iostream>using namespace std;/*** 以独立函数

hdu4267区间统计

题意:给一些数,有两种操作,一种是在[a,b] 区间内,对(i - a)% k == 0 的加value,另一种操作是询问某个位置的值。 import java.io.BufferedInputStream;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStream;import

hdu4417区间统计

给你一个数列{An},然后有m次查询,每次查询一段区间 [l,r] <= h 的值的个数。 import java.io.BufferedInputStream;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamRead

hdu3333区间统计

题目大意:求一个区间内不重复数字的和,例如1 1 1 3,区间[1,4]的和为4。 import java.io.BufferedInputStream;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;

实例:如何统计当前主机的连接状态和连接数

统计当前主机的连接状态和连接数 在 Linux 中,可使用 ss 命令来查看主机的网络连接状态。以下是统计当前主机连接状态和连接主机数量的具体操作。 1. 统计当前主机的连接状态 使用 ss 命令结合 grep、cut、sort 和 uniq 命令来统计当前主机的 TCP 连接状态。 ss -nta | grep -v '^State' | cut -d " " -f 1 | sort |

函数式编程思想

我们经常会用到各种各样的编程思想,例如面向过程、面向对象。不过笔者在该博客简单介绍一下函数式编程思想. 如果对函数式编程思想进行概括,就是f(x) = na(x) , y=uf(x)…至于其他的编程思想,可能是y=a(x)+b(x)+c(x)…,也有可能是y=f(x)=f(x)/a + f(x)/b+f(x)/c… 面向过程的指令式编程 面向过程,简单理解就是y=a(x)+b(x)+c(x)