Pyspark DataFrame常用操作函数和示例

2024-09-06 19:12

本文主要是介绍Pyspark DataFrame常用操作函数和示例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

针对类型:pyspark.sql.dataframe.DataFrame

目录

1.打印前几行

1.1 show()函数

1.2 take()函数

2. 读取文件

2.1 spark.read.csv

3. 获取某行某列的值(具体值)

4.查看列名

5.修改列名

5.1 修改单个列名

5.2 修改多个列名

5.2.1 链式调用 withColumnRenamed 方法

5.2.2 使用 selectExpr 方法

6. pandas类型转化为pyspark  pandas

7.选择特定的列,创建一个新的 DataFrame

8.列表套字典格式转化为pyspark DataFrame

9. 根据某列或者某列进行去重

10. pyspark 的两个dataframe合并

11.查看 pyspark dataframe中某列为空的数量

12.删除 pyspark dataframe中 第一行数据

13.pyspark dataframe用空格拼接两列得到新的列

14.将pyspark dataframe 保存到集群(分片)

16.将pyspark dataframe 保存为csv

实际场景1

实际场景2


1.打印前几行

1.1 show()函数

  • show() 函数会将指定数量的行(默认是 20 行)转换为字符串并打印到控制台。
  • 无返回值,直接打印数据到控制台。

用法:

df.show()  # 默认显示前 20 行
df.show(10)  # 显示前 10 行

1.2 take()函数

  • 用于获取 DataFrame 的前 N 行数据,返回一个包含 Row 对象的列表。
  • 返回一个包含 Row 对象的列表。
  • 返回一个包含前 N 行数据的列表,每行数据以 Row 对象的形式存在。你可以通过索引访问这些行,并进一步处理它们。
rows = df.take(5)  # 获取前 5 行数据
for row in rows:print(row)

2. 读取文件

2.1 spark.read.csv

df = spark.read.csv(path, sep="\t", header=False, inferSchema=True).toDF('id','time','label','feature')
  • inferSchema=True: 让 Spark 自动推断 CSV 文件中各列的数据类型
  • toDF: 这是一个 DataFrame 方法,用于为 DataFrame 的列指定新的列名。

3. 获取某行某列的值(具体值)

直接获取 DataFrame 的特定行(例如第 562962 行)并不是一个高效的操作,因为 Spark 是

分布式计算框架,数据被分割并在多个节点上并行处理

# 获取第一行
first_row = df.first()# 获取 feature 列的值
first_row['feature_1']
# 获取前两行
rows = df.take(2)# 获取第二行
second_row = rows[1]# 获取 feature 列的值
second_row['feature']

4.查看列名

df.columns

5.修改列名

5.1 修改单个列名

# 修改列名
df_renamed = df.withColumnRenamed("name", "new_name")

5.2 修改多个列名

5.2.1 链式调用 withColumnRenamed 方法

# 修改多个列名
df_renamed = df.withColumnRenamed("id", "new_id").withColumnRenamed("name", "new_name")

5.2.2 使用 selectExpr 方法

注意:使用 selectExpr 方法时,最后只会得到你修改的列,即,在函数参数中的列名

如果想使用该方法时,还想要原来的列名,就直接, 在参数中加入,"原列名 as 原列名"

# 使用 selectExpr 修改列名
df_renamed = df.selectExpr("id as new_id", "name as new_name")

6. pandas类型转化为pyspark  pandas

pandas.core.frame.DataFrame 类型转化为 pyspark.sql.dataframe.DataFrame
# 将 Pandas DataFrame 转换为 PySpark DataFrame
pyspark_df = spark.createDataFrame(pandas_df)

7.选择特定的列,创建一个新的 DataFrame

# 选择某几列并创建新的 DataFrame
new_df = df.select("name", "age")

8.列表套字典格式转化为pyspark DataFrame

# 示例列表套字典
data = [{"name": "Alice", "age": 25, "id": 1},{"name": "Bob", "age": 30, "id": 2},{"name": "Cathy", "age": 35, "id": 3}
]# 将列表套字典转换为 PySpark DataFrame
df = spark.createDataFrame(data)# 显示 DataFrame
df.show()

9. 根据某列或者某列进行去重

duyuv3_1_df = duyuv3_1_df.dropDuplicates(['md5', 'time', 'label'])

10. pyspark 的两个dataframe合并

merged_v3_1_df = duyuv3_1_df.join(passid_md5_df, on=['md5'], how='left')

11.查看 pyspark dataframe中某列为空的数量

null_passid_count = merged_v3_1_df.filter(merged_v3_1_df['passid'].isNull()).count()
print(f"passid is null:{null_passid_count}")

12.删除 pyspark dataframe中 第一行数据

data_df = data_df.filter(col("_c0") != data_df.first()[0])
  • data_df.first(): 获取 DataFrame 的第一行数据。

  • col("_c0"): 获取 DataFrame 的第一列(默认情况下,Spark 会将 CSV 文件的列命名为 _c0_c1_c2, ...)。

  • data_df.filter(col("_c0") != data_df.first()[0]): 过滤掉第一行数据。这里假设第一行的第一列值与后续行的第一列值不同,因此通过比较第一列的值来过滤掉第一行。

13.pyspark dataframe用空格拼接两列得到新的列

# 拼接特征列replace_df = replace_df.withColumn('merged_feature',when(col('featurev3').isNotNull() & col('feature_v3_1').isNotNull(),concat_ws(' ', col('featurev3'), col('feature_v3_1'))).when(col('featurev3').isNotNull(), col('featurev3')).when(col('feature_v3_1').isNotNull(), col('feature_v3_1')).otherwise(lit('')))

