JDBC,CaseClass,JSON,Parquet和Schema五种方式创建DataFrame

2023-12-10 16:58

本文主要是介绍JDBC,CaseClass,JSON,Parquet和Schema五种方式创建DataFrame,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.JDBC的方式创建DataFrame

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;public class JDBC2MySQL {public static void main(String[] args){SparkConf conf=new SparkConf();conf.setAppName("JDBC2MySQL").setMaster("local");JavaSparkContext sc=new JavaSparkContext(conf);SQLContext sqlContext=new SQLContext(sc);/*1.通过format("jdbc")的方式说明SparkSQL操作的数据来源是通过JDBC获得*JDBC后端一般是数据库,例如MySQL、Oracle等*2.通过DataFrameReader的option方法把要访问的数据库的信息传递进去*3.url:代表数据库的jdbc链接地址*4.datable:代表具体要链接哪个数据库*5.Driver部分是Spark SQL访问数据库的具体的驱动的完整包名和类名*6.关于JDBC的驱动的Jar,可以放在Spark的library目录,也可以在使用SparkSubmit的使用指定Jar(编码和打包的时候都不需要这个JDBC的Jar)* */DataFrameReader reader=sqlContext.read().format("jdbc");reader.option("url", "jdbc:mysql://SparkMaster:3306");reader.option("dbtable","dt_spark");reader.option("driver", "com.mysql.jdbc.Driver");reader.option("user", "root");reader.option("password", "123");DataFrame mysqlDataSourceDF=reader.load();reader.option("dbtable", "dthadoop");DataFrame DFFromMySQL=reader.load();Map<String, String> options = new HashMap<String, String>();options.put("url", "jdbc:mysql://SparkMaster:3306/testdb");options.put("dbtable", "student_infos");options.put("user", "root");options.put("password","123");DataFrame studentInfosDF=sqlContext.read().format("jdbc").options(options).load();options.put("dbtable", "student_scores");DataFrame studentScoresDF=sqlContext.read().format("jdbc").options(options).load();List<Row> listRow=studentScoresDF.javaRDD().collect();for(Row row:listRow){System.out.println(row);}	 }
}

2.Case Class的方式创建DataFrame

