SparkRDD转DataSet/DataFrame的一个深坑

2024-09-06 21:32

本文主要是介绍SparkRDD转DataSet/DataFrame的一个深坑,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

640?wx_fmt=jpeg

大数据技术与架构
点击右侧关注,大数据开发领域最强公众号!
640?wx_fmt=jpeg

640?wx_fmt=png

暴走大数据
点击右侧关注,暴走大数据!
640?wx_fmt=png


By  大数据技术与架构

场景描述:本文是根据读者反馈的一个问题总结而成的。

关键词:Saprk RDD

原需求:希望在map函数中将每一个rdd转为DataSet或者DataFrame。

SparkRDD转为DataSet的两种方式

第一种方法是使用反射来推断包含特定对象类型的RDD的模式。在写Spark程序的同时,已经知道了模式,这种基于反射的方法可以使代码更简洁并且程序工作得更好。

第二种方法是通过一个编程接口来实现,这个接口允许构造一个模式,然后在存在的RDD上使用它。虽然这种方法代码较为冗长,但是它允许在运行期间之前不知道列以及列的类型的情况下构造DataSet。

官方给出的两个案例:

  • 利用反射推断Schema

Spark SQL支持将javabean的RDD自动转换为DataFrame。使用反射获得的BeanInfo定义了表的模式。目前,Spark SQL不支持包含Map字段的javabean。但是支持嵌套的javabean和列表或数组字段。您可以创建一个实现Serializable的类并为其所有字段设置getter和setter,从而创建一个JavaBean。
  public static class Person implements Serializable {	private String name;	private int age;	public String getName() {	return name;	}	public void setName(String name) {	this.name = name;	}	public int getAge() {	return age;	}	public void setAge(int age) {	this.age = age;	}	}
  private static void runInferSchemaExample(SparkSession spark) {	// Create an RDD of Person objects from a text file	JavaRDD<Person> peopleRDD = spark.read()	.textFile("examples/src/main/resources/people.txt")	.javaRDD()	.map(line -> {	String[] parts = line.split(",");	Person person = new Person();	person.setName(parts[0]);	person.setAge(Integer.parseInt(parts[1].trim()));	return person;	});	// Apply a schema to an RDD of JavaBeans to get a DataFrame	Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);	// Register the DataFrame as a temporary view	peopleDF.createOrReplaceTempView("people");	// SQL statements can be run by using the sql methods provided by spark	Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");	// The columns of a row in the result can be accessed by field index	Encoder<String> stringEncoder = Encoders.STRING();	Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(	(MapFunction<Row, String>) row -> "Name: " + row.getString(0),	stringEncoder);	teenagerNamesByIndexDF.show();	// +------------+	// |       value|	// +------------+	// |Name: Justin|	// +------------+	// or by field name	Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(	(MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),	stringEncoder);	teenagerNamesByFieldDF.show();	// +------------+	// |       value|	// +------------+	// |Name: Justin|	// +------------+	// $example off:schema_inferring$	}
  • 编程指定Schema

如果不能提前定义JavaBean类(例如,记录的结构是在字符串中编码的,或者将对文本数据集进行解析,而对不同的用户将对字段进行不同的投影),那么可以通过三个步骤以编程方式创建DataSet<Row>。
  private static void runProgrammaticSchemaExample(SparkSession spark) {	// 1、创建一个RDD	JavaRDD<String> peopleRDD = spark.sparkContext()	.textFile("examples/src/main/resources/people.txt", 1)	.toJavaRDD();	// The schema is encoded in a string	String schemaString = "name age";	// 2、根据schema的字符串生成schema	List<StructField> fields = new ArrayList<>();	for (String fieldName : schemaString.split(" ")) {	StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);	fields.add(field);	}	StructType schema = DataTypes.createStructType(fields);	// 3、将JavaRDD<String>的记录转换成JavaRDD<Row>	JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {	String[] attributes = record.split(",");	return RowFactory.create(attributes[0], attributes[1].trim());	});	///4、将 schema 应用在JavaRDD<Row> ,创建 Dataset<Row>	Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);	// Creates a temporary view using the DataFrame	peopleDataFrame.createOrReplaceTempView("people");	// SQL can be run over a temporary view created using DataFrames	Dataset<Row> results = spark.sql("SELECT name FROM people");	// The results of SQL queries are DataFrames and support all the normal RDD operations	// The columns of a row in the result can be accessed by field index or by field name	Dataset<String> namesDS = results.map(	(MapFunction<Row, String>) row -> "Name: " + row.getString(0),	Encoders.STRING());	namesDS.show();	// +-------------+	// |        value|	// +-------------+	// |Name: Michael|	// |   Name: Andy|	// | Name: Justin|	// +-------------+	// $example off:programmatic_schema$	}

 
Task not serializable	
 
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSeri	
alizableException: ...

网上也提供很多办法,包括:
  • @Transient 注解

class MyTest1(conf:String) extends Serializable{	val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org");	@transient	private val sparkConf = new SparkConf().setAppName("AppName");	@transient	private val sc = new SparkContext(sparkConf);	val rdd = sc.parallelize(list);	private val rootDomain = conf	def getResult(): Array[(String)] = {	val result = rdd.filter(item => item.contains(rootDomain))	result.take(result.count().toInt)	}	
}
注解是方法级别的,不是变量级别。

  • 方法实现implements Serializable

例如:
public class RDDTest implements Serializable
  • 设置一个参数

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

简单的分析

以上的方法,不一定管用。

在编写Spark程序中,由于在map等算子内部使用了外部定义的变量和函数,由于外部定义的变量和函数有可能不支持序列化,仍然会导致整个类序列化时出现问题,最终可能会出现Task未序列化问题。

引用了类的成员函数,会导致该类及所有成员都需要支持序列化。因此,对于使用了某类成员变量或函数的情形,首先该类需要序列化(Serializable),同时需要对某些不需要序列化的成员变量标记以避免为序列化造成影响。
所以:
  • 引用了类的成员函数或变量,对应的类需要做序列化处理

  • 执行map等方法的时候,尽量不要在闭包内部直接引用成员函数或变量


如果上述办法全都不管用,那么就换个实现方案吧。

欢迎点赞+收藏+转发朋友圈素质三连

640?wx_fmt=jpeg640?wx_fmt=jpeg

文章不错?点个【在看】吧! ?

这篇关于SparkRDD转DataSet/DataFrame的一个深坑的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143171

相关文章

HTML5自定义属性对象Dataset

原文转自HTML5自定义属性对象Dataset简介 一、html5 自定义属性介绍 之前翻译的“你必须知道的28个HTML5特征、窍门和技术”一文中对于HTML5中自定义合法属性data-已经做过些介绍,就是在HTML5中我们可以使用data-前缀设置我们需要的自定义属性,来进行一些数据的存放,例如我们要在一个文字按钮上存放相对应的id: <a href="javascript:" d

论文精读-Supervised Raw Video Denoising with a Benchmark Dataset on Dynamic Scenes

论文精读-Supervised Raw Video Denoising with a Benchmark Dataset on Dynamic Scenes 优势 1、构建了一个用于监督原始视频去噪的基准数据集。为了多次捕捉瞬间,我们手动为对象s创建运动。在高ISO模式下捕获每一时刻的噪声帧,并通过对多个噪声帧进行平均得到相应的干净帧。 2、有效的原始视频去噪网络(RViDeNet),通过探

【python pandas】 Dataframe的数据print输出 显示为...省略号

pandas.set_option() 可以设置pandas相关的参数,从而改变默认参数。 打印pandas数据事,默认是输出100行,多的话会输出….省略号。 那么可以添加: pandas.set_option('display.max_rows',None) 这样就可以显示全部数据 同样,某一列比如url太长 显示省略号 也可以设置。 pd.set_option('display.

Pyspark DataFrame常用操作函数和示例

针对类型:pyspark.sql.dataframe.DataFrame 目录 1.打印前几行 1.1 show()函数 1.2 take()函数 2. 读取文件 2.1 spark.read.csv 3. 获取某行某列的值(具体值) 4.查看列名 5.修改列名 5.1 修改单个列名 5.2 修改多个列名 5.2.1 链式调用 withColumnRenamed 方法 5.2.2 使用

rdd,dataframe,dataset之间的区别

在spark中,RDD、DataFrame、Dataset是最常用的数据类型,本博文给出笔者在使用的过程中体会到的区别和各自的优势   共性: 1、RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利 2、三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始

大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等

点一下关注吧!!!非常感谢!!持续更新!!! 目前已经更新到了: Hadoop(已更完)HDFS(已更完)MapReduce(已更完)Hive(已更完)Flume(已更完)Sqoop(已更完)Zookeeper(已更完)HBase(已更完)Redis (已更完)Kafka(已更完)Spark(已更完)Flink(正在更新!) 章节内容 上节我们完成了如下的内容: Flink Sink J

pandas.concat实现竖着拼接、横着拼接DataFrame

1、concat竖着拼接(默认的竖着,axis=0) 话不多说,直接看例子: import pandas as pddf1=pd.DataFrame([10,12,13])df2=pd.DataFrame([22,33,44,55])df3=pd.DataFrame([90,94]) df1 0010112213 df2 0022133244355 df3

Anndata: AttributeError: ‘DataFrame’ object has no attribute ‘dtype’

Anndata: AttributeError: ‘DataFrame’ object has no attribute ‘dtype’ 背景解决方法 背景 在使用anndata做切片时,比如下面这样的例子 sub_rna = rna[:10] # rna is anndata 出现如下报错: AttributeError: ‘DataFrame’ object has

PyTorch数据加载:自定义数据集【Dataset:处理每个原始样本】【DataLoader:每次生成batch_size个样本】【collate_fn:重新设置一个Batch中所有样本的加载格式】

一、自定义Dataset Dataset是一个包装类: 用来将数据包装为Dataset类,然后传入DataLoader中,我们再使用DataLoader这个类来更加快捷的对数据进行操作。可以通过继承Dataset来将数据集的源文件、规模和其他非必要的功能打包,从而供DataLoader使用。 1、“文本分类”任务下使用自定义Dataset class.txt:所有类别 finance

SparkRDD之mapPartitions和mapPartitionsWithIndex

1.mapPartitions mapPartition可以这么理解,先对RDD进行partition,再把每个partition进行map函数。 下面的例子,将整数转为字符串: package com.cb.spark.sparkrdd;import java.util.ArrayList;import java.util.Arrays;import java.util.Iterato