SparkSQL修仙学习04

2024-01-08 20:50
文章标签 学习 04 修仙 sparksql

本文主要是介绍SparkSQL修仙学习04,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spark SQL是Spark用来处理结构化数据的一个模块.

在SparkSQL中Spark为我们提供了两个新的抽象,分别是DataFrame和DataSet。他们和RDD有什么区别呢?首先从版本的产生上来看:
RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)
效率逐个变高
在这里插入图片描述

sparksql实操

1.SparkSession操作步骤

object Demo1 {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("Demo1").master("local[*]")//            .enableHiveSupport()//支持hive的特定操作.getOrCreate()//读取数据,json格式val pdf = spark.read.json("C:\\Users\\70201\\Desktop\\sql\\people.json")println("------获取表中的元数据信息-----------")pdf.printSchema()println("------获取表中的数据信息-----------")pdf.show()println("------筛选表中的个别字段-----------")pdf.createOrReplaceTempView("people")var sql="""|select|name,age|from|people|""".stripMarginspark.sql(sql).show()println("------条件查询-----------")sql="""|select|name,age|from|people|where name="肖楚轩"|""".stripMarginspark.sql(sql).show()println("------统计-----------")sql="""|select|count(*)|from|people|""".stripMarginspark.sql(sql).show()println("------复杂统计统计-----------")sql="""|select|province,count(name) Count,max(age) maxAge|from|people|group by province|""".stripMarginspark.sql(sql).show()spark.stop()}
}

2. DataFrame的构建

/*** SparkSQL中的编程模型主要有:DataFrame和Dataset* DataFrame的构建分为了两种方式*      基于反射的方式构建*      基于动态编程的方式构建* Dataset的构建方式是和dataframe差不多一样*/
object Demo2 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().appName("Demo2").master("local[*]").getOrCreate()//List[person]>>List[Row]val rows = List(Person1("韩香彧", 17, 167.5),Person1("石云涛", 88, 147.5),Person1("刘炳文", 20, 170.5),Person1("乔钰芹", 16, 167.5)).map(person => {Row(person.name,person.age,person.height)})//List[Row]>>java的List[Row]val rows1 = JavaConversions.seqAsJavaList(rows)val structType = StructType(Array(StructField("name", DataTypes.StringType, false),StructField("age", DataTypes.IntegerType, false),StructField("height", DataTypes.DoubleType, false)))//java的List[Row]val df = sparkSession.createDataFrame(rows1, structType)df.showcreatByBean(sparkSession)sparkSession.stop()}def creatByBean(spakSession:SparkSession): Unit ={//Applies a schema to a List of Java Beansval persons1 = new util.ArrayList[Person]persons1.add(new Person("韩香彧", 17, 167.5))persons1.add(new Person("石云涛", 88, 147.5))persons1.add(new Person("刘炳文", 20, 170.5))//需要java的listval pdf = spakSession.createDataFrame(persons1, classOf[Person])pdf.show()}
}
case class Person1(name: String, age: Int, height: Double)

3.Dataset的构建

 /***         * Dataset的构建*         **         * dataset在构造的时候需要两个条件:*         *  第一导入隐式转换:import spark.implicits._*         *  第二要求封装数据类型为Product的子类,最好就是case class**/*/
object Demo3 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("local[*]").appName("Demo3").getOrCreate()//要求封装数据类型为Product的子类(int.string等和case class),最好就是case classval list = List(Student("韩香彧", 17, 167.5),Student("石云涛", 88, 147.5),Student("刘炳文", 20, 170.5),Student("乔钰芹", 16, 167.5))//scala的list,隐式转换import sparkSession.implicits._val value = sparkSession.createDataset(list)value.show()}
}
case class Student(name: String, age: Int, height: Double)

4.编程模型之间的转换

