SparkSQL(6):外部数据源

2024-05-24 11:48
文章标签 外部 数据源 sparksql

本文主要是介绍SparkSQL(6):外部数据源,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、总括

1.参考官网:

http://spark.apache.org/docs/2.1.0/sql-programming-guide.html#data-sources

2.SparkSQL支持多种数据源,这个部分主要讲了使用Spark加载和保存数据源。然后,讲解内置数据源的特定选项。

3.数据源分类:

(1)parquet数据

(2)hive表

(3)jdbc连接其他数据库(以MySQL的数据库为例)

 

二、parquet数据

1.读取数据:直接转换为DataFrame

val userDF=spark.read.format("parquet").load("file:///opt/modules/spark-2.1.0-bin-2.7.3/examples/src/main/resources/users.parquet")

    备注:如果不设定format,默认是parquet

spark.read.load("datas/users.parquet").show()      

2.保存数据:

userDF.select("name","favorite_color").write.format("json").save("file:///opt/datas/jsonout")

3.优化分区数目

默认200,当设为10后,速度提升很快

 spark.sqlContext.setConf("spark.sql.shuffle.partitions","10")

4.实例代码

(1)idea本地代码,spark在服务器端的实现:

package sparkworkingimport org.apache.spark.sql.SparkSession/*** parquet数据源*/
object _08Parquet {def main(args: Array[String]): Unit = {val spark=SparkSession.builder().appName("ParquetApp").master("local[2]").getOrCreate()//数据源从/opt/modules/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet获得/*** 方法一:* spark.read.format("parquet").load 是标准写法*/val userDF=spark.read.format("parquet").load("data/users.parquet")userDF.printSchema()userDF.show()userDF.select("name","favorite_color").show
//    userDF.select("name","favorite_color").write.format("json").save("datas/jsonout")/*** 方法二:* 非标准*/spark.read.load("data/users.parquet").show()/*** 方法三:* 非标准*/spark.read.format("parquet").option("path","data/users.parquet").load().show()/*** 优化:分区数目,默认200*/spark.sqlContext.setConf("spark.sql.shuffle.partitions","10")spark.close()}
}

(2)在服务器的sparkshell上运行

开启spark-shell

bin/spark-shell --master local[2] --jars /opt/datas/mysql-connector-java-5.1.27-bin.jar

测试

val userDF=spark.read.format("parquet").load("file:///opt/modules/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet")
userDF.printSchema()
userDF.show()
userDF.select("name","favorite_color").show
//保存
userDF.select("name","favorite_color").write.format("json").save("file:///opt/datas/jsonout")
查看结果:
cat /opt/datas/jsonout/part-00000-2ea939cc-2031-4772-a426-c66547d8244b.json 
{"name":"Alyssa"}
{"name":"Ben","favorite_color":"red"}

(3)使用sparkSQL读取parquet

开启spark-sql客户端

bin/spark-sql --master local[2] --jars /opt/datas/mysql-conneor-java-5.1.27-bin.jar

读取数据

CREATE TEMPORARY VIEW parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (path "file:opt/modules/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/users.parquet"
);

测试

SELECT * FROM parquetTable;结果
name    favorite_color  favorite_numbers
Alyssa  NULL    [3,9,15,20]
Ben     red     []

三、操作hive表

1.读取

spark.sql("select deptno, count(1) as mount from imooc.emp where group by deptno").filter("deptno is not null").write.saveAsTable("imooc.hive_table_1")

或者

spark.table(tableName)

2.创建临时表

recordsDF.createOrReplaceTempView("records")

3.写

df.write.saveAsTable("hive_table_1")

4.spark-shell实现

(1)启动spark-shell

bin/spark-shell --master local[2] --jars /opt/datas/mysql-connector-java-5.1.27-bin.jar

(2)求解每个部门多少人

spark.sql("select deptno, count(1) as mount from emp where group by deptno").filter("deptno is not null").write.saveAsTable("hive_table_1")
结果:
spark.sql("show tables").show
scala> spark.sql("show tables").show
+--------+--------------+-----------+
|database|     tableName|isTemporary|
+--------+--------------+-----------+
| default|          dept|      false|
| default|           emp|      false|
| default|    hbase2hive|      false|
| default|hive2hbase_emp|      false|
| default|  hive_table_1|      false|
| default|             t|      false|
+--------+--------------+-----------+
scala> spark.table("hive_table_1").show
+------+-----+
|deptno|mount|
+------+-----+
|    20|    5|
|    10|    3|
|    30|    6|
+------+-----+

(3)修改分区数,重新实现速度变快

scala> spark.sqlContext.getConf("spark.sql.shuffle.partitions")
res9: String = 10使用:
spark.sql("select deptno, count(1) as mount from emp where group by deptno").filter("deptno is not null").write.saveAsTable("hive_table_2")
结果比刚才快很多!

四、jdbc连接其他数据库(操作MySQL)

1.读取

spark.read.format("jdbc").option("url", "jdbc:mysql://bigdata.ibeifeng.com:3306/metastore").option("dbtable", "metastore.TBLS").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").load().show()

备注option解释:

