Intellj IDEA +SBT + Scala + Spark Sql读取HDFS数据

2024-03-18 20:48

本文主要是介绍Intellj IDEA +SBT + Scala + Spark Sql读取HDFS数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前提Spark集群已经搭建完毕,如果不知道怎么搭建,请参考这个链接:
http://qindongliang.iteye.com/blog/2224797

注意提交作业,需要使用sbt打包成一个jar,然后在主任务里面添加jar包的路径远程提交即可,无须到远程集群上执行测试,本次测试使用的是Spark的Standalone方式

sbt依赖如下:


Java代码 复制代码  收藏代码
  1. name := "spark-hello"  
  2.   
  3. version := "1.0"  
  4.   
  5. scalaVersion := "2.11.7"  
  6. //使用公司的私服  
  7. resolvers += "Local Maven Repository" at "http://dev.bizbook-inc.com:8083/nexus/content/groups/public/"  
  8. //使用内部仓储  
  9. externalResolvers := Resolver.withDefaultResolvers(resolvers.value, mavenCentral = false)  
  10. //Hadoop的依赖  
  11. libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.7.1"  
  12. //Spark的依赖  
  13. libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.4.1"  
  14. //Spark SQL 依赖  
  15. libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "1.4.1"  
  16. //java servlet 依赖  
  17. libraryDependencies += "javax.servlet" % "javax.servlet-api" % "3.0.1"  
  18.       
name := "spark-hello"version := "1.0"scalaVersion := "2.11.7"
//使用公司的私服
resolvers += "Local Maven Repository" at "http://dev.bizbook-inc.com:8083/nexus/content/groups/public/"
//使用内部仓储
externalResolvers := Resolver.withDefaultResolvers(resolvers.value, mavenCentral = false)
//Hadoop的依赖
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.7.1"
//Spark的依赖
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.4.1"
//Spark SQL 依赖
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "1.4.1"
//java servlet 依赖
libraryDependencies += "javax.servlet" % "javax.servlet-api" % "3.0.1"


demo1:使用Scala读取HDFS的数据:

