Spark实战(四)spark+python快速入门实战小例子(PySpark)

2024-09-03 23:18

本文主要是介绍Spark实战(四)spark+python快速入门实战小例子(PySpark),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

   由于目前很多spark程序资料都是用scala语言写的,但是现在需要用python来实现,于是在网上找了scala写的例子改为python实现

1、集群测试实例

   代码如下:
from pyspark.sql import SparkSession

if __name__ == "__main__":spark = SparkSession\.builder\.appName("PythonWordCount")\.master("spark://mini1:7077") \.getOrCreate()spark.conf.set("spark.executor.memory", "500M")sc = spark.sparkContexta = sc.parallelize([1, 2, 3])b = a.flatMap(lambda x: (x,x ** 2))print(a.collect())print(b.collect())

   运行结果:
在这里插入图片描述

2、从文件中读取

   为了方便调试,这里采用本地模式进行测试

from py4j.compat import long
from pyspark.sql import SparkSession
def formatData(arr):# arr = arr.split(",")mb = (arr[0], arr[2])flag = arr[3]time = long(arr[1])# time = arr[1]if flag == "1":time = -timereturn (mb,time)if __name__ == "__main__":spark = SparkSession\.builder\.appName("PythonWordCount")\.master("local")\.getOrCreate()sc = spark.sparkContext# sc = spark.sparkContextline = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\bs_log").map(lambda x: x.split(','))count = line.map(lambda x: formatData(x))rdd0 = count.reduceByKey(lambda agg, obj: agg + obj)# print(count.collect())line2 = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\lac_info.txt").map(lambda x: x.split(','))rdd = count.map(lambda arr: (arr[0][1], (arr[0][0], arr[1])))rdd1 = line2.map(lambda arr: (arr[0], (arr[1], arr[2])))rdd3 = rdd.join(rdd1)rdd4 =rdd0.map(lambda arr: (arr[0][0], arr[0][1], arr[1]))# .map(lambda arr: list(arr).sortBy(lambda arr1: arr1[2]).reverse)rdd5 = rdd4.groupBy(lambda arr: arr[0]).values().map(lambda das: sorted(list(das), key=lambda x: x[2], reverse=True))print(rdd5.collect())

   原文件数据:
在这里插入图片描述

在这里插入图片描述

   结果如下:

[[('18688888888', '16030401EAFB68F1E3CDF819735E1C66', 87600), ('18688888888', '9F36407EAD0629FC166F14DDE7970F68', 51200), ('18688888888', 'CC0710CC94ECC657A8561DE549D940E0', 1300)], [('18611132889', '16030401EAFB68F1E3CDF819735E1C66', 97500), ('18611132889', '9F36407EAD0629FC166F14DDE7970F68', 54000), ('18611132889', 'CC0710CC94ECC657A8561DE549D940E0', 1900)]]

3、读取文件并将结果保存至文件

from pyspark.sql import SparkSession
from py4j.compat import longdef formatData(arr):# arr = arr.split(",")mb = (arr[0], arr[2])flag = arr[3]time = long(arr[1])# time = arr[1]if flag == "1":time = -timereturn (mb,time)if __name__ == "__main__":spark = SparkSession\.builder\.appName("PythonWordCount")\.master("local")\.getOrCreate()sc = spark.sparkContextline = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\bs_log").map(lambda x: x.split(','))rdd0 = line.map(lambda x: formatData(x))rdd1 = rdd0.reduceByKey(lambda agg, obj: agg + obj).map(lambda t: (t[0][1], (t[0][0], t[1])))line2 = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\lac_info.txt").map(lambda x: x.split(','))rdd2 = line2.map(lambda x: (x[0], (x[1], x[2])))rdd3 = rdd1.join(rdd2).map(lambda x: (x[1][0][0], x[0], x[1][0][1], x[1][1][0], x[1][1][1]))rdd4 = rdd3.groupBy(lambda x: x[0])rdd5 = rdd4.mapValues(lambda das: sorted(list(das), key=lambda x: x[2], reverse=True)[:2])print(rdd1.join(rdd2).collect())print(rdd5.collect())rdd5.saveAsTextFile("D:\\code\\hadoop\\data\\spark\\day02\\out1")sc.stop()
   结果如下:

在这里插入图片描述

4、根据自定义规则匹配

import urllib
from pyspark.sql import SparkSession
def getUrls(urls):url = urls[0]parsed = urllib.parse.urlparse(url)return (parsed.netloc, url, urls[1])if __name__ == "__main__":spark = SparkSession \.builder \.appName("PythonWordCount") \.master("local") \.getOrCreate()sc = spark.sparkContextline = sc.textFile("D:\\code\\hadoop\\data\\spark\\day02\\itcast.log").map(lambda x: x.split('\t'))//从数据库中加载规则arr = ["java.itcast.cn", "php.itcast.cn", "net.itcast.cn"]rdd1 = line.map(lambda x: (x[1], 1))rdd2 = rdd1.reduceByKey(lambda agg, obj: agg + obj)rdd3 = rdd2.map(lambda x: getUrls(x))for ins in arr:rdd = rdd3.filter(lambda x:x[0] == ins)result = rdd.sortBy(lambda x: x[2], ascending = False).take(2)print(result)spark.stop()

   结果如下:
