Spark SQL之External DataSource外部数据源

2024-03-21 06:59

本文主要是介绍Spark SQL之External DataSource外部数据源,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

http://blog.csdn.net/oopsoom/article/details/42061077

一、Spark SQL External DataSource简介

  随着Spark1.2的发布,Spark SQL开始正式支持外部数据源。Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现。

  这使得Spark SQL支持了更多的类型数据源,如json, parquet, avro, csv格式。只要我们愿意,我们可以开发出任意的外部数据源来连接到Spark SQL。之前大家说的支持HBASE,Cassandra都可以用外部数据源的方式来实现无缝集成。 


二、External DataSource

  拿Spark1.2的json为例,它支持已经改为了实现了外部数据源的接口方式。所以除了先前我们操作json的API,又多了一种DDL创建外部数据源的方式。 

  parquetFile的操作方式也如下类似,就不一一列举了。

2.1 SQL方式 CREATE TEMPORARY TABLE USING OPTIONS

在Spark1.2之后,支持了一种CREATE TEMPORARY TABLE USING OPTIONS的DDL语法来创建外部数据源的表。

[sql]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. CREATE TEMPORARY TABLE jsonTable  
  2. USING org.apache.spark.sql.json  
  3. OPTIONS (  
  4.   path '/path/to/data.json'  
  5. )  

1、操作示例:

我们拿example下people.json文件来做示例。

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. shengli-mac$ cat /Users/shengli/git_repos/spark/examples/src/main/resources/people.json  
  2. {"name":"Michael"}  
  3. {"name":"Andy""age":30}  
  4. {"name":"Justin""age":19}  
2、DDL创建外部数据源表jsonTable:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. 14/12/21 16:32:14 INFO repl.SparkILoop: Created spark context..  
  2. Spark context available as sc.  
  3.   
  4. scala> import org.apache.spark.sql.SQLContext  
  5. import org.apache.spark.sql.SQLContext  
  6.   
  7. scala> val sqlContext  = new SQLContext(sc)  
  8. sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@7be62956  
  9.   
  10. scala> import sqlContext._  
  11. import sqlContext._  
  12. //创建jsonTable外部数据源表,并且指定其数数据源文件是people.json这个json文件,同时指定使用org.apache.spark.sql.json该类型的隐式转化类(这个后续文章会介绍)  
  13. scala> val jsonDDL = s"""  
  14.      | |CREATE TEMPORARY TABLE jsonTable  
  15.      | |USING org.apache.spark.sql.json  
  16.      | |OPTIONS (  
  17.      | | path  'file:///Users/shengli/git_repos/spark/examples/src/main/resources/people.json'  
  18.      | |)""".stripMargin  
  19. jsonDDL: String =   
  20. "  
  21. CREATE TEMPORARY TABLE jsonTable  
  22. USING org.apache.spark.sql.json  
  23. OPTIONS (  
  24.  path  'file:///Users/shengli/git_repos/spark/examples/src/main/resources/people.json'  
  25. )"  
  26.   
  27. scala> sqlContext.sql(jsonDDL).collect() //创建该外部数据源表jsonTable  
  28. 14/12/21 16:44:27 INFO scheduler.DAGScheduler: Job 0 finished: reduce at JsonRDD.scala:57, took 0.204461 s  
  29. res0: Array[org.apache.spark.sql.Row] = Array()  