  • url:数据库的url,可参考hive-site.xml中获得
  • dbtable:数据表
  • driver:驱动,取com.mysql.jdbc.Driver

 

2.写入

jdbcDF.write.format("jdbc").option("url", "jdbc:postgresql:dbserver").option("dbtable", "schema.tablename").option("user", "username").option("password", "password").save()

备注:基本参数同读取

3.测试读取

(1)scala的spark-shell中实现

 查询metastore数据库中的TBLS表

spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop:3306/metastore").option("dbtable", "metastore.TBLS").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").load().show()
结果:
+------+-----------+-----+----------------+-----+---------+-----+--------------+--------------+------------------+------------------+
|TBL_ID|CREATE_TIME|DB_ID|LAST_ACCESS_TIME|OWNER|RETENTION|SD_ID|      TBL_NAME|      TBL_TYPE|VIEW_EXPANDED_TEXT|VIEW_ORIGINAL_TEXT|
+------+-----------+-----+----------------+-----+---------+-----+--------------+--------------+------------------+------------------+
|     1| 1543595818|    1|               0| root|        0|    1|hive2hbase_emp| MANAGED_TABLE|              null|              null|
|     3| 1543596120|    1|               0| root|        0|    3|          dept| MANAGED_TABLE|              null|              null|
|     4| 1543596453|    1|               0| root|        0|    4|    hbase2hive|EXTERNAL_TABLE|              null|              null|
|     6| 1547658170|    1|               0| root|        0|    6|           emp| MANAGED_TABLE|              null|              null|
|    11| 1547730231|    1|               0| root|        0|   11|             t| MANAGED_TABLE|              null|              null|
|    16| 1548076280|    1|               0| root|        0|   16|  hive_table_1| MANAGED_TABLE|              null|              null|
|    17| 1548076685|    1|               0| root|        0|   17|  hive_table_2| MANAGED_TABLE|              null|              null|
+------+-----------+-----+----------------+-----+---------+-----+--------------+--------------+------------------+------------------+

(2)scala的idea代码(服务器)

-》添加pom文件mysql依赖

<!-- mysql driver jar -->
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version>
</dependency>

-》代码

spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop:3306/metastore")
.option("dbtable", "metastore.TBLS")
.option("user", "root").option("password", "123456")
.option("driver", "com.mysql.jdbc.Driver").load().show()

-》结果

+------+-----------+-----+----------------+-----+---------+-----+--------------+--------------+------------------+------------------+
|TBL_ID|CREATE_TIME|DB_ID|LAST_ACCESS_TIME|OWNER|RETENTION|SD_ID|      TBL_NAME|      TBL_TYPE|VIEW_EXPANDED_TEXT|VIEW_ORIGINAL_TEXT|
+------+-----------+-----+----------------+-----+---------+-----+--------------+--------------+------------------+------------------+
|     1| 1543595818|    1|               0| root|        0|    1|hive2hbase_emp| MANAGED_TABLE|              null|              null|
|     3| 1543596120|    1|               0| root|        0|    3|          dept| MANAGED_TABLE|              null|              null|
|     4| 1543596453|    1|               0| root|        0|    4|    hbase2hive|EXTERNAL_TABLE|              null|              null|
|     6| 1547658170|    1|               0| root|        0|    6|           emp| MANAGED_TABLE|              null|              null|
|    11| 1547730231|    1|               0| root|        0|   11|             t| MANAGED_TABLE|              null|              null|
|    16| 1548076280|    1|               0| root|        0|   16|  hive_table_1| MANAGED_TABLE|              null|              null|
|    17| 1548076685|    1|               0| root|        0|   17|  hive_table_2| MANAGED_TABLE|              null|              null|
+------+-----------+-----+----------------+-----+---------+-----+--------------+--------------+------------------+------------------+

(3)通过java方式读取通过spark-shell