import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;
/** 使用反射的方式将RDD转化为DataFrame*/
public class CaseClassDataFrame {public static void main(String[] args) {SparkConf conf=new SparkConf().setAppName("RDD2DataFrame").setMaster("local");JavaSparkContext sc=new JavaSparkContext(conf);SQLContext sqlContext=new SQLContext(sc);//获取SQLContextJavaRDD<String> lines=sc.textFile("C://Users//Jason Shu//Desktop//persons.txt");JavaRDD<Person> persons=lines.map(new Function<String, Person>()/*RDD<String>变为RDD<Person>,泛型转换*/{public Person call(String line) throws Exception {String[] splited=line.split(" ");Person p =new Person();p.setId(Integer.valueOf(splited[0].trim()));p.setName(splited[1]);p.setAge(Integer.valueOf(splited[0].trim()));return p;}});DataFrame df= sqlContext.createDataFrame(persons, Person.class);//SQLContext变为DataFrame /*creatDataFrame第一个参数JavaRDD<?>rdd,第二个参数Class<?>beanClass*/df.registerTempTable("persons");//注册一张临时表DataFrame bigData=sqlContext.sql("select * from persons where age >=6");JavaRDD<Row> bigDataRDD=bigData.javaRDD();//DataFrame转换为RDDJavaRDD<Person> result=bigDataRDD.map(new Function<Row, Person>()/*DataFrame转换为RDD,这个地方由于bigDataRDD是RDD<Row>,result是RDD<Person>* 相当于是一个泛型转换*/ {public Person call(Row row) throws Exception {Person p =new Person();p.setId(row.getInt(0));p.setName(row.getString(1));p.setAge(row.getInt(2));return p;}});List<Person> personList=result.collect();for(Person p:personList){System.out.println(p);}   }}

Person类

public class Person {private static final long serialVesionUID=1L;private int id;private String name;private int age;@Overridepublic String toString() {return "Person [id=" + id + ", name=" + name + ", age=" + age + "]";}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}	}


3.JSON方式创建DataFrame

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.DataFrame;
public class JSONDataFrame {public static void main(String[] args) {SparkConf conf =new SparkConf().setAppName("DataFrame").setMaster("spark://SparkMaster:7077");JavaSparkContext sc =new JavaSparkContext(conf);SQLContext sqlContext=new SQLContext(sc);//可以简单的认为DataFrame是一张表DataFrame dataFrame=sqlContext.read().json("hdfs://SparkMaster:9000/data/people.json");dataFrame.show();}}

4.Parquet的方式创建DataFrame

import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
public class ParquetDataFrame {public static void main(String[] args) {SparkConf conf=new SparkConf();conf.setAppName("ParquetDataFrame").setMaster("spark://SparkMaster:7077");JavaSparkContext sc=new JavaSparkContext(conf);SQLContext sqlContext=new SQLContext(sc);DataFrame df=sqlContext.read().parquet("/input/people.parquet");df.registerTempTable("users");DataFrame result=sqlContext.sql("select name from users");List<Row> listRow=result.javaRDD().collect();for(Row row:listRow){System.out.println(row);}}
}

5.Schema的方式创建RDD

import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;public class SchemaDataFrame {public static void main(String[] args) {SparkConf conf =new SparkConf();conf.setAppName("SchemaDataFrame").setMaster("local");JavaSparkContext sc=new JavaSparkContext(conf);//如果是sparkcontext就不会textfile(location),而是textfile(location,partition)SQLContext sqlContext=new SQLContext(sc);JavaRDD<String> lines=sc.textFile("C://Users//Jason Shu//Desktop");JavaRDD<Row> personsRDD=lines.map(new Function<String, Row>() //JavaRDD<String>变为JavaRDD<Row>{public Row call(String line) throws Exception {String[] splited=line.split(",");return RowFactory.create(Integer.valueOf(splited[0]),splited[1],Integer.valueOf(splited[2]));}	});List<StructField> structFields=new ArrayList<StructField>();//构造一个StructFieldstructFields.add((StructField) DataTypes.createStructField("id",DataTypes.IntegerType,true));structFields.add((StructField) DataTypes.createStructField("name",DataTypes.StringType,true));structFields.add((StructField) DataTypes.createStructField("age",DataTypes.IntegerType,true));StructType structType=DataTypes.createStructType(structFields); }}


这篇关于JDBC,CaseClass,JSON,Parquet和Schema五种方式创建DataFrame的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/477711

相关文章

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

内核启动时减少log的方式

内核引导选项 内核引导选项大体上可以分为两类:一类与设备无关、另一类与设备有关。与设备有关的引导选项多如牛毛,需要你自己阅读内核中的相应驱动程序源码以获取其能够接受的引导选项。比如,如果你想知道可以向 AHA1542 SCSI 驱动程序传递哪些引导选项,那么就查看 drivers/scsi/aha1542.c 文件,一般在前面 100 行注释里就可以找到所接受的引导选项说明。大多数选项是通过"_

在cscode中通过maven创建java项目

在cscode中创建java项目 可以通过博客完成maven的导入 建立maven项目 使用快捷键 Ctrl + Shift + P 建立一个 Maven 项目 1 Ctrl + Shift + P 打开输入框2 输入 "> java create"3 选择 maven4 选择 No Archetype5 输入 域名6 输入项目名称7 建立一个文件目录存放项目,文件名一般为项目名8 确定

透彻!驯服大型语言模型(LLMs)的五种方法,及具体方法选择思路

引言 随着时间的发展,大型语言模型不再停留在演示阶段而是逐步面向生产系统的应用,随着人们期望的不断增加,目标也发生了巨大的变化。在短短的几个月的时间里,人们对大模型的认识已经从对其zero-shot能力感到惊讶,转变为考虑改进模型质量、提高模型可用性。 「大语言模型(LLMs)其实就是利用高容量的模型架构(例如Transformer)对海量的、多种多样的数据分布进行建模得到,它包含了大量的先验

Java 创建图形用户界面(GUI)入门指南(Swing库 JFrame 类)概述

概述 基本概念 Java Swing 的架构 Java Swing 是一个为 Java 设计的 GUI 工具包,是 JAVA 基础类的一部分,基于 Java AWT 构建,提供了一系列轻量级、可定制的图形用户界面(GUI)组件。 与 AWT 相比,Swing 提供了许多比 AWT 更好的屏幕显示元素,更加灵活和可定制,具有更好的跨平台性能。 组件和容器 Java Swing 提供了许多

用命令行的方式启动.netcore webapi

用命令行的方式启动.netcore web项目 进入指定的项目文件夹,比如我发布后的代码放在下面文件夹中 在此地址栏中输入“cmd”,打开命令提示符,进入到发布代码目录 命令行启动.netcore项目的命令为:  dotnet 项目启动文件.dll --urls="http://*:对外端口" --ip="本机ip" --port=项目内部端口 例: dotnet Imagine.M

滚雪球学Java(87):Java事务处理:JDBC的ACID属性与实战技巧!真有两下子!

咦咦咦,各位小可爱,我是你们的好伙伴——bug菌,今天又来给大家普及Java SE啦,别躲起来啊,听我讲干货还不快点赞,赞多了我就有动力讲得更嗨啦!所以呀,养成先点赞后阅读的好习惯,别被干货淹没了哦~ 🏆本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,助你一臂之力,带你早日登顶🚀,欢迎大家关注&&收藏!持续更新中,up!up!up!! 环境说明:Windows 10

顺序表之创建,判满,插入,输出

文章目录 🍊自我介绍🍊创建一个空的顺序表,为结构体在堆区分配空间🍊插入数据🍊输出数据🍊判断顺序表是否满了,满了返回值1,否则返回0🍊main函数 你的点赞评论就是对博主最大的鼓励 当然喜欢的小伙伴可以:点赞+关注+评论+收藏(一键四连)哦~ 🍊自我介绍   Hello,大家好,我是小珑也要变强(也是小珑),我是易编程·终身成长社群的一名“创始团队·嘉宾”

深入理解RxJava:响应式编程的现代方式

在当今的软件开发世界中,异步编程和事件驱动的架构变得越来越重要。RxJava,作为响应式编程(Reactive Programming)的一个流行库,为Java和Android开发者提供了一种强大的方式来处理异步任务和事件流。本文将深入探讨RxJava的核心概念、优势以及如何在实际项目中应用它。 文章目录 💯 什么是RxJava?💯 响应式编程的优势💯 RxJava的核心概念

【即时通讯】轮询方式实现

技术栈 LayUI、jQuery实现前端效果。django4.2、django-ninja实现后端接口。 代码仓 - 后端 代码仓 - 前端 实现功能 首次访问页面并发送消息时需要设置昵称发送内容为空时要提示用户不能发送空消息前端定时获取消息,然后展示在页面上。 效果展示 首次发送需要设置昵称 发送消息与消息展示 提示用户不能发送空消息 后端接口 发送消息 DB = []@ro