我们来看下该schemaRDD:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. scala> val jsonSchema = sqlContext.sql(jsonDDL)  
  2. jsonSchema: org.apache.spark.sql.SchemaRDD =   
  3. SchemaRDD[7] at RDD at SchemaRDD.scala:108  
  4. == Query Plan ==  
  5. == Physical Plan ==  
  6. ExecutedCommand (CreateTableUsing jsonTable, org.apache.spark.sql.json, Map(path -> file:///Users/shengli/git_repos/spark/examples/src/main/resources/people.json))  

ExecutedCommand来取把数据用spark.sql.json的方式从path加载到jsonTable中。涉及到得类是CreateTableUsing,后续源码分析会讲到。

各阶段执行计划情况:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. scala> sqlContext.sql("select * from jsonTable").queryExecution  
  2. res6: org.apache.spark.sql.SQLContext#QueryExecution =   
  3. == Parsed Logical Plan ==  
  4. 'Project [*]  
  5.  'UnresolvedRelation None, jsonTable, None  
  6.   
  7. == Analyzed Logical Plan ==  
  8. Project [age#0,name#1]  
  9.  Relation[age#0,name#1] JSONRelation(file:///Users/shengli/git_repos/spark/examples/src/main/resources/people.json,1.0)  
  10.   
  11. == Optimized Logical Plan ==  
  12. Relation[age#0,name#1] JSONRelation(file:///Users/shengli/git_repos/spark/examples/src/main/resources/people.json,1.0)  
  13.   
  14. == Physical Plan ==  
  15. PhysicalRDD [age#0,name#1], MapPartitionsRDD[27] at map at JsonRDD.scala:47  
  16.   
  17. Code Generation: false  
  18. == RDD ==  

至此,创建加载外部数据源到Spark SQL已经完成。

我们可以使用任何我们希望的方式来查询:

3、SQL查询方式:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. scala> sqlContext.sql("select * from jsonTable")  
  2. 21 16:52:13 INFO spark.SparkContext: Created broadcast 6 from textFile at JSONRelation.scala:39  
  3. res2: org.apache.spark.sql.SchemaRDD =   
  4. SchemaRDD[20] at RDD at SchemaRDD.scala:108  
  5. == Query Plan ==  
  6. == Physical Plan ==  
  7. PhysicalRDD [age#2,name#3], MapPartitionsRDD[24] at map at JsonRDD.scala:47  

执行查询:

scala> sqlContext.sql("select * from jsonTable").collect()
res1: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])
 

2.2 API方式

sqlContext.jsonFile

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. scala> val json = sqlContext.jsonFile("file:///Users/shengli/git_repos/spark/examples/src/main/resources/people.json")  
  2. scala> json.registerTempTable("jsonFile")  
  3.   
  4. scala> sql("select * from jsonFile").collect()  
  5. res2: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])  

  总的来说,Spark SQL 在努力的向各种数据源靠拢,希望让Spark SQL能和其它许多类型的数据源的集成。

  Spark SQL提供的了一种创建加载外部数据源表的DDL语法:CREATE TEMPORARY TABLE USING OPTIONS

  Spark SQL对外开放了一系列的扩展接口,能够通过实现这些接口,来实现对不同的数据源接入,如avro, csv, parquet,json, etc


三、Sources包核心

    Spark SQL在Spark1.2中提供了External DataSource API,开发者可以根据接口来实现自己的外部数据源,如avro, csv, json, parquet等等。

    在Spark SQL源代码的org/spark/sql/sources目录下,我们会看到关于External DataSource的相关代码。这里特别介绍几个:

    1、DDLParser 

    专门负责解析外部数据源SQL的SqlParser,解析create temporary table xxx using options (key 'value', key 'value') 创建加载外部数据源表的语句。

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. protected lazy val createTable: Parser[LogicalPlan] =  
  2.    CREATE ~ TEMPORARY ~ TABLE ~> ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {  
  3.      case tableName ~ provider ~ opts =>  
  4.        CreateTableUsing(tableName, provider, opts)  
  5.    }  

    2、CreateTableUsing

   一个RunnableCommand,通过反射从外部数据源lib中实例化Relation,然后注册到为temp table。

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. private[sql] case class CreateTableUsing(  
  2.     tableName: String,  
  3.     provider: String,  // org.apache.spark.sql.json   
  4.     options: Map[String, String]) extends RunnableCommand {  
  5.   
  6.   def run(sqlContext: SQLContext) = {  
  7.     val loader = Utils.getContextOrSparkClassLoader  
  8.     val clazz: Class[_] = try loader.loadClass(provider) catch { //do reflection  
  9.       case cnf: java.lang.ClassNotFoundException =>  
  10.         try loader.loadClass(provider + ".DefaultSource"catch {  
  11.           case cnf: java.lang.ClassNotFoundException =>  
  12.             sys.error(s"Failed to load class for data source: $provider")  
  13.         }  
  14.     }  
  15.     val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider] //json包DefaultDataSource  
  16.     val relation = dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))//创建JsonRelation  
  17.   
  18.     sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)//注册  
  19.     Seq.empty  
  20.   }  
  21. }  

    2、DataSourcesStrategy

    在 Strategy 一文中,我已讲过Streategy的作用,用来Plan生成物理计划的。这里提供了一种专门为了解析外部数据源的策略。

    最后会根据不同的BaseRelation生产不同的PhysicalRDD。不同的BaseRelation的scan策略下文会介绍。

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. private[sql] object DataSourceStrategy extends Strategy {  
  2.   def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {  
  3.     case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: CatalystScan)) =>  
  4.       pruneFilterProjectRaw(  
  5.         l,  
  6.         projectList,  
  7.         filters,  
  8.         (a, f) => t.buildScan(a, f)) :: Nil  
  9.     ......  
  10.     case l @ LogicalRelation(t: TableScan) =>  
  11.       execution.PhysicalRDD(l.output, t.buildScan()) :: Nil  
  12.   
  13.     case _ => Nil  
  14.   }  
    3、interfaces.scala  

    该文件定义了一系列可扩展的外部数据源接口,对于想要接入的外部数据源,我们只需实现该接口即可。里面比较重要的trait RelationProvider 和 BaseRelation,下文会详细介绍。

    4、filters.scala

    该Filter定义了如何在加载外部数据源的时候,就进行过滤。注意哦,是加载外部数据源到Table里的时候,而不是Spark里进行filter。这个有点像hbase的coprocessor,查询过滤在Server上就做了,不在Client端做过滤。

   5、LogicalRelation

   封装了baseRelation,继承了catalyst的LeafNode,实现MultiInstanceRelation。

        

