Spark实战(四)spark+python快速入门实战小例子(PySpark)

2024-09-03 23:18

本文主要是介绍Spark实战(四)spark+python快速入门实战小例子(PySpark),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

   由于目前很多spark程序资料都是用scala语言写的,但是现在需要用python来实现,于是在网上找了scala写的例子改为python实现

1、集群测试实例

   代码如下:
from pyspark.sql import SparkSession

if __name__ == "__main__":spark = SparkSession\.builder\.appName("PythonWordCount")\.master("spark://mini1:7077") \.getOrCreate()spark.conf.set("spark.executor.memory", "500M")sc = spark.sparkContexta = sc.parallelize([1, 2, 3])b = a.flatMap(lambda x: (x,x ** 2))print(a.collect())print(b.collect())

   运行结果:
在这里插入图片描述

2、从文件中读取

   为了方便调试,这里采用本地模式进行测试

from py4j.compat import long
from pyspark.sql import SparkSession
def formatData(arr):# arr = arr.split(",")mb = (arr[0], arr[2])flag = arr[3]time = long(arr[1])# time = arr[1]if flag == "1":time = -timereturn (mb,time)if __name__ == "__main__":spark = SparkSession\.builder\.appName("PythonWordCount")\.master("local")\.getOrCreate()sc = spark.sparkContext# sc = spark.sparkContextline = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\bs_log").map(lambda x: x.split(','))count = line.map(lambda x: formatData(x))rdd0 = count.reduceByKey(lambda agg, obj: agg + obj)# print(count.collect())line2 = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\lac_info.txt").map(lambda x: x.split(','))rdd = count.map(lambda arr: (arr[0][1], (arr[0][0], arr[1])))rdd1 = line2.map(lambda arr: (arr[0], (arr[1], arr[2])))rdd3 = rdd.join(rdd1)rdd4 =rdd0.map(lambda arr: (arr[0][0], arr[0][1], arr[1]))# .map(lambda arr: list(arr).sortBy(lambda arr1: arr1[2]).reverse)rdd5 = rdd4.groupBy(lambda arr: arr[0]).values().map(lambda das: sorted(list(das), key=lambda x: x[2], reverse=True))print(rdd5.collect())

   原文件数据:
在这里插入图片描述

在这里插入图片描述

   结果如下:

[[('18688888888', '16030401EAFB68F1E3CDF819735E1C66', 87600), ('18688888888', '9F36407EAD0629FC166F14DDE7970F68', 51200), ('18688888888', 'CC0710CC94ECC657A8561DE549D940E0', 1300)], [('18611132889', '16030401EAFB68F1E3CDF819735E1C66', 97500), ('18611132889', '9F36407EAD0629FC166F14DDE7970F68', 54000), ('18611132889', 'CC0710CC94ECC657A8561DE549D940E0', 1900)]]

3、读取文件并将结果保存至文件

from pyspark.sql import SparkSession
from py4j.compat import longdef formatData(arr):# arr = arr.split(",")mb = (arr[0], arr[2])flag = arr[3]time = long(arr[1])# time = arr[1]if flag == "1":time = -timereturn (mb,time)if __name__ == "__main__":spark = SparkSession\.builder\.appName("PythonWordCount")\.master("local")\.getOrCreate()sc = spark.sparkContextline = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\bs_log").map(lambda x: x.split(','))rdd0 = line.map(lambda x: formatData(x))rdd1 = rdd0.reduceByKey(lambda agg, obj: agg + obj).map(lambda t: (t[0][1], (t[0][0], t[1])))line2 = sc.textFile("D:\\code\\hadoop\\data\\spark\\day1\\lac_info.txt").map(lambda x: x.split(','))rdd2 = line2.map(lambda x: (x[0], (x[1], x[2])))rdd3 = rdd1.join(rdd2).map(lambda x: (x[1][0][0], x[0], x[1][0][1], x[1][1][0], x[1][1][1]))rdd4 = rdd3.groupBy(lambda x: x[0])rdd5 = rdd4.mapValues(lambda das: sorted(list(das), key=lambda x: x[2], reverse=True)[:2])print(rdd1.join(rdd2).collect())print(rdd5.collect())rdd5.saveAsTextFile("D:\\code\\hadoop\\data\\spark\\day02\\out1")sc.stop()
   结果如下:

在这里插入图片描述

4、根据自定义规则匹配

import urllib
from pyspark.sql import SparkSession
def getUrls(urls):url = urls[0]parsed = urllib.parse.urlparse(url)return (parsed.netloc, url, urls[1])if __name__ == "__main__":spark = SparkSession \.builder \.appName("PythonWordCount") \.master("local") \.getOrCreate()sc = spark.sparkContextline = sc.textFile("D:\\code\\hadoop\\data\\spark\\day02\\itcast.log").map(lambda x: x.split('\t'))//从数据库中加载规则arr = ["java.itcast.cn", "php.itcast.cn", "net.itcast.cn"]rdd1 = line.map(lambda x: (x[1], 1))rdd2 = rdd1.reduceByKey(lambda agg, obj: agg + obj)rdd3 = rdd2.map(lambda x: getUrls(x))for ins in arr:rdd = rdd3.filter(lambda x:x[0] == ins)result = rdd.sortBy(lambda x: x[2], ascending = False).take(2)print(result)spark.stop()

   结果如下:
