JDBC,CaseClass,JSON,Parquet和Schema五种方式创建DataFrame

2023-12-10 16:58

本文主要是介绍JDBC,CaseClass,JSON,Parquet和Schema五种方式创建DataFrame,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.JDBC的方式创建DataFrame

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;public class JDBC2MySQL {public static void main(String[] args){SparkConf conf=new SparkConf();conf.setAppName("JDBC2MySQL").setMaster("local");JavaSparkContext sc=new JavaSparkContext(conf);SQLContext sqlContext=new SQLContext(sc);/*1.通过format("jdbc")的方式说明SparkSQL操作的数据来源是通过JDBC获得*JDBC后端一般是数据库,例如MySQL、Oracle等*2.通过DataFrameReader的option方法把要访问的数据库的信息传递进去*3.url:代表数据库的jdbc链接地址*4.datable:代表具体要链接哪个数据库*5.Driver部分是Spark SQL访问数据库的具体的驱动的完整包名和类名*6.关于JDBC的驱动的Jar,可以放在Spark的library目录,也可以在使用SparkSubmit的使用指定Jar(编码和打包的时候都不需要这个JDBC的Jar)* */DataFrameReader reader=sqlContext.read().format("jdbc");reader.option("url", "jdbc:mysql://SparkMaster:3306");reader.option("dbtable","dt_spark");reader.option("driver", "com.mysql.jdbc.Driver");reader.option("user", "root");reader.option("password", "123");DataFrame mysqlDataSourceDF=reader.load();reader.option("dbtable", "dthadoop");DataFrame DFFromMySQL=reader.load();Map<String, String> options = new HashMap<String, String>();options.put("url", "jdbc:mysql://SparkMaster:3306/testdb");options.put("dbtable", "student_infos");options.put("user", "root");options.put("password","123");DataFrame studentInfosDF=sqlContext.read().format("jdbc").options(options).load();options.put("dbtable", "student_scores");DataFrame studentScoresDF=sqlContext.read().format("jdbc").options(options).load();List<Row> listRow=studentScoresDF.javaRDD().collect();for(Row row:listRow){System.out.println(row);}	 }
}

2.Case Class的方式创建DataFrame