四、External DataSource注册流程

用spark sql下sql/json来做示例, 画了一张流程图,如下:



注册外部数据源的表的流程:
1、提供一个外部数据源文件,比如json文件。
2、提供一个实现了外部数据源所需要的interfaces的类库,比如sql下得json包,在1.2版本后改为了External Datasource实现。
3、引入SQLContext,使用DDL创建表,如create temporary table xxx using options (key 'value', key 'value') 
4、External Datasource的DDLParser将对该SQL进行Parse
5、Parse后封装成为一个CreateTableUsing类的对象。该类是一个RunnableCommand,其run方法会直接执行创建表语句。
6、该类会通过反射来创建一个org.apache.spark.sql.sources.RelationProvider,该trait定义要createRelation,如json,则创建JSONRelation,若avro,则创建AvroRelation。
7、得到external releation后,直接调用SQLContext的baseRelationToSchemaRDD转换为SchemaRDD
8、最后registerTempTable(tableName) 来注册为Table,可以用SQL来查询了。

五、External DataSource解析流程

先看图,图如下:


Spark SQL解析SQL流程如下:
1、Analyzer通过Rule解析,将UnresolvedRelation解析为JsonRelation。
2、通过Parse,Analyzer,Optimizer最后得到JSONRelation(file:///path/to/shengli.json,1.0)  
3、通过sources下得DataSourceStrategy将LogicalPlan映射到物理计划PhysicalRDD。
4、PhysicalRDD里包含了如何查询外部数据的规则,可以调用execute()方法来执行Spark查询。

六、External Datasource Interfaces

在第一节我已经介绍过,主要的interfaces,主要看一下BaseRelation和RelationProvider。
如果我们要实现一个外部数据源,比如avro数据源,支持spark sql操作avro file。那么久必须定义AvroRelation来继承BaseRelation。同时也要实现一个RelationProvider。


BaseRelation:
是外部数据源的抽象,里面存放了 schema的映射,和 如何scan数据的规则
[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. abstract class BaseRelation {  
  2.   def sqlContext: SQLContext  
  3.   def schema: StructType  
[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. abstract class PrunedFilteredScan extends BaseRelation {  
  2.   def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]  
  3. }  

1、schema我们如果自定义Relation,必须重写schema,就是我们必须描述对于外部数据源的Schema。
2、buildScan我们定义如何查询外部数据源,提供了4种Scan的策略,对应4种BaseRelation。


我们支持4种BaseRelation,分为TableScan, PrunedScan,PrunedFilterScan,CatalystScan。
1、 TableScan
默认的Scan策略。
2、 PrunedScan
这里可以传入指定的列,requiredColumns,列裁剪,不需要的列不会从外部数据源加载。
3、 PrunedFilterScan
在列裁剪的基础上,并且加入Filter机制,在加载数据也的时候就进行过滤,而不是在客户端请求返回时做Filter。
4、 CatalystScan
Catalyst的支持传入expressions来进行Scan。支持列裁剪和Filter。

RelationProvider:
我们要实现这个,接受Parse后传入的参数,来生成对应的External Relation,就是一个反射生产外部数据源Relation的接口。
[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. trait RelationProvider {  
  2.   /** 
  3.    * Returns a new base relation with the given parameters. 
  4.    * Note: the parameters' keywords are case insensitive and this insensitivity is enforced 
  5.    * by the Map that is passed to the function. 
  6.    */  
  7.   def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation  
  8. }  

七、External Datasource定义示例

在Spark1.2之后,json和parquet也改为通过实现External API来进行外部数据源查询的。
下面以json的外部数据源定义为示例,说明是如何实现的:


1、JsonRelation

定义处理对于json文件的,schema和Scan策略,均基于JsonRDD,细节可以自行阅读JsonRDD。

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. private[sql] case class JSONRelation(fileName: String, samplingRatio: Double)(  
  2.     @transient val sqlContext: SQLContext)  
  3.   extends TableScan {  
  4.   
  5.   private def baseRDD = sqlContext.sparkContext.textFile(fileName) //读取json file  
  6.   
  7.   override val schema =  
  8.     JsonRDD.inferSchema(  // jsonRDD的inferSchema方法,能自动识别json的schema,和类型type。  
  9.       baseRDD,  
  10.       samplingRatio,  
  11.       sqlContext.columnNameOfCorruptRecord)  
  12.   
  13.   override def buildScan() =  
  14.     JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord) //这里还是JsonRDD,调用jsonStringToRow查询返回Row  
  15. }  

2、DefaultSource
parameters中可以获取到options中传入的path等自定义参数。
这里接受传入的参数,来狗仔JsonRelation。
[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. private[sql] class DefaultSource extends RelationProvider {  
  2.   /** Returns a new base relation with the given parameters. */  
  3.   override def createRelation(  
  4.       sqlContext: SQLContext,  
  5.       parameters: Map[String, String]): BaseRelation = {  
  6.     val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))  
  7.     val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)  
  8.   
  9.     JSONRelation(fileName, samplingRatio)(sqlContext)  
  10.   }  
  11. }  

八、总结
External DataSource源码分析下来,可以总结为3部分。
1、外部数据源的注册流程
2、外部数据源Table查询的计划解析流程
3、如何自定义一个外部数据源,重写BaseRelation定义外部数据源的schema和scan的规则。定义RelationProvider,如何生成外部数据源Relation。
External Datasource此部分API还有可能在后续的build中改动,目前只是涉及到了查询,关于其它的操作还未涉及。
——EOF——

原创文章,转载请注明:

转载自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

本文链接地址:http://blog.csdn.net/oopsoom/article/details/42064075  

注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

image



这篇关于Spark SQL之External DataSource外部数据源的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/832040

相关文章

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

MySQL高性能优化规范

前言:      笔者最近上班途中突然想丰富下自己的数据库优化技能。于是在查阅了多篇文章后,总结出了这篇! 数据库命令规范 所有数据库对象名称必须使用小写字母并用下划线分割 所有数据库对象名称禁止使用mysql保留关键字(如果表名中包含关键字查询时,需要将其用单引号括起来) 数据库对象的命名要能做到见名识意,并且最后不要超过32个字符 临时库表必须以tmp_为前缀并以日期为后缀,备份

[MySQL表的增删改查-进阶]

🌈个人主页:努力学编程’ ⛅个人推荐: c语言从初阶到进阶 JavaEE详解 数据结构 ⚡学好数据结构,刷题刻不容缓:点击一起刷题 🌙心灵鸡汤:总有人要赢,为什么不能是我呢 💻💻💻数据库约束 🔭🔭🔭约束类型 not null: 指示某列不能存储 NULL 值unique: 保证某列的每行必须有唯一的值default: 规定没有给列赋值时的默认值.primary key:

MySQL-CRUD入门1

文章目录 认识配置文件client节点mysql节点mysqld节点 数据的添加(Create)添加一行数据添加多行数据两种添加数据的效率对比 数据的查询(Retrieve)全列查询指定列查询查询中带有表达式关于字面量关于as重命名 临时表引入distinct去重order by 排序关于NULL 认识配置文件 在我们的MySQL服务安装好了之后, 会有一个配置文件, 也就

Java 连接Sql sever 2008

Java 连接Sql sever 2008 /Sql sever 2008 R2 import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; public class TestJDBC

Mysql BLOB类型介绍

BLOB类型的字段用于存储二进制数据 在MySQL中,BLOB类型,包括:TinyBlob、Blob、MediumBlob、LongBlob,这几个类型之间的唯一区别是在存储的大小不同。 TinyBlob 最大 255 Blob 最大 65K MediumBlob 最大 16M LongBlob 最大 4G