本文主要是介绍SparkSQL修仙学习04,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Spark SQL是Spark用来处理结构化数据的一个模块.
在SparkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。他们和RDD有什么区别呢?首先从版本的产生上来看:
RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)
效率逐个变高
sparksql实操
1.SparkSession操作步骤
object Demo1 {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("Demo1").master("local[*]")// .enableHiveSupport()//支持hive的特定操作.getOrCreate()//读取数据,json格式val pdf = spark.read.json("C:\\Users\\70201\\Desktop\\sql\\people.json")println("------获取表中的元数据信息-----------")pdf.printSchema()println("------获取表中的数据信息-----------")pdf.show()println("------筛选表中的个别字段-----------")pdf.createOrReplaceTempView("people")var sql="""|select|name,age|from|people|""".stripMarginspark.sql(sql).show()println("------条件查询-----------")sql="""|select|name,age|from|people|where name="肖楚轩"|""".stripMarginspark.sql(sql).show()println("------统计-----------")sql="""|select|count(*)|from|people|""".stripMarginspark.sql(sql).show()println("------复杂统计统计-----------")sql="""|select|province,count(name) Count,max(age) maxAge|from|people|group by province|""".stripMarginspark.sql(sql).show()spark.stop()}
}
2. DataFrame的构建
/*** SparkSQL中的编程模型主要有:DataFrame和Dataset* DataFrame的构建分为了两种方式* 基于反射的方式构建* 基于动态编程的方式构建* Dataset的构建方式是和dataframe差不多一样*/
object Demo2 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().appName("Demo2").master("local[*]").getOrCreate()//List[person]>>List[Row]val rows = List(Person1("韩香彧", 17, 167.5),Person1("石云涛", 88, 147.5),Person1("刘炳文", 20, 170.5),Person1("乔钰芹", 16, 167.5)).map(person => {Row(person.name,person.age,person.height)})//List[Row]>>java的List[Row]val rows1 = JavaConversions.seqAsJavaList(rows)val structType = StructType(Array(StructField("name", DataTypes.StringType, false),StructField("age", DataTypes.IntegerType, false),StructField("height", DataTypes.DoubleType, false)))//java的List[Row]val df = sparkSession.createDataFrame(rows1, structType)df.showcreatByBean(sparkSession)sparkSession.stop()}def creatByBean(spakSession:SparkSession): Unit ={//Applies a schema to a List of Java Beansval persons1 = new util.ArrayList[Person]persons1.add(new Person("韩香彧", 17, 167.5))persons1.add(new Person("石云涛", 88, 147.5))persons1.add(new Person("刘炳文", 20, 170.5))//需要java的listval pdf = spakSession.createDataFrame(persons1, classOf[Person])pdf.show()}
}
case class Person1(name: String, age: Int, height: Double)
3.Dataset的构建
/*** * Dataset的构建* ** * dataset在构造的时候需要两个条件:* * 第一导入隐式转换:import spark.implicits._* * 第二要求封装数据类型为Product的子类,最好就是case class**/*/
object Demo3 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("local[*]").appName("Demo3").getOrCreate()//要求封装数据类型为Product的子类(int.string等和case class),最好就是case classval list = List(Student("韩香彧", 17, 167.5),Student("石云涛", 88, 147.5),Student("刘炳文", 20, 170.5),Student("乔钰芹", 16, 167.5))//scala的list,隐式转换import sparkSession.implicits._val value = sparkSession.createDataset(list)value.show()}
}
case class Student(name: String, age: Int, height: Double)
4.编程模型之间的转换
/*** 编程模型之间的互相转换:* rdd--dataframe/dataset* dataframe-->rdd/dataset* dataset=-->dataframe/rdd*/
object Demo4 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("local[*]").appName("Demo4").getOrCreate()val list = List(Student("韩香彧", 17, 167.5),Student("石云涛", 88, 147.5),Student("刘炳文", 20, 170.5),Student("乔钰芹", 16, 167.5))val value:RDD[Student] = sparkSession.sparkContext.parallelize(list)//隐式转换import sparkSession.implicits._println("rdd--->dataframe")val df = value.toDF()df.show()println("rdd--->dataSet")val ds = value.toDS()ds.show()println("dataframe--->rdd")//dataframe的数据是一个一个的Rowval rdd = df.rddrdd.foreach{case Row(name,age,height)=>{println(s"${name},${age},${height}")}}println("dataframe--->dataSet")println("""| dataframe 不能直接转化为Dataset| 为什么?我们前了解到dataframe中的泛型是Row,那么转化为Dataset其实就成了Dataset[Row]| 由于Row并不是Product的子类,并没有提供一个Encoder所以不能作为dataset的数据类型| 故而,不可直接转化为dataset|""".stripMargin)println("dataSet--->dataframe")ds.toDF().show()println("dataSet--->rdd")val rdd1 = ds.rddrdd1.foreach(stu=>{println(s"${stu.name},${stu.age},${stu.height}")})sparkSession.stop()}
}
5.数据写出保存落地
//数据落地
object Demo6 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("local[*]").appName("Demo5").getOrCreate()import sparkSession.implicits._val ds = sparkSession.read.textFile("C:\\Users\\70201\\Desktop\\sql\\people.txt").map(line=>{val strings = line.split(",")val name = strings(0).trim//去左右两边空格val age = strings(1).trim.toIntinfo(name,age)})/*数据的落地SaveMode:ErrorIfExists 默认的Append 追加Overwrite 覆盖Ignore 忽略,如果目录已经存在,则忽略,如果目录不存在,则执行创建*/ds.write.mode(SaveMode.Ignore).save("C:\\Users\\70201\\Desktop\\test\\save")ds.write.mode(SaveMode.Ignore).json("C:\\Users\\70201\\Desktop\\test\\json")
// csv导出默认是,
// Michael,29
// Andy,30
// Justin,19//可以指定输出格式option("header","true").option("delimiter","|")ds.write.mode(SaveMode.Overwrite).option("header","true").option("delimiter","|").csv("C:\\Users\\70201\\Desktop\\test\\csv")var url="jdbc:mysql://localhost:3306/test"var table="info"//最好先建表,虽然此处会自动生成,但表字段的数据类型给的不是很完美varchar--Textval properties = new Properties()properties.put("user","root")properties.put("password","123456")ds.write.jdbc(url,table,properties)sparkSession.stop()}
}
6.数据加载
/*** SparkSQL对数据的统一加载和落地操作* 加载使用* read.load* not a Parquet file. expected magic numbe ==> 默认加载的文件格式要求是parquet,是一个二进制的列式存储格式文件,twitter公司开源到apache的* option* https://docs.databricks.com/data/data-sources/aws/amazon-s3-select.html#csv-specific-options* 落地使用* write.save*/
//数据加载
object Dmeo5 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("local[*]").appName("Demo5").getOrCreate()var df = sparkSession.read.load("C:\\Users\\70201\\Desktop\\sql\\sqldf.parquet")df.show()//对于复杂的操作,需要设置一些option选项来完成过滤或者修正df = sparkSession.read.option("header","true").option("delimiter","|")//对不规范的csv要指定分隔符,让程序怎么切分成对应字段.csv("C:\\Users\\70201\\Desktop\\sql\\student.csv")df = sparkSession.read.csv("C:\\Users\\70201\\Desktop\\sql\\country.csv").toDF("id","country","code")//对没有表头的csv可以转成df并指定字段名df.show()//orc是一种列式存储文件,式rc的升级版本呢,是facebook用来存储数据的文件格式df=sparkSession.read.orc("C:\\Users\\70201\\Desktop\\sql\\student.orc")df.show()df=sparkSession.read.json("C:\\Users\\70201\\Desktop\\sql\\product_info.json")df.show()import sparkSession.implicits._//隐式转换,为了解决ds的encoder//text文件认为只有一列数据,我们可以拆分val ds = sparkSession.read.textFile("C:\\Users\\70201\\Desktop\\sql\\people.txt").map(line=>{val strings = line.split(",")val name = strings(0).trim//去左右两边空格val age = strings(1).trim.toIntinfo(name,age)})ds.show()
// url: String,
// table: String,
// columnName: String,var url="jdbc:mysql://localhost:3306/test"var table="wordcounts"val properties = new Properties()//a "user" and "password" propertyproperties.put("user","root")properties.put("password","123456")df=sparkSession.read.jdbc(url,table,properties)df.show()sparkSession.stop()}
}
case class info(name: String,age:Int)
7.sql操作hive表
打包插件
/*spark和hive整合时需要注意的地方:
* 1、为了能够让spark正常的解析hive的仓库为止,需要将hive-site.xml传递给spark,加载到spark的classpath(resources目录)中
* 一种通过直接将hive-site.xml放到spark的conf目录下面
* 另外一种就是通过程序的方式放到classpath即可(第二种)
* 2、在hive-site.xml中最重要的就是一个参数
* <property>
* <name>hive.metastore.warehouse.dir</name>
* <value>/user/hive/warehouse</value>
* </property>
* 如果没有配置这个参数,就会在当前程序的当前目录下面指定hive的warehouse,而真正的数据在hdfs里卖弄,执行的时候会找不到数据:
* except: file:/// hdfs://
* 3、同时如果配置hadoop高可用得需要解析出hdfs的具体路径,所以也需要将hdfs-site.xml和core-site.xml也打到classpath下面
* 4、得需要将mysql的驱动包打入classpath中
*/
//1.导入打包插件,将spark-hive依赖取消,因为linux的spark有支持,省的打包时文件过大
//2.对父module打包到本地install,因为有spark_sql的相关依赖spark_common,spark_sql打包package会先去本地仓库找spark_common,找不到再去远程仓库,
//3.上传linux是带有依赖的jar
//4.编写执行脚本,要去掉local,因为程序有了,要把enableHiveSupport()打开
//5.执行
object Demo1 {def main(args: Array[String]): Unit = {if(args==null||args.length!=2){println("null")System.exit(-1)}val Array(basic,info)=argsval sparkSession = SparkSession.builder().appName("Demo1").master("local[*]").enableHiveSupport()//支持hive的特定操作.getOrCreate()println("1.创建数据库")sparkSession.sql("""|create database if not exists info|""".stripMargin)println("2.创建表teacher_basic")sparkSession.sql("""|create table if not exists info.teacher_basic(|name string,|age int,|merry boolean,|course int|)|row format delimited fields terminated by ','|""".stripMargin)println("3.创建表teacher_info")sparkSession.sql("""|create table if not exists info.teacher_info(|name string,|height int|)|row format delimited fields terminated by ','|""".stripMargin)println("4.加载数据到表teacher_basic")sparkSession.sql(s"""|load data inpath '${basic}' into table info.teacher_basic|""".stripMargin)println("5.加载数据到表teacher_info")sparkSession.sql(s"""|load data inpath '${info}' into table info.teacher_info|""".stripMargin)println("6.关联两张表")val df = sparkSession.sql("""|select|b.name,b.age,b.merry,b.course,i.height|from info.teacher_basic b|join info.teacher_info i|on b.name=i.name|""".stripMargin)println("7.将关联数据写入到表teacher")df.write.saveAsTable("info.teacher")println("结束!")sparkSession.stop()}
}
8.自定义UDF函数
object Demo2 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().appName("Demo2").master("local[*]").getOrCreate()val df = sparkSession.read.json("C:\\Users\\70201\\Desktop\\sql\\people.json")//3.建视图/*注册一张临时表global在整个应用范围内有效,不带的话只在当前sparkSession内有效replace如果该视图存在,则会覆盖,否则新建*/df.createOrReplaceTempView("people")//2.注册udf函数 返回值类型int,输入类型stringsparkSession.udf.register[Int,String]("mylength",str=>myle(str))//4.使用,执行之前要把之前hive.sql依赖解开,把hive-site.mxl去掉,否则会加载到hdfs找数据sparkSession.sql("""|select|name,|mylength(name) mylen,|length(name) len|from|people|""".stripMargin).show()sparkSession.stop()}//1.自定义udf函数def myle(str:String):Int={str.length}
}
9.自定义UDAF函数
object Demo3 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().appName("Demo3").master("local[*]").getOrCreate()val df = sparkSession.read.json("C:\\Users\\70201\\Desktop\\sql\\people.json")df.createOrReplaceTempView("student")sparkSession.udf.register("myavg",new AvgHeight)sparkSession.sql("""|select|round(avg(height),1) avg,|round(myavg(height),1) myavg|from|student|""".stripMargin).show()sparkSession.stop()}
}
自定义类
class AvgHeight extends UserDefinedAggregateFunction{/*该udaf输入参数的类型说明*/override def inputSchema: StructType = StructType(List(StructField("height", DataTypes.DoubleType, false)))/*为了计算聚合结果所需要的涉及到的临时变量的类型平均数=总数/个数,这里面涉及到了2个临时变量,总数,个数*/override def bufferSchema: StructType = StructType(List(StructField("sum", DataTypes.DoubleType, false),StructField("count", DataTypes.IntegerType, false)))/*该udaf返回值的数据类型*/override def dataType: DataType = DataTypes.DoubleType/*确定性,相同的输入,其返回值是确定,不会有其他可能,称之为确定性,即返回为truegiven the same input,always return the same output.*/override def deterministic: Boolean = true//初始化override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer.update(0, 0.0)buffer.update(1, 0)}//局部聚合,new input data from `input`,This is called once per input row.override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer.update(0, buffer.getDouble(0) + input.getDouble(0))buffer.update(1, buffer.getInt(1) + 1)}//分区间的全局聚合override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1.update(0, buffer1.getDouble(0) + buffer2.getDouble(0))buffer1.update(1, buffer1.getInt(1) + buffer2.getInt(1))}override def evaluate(buffer: Row): Double = {buffer.getDouble(0) / buffer.getInt(1)}
}
10.开窗函数,分组求TOPN
object Demo4 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().appName("Demo4").master("local[*]").getOrCreate()val df = sparkSession.read.json("C:\\Users\\70201\\Desktop\\sql\\people.json")df.createOrReplaceTempView("student")sparkSession.sql("""|select|tmp.*|from|(select|name,age,province,height,|row_number() over(partition by province order by height desc) rank|from|student) tmp|where tmp.rank<3|""".stripMargin).show()sparkSession.stop()}
}
11.数据倾斜(重点!!!)
这篇关于SparkSQL修仙学习04的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!