本文主要是介绍spark从入门到放弃五十二:Spark Streaming(12)结合spark Sql,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章地址:http://www.haha174.top/article/details/253627
1.简介
Spark Streaming 强大的地方在于,可以于spark core 和spark sql 整合使用,之前已经通过transform foreachRDD 等算子看到了 如何将DStream 种的RDD 使用spark core 执行批处理操作。现在就来看看 如何将spark sql 和spark Streaming 整合起来操作
2.案例
每隔10秒 ,统计最近60秒的,每个种类的每个商品的点击次数,然后统计出每个种类top3 热门的商品
下面给出
public class Top3HotProduct {public static void main(String[] args) throws InterruptedException {SparkConf conf=new SparkConf().setAppName("Top3HotProduct").setMaster("local[2]");JavaStreamingContext jssc=new JavaStreamingContext(conf, Durations.seconds(1));//首先看一下,输入日志的格式//leo product1 category1//首先获取输入数据JavaReceiverInputDStream<String> lines=jssc.socketTextStream("www.codeguoj.cn",9999);JavaPairDStream<String,Integer> categoryProductDStream=lines.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) throws Exception {String[] prudoctSplited=s.split(" ");return new Tuple2<>(prudoctSplited[2]+"-"+prudoctSplited[1],1);}});//然后执行window//到这里,就可以做到,每隔10秒钟,对最近60秒的数据,执行reduceByKey 操作//计算出来这60秒内,每个种类的每个商品的点击次数JavaPairDStream<String,Integer> categoryProductDStreamed=categoryProductDStream.reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}},Durations.seconds(60),Durations.seconds(10));//然后针对60秒内的每个种类的每个商品的点击次数categoryProductDStreamed.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {@Overridepublic void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRDD) throws Exception {JavaRDD<Row> rowCategoryCount= stringIntegerJavaPairRDD.map(new Function<Tuple2<String, Integer>, Row>() {@Overridepublic Row call(Tuple2<String, Integer> v1) throws Exception {String category=v1._1.split("-")[0];String product=v1._1.split("-")[1];int count=v1._2;return RowFactory.create(category,product,count);}});//DataSet 转换List<StructField> structFields=new ArrayList<>();structFields.add(DataTypes.createStructField("category",DataTypes.StringType,true));structFields.add(DataTypes.createStructField("product",DataTypes.StringType,true));structFields.add(DataTypes.createStructField("click_count",DataTypes.IntegerType,true));StructType structType=DataTypes.createStructType(structFields);SQLContext sqlContext=new SQLContext(rowCategoryCount.context());Dataset cataCountDS=sqlContext.createDataFrame(rowCategoryCount,structType);// 将60秒内的数据创建一个零时表cataCountDS.registerTempTable("product_click_log");Dataset cataSearchDS= sqlContext.sql("SELECT category,product,click_count "+ "FROM ("+ "SELECT "+ "category,"+ "product,"+ "click_count,"+ "row_number() OVER (PARTITION BY category ORDER BY click_count DESC) rank "+ "FROM product_click_log"+ ") tmp "+ "WHERE rank<=3");cataSearchDS.show();}});jssc.start();jssc.awaitTermination();jssc.stop();jssc.close();}
}
欢迎关注,更多惊喜等着你
这篇关于spark从入门到放弃五十二:Spark Streaming(12)结合spark Sql的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!