import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "123456")
connectionProperties.put("driver", "com.mysql.jdbc.Driver")val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop:3306/metastore", "metastore.TBLS", connectionProperties)结果:
scala> jdbcDF2.show
+------+-----------+-----+----------------+-----+---------+-----+--------------+--------------+------------------+------------------+
|TBL_ID|CREATE_TIME|DB_ID|LAST_ACCESS_TIME|OWNER|RETENTION|SD_ID|      TBL_NAME|      TBL_TYPE|VIEW_EXPANDED_TEXT|VIEW_ORIGINAL_TEXT|
+------+-----------+-----+----------------+-----+---------+-----+--------------+--------------+------------------+------------------+
|     1| 1543595818|    1|               0| root|        0|    1|hive2hbase_emp| MANAGED_TABLE|              null|              null|
|     3| 1543596120|    1|               0| root|        0|    3|          dept| MANAGED_TABLE|              null|              null|
|     4| 1543596453|    1|               0| root|        0|    4|    hbase2hive|EXTERNAL_TABLE|              null|              null|
|     6| 1547658170|    1|               0| root|        0|    6|           emp| MANAGED_TABLE|              null|              null|
|    11| 1547730231|    1|               0| root|        0|   11|             t| MANAGED_TABLE|              null|              null|
|    16| 1548076280|    1|               0| root|        0|   16|  hive_table_1| MANAGED_TABLE|              null|              null|
|    17| 1548076685|    1|               0| root|        0|   17|  hive_table_2| MANAGED_TABLE|              null|              null|
+------+-----------+-----+----------------+-----+---------+-----+--------------+--------------+------------------+------------------+

4.通过jdbc写入mysql方式

CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (url "jdbc:mysql://hadoop:3306",dbtable "hive.TBLS",user 'root',password '123456',driver 'com.mysql.jdbc.Driver');结果:(1)show tables;database        tableName       isTemporarydefault dept    falsedefault emp     falsedefault hbase2hive      falsedefault hive2hbase_emp  falsedefault hive_table_1    falsedefault hive_table_2    falsedefault t       falsejdbctable       trueparquettable    trueTime taken: 0.054 seconds, Fetched 9 row(s)19/01/21 21:51:23 INFO CliDriver: Time taken: 0.054 seconds, Fetched 9 row(s)(2)select * from jdbctable;TBL_ID  CREATE_TIME     DB_ID   LAST_ACCESS_TIME        OWNER   RETENTION       SD_ID   TBL_NAME        TBL_TYPE     VIEW_EXPANDED_TEXT      VIEW_ORIGINAL_TEXT      LINK_TARGET_ID199     1546153570      1       0       hue     0       585     sample_07       MANAGED_TABLE   NULL    NULLNULL200     1546153571      1       0       hue     0       586     sample_08       MANAGED_TABLE   NULL    NULLNULL201     1546153572      1       0       hue     0       587     customers       MANAGED_TABLE   NULL    NULLNULL202     1546153572      1       0       hue     0       588     web_logs        MANAGED_TABLE   NULL    NULLNULLTime taken: 0.108 seconds, Fetched 4 row(s)

五、csv

1.读取

val pathCSV_2="file:///E:\\taxi.csv"val schemaS=StructType(Array(StructField("tid",LongType),StructField("lat",StringType,nullable = true),StructField("lon",StringType,nullable = true),StructField("time",StringType)))sqlContext.read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").option("header","false").schema(schemaS).load(pathCSV_2).show(5)

2.写出

resultDataFrame.coalesce(1).write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").option("header","true").partitionBy("hour").mode(SaveMode.Overwrite).save("file:///E:\\out")

3.代码scala