/*** 编程模型之间的互相转换:*   rdd--dataframe/dataset*   dataframe-->rdd/dataset*   dataset=-->dataframe/rdd*/
object Demo4 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("local[*]").appName("Demo4").getOrCreate()val list = List(Student("韩香彧", 17, 167.5),Student("石云涛", 88, 147.5),Student("刘炳文", 20, 170.5),Student("乔钰芹", 16, 167.5))val value:RDD[Student] = sparkSession.sparkContext.parallelize(list)//隐式转换import sparkSession.implicits._println("rdd--->dataframe")val df = value.toDF()df.show()println("rdd--->dataSet")val ds = value.toDS()ds.show()println("dataframe--->rdd")//dataframe的数据是一个一个的Rowval rdd = df.rddrdd.foreach{case Row(name,age,height)=>{println(s"${name},${age},${height}")}}println("dataframe--->dataSet")println("""| dataframe 不能直接转化为Dataset| 为什么?我们前了解到dataframe中的泛型是Row,那么转化为Dataset其实就成了Dataset[Row]| 由于Row并不是Product的子类,并没有提供一个Encoder所以不能作为dataset的数据类型| 故而,不可直接转化为dataset|""".stripMargin)println("dataSet--->dataframe")ds.toDF().show()println("dataSet--->rdd")val rdd1 = ds.rddrdd1.foreach(stu=>{println(s"${stu.name},${stu.age},${stu.height}")})sparkSession.stop()}
}

5.数据写出保存落地

//数据落地
object Demo6 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("local[*]").appName("Demo5").getOrCreate()import sparkSession.implicits._val ds = sparkSession.read.textFile("C:\\Users\\70201\\Desktop\\sql\\people.txt").map(line=>{val strings = line.split(",")val name = strings(0).trim//去左右两边空格val age = strings(1).trim.toIntinfo(name,age)})/*数据的落地SaveMode:ErrorIfExists 默认的Append        追加Overwrite     覆盖Ignore        忽略,如果目录已经存在,则忽略,如果目录不存在,则执行创建*/ds.write.mode(SaveMode.Ignore).save("C:\\Users\\70201\\Desktop\\test\\save")ds.write.mode(SaveMode.Ignore).json("C:\\Users\\70201\\Desktop\\test\\json")
//  csv导出默认是,
//    Michael,29
//    Andy,30
//    Justin,19//可以指定输出格式option("header","true").option("delimiter","|")ds.write.mode(SaveMode.Overwrite).option("header","true").option("delimiter","|").csv("C:\\Users\\70201\\Desktop\\test\\csv")var url="jdbc:mysql://localhost:3306/test"var table="info"//最好先建表,虽然此处会自动生成,但表字段的数据类型给的不是很完美varchar--Textval properties = new Properties()properties.put("user","root")properties.put("password","123456")ds.write.jdbc(url,table,properties)sparkSession.stop()}
}

6.数据加载

/*** SparkSQL对数据的统一加载和落地操作*  加载使用*      read.load*           not a Parquet file. expected magic numbe ==> 默认加载的文件格式要求是parquet,是一个二进制的列式存储格式文件,twitter公司开源到apache的*      option*          https://docs.databricks.com/data/data-sources/aws/amazon-s3-select.html#csv-specific-options*  落地使用*      write.save*/
//数据加载
object Dmeo5 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("local[*]").appName("Demo5").getOrCreate()var df = sparkSession.read.load("C:\\Users\\70201\\Desktop\\sql\\sqldf.parquet")df.show()//对于复杂的操作,需要设置一些option选项来完成过滤或者修正df = sparkSession.read.option("header","true").option("delimiter","|")//对不规范的csv要指定分隔符,让程序怎么切分成对应字段.csv("C:\\Users\\70201\\Desktop\\sql\\student.csv")df = sparkSession.read.csv("C:\\Users\\70201\\Desktop\\sql\\country.csv").toDF("id","country","code")//对没有表头的csv可以转成df并指定字段名df.show()//orc是一种列式存储文件,式rc的升级版本呢,是facebook用来存储数据的文件格式df=sparkSession.read.orc("C:\\Users\\70201\\Desktop\\sql\\student.orc")df.show()df=sparkSession.read.json("C:\\Users\\70201\\Desktop\\sql\\product_info.json")df.show()import sparkSession.implicits._//隐式转换,为了解决ds的encoder//text文件认为只有一列数据,我们可以拆分val ds = sparkSession.read.textFile("C:\\Users\\70201\\Desktop\\sql\\people.txt").map(line=>{val strings = line.split(",")val name = strings(0).trim//去左右两边空格val age = strings(1).trim.toIntinfo(name,age)})ds.show()
//    url: String,
//    table: String,
//    columnName: String,var url="jdbc:mysql://localhost:3306/test"var table="wordcounts"val properties = new Properties()//a "user" and "password" propertyproperties.put("user","root")properties.put("password","123456")df=sparkSession.read.jdbc(url,table,properties)df.show()sparkSession.stop()}
}
case class info(name: String,age:Int)