import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;
/** 使用反射的方式将RDD转化为DataFrame*/
public class CaseClassDataFrame {public static void main(String[] args) {SparkConf conf=new SparkConf().setAppName("RDD2DataFrame").setMaster("local");JavaSparkContext sc=new JavaSparkContext(conf);SQLContext sqlContext=new SQLContext(sc);//获取SQLContextJavaRDD<String> lines=sc.textFile("C://Users//Jason Shu//Desktop//persons.txt");JavaRDD<Person> persons=lines.map(new Function<String, Person>()/*RDD<String>变为RDD<Person>,泛型转换*/{public Person call(String line) throws Exception {String[] splited=line.split(" ");Person p =new Person();p.setId(Integer.valueOf(splited[0].trim()));p.setName(splited[1]);p.setAge(Integer.valueOf(splited[0].trim()));return p;}});DataFrame df= sqlContext.createDataFrame(persons, Person.class);//SQLContext变为DataFrame /*creatDataFrame第一个参数JavaRDD<?>rdd,第二个参数Class<?>beanClass*/df.registerTempTable("persons");//注册一张临时表DataFrame bigData=sqlContext.sql("select * from persons where age >=6");JavaRDD<Row> bigDataRDD=bigData.javaRDD();//DataFrame转换为RDDJavaRDD<Person> result=bigDataRDD.map(new Function<Row, Person>()/*DataFrame转换为RDD,这个地方由于bigDataRDD是RDD<Row>,result是RDD<Person>* 相当于是一个泛型转换*/ {public Person call(Row row) throws Exception {Person p =new Person();p.setId(row.getInt(0));p.setName(row.getString(1));p.setAge(row.getInt(2));return p;}});List<Person> personList=result.collect();for(Person p:personList){System.out.println(p);}   }}

Person类

public class Person {private static final long serialVesionUID=1L;private int id;private String name;private int age;@Overridepublic String toString() {return "Person [id=" + id + ", name=" + name + ", age=" + age + "]";}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}	}


3.JSON方式创建DataFrame

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.DataFrame;
public class JSONDataFrame {public static void main(String[] args) {SparkConf conf =new SparkConf().setAppName("DataFrame").setMaster("spark://SparkMaster:7077");JavaSparkContext sc =new JavaSparkContext(conf);SQLContext sqlContext=new SQLContext(sc);//可以简单的认为DataFrame是一张表DataFrame dataFrame=sqlContext.read().json("hdfs://SparkMaster:9000/data/people.json");dataFrame.show();}}

4.Parquet的方式创建DataFrame

import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
public class ParquetDataFrame {public static void main(String[] args) {SparkConf conf=new SparkConf();conf.setAppName("ParquetDataFrame").setMaster("spark://SparkMaster:7077");JavaSparkContext sc=new JavaSparkContext(conf);SQLContext sqlContext=new SQLContext(sc);DataFrame df=sqlContext.read().parquet("/input/people.parquet");df.registerTempTable("users");DataFrame result=sqlContext.sql("select name from users");List<Row> listRow=result.javaRDD().collect();for(Row row:listRow){System.out.println(row);}}
}

5.Schema的方式创建RDD

import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;public class SchemaDataFrame {public static void main(String[] args) {SparkConf conf =new SparkConf();conf.setAppName("SchemaDataFrame").setMaster("local");JavaSparkContext sc=new JavaSparkContext(conf);//如果是sparkcontext就不会textfile(location),而是textfile(location,partition)SQLContext sqlContext=new SQLContext(sc);JavaRDD<String> lines=sc.textFile("C://Users//Jason Shu//Desktop");JavaRDD<Row> personsRDD=lines.map(new Function<String, Row>() //JavaRDD<String>变为JavaRDD<Row>{public Row call(String line) throws Exception {String[] splited=line.split(",");return RowFactory.create(Integer.valueOf(splited[0]),splited[1],Integer.valueOf(splited[2]));}	});List<StructField> structFields=new ArrayList<StructField>();//构造一个StructFieldstructFields.add((StructField) DataTypes.createStructField("id",DataTypes.IntegerType,true));structFields.add((StructField) DataTypes.createStructField("name",DataTypes.StringType,true));structFields.add((StructField) DataTypes.createStructField("age",DataTypes.IntegerType,true));StructType structType=DataTypes.createStructType(structFields); }}


这篇关于JDBC,CaseClass,JSON,Parquet和Schema五种方式创建DataFrame的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/477711

相关文章

golang程序打包成脚本部署到Linux系统方式

《golang程序打包成脚本部署到Linux系统方式》Golang程序通过本地编译(设置GOOS为linux生成无后缀二进制文件),上传至Linux服务器后赋权执行,使用nohup命令实现后台运行,完... 目录本地编译golang程序上传Golang二进制文件到linux服务器总结本地编译Golang程序

Linux下删除乱码文件和目录的实现方式

《Linux下删除乱码文件和目录的实现方式》:本文主要介绍Linux下删除乱码文件和目录的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux下删除乱码文件和目录方法1方法2总结Linux下删除乱码文件和目录方法1使用ls -i命令找到文件或目录

Linux在线解压jar包的实现方式

《Linux在线解压jar包的实现方式》:本文主要介绍Linux在线解压jar包的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux在线解压jar包解压 jar包的步骤总结Linux在线解压jar包在 Centos 中解压 jar 包可以使用 u

创建Java keystore文件的完整指南及详细步骤

《创建Javakeystore文件的完整指南及详细步骤》本文详解Java中keystore的创建与配置,涵盖私钥管理、自签名与CA证书生成、SSL/TLS应用,强调安全存储及验证机制,确保通信加密和... 目录1. 秘密键(私钥)的理解与管理私钥的定义与重要性私钥的管理策略私钥的生成与存储2. 证书的创建与

Jenkins分布式集群配置方式

《Jenkins分布式集群配置方式》:本文主要介绍Jenkins分布式集群配置方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1.安装jenkins2.配置集群总结Jenkins是一个开源项目,它提供了一个容易使用的持续集成系统,并且提供了大量的plugin满

Java通过驱动包(jar包)连接MySQL数据库的步骤总结及验证方式

《Java通过驱动包(jar包)连接MySQL数据库的步骤总结及验证方式》本文详细介绍如何使用Java通过JDBC连接MySQL数据库,包括下载驱动、配置Eclipse环境、检测数据库连接等关键步骤,... 目录一、下载驱动包二、放jar包三、检测数据库连接JavaJava 如何使用 JDBC 连接 mys

C#读写文本文件的多种方式详解

《C#读写文本文件的多种方式详解》这篇文章主要为大家详细介绍了C#中各种常用的文件读写方式,包括文本文件,二进制文件、CSV文件、JSON文件等,有需要的小伙伴可以参考一下... 目录一、文本文件读写1. 使用 File 类的静态方法2. 使用 StreamReader 和 StreamWriter二、二进

MySQL查询JSON数组字段包含特定字符串的方法

《MySQL查询JSON数组字段包含特定字符串的方法》在MySQL数据库中,当某个字段存储的是JSON数组,需要查询数组中包含特定字符串的记录时传统的LIKE语句无法直接使用,下面小编就为大家介绍两种... 目录问题背景解决方案对比1. 精确匹配方案(推荐)2. 模糊匹配方案参数化查询示例使用场景建议性能优

java实现docker镜像上传到harbor仓库的方式

《java实现docker镜像上传到harbor仓库的方式》:本文主要介绍java实现docker镜像上传到harbor仓库的方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 前 言2. 编写工具类2.1 引入依赖包2.2 使用当前服务器的docker环境推送镜像2.2

springboot项目打jar制作成镜像并指定配置文件位置方式

《springboot项目打jar制作成镜像并指定配置文件位置方式》:本文主要介绍springboot项目打jar制作成镜像并指定配置文件位置方式,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录一、上传jar到服务器二、编写dockerfile三、新建对应配置文件所存放的数据卷目录四、将配置文