Spark重点难点知识总结(二)

2023-12-10 16:58

本文主要是介绍Spark重点难点知识总结(二),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.join:join函数主要用来拼接字符串,将字符串、元组、列表中的元素以指定的字符(分隔符)连接生成一个新的字符串。

var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
scala> rdd1.join(rdd2).collect
res: Array[(String, (String, String))] = Array((A,(1,a)), (C,(3,c)))



2.countByKey:统计每个key对应的value个数。

val scores=Array(Tuple2(1,100),Tuple2(2,90),Tuple2(3,100),Tuple2(2,90),Tuple2(3,100))
val content=sc.parallelize(scores)
val data=content.countByKey()
res:data: scala.collection.Map[Int,Long] = Map(1 -> 1, 3 -> 2, 2 -> 2)


3.宽依赖和窄依赖:窄依赖是指每个父RDD的一个partition最多被子RDD的一个partition所使用,例如map,filter,union等都会产生窄依赖。窄依赖,1对1,n对1。宽依赖是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、ReduceByKey、sortByKey等操作都会产生宽依赖。总结:如果父RDD的一个Partition被一个子RDD的Partition所使用就是窄依赖,否则的话就是宽依赖。



4.DataFrame与RDD:DataFrame是一种分布式二维数据结构,R和Python语言中都有DataFrame,Spark中的DataFrame最大的不同点是其天生是分布式的,可以简单的认为Spark中的DataFrame是一个分布式的Table,形式如下所示。


而RDD类型为

简单来说,RDD是一个个Person实例,RDD并不知道里面有什么类型的数据。

(1)RDD以Record为单位,Spark在优化的时候无法洞悉Record内部的细节,所以也就无法进行更深度的优化,这极大的限制了Spark SQL性能的提示。
(2)DataFrame包含了每个Record的Metadata信息,也就是说DataFrame的优化是基于列内部的优化,而不是像RDD一样,只能够基于行进行优化。


5.RDD转DataFrame

import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;
/** 使用反射的方式将RDD转化为DataFrame*/
public class RDD2DataFrame {public static void main(String[] args) {SparkConf conf=new SparkConf().setAppName("RDD2DataFrame").setMaster("local");JavaSparkContext sc=new JavaSparkContext(conf);SQLContext sqlContext=new SQLContext(sc);JavaRDD<String> lines=sc.textFile("C://Users//Jason Shu//Desktop//persons.txt");JavaRDD<Person> persons=lines.map(new Function<String, Person>(){public Person call(String line) throws Exception {String[] splited=line.split(",");Person p =new Person();p.setId(Integer.valueOf(splited[0].trim()));p.setName(splited[1]);p.setAge(Integer.valueOf(splited[0].trim()));return p;}});DataFrame df= sqlContext.createDataFrame(persons, Person.class);df.registerTempTable("persons");//注册一张临时表DataFrame bigData=sqlContext.sql("select * from persons where age >=6");JavaRDD<Row> bigDataRDD=bigData.javaRDD();JavaRDD<Person> result=bigDataRDD.map(new Function<Row, Person>() {public Person call(Row row) throws Exception {Person p =new Person();p.setId(row.getInt(0));p.setName(row.getString(1));p.setAge(row.getInt(2));return p;}});List<Person> personList=result.collect();for(Person p:personList){System.out.println(p);}   }}