14.将pyspark dataframe 保存到集群(分片)

save_path =f'afs://szth.afs.****.com:9902/user/fsi/duyuv3_1_feature/result_duyuv3_1/'
rdd_combined_duyuv3_1 = feature_cgc.rdd.map(lambda x: "\t".join(x))
rdd_combined_duyuv3_1.saveAsTextFile(save_path)

16.将pyspark dataframe 保存为csv

output_path = "afs://szth.afs.baidu.com:9902/user/fsi/tongweiwei/duyuv3_1_feature/data.csv"
final_df.write.csv(output_path, header=True, mode="overwrite")

实际场景1

对某列的值进行按照空格进行切割,然后在对切割后的数据判断冒号前面的字符串判断是否在某一个字符串中,如果在则去除掉

from pyspark.sql.types import StringType
from pyspark.sql.functions import concat_ws, col, when, lit, udfdef filter_feature(feature_str, filter_list):parts = feature_str.split()filtered_parts = [part for part in parts if str(part.split(':')[0]) not in filter_list.split(',')]return ' '.join(filtered_parts)filter_feature_udf = udf(filter_feature, StringType())df = duyuv3_df.withColumn("featurev3", filter_feature_udf(col("featurev3"), lit(duyuv3_str)))

实际场景2

对某列的值,按照空格进行切割后,按照冒号前面的进行排序

from pyspark.sql.types import StringType
from pyspark.sql.functions import concat_ws, col, when, lit, udfdef sort_by_number(value):# 将输入字符串按空格分割为列表value = value.strip().split(" ")value_list = []# 遍历列表中的每个元素,提取数字部分并排序for val in value:try:feat_num = int(val.split(":")[0])value_list.append(val)except:continuesorted_pairs = sorted(value_list, key=lambda x: int(x.split(":")[0]))return " ".join(sorted_pairs)sort_by_number_udf = udf(sort_by_number, StringType())feature_cgc = replace_df.withColumn("sorted_feat",sort_by_number_udf(replace_df["merged_feature"]))

这篇关于Pyspark DataFrame常用操作函数和示例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1142872

相关文章

0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型的操作流程

《0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeekR1模型的操作流程》DeepSeekR1模型凭借其强大的自然语言处理能力,在未来具有广阔的应用前景,有望在多个领域发... 目录0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型,3步搞定一个应

Python中顺序结构和循环结构示例代码

《Python中顺序结构和循环结构示例代码》:本文主要介绍Python中的条件语句和循环语句,条件语句用于根据条件执行不同的代码块,循环语句用于重复执行一段代码,文章还详细说明了range函数的使... 目录一、条件语句(1)条件语句的定义(2)条件语句的语法(a)单分支 if(b)双分支 if-else(

Python itertools中accumulate函数用法及使用运用详细讲解

《Pythonitertools中accumulate函数用法及使用运用详细讲解》:本文主要介绍Python的itertools库中的accumulate函数,该函数可以计算累积和或通过指定函数... 目录1.1前言:1.2定义:1.3衍生用法:1.3Leetcode的实际运用:总结 1.1前言:本文将详

Python中Markdown库的使用示例详解

《Python中Markdown库的使用示例详解》Markdown库是一个用于处理Markdown文本的Python工具,这篇文章主要为大家详细介绍了Markdown库的具体使用,感兴趣的... 目录一、背景二、什么是 Markdown 库三、如何安装这个库四、库函数使用方法1. markdown.mark

CSS弹性布局常用设置方式

《CSS弹性布局常用设置方式》文章总结了CSS布局与样式的常用属性和技巧,包括视口单位、弹性盒子布局、浮动元素、背景和边框样式、文本和阴影效果、溢出隐藏、定位以及背景渐变等,通过这些技巧,可以实现复杂... 一、单位元素vm 1vm 为视口的1%vh 视口高的1%vmin 参照长边vmax 参照长边re

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

MySQL数据库函数之JSON_EXTRACT示例代码

《MySQL数据库函数之JSON_EXTRACT示例代码》:本文主要介绍MySQL数据库函数之JSON_EXTRACT的相关资料,JSON_EXTRACT()函数用于从JSON文档中提取值,支持对... 目录前言基本语法路径表达式示例示例 1: 提取简单值示例 2: 提取嵌套值示例 3: 提取数组中的值注意

CSS3中使用flex和grid实现等高元素布局的示例代码

《CSS3中使用flex和grid实现等高元素布局的示例代码》:本文主要介绍了使用CSS3中的Flexbox和Grid布局实现等高元素布局的方法,通过简单的两列实现、每行放置3列以及全部代码的展示,展示了这两种布局方式的实现细节和效果,详细内容请阅读本文,希望能对你有所帮助... 过往的实现方法是使用浮动加

css渐变色背景|<gradient示例详解

《css渐变色背景|<gradient示例详解》CSS渐变是一种从一种颜色平滑过渡到另一种颜色的效果,可以作为元素的背景,它包括线性渐变、径向渐变和锥形渐变,本文介绍css渐变色背景|<gradien... 使用渐变色作为背景可以直接将渐China编程变色用作元素的背景,可以看做是一种特殊的背景图片。(是作为背

JAVA调用Deepseek的api完成基本对话简单代码示例

《JAVA调用Deepseek的api完成基本对话简单代码示例》:本文主要介绍JAVA调用Deepseek的api完成基本对话的相关资料,文中详细讲解了如何获取DeepSeekAPI密钥、添加H... 获取API密钥首先,从DeepSeek平台获取API密钥,用于身份验证。添加HTTP客户端依赖使用Jav