2024.1.8 Day04_SparkCore_homeWork

2024-01-08 17:36

本文主要是介绍2024.1.8 Day04_SparkCore_homeWork,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

1. 简述Spark持久化中缓存和checkpoint检查点的区别

2 . 如何使用缓存和检查点?

3 . 代码题

浏览器Nginx案例

先进行数据清洗,做后续需求用

1、需求一:点击最多的前10个网站域名

2、需求二:用户最喜欢点击的页面排序TOP10

3、需求三:统计每分钟用户搜索次数

学生系统案例

4. RDD依赖的分类

5. 简述DAG与Stage 形成过程 

DAG :  

Stage : 


1. 简述Spark持久化中缓存和checkpoint检查点的区别

1- 数据存储位置不同
    缓存: 存储在内存或者磁盘 或者 堆外内存中
    checkpoint检查点: 可以将数据存储在磁盘或者HDFS上, 在集群模式下, 仅能保存到HDFS上

2- 数据生命周期:
    缓存: 当程序执行完成后, 或者手动调用unpersist 缓存都会被删除
    checkpoint检查点: 即使程序退出后, checkpoint检查点的数据依然是存在的, 不会删除, 需要手动删除

3- 血缘关系:
    缓存: 不会截断RDD之间的血缘关系, 因为缓存数据有可能是失效, 当失效后, 需要重新回溯计算操作
    checkpoint检查点: 会截断掉依赖关系, 因为checkpoint将数据保存到更加安全可靠的位置, 不会发生数据丢失的问题, 当执行失败的时候, 也不需要重新回溯执行
    
4- 主要作用不同:
    缓存: 提高Spark程序的运行效率
    checkpoint检查点: 提高Spark程序的容错性

2 . 如何使用缓存和检查点?

       将两种方案同时用在一个项目中, 先设置缓存,再设置检查点 ,  最后一同使用Action算子进行触发, 这样程序只会有一次IO操作, 如果先设置检查点的话,就会有2次IO操作;

         当在后续工程中读取数据的时候,优先从缓存中读取,如果缓存中没有数据, 再从检查点读取数据,并且会将数据缓存一份到内存中 ,后续直接从缓存中读取数据

3 . 代码题

浏览器Nginx案例

        

先进行数据清洗,做后续需求用

import os
from pyspark import SparkConf, SparkContext,StorageLevel
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructFieldif __name__ == '__main__':
# 1- 创建SparkSession对象conf = SparkConf().setAppName('需求1').setMaster('local[*]')sc = SparkContext(conf=conf)
# 2- 数据输入init_rdd = sc.textFile('file:///export/data/2024.1.2_Spark/1.6_day04/SogouQ.sample')# 3- 数据处理filter_tmp_rdd = init_rdd.filter(lambda line:line.strip()!='')print('过滤空行的数据',filter_tmp_rdd.take(10))map_rdd = filter_tmp_rdd.map(lambda line:line.split())print('map出来的数据',map_rdd.take(10))len6_rdd = map_rdd.filter(lambda line:len(line)==6)print('字段数为6个的字段',len6_rdd.take(10))etl_rdd = len6_rdd.map(lambda list:(list[0],list[1],list[2][1:-1],list[3],list[4],list[5])  )print('转换成元组后的数据',etl_rdd.take(10))# 设置缓存etl_rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK).count()

1、需求一:点击最多的前10个网站域名

print('点击最多的前10个网站域名','-'*50)website_map_rdd = etl_rdd.map(lambda tup:(tup[5].split('/')[0],1))print('把网站域名切出来,变成(hello,1)的格式',website_map_rdd.take(10))website_reducekey_rdd = website_map_rdd.reduceByKey(lambda agg,curr:agg+curr)print('进行聚合',website_reducekey_rdd.take(10))sort_rdd =website_reducekey_rdd.sortBy(lambda tup:tup[1],ascending=False)print('进行降序排序',sort_rdd.take(10))
# 4- 数据输出
# 5- 释放资源sc.stop()

2、需求二:用户最喜欢点击的页面排序TOP10

    print('用户最喜欢点击的页面排序TOP10','-'*100)top_10_order = etl_rdd.map(lambda tup:(tup[4],1))print('点击量排行',top_10_order.take(10))top_10_reducebykey = top_10_order.reduceByKey(lambda agg,curr:agg+curr)print('进行聚合',top_10_reducebykey.take(10))sortby_top10 = top_10_reducebykey.sortBy(lambda line:line[1],ascending=False)print('进行排序',sortby_top10.take(10))
# 4- 数据输出
# 5- 释放资源sc.stop()

3、需求三:统计每分钟用户搜索次数

    print('统计每分钟用户搜索次数','-'*50)search_map_rdd = etl_rdd.map(lambda tup:(tup[0][0:5],1))print('把网站域名切出来,变成(hello,1)的格式',search_map_rdd.take(10))search_reducekey_rdd = search_map_rdd.reduceByKey(lambda agg,curr:agg+curr)print('进行聚合',search_reducekey_rdd.take(10))sort_rdd =search_reducekey_rdd.sortBy(lambda tup:tup)print('按照时间进行排序',sort_rdd.take(10))