在这里插入图片描述

5、自定义类排序

from operator import gt
from pyspark.sql import SparkSessionclass Girl:def __init__(self, faceValue, age):self.faceValue = faceValueself.age = agedef __gt__(self, other):if other.faceValue == self.faceValue:return gt(self.age, other.age)else:return gt(self.faceValue, other.faceValue)if __name__ == "__main__":spark = SparkSession\.builder\.appName("PythonWordCount")\.master("local")\.getOrCreate()sc = spark.sparkContextrdd1 = sc.parallelize([("yuihatano", 90, 28, 1), ("angelababy", 90, 27, 2), ("JuJingYi", 95, 22, 3)])rdd2 = rdd1.sortBy(lambda das: Girl(das[1], das[2]),False)print(rdd2.collect())sc.stop()

   结果如下:

在这里插入图片描述

6、JDBC

from pyspark import SQLContext
from pyspark.sql import SparkSessionif __name__ == "__main__":spark = SparkSession\.builder\.appName("PythonWordCount")\.master("local")\.getOrCreate()sc = spark.sparkContextsqlContext = SQLContext(sc)df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/hellospark",driver="com.mysql.jdbc.Driver",dbtable="(select * from actor) tmp",user="root",password="123456").load()print(df.select('description','age').show(2))# print(df.printSchema)sc.stop()

   结果如下:
在这里插入图片描述

这篇关于Spark实战(四)spark+python快速入门实战小例子(PySpark)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1134332

相关文章

Python中你不知道的gzip高级用法分享

《Python中你不知道的gzip高级用法分享》在当今大数据时代,数据存储和传输成本已成为每个开发者必须考虑的问题,Python内置的gzip模块提供了一种简单高效的解决方案,下面小编就来和大家详细讲... 目录前言:为什么数据压缩如此重要1. gzip 模块基础介绍2. 基本压缩与解压缩操作2.1 压缩文

Python设置Cookie永不超时的详细指南

《Python设置Cookie永不超时的详细指南》Cookie是一种存储在用户浏览器中的小型数据片段,用于记录用户的登录状态、偏好设置等信息,下面小编就来和大家详细讲讲Python如何设置Cookie... 目录一、Cookie的作用与重要性二、Cookie过期的原因三、实现Cookie永不超时的方法(一)

Python内置函数之classmethod函数使用详解

《Python内置函数之classmethod函数使用详解》:本文主要介绍Python内置函数之classmethod函数使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录1. 类方法定义与基本语法2. 类方法 vs 实例方法 vs 静态方法3. 核心特性与用法(1编程客

从入门到精通MySQL联合查询

《从入门到精通MySQL联合查询》:本文主要介绍从入门到精通MySQL联合查询,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下... 目录摘要1. 多表联合查询时mysql内部原理2. 内连接3. 外连接4. 自连接5. 子查询6. 合并查询7. 插入查询结果摘要前面我们学习了数据库设计时要满

Python函数作用域示例详解

《Python函数作用域示例详解》本文介绍了Python中的LEGB作用域规则,详细解析了变量查找的四个层级,通过具体代码示例,展示了各层级的变量访问规则和特性,对python函数作用域相关知识感兴趣... 目录一、LEGB 规则二、作用域实例2.1 局部作用域(Local)2.2 闭包作用域(Enclos

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

从原理到实战深入理解Java 断言assert

《从原理到实战深入理解Java断言assert》本文深入解析Java断言机制,涵盖语法、工作原理、启用方式及与异常的区别,推荐用于开发阶段的条件检查与状态验证,并强调生产环境应使用参数验证工具类替代... 目录深入理解 Java 断言(assert):从原理到实战引言:为什么需要断言?一、断言基础1.1 语

使用Python实现可恢复式多线程下载器

《使用Python实现可恢复式多线程下载器》在数字时代,大文件下载已成为日常操作,本文将手把手教你用Python打造专业级下载器,实现断点续传,多线程加速,速度限制等功能,感兴趣的小伙伴可以了解下... 目录一、智能续传:从崩溃边缘抢救进度二、多线程加速:榨干网络带宽三、速度控制:做网络的好邻居四、终端交互

Python中注释使用方法举例详解

《Python中注释使用方法举例详解》在Python编程语言中注释是必不可少的一部分,它有助于提高代码的可读性和维护性,:本文主要介绍Python中注释使用方法的相关资料,需要的朋友可以参考下... 目录一、前言二、什么是注释?示例:三、单行注释语法:以 China编程# 开头,后面的内容为注释内容示例:示例:四

Python中win32包的安装及常见用途介绍

《Python中win32包的安装及常见用途介绍》在Windows环境下,PythonWin32模块通常随Python安装包一起安装,:本文主要介绍Python中win32包的安装及常见用途的相关... 目录前言主要组件安装方法常见用途1. 操作Windows注册表2. 操作Windows服务3. 窗口操作