7.sql操作hive表

打包插件

/*spark和hive整合时需要注意的地方:
*      1、为了能够让spark正常的解析hive的仓库为止,需要将hive-site.xml传递给spark,加载到spark的classpath(resources目录)中
*          一种通过直接将hive-site.xml放到spark的conf目录下面
*          另外一种就是通过程序的方式放到classpath即可(第二种)
*      2、在hive-site.xml中最重要的就是一个参数
*      <property>
*      <name>hive.metastore.warehouse.dir</name>
*      <value>/user/hive/warehouse</value>
*      </property>
*      如果没有配置这个参数,就会在当前程序的当前目录下面指定hive的warehouse,而真正的数据在hdfs里卖弄,执行的时候会找不到数据:
*          except: file:///  hdfs://
*       3、同时如果配置hadoop高可用得需要解析出hdfs的具体路径,所以也需要将hdfs-site.xml和core-site.xml也打到classpath下面
*       4、得需要将mysql的驱动包打入classpath中
*/
//1.导入打包插件,将spark-hive依赖取消,因为linux的spark有支持,省的打包时文件过大
//2.对父module打包到本地install,因为有spark_sql的相关依赖spark_common,spark_sql打包package会先去本地仓库找spark_common,找不到再去远程仓库,
//3.上传linux是带有依赖的jar
//4.编写执行脚本,要去掉local,因为程序有了,要把enableHiveSupport()打开
//5.执行
object Demo1 {def main(args: Array[String]): Unit = {if(args==null||args.length!=2){println("null")System.exit(-1)}val Array(basic,info)=argsval sparkSession = SparkSession.builder().appName("Demo1").master("local[*]").enableHiveSupport()//支持hive的特定操作.getOrCreate()println("1.创建数据库")sparkSession.sql("""|create database if not exists info|""".stripMargin)println("2.创建表teacher_basic")sparkSession.sql("""|create table if not exists info.teacher_basic(|name string,|age int,|merry boolean,|course int|)|row format delimited fields terminated by ','|""".stripMargin)println("3.创建表teacher_info")sparkSession.sql("""|create table if not exists info.teacher_info(|name string,|height int|)|row format delimited fields terminated by ','|""".stripMargin)println("4.加载数据到表teacher_basic")sparkSession.sql(s"""|load data inpath '${basic}' into table info.teacher_basic|""".stripMargin)println("5.加载数据到表teacher_info")sparkSession.sql(s"""|load data inpath '${info}' into table info.teacher_info|""".stripMargin)println("6.关联两张表")val df = sparkSession.sql("""|select|b.name,b.age,b.merry,b.course,i.height|from info.teacher_basic b|join info.teacher_info i|on b.name=i.name|""".stripMargin)println("7.将关联数据写入到表teacher")df.write.saveAsTable("info.teacher")println("结束!")sparkSession.stop()}
}

8.自定义UDF函数

object Demo2 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().appName("Demo2").master("local[*]").getOrCreate()val df = sparkSession.read.json("C:\\Users\\70201\\Desktop\\sql\\people.json")//3.建视图/*注册一张临时表global在整个应用范围内有效,不带的话只在当前sparkSession内有效replace如果该视图存在,则会覆盖,否则新建*/df.createOrReplaceTempView("people")//2.注册udf函数  返回值类型int,输入类型stringsparkSession.udf.register[Int,String]("mylength",str=>myle(str))//4.使用,执行之前要把之前hive.sql依赖解开,把hive-site.mxl去掉,否则会加载到hdfs找数据sparkSession.sql("""|select|name,|mylength(name) mylen,|length(name) len|from|people|""".stripMargin).show()sparkSession.stop()}//1.自定义udf函数def myle(str:String):Int={str.length}
}

9.自定义UDAF函数

object Demo3 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().appName("Demo3").master("local[*]").getOrCreate()val df = sparkSession.read.json("C:\\Users\\70201\\Desktop\\sql\\people.json")df.createOrReplaceTempView("student")sparkSession.udf.register("myavg",new AvgHeight)sparkSession.sql("""|select|round(avg(height),1) avg,|round(myavg(height),1) myavg|from|student|""".stripMargin).show()sparkSession.stop()}
}

