1.8.7 大数据-Spark-SparkStreaming实时流处理(保存到Mysql)

2024-03-16 13:08

本文主要是介绍1.8.7 大数据-Spark-SparkStreaming实时流处理(保存到Mysql),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

演练环境搭建

安装nc 作为输出流

[kfk@bigdata-pro03 softwares]$ sudo rpm -ivh nc-1.84-22.el6.x86_64.rpm Preparing...                                                            (100%########################################### [100%]1:nc                                                                 ( 19%########################################### [100%]
[kfk@bigdata-pro03 softwares]$ which nc
/usr/bin/nc
$ nc -lk 9999

官网演示DEMO

$ ./bin/run-example streaming.NetworkWordCount localhost 9999

spark-shell演示

[kfk@bigdata-pro03 spark-2.2.0-bin]$ bin/spark-shell
20/06/24 00:39:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/06/24 00:39:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark context Web UI available at http://192.168.0.153:4041
Spark context available as 'sc' (master = local[*], app id = local-1592973563887).
Spark session available as 'spark'.
Welcome to____              __/ __/__  ___ _____/ /___\ \/ _ \/ _ `/ __/  '_//___/ .__/\_,_/_/ /_/\_\   version 2.2.0/_/Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_11)
Type in expressions to have them evaluated.
Type :help for more information.scala> import org.apache.spark._
import org.apache.spark._scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._scala> val ssc = new StreamingContext(sc, Seconds(5))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@1002b06dscala> val lines = ssc.socketTextStream("localhost", 9999)
lines: org.apache.spark.streaming.dstream.ReceiverInputDStream[String] = org.apache.spark.streaming.dstream.SocketInputDStream@514f2020scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@4f5df012scala> val pairs = words.map(word => (word, 1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.MappedDStream@39f3285dscala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@3299e315scala> wordCounts.print()scala> ssc.start()  
//一直运行,除非人为干预再停止 ssc.awaitTermination()

在nc下输入单词 在shell客户端就可以读到了

在IDEA中代码

包含读到MySQL库 、注释部分读到HDFS

package com.spark.streamingimport java.sql.DriverManagerimport org.apache.spark._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}object TestStreaming {def main(args: Array[String]): Unit = {val spark = SparkSession.builder.master("local[2]").appName("straming").getOrCreate()val sc = spark.sparkContext;sc.setLogLevel("WARN");val ssc = new StreamingContext(sc, Seconds(5))val lines = ssc.socketTextStream("bigdata-pro03.kfk.com", 9999)val words = lines.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)//words.foreachRDD(wd =>wd.saveAsTextFile("hdfs://bigdata-pro01.kfk.com/user/kfk/stream"));words.foreachRDD(rdd=>rdd.foreachPartition(line =>{Class.forName("com.mysql.jdbc.Driver")val conn = DriverManager.getConnection("jdbc:mysql://bigdata-pro01.kfk.com/test","root","123456")try{for (row <- line){val sql = "insert into webCount(titleName,count) values ('"+row._1+"',"+row._2+" )";conn.prepareStatement(sql).executeUpdate()}}finally {conn.close()}}))words.print()ssc.start()ssc.awaitTermination()}
}

这篇关于1.8.7 大数据-Spark-SparkStreaming实时流处理(保存到Mysql)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/815592

相关文章

mysql中的group by高级用法

《mysql中的groupby高级用法》MySQL中的GROUPBY是数据聚合分析的核心功能,主要用于将结果集按指定列分组,并结合聚合函数进行统计计算,下面给大家介绍mysql中的groupby用法... 目录一、基本语法与核心功能二、基础用法示例1. 单列分组统计2. 多列组合分组3. 与WHERE结合使

C++如何通过Qt反射机制实现数据类序列化

《C++如何通过Qt反射机制实现数据类序列化》在C++工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作,所以本文就来聊聊C++如何通过Qt反射机制实现数据类序列化吧... 目录设计预期设计思路代码实现使用方法在 C++ 工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作。由于数据类

Java中Switch Case多个条件处理方法举例

《Java中SwitchCase多个条件处理方法举例》Java中switch语句用于根据变量值执行不同代码块,适用于多个条件的处理,:本文主要介绍Java中SwitchCase多个条件处理的相... 目录前言基本语法处理多个条件示例1:合并相同代码的多个case示例2:通过字符串合并多个case进阶用法使用

Mysql用户授权(GRANT)语法及示例解读

《Mysql用户授权(GRANT)语法及示例解读》:本文主要介绍Mysql用户授权(GRANT)语法及示例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录mysql用户授权(GRANT)语法授予用户权限语法GRANT语句中的<权限类型>的使用WITH GRANT

Mysql如何解决死锁问题

《Mysql如何解决死锁问题》:本文主要介绍Mysql如何解决死锁问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录【一】mysql中锁分类和加锁情况【1】按锁的粒度分类全局锁表级锁行级锁【2】按锁的模式分类【二】加锁方式的影响因素【三】Mysql的死锁情况【1

Java实现优雅日期处理的方案详解

《Java实现优雅日期处理的方案详解》在我们的日常工作中,需要经常处理各种格式,各种类似的的日期或者时间,下面我们就来看看如何使用java处理这样的日期问题吧,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言一、日期的坑1.1 日期格式化陷阱1.2 时区转换二、优雅方案的进阶之路2.1 线程安全重构2

SpringBoot使用GZIP压缩反回数据问题

《SpringBoot使用GZIP压缩反回数据问题》:本文主要介绍SpringBoot使用GZIP压缩反回数据问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录SpringBoot使用GZIP压缩反回数据1、初识gzip2、gzip是什么,可以干什么?3、Spr

SQL BETWEEN 的常见用法小结

《SQLBETWEEN的常见用法小结》BETWEEN操作符是SQL中非常有用的工具,它允许你快速选取某个范围内的值,本文给大家介绍SQLBETWEEN的常见用法,感兴趣的朋友一起看看吧... 在SQL中,BETWEEN是一个操作符,用于选取介于两个值之间的数据。它包含这两个边界值。BETWEEN操作符常用

MySQL索引的优化之LIKE模糊查询功能实现

《MySQL索引的优化之LIKE模糊查询功能实现》:本文主要介绍MySQL索引的优化之LIKE模糊查询功能实现,本文通过示例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧... 目录一、前缀匹配优化二、后缀匹配优化三、中间匹配优化四、覆盖索引优化五、减少查询范围六、避免通配符开头七、使用外部搜索引擎八、分

MySql match against工具详细用法

《MySqlmatchagainst工具详细用法》在MySQL中,MATCH……AGAINST是全文索引(Full-Textindex)的查询语法,它允许你对文本进行高效的全文搜素,支持自然语言搜... 目录一、全文索引的基本概念二、创建全文索引三、自然语言搜索四、布尔搜索五、相关性排序六、全文索引的限制七