spark从入门到放弃五十二:Spark Streaming(12)结合spark Sql

2024-08-22 22:18

本文主要是介绍spark从入门到放弃五十二:Spark Streaming(12)结合spark Sql,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章地址:http://www.haha174.top/article/details/253627
1.简介

Spark Streaming 强大的地方在于,可以于spark core 和spark sql 整合使用,之前已经通过transform foreachRDD 等算子看到了 如何将DStream 种的RDD 使用spark core 执行批处理操作。现在就来看看 如何将spark sql 和spark Streaming 整合起来操作

2.案例

每隔10秒 ,统计最近60秒的,每个种类的每个商品的点击次数,然后统计出每个种类top3 热门的商品
下面给出

public class Top3HotProduct {public static void main(String[] args) throws InterruptedException {SparkConf conf=new SparkConf().setAppName("Top3HotProduct").setMaster("local[2]");JavaStreamingContext jssc=new JavaStreamingContext(conf, Durations.seconds(1));//首先看一下,输入日志的格式//leo  product1 category1//首先获取输入数据JavaReceiverInputDStream<String> lines=jssc.socketTextStream("www.codeguoj.cn",9999);JavaPairDStream<String,Integer> categoryProductDStream=lines.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) throws Exception {String[] prudoctSplited=s.split(" ");return new Tuple2<>(prudoctSplited[2]+"-"+prudoctSplited[1],1);}});//然后执行window//到这里,就可以做到,每隔10秒钟,对最近60秒的数据,执行reduceByKey  操作//计算出来这60秒内,每个种类的每个商品的点击次数JavaPairDStream<String,Integer> categoryProductDStreamed=categoryProductDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}},Durations.seconds(60),Durations.seconds(10));//然后针对60秒内的每个种类的每个商品的点击次数categoryProductDStreamed.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {@Overridepublic void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRDD) throws Exception {JavaRDD<Row> rowCategoryCount=    stringIntegerJavaPairRDD.map(new Function<Tuple2<String, Integer>, Row>() {@Overridepublic Row call(Tuple2<String, Integer> v1) throws Exception {String category=v1._1.split("-")[0];String product=v1._1.split("-")[1];int count=v1._2;return RowFactory.create(category,product,count);}});//DataSet  转换List<StructField> structFields=new ArrayList<>();structFields.add(DataTypes.createStructField("category",DataTypes.StringType,true));structFields.add(DataTypes.createStructField("product",DataTypes.StringType,true));structFields.add(DataTypes.createStructField("click_count",DataTypes.IntegerType,true));StructType structType=DataTypes.createStructType(structFields);SQLContext sqlContext=new SQLContext(rowCategoryCount.context());Dataset cataCountDS=sqlContext.createDataFrame(rowCategoryCount,structType);//  将60秒内的数据创建一个零时表cataCountDS.registerTempTable("product_click_log");Dataset cataSearchDS=   sqlContext.sql("SELECT category,product,click_count "+ "FROM ("+ "SELECT "+ "category,"+ "product,"+ "click_count,"+ "row_number() OVER (PARTITION BY category ORDER BY click_count DESC) rank "+ "FROM product_click_log"+ ") tmp "+ "WHERE rank<=3");cataSearchDS.show();}});jssc.start();jssc.awaitTermination();jssc.stop();jssc.close();}
}

欢迎关注,更多惊喜等着你

这里写图片描述

这篇关于spark从入门到放弃五十二:Spark Streaming(12)结合spark Sql的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1097544

相关文章

SQL中redo log 刷⼊磁盘的常见方法

《SQL中redolog刷⼊磁盘的常见方法》本文主要介绍了SQL中redolog刷⼊磁盘的常见方法,将redolog刷入磁盘的方法确保了数据的持久性和一致性,下面就来具体介绍一下,感兴趣的可以了解... 目录Redo Log 刷入磁盘的方法Redo Log 刷入磁盘的过程代码示例(伪代码)在数据库系统中,r

mysql中的group by高级用法

《mysql中的groupby高级用法》MySQL中的GROUPBY是数据聚合分析的核心功能,主要用于将结果集按指定列分组,并结合聚合函数进行统计计算,下面给大家介绍mysql中的groupby用法... 目录一、基本语法与核心功能二、基础用法示例1. 单列分组统计2. 多列组合分组3. 与WHERE结合使

Mysql用户授权(GRANT)语法及示例解读

《Mysql用户授权(GRANT)语法及示例解读》:本文主要介绍Mysql用户授权(GRANT)语法及示例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录mysql用户授权(GRANT)语法授予用户权限语法GRANT语句中的<权限类型>的使用WITH GRANT

Mysql如何解决死锁问题

《Mysql如何解决死锁问题》:本文主要介绍Mysql如何解决死锁问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录【一】mysql中锁分类和加锁情况【1】按锁的粒度分类全局锁表级锁行级锁【2】按锁的模式分类【二】加锁方式的影响因素【三】Mysql的死锁情况【1

SQL BETWEEN 的常见用法小结

《SQLBETWEEN的常见用法小结》BETWEEN操作符是SQL中非常有用的工具,它允许你快速选取某个范围内的值,本文给大家介绍SQLBETWEEN的常见用法,感兴趣的朋友一起看看吧... 在SQL中,BETWEEN是一个操作符,用于选取介于两个值之间的数据。它包含这两个边界值。BETWEEN操作符常用

MySQL索引的优化之LIKE模糊查询功能实现

《MySQL索引的优化之LIKE模糊查询功能实现》:本文主要介绍MySQL索引的优化之LIKE模糊查询功能实现,本文通过示例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧... 目录一、前缀匹配优化二、后缀匹配优化三、中间匹配优化四、覆盖索引优化五、减少查询范围六、避免通配符开头七、使用外部搜索引擎八、分

MySql match against工具详细用法

《MySqlmatchagainst工具详细用法》在MySQL中,MATCH……AGAINST是全文索引(Full-Textindex)的查询语法,它允许你对文本进行高效的全文搜素,支持自然语言搜... 目录一、全文索引的基本概念二、创建全文索引三、自然语言搜索四、布尔搜索五、相关性排序六、全文索引的限制七

Python结合PyWebView库打造跨平台桌面应用

《Python结合PyWebView库打造跨平台桌面应用》随着Web技术的发展,将HTML/CSS/JavaScript与Python结合构建桌面应用成为可能,本文将系统讲解如何使用PyWebView... 目录一、技术原理与优势分析1.1 架构原理1.2 核心优势二、开发环境搭建2.1 安装依赖2.2 验

数据库面试必备之MySQL中的乐观锁与悲观锁

《数据库面试必备之MySQL中的乐观锁与悲观锁》:本文主要介绍数据库面试必备之MySQL中乐观锁与悲观锁的相关资料,乐观锁适用于读多写少的场景,通过版本号检查避免冲突,而悲观锁适用于写多读少且对数... 目录一、引言二、乐观锁(一)原理(二)应用场景(三)示例代码三、悲观锁(一)原理(二)应用场景(三)示例

SQL表间关联查询实例详解

《SQL表间关联查询实例详解》本文主要讲解SQL语句中常用的表间关联查询方式,包括:左连接(leftjoin)、右连接(rightjoin)、全连接(fulljoin)、内连接(innerjoin)、... 目录简介样例准备左外连接右外连接全外连接内连接交叉连接自然连接简介本文主要讲解SQL语句中常用的表