    转换过程示意图



6.Spark SQL和DataFrame:Spark SQL 是 Spark 生态系统里用于处理结构化大数据的模块,该模块里最重要的概念就是DataFrame, Spark 的 DataFrame 是基于早期版本中的 SchemaRDD,所以很自然的使用分布式大数据处理的场景。DataFrame 以 RDD 为基础,但是带有 Schema 信息,它类似于传统数据库中的二维表格。Spark SQL 模块目前支持将多种外部数据源的数据转化为 DataFrame,并像操作 RDD 或者将其注册为临时表的方式处理和分析这些数据。当前支持的数据源有:Json,文本文件,RDD,关系数据库,Hive,Parquet。一旦将 DataFrame 注册成临时表,我们就可以使用类 SQL 的方式操作这些数据。Spark SQL的表数据在内存中存储不是采用原生态的JVM对象存储方式,而是采用内存列存储,如下图所示。




7.Schema的方式创建DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}object schemaDataFrame {def main(args:Array[String]): Unit ={val conf=new SparkConf()conf.setMaster("local").setAppName("schemaDataFrame ")val sc=new SparkContext(conf)val RowRDD =sc.textFile("C://Users//Jason Shu//Desktop//InputFile.txt").map(x=>x.split(" ")).map(p=>Row(p(0),p(1)))val sqlContext=new SQLContext(sc)val peopleSchema=StructType(Array(StructField("name", StringType, true),StructField("age", IntegerType, true),StructField("sex", BooleanType, true)))val peopleDataFrame = sqlContext.createDataFrame(RowRDD, peopleSchema)//创建DataFrame,第一个参数为Row[RDD],第二个参数为StructTypepeopleDataFrame.registerTempTable("people")//表的名字随便取一个val results = sqlContext.sql("SELECT name FROM people")results.map(t => "name: " + t(0)).collect().foreach(println)}
}


8.first():Return the first element in this RDD,first返回RDD中的第一个元素,不排序。

scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21scala> rdd1.first
res14: (String, String) = (A,1)scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21scala> rdd1.first
res8: Int = 10

9.contains():Returns true if and only if this string contains the specified sequence of char values,当且仅当此字符串包含指定的char值序列返回true


10.parallellize():在一个已经存在的Scala集合上创建的RDD, 集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集。

val data = List(1, 2, 3, 4, 5)  val distData = sc.parallelize(data)//distData此时为RDD[Int]


11.socketTextStream():相当于Socket客户端,里面的参数就是socket服务器的ip和端口,执行该语句的时候就向socket服务器发送了建立请求了。服务器端接受到了请求就可以给socketTextStream发送消息了


12.filter():使用filter方法,你可以筛选出集合中你需要的元素,形成一个新的集合。

val x = List.range(1, 10)val evens = x.filterNot(_ % 2 == 0)Res:evens: List[Int] = List(1, 3, 5, 7, 9)

13.String.valueOf():要把参数中给的值,转化为String类型,这里的参数是Any,任意的参数都可以。


14.Integer.parseInt:将整数的字符串,转化为整数。

val b="123"val a=Integer.parseInt(b)println(a)//打印结果123

15.collect():将RDD转成Scala数组,并返回。


16.Spark中的partition:提供一种划分数据的依据。例如wordcount程序中的:

val lines=sc.textFile(path, 8)
这个地方的8就是指8个分区,当然如果数据量不够或不够复杂,可以不分为8个。

这篇关于Spark重点难点知识总结(二)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/477716

相关文章

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

学习hash总结

2014/1/29/   最近刚开始学hash,名字很陌生,但是hash的思想却很熟悉,以前早就做过此类的题,但是不知道这就是hash思想而已,说白了hash就是一个映射,往往灵活利用数组的下标来实现算法,hash的作用:1、判重;2、统计次数;

sqlite3 相关知识

WAL 模式 VS 回滚模式 特性WAL 模式回滚模式(Rollback Journal)定义使用写前日志来记录变更。使用回滚日志来记录事务的所有修改。特点更高的并发性和性能;支持多读者和单写者。支持安全的事务回滚,但并发性较低。性能写入性能更好,尤其是读多写少的场景。写操作会造成较大的性能开销,尤其是在事务开始时。写入流程数据首先写入 WAL 文件,然后才从 WAL 刷新到主数据库。数据在开始

git使用的说明总结

Git使用说明 下载安装(下载地址) macOS: Git - Downloading macOS Windows: Git - Downloading Windows Linux/Unix: Git (git-scm.com) 创建新仓库 本地创建新仓库:创建新文件夹,进入文件夹目录,执行指令 git init ,用以创建新的git 克隆仓库 执行指令用以创建一个本地仓库的

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

二分最大匹配总结

HDU 2444  黑白染色 ,二分图判定 const int maxn = 208 ;vector<int> g[maxn] ;int n ;bool vis[maxn] ;int match[maxn] ;;int color[maxn] ;int setcolor(int u , int c){color[u] = c ;for(vector<int>::iter

整数Hash散列总结

方法:    step1  :线性探测  step2 散列   当 h(k)位置已经存储有元素的时候,依次探查(h(k)+i) mod S, i=1,2,3…,直到找到空的存储单元为止。其中,S为 数组长度。 HDU 1496   a*x1^2+b*x2^2+c*x3^2+d*x4^2=0 。 x在 [-100,100] 解的个数  const int MaxN = 3000

状态dp总结

zoj 3631  N 个数中选若干数和(只能选一次)<=M 的最大值 const int Max_N = 38 ;int a[1<<16] , b[1<<16] , x[Max_N] , e[Max_N] ;void GetNum(int g[] , int n , int s[] , int &m){ int i , j , t ;m = 0 ;for(i = 0 ;

go基础知识归纳总结

无缓冲的 channel 和有缓冲的 channel 的区别? 在 Go 语言中,channel 是用来在 goroutines 之间传递数据的主要机制。它们有两种类型:无缓冲的 channel 和有缓冲的 channel。 无缓冲的 channel 行为:无缓冲的 channel 是一种同步的通信方式,发送和接收必须同时发生。如果一个 goroutine 试图通过无缓冲 channel