# 4- 数据输出
# 5- 释放资源sc.stop()

学生系统案例

 数据准备

import os
from pyspark import SparkConf, SparkContext, StorageLevel
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructFieldif __name__ == '__main__':
# 1- 创建SparkSession对象conf = SparkConf().setAppName('学生案例').setMaster('local[*]')sc = SparkContext(conf=conf)
# 2- 数据输入init_rdd= sc.textFile('hdfs://node1:8020/input/day04_home_work.txt')# 3- 数据处理stu_rdd = init_rdd.map(lambda line:line.split(',')).cache()print('切分后的数据为',stu_rdd.collect())
# 1、需求一:该系总共有多少学生stu_cnt = stu_rdd.map(lambda line:line[0]).distinct().count()print(f'该系总共有{stu_cnt}个学生')
# 2、需求二:该系共开设了多少门课程subject_cnt = stu_rdd.map(lambda line:line[1]).distinct().count()print(f'该系共开设了{subject_cnt}门课程')
# 3、需求三:Tom同学的总成绩平均分是多少tom_score_sum = stu_rdd.filter(lambda line:line[0]=='Tom').map(lambda line:int(line[2])).sum()tom_subject_num = stu_rdd.filter(lambda line:line[0]=='Tom').map(lambda line:line[1]).distinct().count()tom_score_avg = tom_score_sum/tom_subject_numprint(f'Tom同学的总成绩平均分是{round(tom_score_avg,2)}')# 4、需求四:求每名同学的选修的课程门数
#     every_student_course_num = stu_rdd.map(lambda x: (x[0], x[1])).distinct().map(lambda tup: (tup[0], 1))\
#         .reduceByKey(lambda agg, curr: agg + curr).collect()every_student_course_num = stu_rdd.map(lambda x: (x[0], x[1])).distinct()print('学生与选修课,把一个学生重修一门选修课的情况去掉',every_student_course_num.collect())every_student_course_num2 = every_student_course_num\.map(lambda tup:(tup[0],1))\.reduceByKey(lambda agg,curr:agg+curr).collect()print('每个同学的选修课数',every_student_course_num2)
# 5、需求五:该系DataBase课程共有多少人选修subject_database = stu_rdd.filter(lambda line:line[1]=='DataBase').map(lambda line:line[0]).distinct().count()print(f'数据库有{subject_database}人选修')
# 6、需求六:各门课程的平均分是多少total_score = stu_rdd.map(lambda x:(x[1],int(x[2]))).groupByKey().map(lambda x:(x[0],sum(x[1])))print('各科总分为',total_score.collect())total_num = stu_rdd.map(lambda x: (x[1], 1)).groupByKey().map(lambda x: (x[0], sum(x[1])))print('各科的数量为',total_num.collect())#total_join =total_score.join(total_num)print('join后结果',total_join.collect())
# 各科总分为 [('DataBase', 170), ('Algorithm', 110), ('DataStructure', 140)]
# 各科的数量为 [('DataBase', 2), ('Algorithm', 2), ('DataStructure', 2)]
# 合表后为 [('DataBase', (170, 2)), ('DataStructure', (140, 2)), ('Algorithm', (110, 2))]total_avg =total_score.join(total_num).map(lambda x: (x[0], round(x[1][0] / x[1][1], 2))).collect()print('各科目的平均分为',total_avg)
# 4- 数据输出# 5- 释放资源sc.stop()

4. RDD依赖的分类

窄依赖:  父RDD分区与子RDD分区是一对一关系

宽依赖:  父RDD分区与子RDD分区是一对多关系

5. 简述DAG与Stage 形成过程 

DAG :  

1-Spark应用程序,遇到了Action算子以后,就会触发一个Job任务的产生。Job任务首先将它所依赖的全部算子加载到内存中,形成一个完整Stage

2-会根据算子间的依赖关系,从Action算子开始,从后往前进行回溯,如果算子间是窄依赖,就放到同一个Stage中;如果是宽依赖,就形成新的Stage。一直回溯完成。

Stage : 

1-Driver进程启动成功以后,底层基于PY4J创建SparkContext对象,在创建SparkContext对象的过程中,还会同时创建DAGScheduler(DAG调度器)和TaskScheduler(Task调度器)
    DAGScheduler: 对Job任务形成DAG有向无环图和划分Stage阶段
    TaskScheduler: 调度Task线程给到Executor进程进行执行

2-Spark应用程序遇到了一个Action算子以后,就会触发一个Job任务的产生。SparkContext对象将Job任务提交DAG调度器,对Job形成DAG有向无环图和划分Stage阶段。并且确定每个Stage阶段需要有多少个Task线程,将这些Task线程放置在TaskSet集合中。再将TaskSet集合给到Task调度器。

3-Task调度器接收到DAG调度器传递过来的TaskSet集合以后,将Task线程分配给到具体的Executor进行执行,底层是基于调度队列SchedulerBackend。Stage阶段是一个一个按顺序执行的,不能并行执行。