Java代码 复制代码  收藏代码
  1. /** * 
  2.    * Spark读取来自HDFS的数据 
  3.    */  
  4. ef readDataFromHDFS(): Unit ={  
  5.    //以standalone方式运行,提交到远程的spark集群上面  
  6.    val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("load hdfs data")  
  7.    conf.setJars(Seq(jarPaths));  
  8.    //得到一个Sprak上下文  
  9.    val sc = new SparkContext(conf)  
  10.    val textFile=sc.textFile("hdfs://h1:8020/user/webmaster/crawldb/etl_monitor/part-m-00000")  
  11.    //获取第一条数据  
  12.    //val data=textFile.first()  
  13.   // println(data)  
  14.    //遍历打印  
  15.      /** 
  16.       * collect() 方法 游标方式迭代收集每行数据 
  17.       * take(5)   取前topN条数据 
  18.       * foreach() 迭代打印 
  19.       * stop()    关闭链接 
  20.       */  
  21.   textFile.collect().take(5).foreach( line => println(line) )  
  22.    //关闭资源  
  23.    sc.stop()  
 /** ** Spark读取来自HDFS的数据*/
def readDataFromHDFS(): Unit ={//以standalone方式运行,提交到远程的spark集群上面val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("load hdfs data")conf.setJars(Seq(jarPaths));//得到一个Sprak上下文val sc = new SparkContext(conf)val textFile=sc.textFile("hdfs://h1:8020/user/webmaster/crawldb/etl_monitor/part-m-00000")//获取第一条数据//val data=textFile.first()// println(data)//遍历打印/*** collect() 方法 游标方式迭代收集每行数据* take(5)   取前topN条数据* foreach() 迭代打印* stop()    关闭链接*/textFile.collect().take(5).foreach( line => println(line) )//关闭资源sc.stop()
}


demo2:使用Scala 在客户端造数据,测试Spark Sql:

Java代码 复制代码  收藏代码
  1. def mappingLocalSQL1() {  
  2.    val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("hdfs data count")  
  3.    conf.setJars(Seq(jarPaths));  
  4.    val sc = new SparkContext(conf)  
  5.    val sqlContext=new SQLContext(sc);  
  6.    //导入隐式sql的schema转换  
  7.    import sqlContext.implicits._  
  8.    val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF()  
  9.    df.registerTempTable("records")  
  10.    println("Result of SELECT *:")  
  11.    sqlContext.sql("SELECT * FROM records").collect().foreach(println)  
  12.    //聚合查询  
  13.    val count = sqlContext.sql("SELECT COUNT(*) FROM records").collect().head.getLong(0)  
  14.    println(s"COUNT(*): $count")  
  15.    sc.stop()  
  16.  }  
 def mappingLocalSQL1() {val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("hdfs data count")conf.setJars(Seq(jarPaths));val sc = new SparkContext(conf)val sqlContext=new SQLContext(sc);//导入隐式sql的schema转换import sqlContext.implicits._val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF()df.registerTempTable("records")println("Result of SELECT *:")sqlContext.sql("SELECT * FROM records").collect().foreach(println)//聚合查询val count = sqlContext.sql("SELECT COUNT(*) FROM records").collect().head.getLong(0)println(s"COUNT(*): $count")sc.stop()}




Spark SQL 映射实体类的方式读取HDFS方式和字段,注意在Scala的Objcet最上面有个case 类定义,一定要放在
这里,不然会出问题:





demo2:使用Scala 远程读取HDFS文件,并映射成Spark表,以Spark Sql方式,读取top10:

Java代码 复制代码  收藏代码
  1.  val jarPaths="target/scala-2.11/spark-hello_2.11-1.0.jar"  
  2.   /**Spark SQL映射的到实体类的方式**/  
  3.   def mapSQL2(): Unit ={  
  4.     //使用一个类,参数都是可选类型,如果没有值,就默认为NULL  
  5.     //SparkConf指定master和任务名  
  6.     val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("spark sql query hdfs file")  
  7.     //设置上传需要jar包  
  8.     conf.setJars(Seq(jarPaths));  
  9.     //获取Spark上下文  
  10.     val sc = new SparkContext(conf)  
  11.     //得到SQL上下文  
  12.     val sqlContext=new SQLContext(sc);  
  13.     //必须导入此行代码,才能隐式转换成表格  
  14.     import sqlContext.implicits._  
  15.     //读取一个hdfs上的文件,并根据某个分隔符split成数组  
  16.     //然后根据长度映射成对应字段值,并处理数组越界问题  
  17.     val model=sc.textFile("hdfs://h1:8020/user/webmaster/crawldb/etl_monitor/part-m-00000").map(_.split("\1"))  
  18.       .map( p =>  ( if (p.length==4) Model(Some(p(0)), Some(p(1)), Some(p(2)), Some(p(3).toLong))  
  19.     else if (p.length==3) Model(Some(p(0)), Some(p(1)), Some(p(2)),None)  
  20.     else if (p.length==2) Model(Some(p(0)), Some(p(1)),None,None)  
  21.     else   Model( Some(p(0)),None,None,None )  
  22.       )).toDF()//转换成DF  
  23.     //注册临时表  
  24.     model.registerTempTable("monitor")  
  25.     //执行sql查询  
  26.     val it = sqlContext.sql("SELECT rowkey,title,dtime FROM monitor  limit 10 ")  
  27. //    val it = sqlContext.sql("SELECT rowkey,title,dtime FROM monitor WHERE title IS  NULL AND dtime IS NOT NULL      ")  
  28.       println("开始")  
  29.       it.collect().take(8).foreach(line => println(line))  
  30.       println("结束")  
  31.     sc.stop();  
  32.   }  
 val jarPaths="target/scala-2.11/spark-hello_2.11-1.0.jar"/**Spark SQL映射的到实体类的方式**/def mapSQL2(): Unit ={//使用一个类,参数都是可选类型,如果没有值,就默认为NULL//SparkConf指定master和任务名val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("spark sql query hdfs file")//设置上传需要jar包conf.setJars(Seq(jarPaths));//获取Spark上下文val sc = new SparkContext(conf)//得到SQL上下文val sqlContext=new SQLContext(sc);//必须导入此行代码,才能隐式转换成表格import sqlContext.implicits._//读取一个hdfs上的文件,并根据某个分隔符split成数组//然后根据长度映射成对应字段值,并处理数组越界问题val model=sc.textFile("hdfs://h1:8020/user/webmaster/crawldb/etl_monitor/part-m-00000").map(_.split("\1")).map( p =>  ( if (p.length==4) Model(Some(p(0)), Some(p(1)), Some(p(2)), Some(p(3).toLong))else if (p.length==3) Model(Some(p(0)), Some(p(1)), Some(p(2)),None)else if (p.length==2) Model(Some(p(0)), Some(p(1)),None,None)else   Model( Some(p(0)),None,None,None ))).toDF()//转换成DF//注册临时表model.registerTempTable("monitor")//执行sql查询val it = sqlContext.sql("SELECT rowkey,title,dtime FROM monitor  limit 10 ")
//    val it = sqlContext.sql("SELECT rowkey,title,dtime FROM monitor WHERE title IS  NULL AND dtime IS NOT NULL      ")println("开始")it.collect().take(8).foreach(line => println(line))println("结束")sc.stop();}


在IDEA的控制台,可以输出如下结果:

 

这篇关于Intellj IDEA +SBT + Scala + Spark Sql读取HDFS数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/823597

相关文章

mysql_mcp_server部署及应用实践案例

《mysql_mcp_server部署及应用实践案例》文章介绍了在CentOS7.5环境下部署MySQL_mcp_server的步骤,包括服务安装、配置和启动,还提供了一个基于Dify工作流的应用案例... 目录mysql_mcp_server部署及应用案例1. 服务安装1.1. 下载源码1.2. 创建独立

Mysql中RelayLog中继日志的使用

《Mysql中RelayLog中继日志的使用》MySQLRelayLog中继日志是主从复制架构中的核心组件,负责将从主库获取的Binlog事件暂存并应用到从库,本文就来详细的介绍一下RelayLog中... 目录一、什么是 Relay Log(中继日志)二、Relay Log 的工作流程三、Relay Lo

MySQL日志UndoLog的作用

《MySQL日志UndoLog的作用》UndoLog是InnoDB用于事务回滚和MVCC的重要机制,本文主要介绍了MySQL日志UndoLog的作用,文中介绍的非常详细,对大家的学习或者工作具有一定的... 目录一、Undo Log 的作用二、Undo Log 的分类三、Undo Log 的存储四、Undo

MySQL游标和触发器的操作流程

《MySQL游标和触发器的操作流程》本文介绍了MySQL中的游标和触发器的使用方法,游标可以对查询结果集进行逐行处理,而触发器则可以在数据表发生更改时自动执行预定义的操作,感兴趣的朋友跟随小编一起看看... 目录游标游标的操作流程1. 定义游标2.打开游标3.利用游标检索数据4.关闭游标例题触发器触发器的基

Qt实现对Word网页的读取功能

《Qt实现对Word网页的读取功能》文章介绍了几种在Qt中实现Word文档(.docx/.doc)读写功能的方法,包括基于QAxObject的COM接口调用、DOCX模板替换及跨平台解决方案,重点讨论... 目录1. 核心实现方式2. 基于QAxObject的COM接口调用(Windows专用)2.1 环境

MySQL查看表的历史SQL的几种实现方法

《MySQL查看表的历史SQL的几种实现方法》:本文主要介绍多种查看MySQL表历史SQL的方法,包括通用查询日志、慢查询日志、performance_schema、binlog、第三方工具等,并... 目录mysql 查看某张表的历史SQL1.查看MySQL通用查询日志(需提前开启)2.查看慢查询日志3.

MySQL底层文件的查看和修改方法

《MySQL底层文件的查看和修改方法》MySQL底层文件分为文本类(可安全查看/修改)和二进制类(禁止手动操作),以下按「查看方法、修改方法、风险管控三部分详细说明,所有操作均以Linux环境为例,需... 目录引言一、mysql 底层文件的查看方法1. 先定位核心文件路径(基础前提)2. 文本类文件(可直

MySQL数据目录迁移的完整过程

《MySQL数据目录迁移的完整过程》文章详细介绍了将MySQL数据目录迁移到新硬盘的整个过程,包括新硬盘挂载、创建新的数据目录、迁移数据(推荐使用两遍rsync方案)、修改MySQL配置文件和重启验证... 目录1,新硬盘挂载(如果有的话)2,创建新的 mysql 数据目录3,迁移 MySQL 数据(推荐两

Python数据验证神器Pydantic库的使用和实践中的避坑指南

《Python数据验证神器Pydantic库的使用和实践中的避坑指南》Pydantic是一个用于数据验证和设置的库,可以显著简化API接口开发,文章通过一个实际案例,展示了Pydantic如何在生产环... 目录1️⃣ 崩溃时刻:当你的API接口又双叒崩了!2️⃣ 神兵天降:3行代码解决验证难题3️⃣ 深度

idea设置快捷键风格方式

《idea设置快捷键风格方式》在IntelliJIDEA中设置快捷键风格,打开IDEA,进入设置页面,选择Keymap,从Keymaps下拉列表中选择或复制想要的快捷键风格,点击Apply和OK即可使... 目录idea设www.chinasem.cn置快捷键风格按照以下步骤进行总结idea设置快捷键pyth