spark从入门到放弃五十二:Spark Streaming(12)结合spark Sql

2024-08-22 22:18

本文主要是介绍spark从入门到放弃五十二:Spark Streaming(12)结合spark Sql,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章地址:http://www.haha174.top/article/details/253627
1.简介

Spark Streaming 强大的地方在于,可以于spark core 和spark sql 整合使用,之前已经通过transform foreachRDD 等算子看到了 如何将DStream 种的RDD 使用spark core 执行批处理操作。现在就来看看 如何将spark sql 和spark Streaming 整合起来操作

2.案例

每隔10秒 ,统计最近60秒的,每个种类的每个商品的点击次数,然后统计出每个种类top3 热门的商品
下面给出

public class Top3HotProduct {public static void main(String[] args) throws InterruptedException {SparkConf conf=new SparkConf().setAppName("Top3HotProduct").setMaster("local[2]");JavaStreamingContext jssc=new JavaStreamingContext(conf, Durations.seconds(1));//首先看一下,输入日志的格式//leo  product1 category1//首先获取输入数据JavaReceiverInputDStream<String> lines=jssc.socketTextStream("www.codeguoj.cn",9999);JavaPairDStream<String,Integer> categoryProductDStream=lines.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) throws Exception {String[] prudoctSplited=s.split(" ");return new Tuple2<>(prudoctSplited[2]+"-"+prudoctSplited[1],1);}});//然后执行window//到这里,就可以做到,每隔10秒钟,对最近60秒的数据,执行reduceByKey  操作//计算出来这60秒内,每个种类的每个商品的点击次数JavaPairDStream<String,Integer> categoryProductDStreamed=categoryProductDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}},Durations.seconds(60),Durations.seconds(10));//然后针对60秒内的每个种类的每个商品的点击次数categoryProductDStreamed.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {@Overridepublic void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRDD) throws Exception {JavaRDD<Row> rowCategoryCount=    stringIntegerJavaPairRDD.map(new Function<Tuple2<String, Integer>, Row>() {@Overridepublic Row call(Tuple2<String, Integer> v1) throws Exception {String category=v1._1.split("-")[0];String product=v1._1.split("-")[1];int count=v1._2;return RowFactory.create(category,product,count);}});//DataSet  转换List<StructField> structFields=new ArrayList<>();structFields.add(DataTypes.createStructField("category",DataTypes.StringType,true));structFields.add(DataTypes.createStructField("product",DataTypes.StringType,true));structFields.add(DataTypes.createStructField("click_count",DataTypes.IntegerType,true));StructType structType=DataTypes.createStructType(structFields);SQLContext sqlContext=new SQLContext(rowCategoryCount.context());Dataset cataCountDS=sqlContext.createDataFrame(rowCategoryCount,structType);//  将60秒内的数据创建一个零时表cataCountDS.registerTempTable("product_click_log");Dataset cataSearchDS=   sqlContext.sql("SELECT category,product,click_count "+ "FROM ("+ "SELECT "+ "category,"+ "product,"+ "click_count,"+ "row_number() OVER (PARTITION BY category ORDER BY click_count DESC) rank "+ "FROM product_click_log"+ ") tmp "+ "WHERE rank<=3");cataSearchDS.show();}});jssc.start();jssc.awaitTermination();jssc.stop();jssc.close();}
}

欢迎关注,更多惊喜等着你

这里写图片描述

这篇关于spark从入门到放弃五十二:Spark Streaming(12)结合spark Sql的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1097544

相关文章

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

MySQL高性能优化规范

前言:      笔者最近上班途中突然想丰富下自己的数据库优化技能。于是在查阅了多篇文章后,总结出了这篇! 数据库命令规范 所有数据库对象名称必须使用小写字母并用下划线分割 所有数据库对象名称禁止使用mysql保留关键字(如果表名中包含关键字查询时,需要将其用单引号括起来) 数据库对象的命名要能做到见名识意,并且最后不要超过32个字符 临时库表必须以tmp_为前缀并以日期为后缀,备份

数论入门整理(updating)

一、gcd lcm 基础中的基础,一般用来处理计算第一步什么的,分数化简之类。 LL gcd(LL a, LL b) { return b ? gcd(b, a % b) : a; } <pre name="code" class="cpp">LL lcm(LL a, LL b){LL c = gcd(a, b);return a / c * b;} 例题:

Java 创建图形用户界面(GUI)入门指南(Swing库 JFrame 类)概述

概述 基本概念 Java Swing 的架构 Java Swing 是一个为 Java 设计的 GUI 工具包,是 JAVA 基础类的一部分,基于 Java AWT 构建,提供了一系列轻量级、可定制的图形用户界面(GUI)组件。 与 AWT 相比,Swing 提供了许多比 AWT 更好的屏幕显示元素,更加灵活和可定制,具有更好的跨平台性能。 组件和容器 Java Swing 提供了许多

【IPV6从入门到起飞】5-1 IPV6+Home Assistant(搭建基本环境)

【IPV6从入门到起飞】5-1 IPV6+Home Assistant #搭建基本环境 1 背景2 docker下载 hass3 创建容器4 浏览器访问 hass5 手机APP远程访问hass6 更多玩法 1 背景 既然电脑可以IPV6入站,手机流量可以访问IPV6网络的服务,为什么不在电脑搭建Home Assistant(hass),来控制你的设备呢?@智能家居 @万物互联