本文主要是介绍SparkSQL(6):外部数据源,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、总括
1.参考官网:
http://spark.apache.org/docs/2.1.0/sql-programming-guide.html#data-sources
2.SparkSQL支持多种数据源,这个部分主要讲了使用Spark加载和保存数据源。然后,讲解内置数据源的特定选项。
3.数据源分类:
(1)parquet数据
(2)hive表
(3)jdbc连接其他数据库(以MySQL的数据库为例)
二、parquet数据
1.读取数据:直接转换为DataFrame
val userDF=spark.read.format("parquet").load("file:///opt/modules/spark-2.1.0-bin-2.7.3/examples/src/main/resources/users.parquet")
备注:如果不设定format,默认是parquet
spark.read.load("datas/users.parquet").show()
2.保存数据:
userDF.select("name","favorite_color").write.format("json").save("file:///opt/datas/jsonout")
3.优化分区数目
默认200,当设为10后,速度提升很快
spark.sqlContext.setConf("spark.sql.shuffle.partitions","10")
4.实例代码
(1)idea本地代码,spark在服务器端的实现:
package sparkworkingimport org.apache.spark.sql.SparkSession/*** parquet数据源*/
object _08Parquet {def main(args: Array[String]): Unit = {val spark=SparkSession.builder().appName("ParquetApp").master("local[2]").getOrCreate()//数据源从/opt/modules/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet获得/*** 方法一:* spark.read.format("parquet").load 是标准写法*/val userDF=spark.read.format("parquet").load("data/users.parquet")userDF.printSchema()userDF.show()userDF.select("name","favorite_color").show
// userDF.select("name","favorite_color").write.format("json").save("datas/jsonout")/*** 方法二:* 非标准*/spark.read.load("data/users.parquet").show()/*** 方法三:* 非标准*/spark.read.format("parquet").option("path","data/users.parquet").load().show()/*** 优化:分区数目,默认200*/spark.sqlContext.setConf("spark.sql.shuffle.partitions","10")spark.close()}
}
(2)在服务器的sparkshell上运行
开启spark-shell
bin/spark-shell --master local[2] --jars /opt/datas/mysql-connector-java-5.1.27-bin.jar
测试
val userDF=spark.read.format("parquet").load("file:///opt/modules/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet")
userDF.printSchema()
userDF.show()
userDF.select("name","favorite_color").show
//保存
userDF.select("name","favorite_color").write.format("json").save("file:///opt/datas/jsonout")
查看结果:
cat /opt/datas/jsonout/part-00000-2ea939cc-2031-4772-a426-c66547d8244b.json
{"name":"Alyssa"}
{"name":"Ben","favorite_color":"red"}
(3)使用sparkSQL读取parquet
开启spark-sql客户端
bin/spark-sql --master local[2] --jars /opt/datas/mysql-conneor-java-5.1.27-bin.jar
读取数据
CREATE TEMPORARY VIEW parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (path "file:opt/modules/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet"
);
测试
SELECT * FROM parquetTable;结果
name favorite_color favorite_numbers
Alyssa NULL [3,9,15,20]
Ben red []
三、操作hive表
1.读取
spark.sql("select deptno, count(1) as mount from imooc.emp where group by deptno").filter("deptno is not null").write.saveAsTable("imooc.hive_table_1")
或者
spark.table(tableName)
2.创建临时表
recordsDF.createOrReplaceTempView("records")
3.写
df.write.saveAsTable("hive_table_1")
4.spark-shell实现
(1)启动spark-shell
bin/spark-shell --master local[2] --jars /opt/datas/mysql-connector-java-5.1.27-bin.jar
(2)求解每个部门多少人
spark.sql("select deptno, count(1) as mount from emp where group by deptno").filter("deptno is not null").write.saveAsTable("hive_table_1")
结果:
spark.sql("show tables").show
scala> spark.sql("show tables").show
+--------+--------------+-----------+
|database| tableName|isTemporary|
+--------+--------------+-----------+
| default| dept| false|
| default| emp| false|
| default| hbase2hive| false|
| default|hive2hbase_emp| false|
| default| hive_table_1| false|
| default| t| false|
+--------+--------------+-----------+
scala> spark.table("hive_table_1").show
+------+-----+
|deptno|mount|
+------+-----+
| 20| 5|
| 10| 3|
| 30| 6|
+------+-----+
(3)修改分区数,重新实现速度变快
scala> spark.sqlContext.getConf("spark.sql.shuffle.partitions")
res9: String = 10使用:
spark.sql("select deptno, count(1) as mount from emp where group by deptno").filter("deptno is not null").write.saveAsTable("hive_table_2")
结果比刚才快很多!
四、jdbc连接其他数据库(操作MySQL)
1.读取
spark.read.format("jdbc").option("url", "jdbc:mysql://bigdata.ibeifeng.com:3306/metastore").option("dbtable", "metastore.TBLS").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").load().show()
备注option解释:
- url:数据库的url,可参考hive-site.xml中获得
- dbtable:数据表
- driver:驱动,取com.mysql.jdbc.Driver
2.写入
jdbcDF.write.format("jdbc").option("url", "jdbc:postgresql:dbserver").option("dbtable", "schema.tablename").option("user", "username").option("password", "password").save()
备注:基本参数同读取
3.测试读取
(1)scala的spark-shell中实现
查询metastore数据库中的TBLS表
spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop:3306/metastore").option("dbtable", "metastore.TBLS").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").load().show()
结果:
+------+-----------+-----+----------------+-----+---------+-----+--------------+--------------+------------------+------------------+
|TBL_ID|CREATE_TIME|DB_ID|LAST_ACCESS_TIME|OWNER|RETENTION|SD_ID| TBL_NAME| TBL_TYPE|VIEW_EXPANDED_TEXT|VIEW_ORIGINAL_TEXT|
+------+-----------+-----+----------------+-----+---------+-----+--------------+--------------+------------------+------------------+
| 1| 1543595818| 1| 0| root| 0| 1|hive2hbase_emp| MANAGED_TABLE| null| null|
| 3| 1543596120| 1| 0| root| 0| 3| dept| MANAGED_TABLE| null| null|
| 4| 1543596453| 1| 0| root| 0| 4| hbase2hive|EXTERNAL_TABLE| null| null|
| 6| 1547658170| 1| 0| root| 0| 6| emp| MANAGED_TABLE| null| null|
| 11| 1547730231| 1| 0| root| 0| 11| t| MANAGED_TABLE| null| null|
| 16| 1548076280| 1| 0| root| 0| 16| hive_table_1| MANAGED_TABLE| null| null|
| 17| 1548076685| 1| 0| root| 0| 17| hive_table_2| MANAGED_TABLE| null| null|
+------+-----------+-----+----------------+-----+---------+-----+--------------+--------------+------------------+------------------+
(2)scala的idea代码(服务器)
-》添加pom文件mysql依赖
<!-- mysql driver jar -->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version>
</dependency>
-》代码
spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop:3306/metastore")
.option("dbtable", "metastore.TBLS")
.option("user", "root").option("password", "123456")
.option("driver", "com.mysql.jdbc.Driver").load().show()
-》结果
+------+-----------+-----+----------------+-----+---------+-----+--------------+--------------+------------------+------------------+
|TBL_ID|CREATE_TIME|DB_ID|LAST_ACCESS_TIME|OWNER|RETENTION|SD_ID| TBL_NAME| TBL_TYPE|VIEW_EXPANDED_TEXT|VIEW_ORIGINAL_TEXT|
+------+-----------+-----+----------------+-----+---------+-----+--------------+--------------+------------------+------------------+
| 1| 1543595818| 1| 0| root| 0| 1|hive2hbase_emp| MANAGED_TABLE| null| null|
| 3| 1543596120| 1| 0| root| 0| 3| dept| MANAGED_TABLE| null| null|
| 4| 1543596453| 1| 0| root| 0| 4| hbase2hive|EXTERNAL_TABLE| null| null|
| 6| 1547658170| 1| 0| root| 0| 6| emp| MANAGED_TABLE| null| null|
| 11| 1547730231| 1| 0| root| 0| 11| t| MANAGED_TABLE| null| null|
| 16| 1548076280| 1| 0| root| 0| 16| hive_table_1| MANAGED_TABLE| null| null|
| 17| 1548076685| 1| 0| root| 0| 17| hive_table_2| MANAGED_TABLE| null| null|
+------+-----------+-----+----------------+-----+---------+-----+--------------+--------------+------------------+------------------+
(3)通过java方式读取通过spark-shell
import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "123456")
connectionProperties.put("driver", "com.mysql.jdbc.Driver")val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop:3306/metastore", "metastore.TBLS", connectionProperties)结果:
scala> jdbcDF2.show
+------+-----------+-----+----------------+-----+---------+-----+--------------+--------------+------------------+------------------+
|TBL_ID|CREATE_TIME|DB_ID|LAST_ACCESS_TIME|OWNER|RETENTION|SD_ID| TBL_NAME| TBL_TYPE|VIEW_EXPANDED_TEXT|VIEW_ORIGINAL_TEXT|
+------+-----------+-----+----------------+-----+---------+-----+--------------+--------------+------------------+------------------+
| 1| 1543595818| 1| 0| root| 0| 1|hive2hbase_emp| MANAGED_TABLE| null| null|
| 3| 1543596120| 1| 0| root| 0| 3| dept| MANAGED_TABLE| null| null|
| 4| 1543596453| 1| 0| root| 0| 4| hbase2hive|EXTERNAL_TABLE| null| null|
| 6| 1547658170| 1| 0| root| 0| 6| emp| MANAGED_TABLE| null| null|
| 11| 1547730231| 1| 0| root| 0| 11| t| MANAGED_TABLE| null| null|
| 16| 1548076280| 1| 0| root| 0| 16| hive_table_1| MANAGED_TABLE| null| null|
| 17| 1548076685| 1| 0| root| 0| 17| hive_table_2| MANAGED_TABLE| null| null|
+------+-----------+-----+----------------+-----+---------+-----+--------------+--------------+------------------+------------------+
4.通过jdbc写入mysql方式
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (url "jdbc:mysql://hadoop:3306",dbtable "hive.TBLS",user 'root',password '123456',driver 'com.mysql.jdbc.Driver');结果:(1)show tables;database tableName isTemporarydefault dept falsedefault emp falsedefault hbase2hive falsedefault hive2hbase_emp falsedefault hive_table_1 falsedefault hive_table_2 falsedefault t falsejdbctable trueparquettable trueTime taken: 0.054 seconds, Fetched 9 row(s)19/01/21 21:51:23 INFO CliDriver: Time taken: 0.054 seconds, Fetched 9 row(s)(2)select * from jdbctable;TBL_ID CREATE_TIME DB_ID LAST_ACCESS_TIME OWNER RETENTION SD_ID TBL_NAME TBL_TYPE VIEW_EXPANDED_TEXT VIEW_ORIGINAL_TEXT LINK_TARGET_ID199 1546153570 1 0 hue 0 585 sample_07 MANAGED_TABLE NULL NULLNULL200 1546153571 1 0 hue 0 586 sample_08 MANAGED_TABLE NULL NULLNULL201 1546153572 1 0 hue 0 587 customers MANAGED_TABLE NULL NULLNULL202 1546153572 1 0 hue 0 588 web_logs MANAGED_TABLE NULL NULLNULLTime taken: 0.108 seconds, Fetched 4 row(s)
五、csv
1.读取
val pathCSV_2="file:///E:\\taxi.csv"val schemaS=StructType(Array(StructField("tid",LongType),StructField("lat",StringType,nullable = true),StructField("lon",StringType,nullable = true),StructField("time",StringType)))sqlContext.read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").option("header","false").schema(schemaS).load(pathCSV_2).show(5)
2.写出
resultDataFrame.coalesce(1).write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").option("header","true").partitionBy("hour").mode(SaveMode.Overwrite).save("file:///E:\\out")
3.代码scala
package _0729DFimport SparkUtil.SparkUtil
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}/*** Created by Administrator on 2018/7/29.*/
object Readcsv extends App{
// //构建上下文val conf = new SparkConf().setMaster("local[*]").setAppName("Readcsv")//.set("spark.sql.shuffle.partition","10") 这个是为了设置分区数目,默认是200//即只要触发shuffle,就会将数据分为200个分区计算。所以当数据量小的时候,没有必要那么多分区。//当数据量大的时候,200分区并不能满足需求。.set("spark.sql.shuffle.partitions","10")//这个方法是一个锁的机制,通过这个方法可以保证只有一个上下文val sc = SparkContext.getOrCreate(conf)//如果不需要用hive就不要用hivecontext,使用sqlcontext就可以了//Hivecontext可能需要配置VM options: -XX:PermSize=128M -XX:MaxPermSize=256Mval sqlContext = new SQLContext(sc)val pathCSV_2="file:///E:\\taxi.csv"val schemaS=StructType(Array(StructField("tid",LongType),StructField("lat",StringType,nullable = true),StructField("lon",StringType,nullable = true),StructField("time",StringType)))//自己:2.1.0sqlContext.read
// .format("csv").format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").option("header","false").schema(schemaS).load(pathCSV_2).show(5)//自己:2.1.0sqlContext.read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").option("header","false").schema(schemaS).load(pathCSV_2).registerTempTable("tmp_taxi")// 获取id和hoursqlContext.sql("""|SELECT tid,|SUBSTRING(time,0,2) as hour|FROM tmp_taxi|""".stripMargin).registerTempTable("tmp_id_hour")//计算各个小时的出租车的载客次数sqlContext.sql("""|SELECT tid,hour,count(1) as count|FROM tmp_id_hour|GROUP BY tid,hour""".stripMargin).registerTempTable("tmp_id_hour_count")// sqlContext.sql(// """// |SELECT tid,hour,count(1) as count// |FROM tmp_id_hour// |GROUP BY tid,hour// """.stripMargin// ).show()// //排序sqlContext.sql("""|SELECT tid,hour,count,|ROW_NUMBER() OVER (PARTITION BY hour ORDER BY count DESC ) AS rnk|FROM tmp_id_hour_count""".stripMargin).registerTempTable("tmp_id_hour_count_rnk")
//
// sqlContext.sql(
// """
// |SELECT tid,hour,count,
// |ROW_NUMBER() OVER (PARTITION BY hour ORDER BY count DESC ) AS rnk
// |FROM tmp_id_hour_count
//
// """.stripMargin
// ).show()sqlContext.sql("""|SELECT tid,hour,count,rnk|FROM tmp_id_hour_count_rnk|where rnk <=5""".stripMargin).registerTempTable("tmp_id_hour_count_rnk_top5")// sqlContext.sql(
// """
// |SELECT tid,hour,count,rnk
// |FROM tmp_id_hour_count_rnk
// |where rnk <=5
//
// """.stripMargin
// ).show()
////
//
////保存val resultDataFrame=sqlContext.sql("""|SELECT tid,hour,count,rnk|FROM tmp_id_hour_count_rnk|where rnk <=5""".stripMargin)resultDataFrame.show()resultDataFrame.coalesce(1).write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").option("header","true").partitionBy("hour").mode(SaveMode.Overwrite).save("file:///E:\\out")}
六、服务器外部数据源综合实例
1.目的
hive和mysql分别有一张表,然后进行关联操作
2.操作
(1)mysql中创建新表并且插入数据
create database spark;
use spark;CREATE TABLE DEPT(
DEPTNO int(2) PRIMARY KEY,
DNAME VARCHAR(14) ,
LOC VARCHAR(13) ) ;INSERT INTO DEPT VALUES(10,'ACCOUNTING','NEW YORK');
INSERT INTO DEPT VALUES(20,'RESEARCH','DALLAS');
INSERT INTO DEPT VALUES(30,'SALES','CHICAGO');
INSERT INTO DEPT VALUES(40,'OPERATIONS','BOSTON');
(2)把hive中emp的表数据,根据deptno和mysql的deptno进行join
(3)注意
-》编译器idea中运行需要添加enableHiveSupport()这个功能
-》如果在服务器局域网外运行,需要添加否则会报错找不到数据
.config("dfs.client.use.datanode.hostname","true")
(4)scala代码
package sparkworkingimport org.apache.spark.sql.SparkSession/*** Created by Administrator on 2019/1/21.* 使用外部数据源综合查询Hive和Mysql的表数据*/
object _09HiveMysqlApp {def main(args: Array[String]): Unit = {val spark=SparkSession.builder().appName("HiveMySQLApp").master("local[2]").enableHiveSupport() //sparksql连接Hive必须要填写.config("dfs.client.use.datanode.hostname","true") //服务器必须添加.getOrCreate()//加载hive表val hiveDF=spark.table("emp")hiveDF.show()//加载mysql表val mysqlDF=spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop:3306/").option("dbtable", "spark.DEPT").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").load()mysqlDF.show()//Joinval resultDF=hiveDF.join(mysqlDF,hiveDF.col("deptno")===mysqlDF.col("DEPTNO"))resultDF.show()resultDF.select(hiveDF.col("empno"),hiveDF.col("ename"),mysqlDF.col("dname")).show()spark.stop()}
}
这篇关于SparkSQL(6):外部数据源的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!