在这里插入图片描述

5、自定义类排序

from operator import gt
from pyspark.sql import SparkSessionclass Girl:def __init__(self, faceValue, age):self.faceValue = faceValueself.age = agedef __gt__(self, other):if other.faceValue == self.faceValue:return gt(self.age, other.age)else:return gt(self.faceValue, other.faceValue)if __name__ == "__main__":spark = SparkSession\.builder\.appName("PythonWordCount")\.master("local")\.getOrCreate()sc = spark.sparkContextrdd1 = sc.parallelize([("yuihatano", 90, 28, 1), ("angelababy", 90, 27, 2), ("JuJingYi", 95, 22, 3)])rdd2 = rdd1.sortBy(lambda das: Girl(das[1], das[2]),False)print(rdd2.collect())sc.stop()

   结果如下:

在这里插入图片描述

6、JDBC

from pyspark import SQLContext
from pyspark.sql import SparkSessionif __name__ == "__main__":spark = SparkSession\.builder\.appName("PythonWordCount")\.master("local")\.getOrCreate()sc = spark.sparkContextsqlContext = SQLContext(sc)df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/hellospark",driver="com.mysql.jdbc.Driver",dbtable="(select * from actor) tmp",user="root",password="123456").load()print(df.select('description','age').show(2))# print(df.printSchema)sc.stop()

   结果如下:
在这里插入图片描述

这篇关于Spark实战(四)spark+python快速入门实战小例子(PySpark)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1134332

相关文章

python获取指定名字的程序的文件路径的两种方法

《python获取指定名字的程序的文件路径的两种方法》本文主要介绍了python获取指定名字的程序的文件路径的两种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要... 最近在做项目,需要用到给定一个程序名字就可以自动获取到这个程序在Windows系统下的绝对路径,以下

使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解

《使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解》本文详细介绍了如何使用Python通过ncmdump工具批量将.ncm音频转换为.mp3的步骤,包括安装、配置ffmpeg环... 目录1. 前言2. 安装 ncmdump3. 实现 .ncm 转 .mp34. 执行过程5. 执行结

Python实现批量CSV转Excel的高性能处理方案

《Python实现批量CSV转Excel的高性能处理方案》在日常办公中,我们经常需要将CSV格式的数据转换为Excel文件,本文将介绍一个基于Python的高性能解决方案,感兴趣的小伙伴可以跟随小编一... 目录一、场景需求二、技术方案三、核心代码四、批量处理方案五、性能优化六、使用示例完整代码七、小结一、

Python中 try / except / else / finally 异常处理方法详解

《Python中try/except/else/finally异常处理方法详解》:本文主要介绍Python中try/except/else/finally异常处理方法的相关资料,涵... 目录1. 基本结构2. 各部分的作用tryexceptelsefinally3. 执行流程总结4. 常见用法(1)多个e

Python中logging模块用法示例总结

《Python中logging模块用法示例总结》在Python中logging模块是一个强大的日志记录工具,它允许用户将程序运行期间产生的日志信息输出到控制台或者写入到文件中,:本文主要介绍Pyt... 目录前言一. 基本使用1. 五种日志等级2.  设置报告等级3. 自定义格式4. C语言风格的格式化方法

Python实现精确小数计算的完全指南

《Python实现精确小数计算的完全指南》在金融计算、科学实验和工程领域,浮点数精度问题一直是开发者面临的重大挑战,本文将深入解析Python精确小数计算技术体系,感兴趣的小伙伴可以了解一下... 目录引言:小数精度问题的核心挑战一、浮点数精度问题分析1.1 浮点数精度陷阱1.2 浮点数误差来源二、基础解决

SpringBoot 多环境开发实战(从配置、管理与控制)

《SpringBoot多环境开发实战(从配置、管理与控制)》本文详解SpringBoot多环境配置,涵盖单文件YAML、多文件模式、MavenProfile分组及激活策略,通过优先级控制灵活切换环境... 目录一、多环境开发基础(单文件 YAML 版)(一)配置原理与优势(二)实操示例二、多环境开发多文件版

使用Python实现Word文档的自动化对比方案

《使用Python实现Word文档的自动化对比方案》我们经常需要比较两个Word文档的版本差异,无论是合同修订、论文修改还是代码文档更新,人工比对不仅效率低下,还容易遗漏关键改动,下面通过一个实际案例... 目录引言一、使用python-docx库解析文档结构二、使用difflib进行差异比对三、高级对比方

深度解析Python中递归下降解析器的原理与实现

《深度解析Python中递归下降解析器的原理与实现》在编译器设计、配置文件处理和数据转换领域,递归下降解析器是最常用且最直观的解析技术,本文将详细介绍递归下降解析器的原理与实现,感兴趣的小伙伴可以跟随... 目录引言:解析器的核心价值一、递归下降解析器基础1.1 核心概念解析1.2 基本架构二、简单算术表达

Three.js构建一个 3D 商品展示空间完整实战项目

《Three.js构建一个3D商品展示空间完整实战项目》Three.js是一个强大的JavaScript库,专用于在Web浏览器中创建3D图形,:本文主要介绍Three.js构建一个3D商品展... 目录引言项目核心技术1. 项目架构与资源组织2. 多模型切换、交互热点绑定3. 移动端适配与帧率优化4. 可