package _0729DFimport SparkUtil.SparkUtil
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}/*** Created by Administrator on 2018/7/29.*/
object Readcsv extends App{
//  //构建上下文val conf = new SparkConf().setMaster("local[*]").setAppName("Readcsv")//.set("spark.sql.shuffle.partition","10") 这个是为了设置分区数目,默认是200//即只要触发shuffle,就会将数据分为200个分区计算。所以当数据量小的时候,没有必要那么多分区。//当数据量大的时候,200分区并不能满足需求。.set("spark.sql.shuffle.partitions","10")//这个方法是一个锁的机制,通过这个方法可以保证只有一个上下文val sc = SparkContext.getOrCreate(conf)//如果不需要用hive就不要用hivecontext,使用sqlcontext就可以了//Hivecontext可能需要配置VM options: -XX:PermSize=128M -XX:MaxPermSize=256Mval sqlContext = new SQLContext(sc)val pathCSV_2="file:///E:\\taxi.csv"val schemaS=StructType(Array(StructField("tid",LongType),StructField("lat",StringType,nullable = true),StructField("lon",StringType,nullable = true),StructField("time",StringType)))//自己:2.1.0sqlContext.read
//    .format("csv").format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").option("header","false").schema(schemaS).load(pathCSV_2).show(5)//自己:2.1.0sqlContext.read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").option("header","false").schema(schemaS).load(pathCSV_2).registerTempTable("tmp_taxi")//  获取id和hoursqlContext.sql("""|SELECT tid,|SUBSTRING(time,0,2) as hour|FROM tmp_taxi|""".stripMargin).registerTempTable("tmp_id_hour")//计算各个小时的出租车的载客次数sqlContext.sql("""|SELECT tid,hour,count(1) as count|FROM tmp_id_hour|GROUP BY tid,hour""".stripMargin).registerTempTable("tmp_id_hour_count")//    sqlContext.sql(//      """//        |SELECT tid,hour,count(1) as count//        |FROM tmp_id_hour//        |GROUP BY tid,hour//      """.stripMargin//    ).show()//  //排序sqlContext.sql("""|SELECT tid,hour,count,|ROW_NUMBER() OVER (PARTITION BY hour ORDER BY count DESC ) AS rnk|FROM tmp_id_hour_count""".stripMargin).registerTempTable("tmp_id_hour_count_rnk")
//
//  sqlContext.sql(
//    """
//      |SELECT tid,hour,count,
//      |ROW_NUMBER() OVER (PARTITION BY hour ORDER BY count DESC ) AS rnk
//      |FROM tmp_id_hour_count
//
//    """.stripMargin
//  ).show()sqlContext.sql("""|SELECT tid,hour,count,rnk|FROM tmp_id_hour_count_rnk|where rnk <=5""".stripMargin).registerTempTable("tmp_id_hour_count_rnk_top5")//  sqlContext.sql(
//    """
//      |SELECT tid,hour,count,rnk
//      |FROM tmp_id_hour_count_rnk
//      |where rnk <=5
//
//    """.stripMargin
//  ).show()
////
//
////保存val resultDataFrame=sqlContext.sql("""|SELECT tid,hour,count,rnk|FROM tmp_id_hour_count_rnk|where rnk <=5""".stripMargin)resultDataFrame.show()resultDataFrame.coalesce(1).write.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").option("header","true").partitionBy("hour").mode(SaveMode.Overwrite).save("file:///E:\\out")}

六、服务器外部数据源综合实例

1.目的

hive和mysql分别有一张表,然后进行关联操作

2.操作

(1)mysql中创建新表并且插入数据

create database spark;
use spark;CREATE TABLE DEPT(
DEPTNO int(2) PRIMARY KEY,
DNAME VARCHAR(14) ,
LOC VARCHAR(13) ) ;INSERT INTO DEPT VALUES(10,'ACCOUNTING','NEW YORK');
INSERT INTO DEPT VALUES(20,'RESEARCH','DALLAS');
INSERT INTO DEPT VALUES(30,'SALES','CHICAGO');
INSERT INTO DEPT VALUES(40,'OPERATIONS','BOSTON');

(2)把hive中emp的表数据,根据deptno和mysql的deptno进行join

(3)注意

-》编译器idea中运行需要添加enableHiveSupport()这个功能

-》如果在服务器局域网外运行,需要添加否则会报错找不到数据

.config("dfs.client.use.datanode.hostname","true")

(4)scala代码

package sparkworkingimport org.apache.spark.sql.SparkSession/*** Created by Administrator on 2019/1/21.* 使用外部数据源综合查询Hive和Mysql的表数据*/
object _09HiveMysqlApp {def main(args: Array[String]): Unit = {val spark=SparkSession.builder().appName("HiveMySQLApp").master("local[2]").enableHiveSupport()  //sparksql连接Hive必须要填写.config("dfs.client.use.datanode.hostname","true") //服务器必须添加.getOrCreate()//加载hive表val hiveDF=spark.table("emp")hiveDF.show()//加载mysql表val mysqlDF=spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop:3306/").option("dbtable", "spark.DEPT").option("user", "root").option("password", "123456").option("driver", "com.mysql.jdbc.Driver").load()mysqlDF.show()//Joinval resultDF=hiveDF.join(mysqlDF,hiveDF.col("deptno")===mysqlDF.col("DEPTNO"))resultDF.show()resultDF.select(hiveDF.col("empno"),hiveDF.col("ename"),mysqlDF.col("dname")).show()spark.stop()}
}

 

这篇关于SparkSQL(6):外部数据源的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/998325

相关文章

Spring Boot实现多数据源连接和切换的解决方案

《SpringBoot实现多数据源连接和切换的解决方案》文章介绍了在SpringBoot中实现多数据源连接和切换的几种方案,并详细描述了一个使用AbstractRoutingDataSource的实... 目录前言一、多数据源配置与切换方案二、实现步骤总结前言在 Spring Boot 中实现多数据源连接

多数据源的事务处理总是打印很多无用的log日志

之前做了一个项目,需要用到多数据源以及事务处理,在使用事务处理,服务器总是打印很多关于事务处理的log日志(com.atomikos.logging.Slf4jLogger),但是我们根本不会用到这些log日志,反而使得查询一些有用的log日志变得困难。那要如何屏蔽这些log日志呢? 之前的项目是提高项目打印log日志的级别,后来觉得这样治标不治本。 现在有一个更好的方法: 我使用的是log

【AI大模型应用开发】2.1 Function Calling连接外部世界 - 入门与实战(1)

Function Calling是大模型连接外部世界的通道,目前出现的插件(Plugins )、OpenAI的Actions、各个大模型平台中出现的tools工具集,其实都是Function Calling的范畴。时下大火的OpenAI的GPTs,原理就是使用了Function Calling,例如联网检索、code interpreter。 本文带大家了解下Function calling,看

外部中断的边缘触发和电平触发

MCS-51单片机中的边缘触发是指当输入引脚电平由高到低发生跳变时,才引起中断。而电平触发是指只要外部引脚为低电平就引起中断。         在电平触发方式下,当外部引脚的低电平在中断服务返回前没有被拉高时(即撤除中断请求状态),会引起反复的不需要的中断,造成程序执行的错误。这类中断方式下,需要在中断服务程序中设置指令,清除外部中断的低电平状态,使之变为高电平。

从应用内跳转至外部浏览器 - 鸿蒙 HarmonyOS Next

从应用内跳转至外部浏览器,基于 Want 来实现,同时也可以通过其方式尝试跳转至其它系统模块,具体可参考如下 code : 方法调用 // 调用pushOutsideWeb(controller, url) 方法实现 import { common, contextConstant, Want } from '@kit.AbilityKit';import { HintMessage

兼容Trino Connector,扩展Apache Doris数据源接入能力|Lakehouse 使用手册(四)

Apache Doris 内置支持包括 Hive、Iceberg、Hudi、Paimon、LakeSoul、JDBC 在内的多种 Catalog,并为其提供原生高性能且稳定的访问能力,以满足与数据湖的集成需求。而随着 Apache Doris 用户的增加,新的数据源连接需求也随之增加。因此,从 3.0 版本开始,Apache Doris 引入了 Trino Connector 兼容框架。 Tri

【spark 读写数据】数据源的读写操作

通用的 Load/Save 函数 在最简单的方式下,默认的数据源(parquet 除非另外配置通过spark.sql.sources.default)将会用于所有的操作。 Parquet 是一个列式存储格式的文件,被许多其他数据处理系统所支持。Spark SQL 支持对 Parquet 文件的读写还可以自动的保存源数据的模式 val usersDF = spark.read.load("e

江协科技STM32学习- P11 中断系统,EXTI外部中断

🚀write in front🚀   🔎大家好,我是黄桃罐头,希望你看完之后,能对你有所帮助,不足请指正!共同学习交流 🎁欢迎各位→点赞👍 + 收藏⭐️ + 留言📝​  💬本系列哔哩哔哩江科大STM32的视频为主以及自己的总结梳理📚  🚀Projeet source code🚀    💾工程代码放在了本人的Gitee仓库:iPickCan (iPickCan

Java在windows和linux上调用外部程序

在用java开发时,有时候会遇到需要调用系统命令或者外部脚本,当前文章给出调用方法。代码如下: /*** 转换脚本路径为在win、linux中可执行的命令* * @param scriptPath* 脚本路径* @return 在linux或window中可执行的命令*/public static String[] convertExecuteCommand(Str

C++ 在外部访问对象的protected成员的方法

起因 起因在于,今天在写UE4插件时,有一个对象的protected成员我想要访问。这个类没有提供接口来访问那个成员,并且这个类是定义在引擎代码中的而我不想“污染”引擎代码。 不过,我想到这么做或许可以在不改变原有类的定义下访问其中的protected成员: 定义一个原有类的子类,在其中定义可以访问protected成员的接口。想要访问对象的成员时,将原有类类型的指针转变为子类类型的指针,然