4-Executor进程开始执行具体的Task线程。后续过程就是Driver监控多个Executor的执行状态,直到Job任务执行完成。

这篇关于2024.1.8 Day04_SparkCore_homeWork的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/584266

相关文章

Spark学习之路 (十四)SparkCore的调优之资源调优JVM的GC垃圾收集器

《2021年最新版大数据面试题全面开启更新》 欢迎关注github《大数据成神之路》 目录 一、概述 二、垃圾收集器(garbage collector (GC)) 是什么? 三、为什么需要GC? 四、为什么需要多种GC? 五、对象存活的判断 六、垃圾回收算法 6.1 标记 -清除算法 6.2 复制算法 6.3 标记-整理算法 6.4 分代收集算法 七、垃圾收集器 7.1 Serial收集器

Spark学习之路 (十三)SparkCore的调优之资源调优JVM的基本架构

《2021年最新版大数据面试题全面开启更新》 欢迎关注github《大数据成神之路》   目录 一、JVM的结构图 1.1 Java内存结构 1.2 如何通过参数来控制各区域的内存大小 1.3 控制参数 1.4 JVM和系统调用之间的关系 二、JVM各区域的作用 2.1 Java堆(Heap) 2.2 方法区(Method Area) 2.3 程序计数器(Program Counter R

Spark学习之路 (十二)SparkCore的调优之资源调优

《2021年最新版大数据面试题全面开启更新》 欢迎关注github《大数据成神之路》 目录 一、概述 二、Spark作业基本运行原理 三、资源参数调优 3.1 num-executors 3.2 executor-memory 3.3 executor-cores 3.4 driver-memory 3.5 spark.default.parallelism 3.6 spark.storag

Spark学习之路 (十)SparkCore的调优之Shuffle调优

《2021年最新版大数据面试题全面开启更新》 欢迎关注github《大数据成神之路》 目录 一、概述 二、shuffle的定义 三、ShuffleManager发展概述 四、HashShuffleManager的运行原理 4.1 未经优化的HashShuffleManager 4.2 优化后的HashShuffleManager 五、SortShuffleManager运行原理 5.1 普通

Spark学习之路 (九)SparkCore的调优之数据倾斜调优

《2021年最新版大数据面试题全面开启更新》 欢迎关注github《大数据成神之路》 目录 调优概述 数据倾斜发生时的现象 数据倾斜发生的原理 如何定位导致数据倾斜的代码 某个task执行特别慢的情况 某个task莫名其妙内存溢出的情况 查看导致数据倾斜的key的数据分布情况 数据倾斜的解决方案 解决方案一:使用Hive ETL预处理数据 解决方案二:过滤少数导致倾斜的key 解决方案三:提

Spark学习之路 (八)SparkCore的调优之开发调优

《2021年最新版大数据面试题全面开启更新》 欢迎关注github《大数据成神之路》 前言 在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。Spark的功能涵盖了大数据领域的离线批处理、SQL类处理、流式/实时计算、机器学习、图计算等各种不同类型的计算操作,应用范围与前景非常广泛。在美团•大众点评,已经有很多同学在各种项目中尝试使用Spark。大多数同学(包括笔

C++复习day04

一、函数重载 1.什么是函数重载? 自然语言中,一个词可以有多重含义,人们可以通过上下文来判断该词真实的含义,即该词被重载了。比如:以前有一个笑话,国有两个体育项目大家根本不用看,也不用担心。一个是乒乓球,一个是男足。前者是“谁也赢不了!”,后者是“谁也赢不了!” 函数重载:是函数的一种特殊情况,C++允许在同一作用域中声明几个功能类似的同名函数,这 些同名函数的形参列表(参数个数

React学习day04-useEffect、自定义Hook函数

11、useEffect(一个React Hook函数) (1)作用:用于在React组件中创建不是由事件引起而是由渲染本身引起的操作,比如发送AJAX请求,更改DOM等(即:视图渲染完后会触发一些事件) (2)语法:(useEffect(()=>{},[]))         1)参数1是一个函数,也叫副作用函数,可在其内部放置执行的操作         2)参数2是一个数组(可选参),

从零手搓中文大模型|Day04|模型参数和训练启动|我的micro大模型预训练成功跑起来啦

走过路过不要错过,先关注一下,第一时间获取最新进度(或催更) 从零手搓中文大模型|🚀Day04 前面已经完成了数据预处理,今天我们来研究一下模型的配置。 litgpt使用的配置文件和transformers有点不太一样,它的仓库里提供了一些预训练所用的yaml配置文件样例。这个主要用于需要自定义模型的场景。 另外litgpt也内置了一些huggingface上的现成模型,可以直接拿来使

cgb2111-day04

文章目录 一,多表联查--1,准备表和数据 二,笛卡尔积--1,概述--2,测试 三,连接查询--1,概述--2,测试 四,子查询--1,概述--2,测试 五,综合练习--1,测试 六,扩展:索引--1,概述--2,测试--3,总结 作业 一,多表联查 –1,准备表和数据 create table courses(cno varchar(5) not null,cna