自定义类

class AvgHeight extends UserDefinedAggregateFunction{/*该udaf输入参数的类型说明*/override def inputSchema: StructType = StructType(List(StructField("height", DataTypes.DoubleType, false)))/*为了计算聚合结果所需要的涉及到的临时变量的类型平均数=总数/个数,这里面涉及到了2个临时变量,总数,个数*/override def bufferSchema: StructType =  StructType(List(StructField("sum", DataTypes.DoubleType, false),StructField("count", DataTypes.IntegerType, false)))/*该udaf返回值的数据类型*/override def dataType: DataType = DataTypes.DoubleType/*确定性,相同的输入,其返回值是确定,不会有其他可能,称之为确定性,即返回为truegiven the same input,always return the same output.*/override def deterministic: Boolean = true//初始化override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer.update(0, 0.0)buffer.update(1, 0)}//局部聚合,new input data from `input`,This is called once per input row.override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer.update(0, buffer.getDouble(0) + input.getDouble(0))buffer.update(1, buffer.getInt(1) + 1)}//分区间的全局聚合override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1.update(0, buffer1.getDouble(0) + buffer2.getDouble(0))buffer1.update(1, buffer1.getInt(1) + buffer2.getInt(1))}override def evaluate(buffer: Row): Double = {buffer.getDouble(0) / buffer.getInt(1)}
}

10.开窗函数,分组求TOPN

object Demo4 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().appName("Demo4").master("local[*]").getOrCreate()val df = sparkSession.read.json("C:\\Users\\70201\\Desktop\\sql\\people.json")df.createOrReplaceTempView("student")sparkSession.sql("""|select|tmp.*|from|(select|name,age,province,height,|row_number() over(partition by province order by height desc) rank|from|student) tmp|where tmp.rank<3|""".stripMargin).show()sparkSession.stop()}
}

11.数据倾斜(重点!!!)

这篇关于SparkSQL修仙学习04的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/584783

相关文章

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

学习hash总结

2014/1/29/   最近刚开始学hash,名字很陌生,但是hash的思想却很熟悉,以前早就做过此类的题,但是不知道这就是hash思想而已,说白了hash就是一个映射,往往灵活利用数组的下标来实现算法,hash的作用:1、判重;2、统计次数;

零基础学习Redis(10) -- zset类型命令使用

zset是有序集合,内部除了存储元素外,还会存储一个score,存储在zset中的元素会按照score的大小升序排列,不同元素的score可以重复,score相同的元素会按照元素的字典序排列。 1. zset常用命令 1.1 zadd  zadd key [NX | XX] [GT | LT]   [CH] [INCR] score member [score member ...]

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

【学习笔记】 陈强-机器学习-Python-Ch15 人工神经网络(1)sklearn

系列文章目录 监督学习:参数方法 【学习笔记】 陈强-机器学习-Python-Ch4 线性回归 【学习笔记】 陈强-机器学习-Python-Ch5 逻辑回归 【课后题练习】 陈强-机器学习-Python-Ch5 逻辑回归(SAheart.csv) 【学习笔记】 陈强-机器学习-Python-Ch6 多项逻辑回归 【学习笔记 及 课后题练习】 陈强-机器学习-Python-Ch7 判别分析 【学

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

线性代数|机器学习-P36在图中找聚类

文章目录 1. 常见图结构2. 谱聚类 感觉后面几节课的内容跨越太大,需要补充太多的知识点,教授讲得内容跨越较大,一般一节课的内容是书本上的一章节内容,所以看视频比较吃力,需要先预习课本内容后才能够很好的理解教授讲解的知识点。 1. 常见图结构 假设我们有如下图结构: Adjacency Matrix:行和列表示的是节点的位置,A[i,j]表示的第 i 个节点和第 j 个

Node.js学习记录(二)

目录 一、express 1、初识express 2、安装express 3、创建并启动web服务器 4、监听 GET&POST 请求、响应内容给客户端 5、获取URL中携带的查询参数 6、获取URL中动态参数 7、静态资源托管 二、工具nodemon 三、express路由 1、express中路由 2、路由的匹配 3、路由模块化 4、路由模块添加前缀 四、中间件