本文主要是介绍PySpark withColumn更新或添加列,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
原文:https://sparkbyexamples.com/pyspark/pyspark-withcolumn/
PySparkwithColumn()
是DataFrame的转换函数,用于更改或更新值,转换现有DataFrame列的数据类型,添加/创建新列以及多核。在本文中,我将使用withColumn()示例向您介绍常用的PySpark DataFrame列操作。
- PySpark withColumn –更改列的数据类型
- 转换/更改现有列的值
- 从现有列派生新列
- 添加具有文字值的列
- 重命名列名
- 删除DataFrame列
首先,让我们创建一个要使用的DataFrame。
data = [('James','','Smith','1991-04-01','M',3000),('Michael','Rose','','2000-05-19','M',4000),('Robert','','Williams','1978-09-05','M',4000),('Maria','Anne','Jones','1967-12-01','F',4000),('Jen','Mary','Brown','1980-02-17','F',-1)
]columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
1.使用带有列的PySpark更改列DataType
通过在DataFramewithColumn()
上使用PySpark,我们可以强制转换或更改列的数据类型。为了更改数据类型,您还需要将cast()
函数与withColumn()一起使用。下面的语句将“工资”列的数据类型从String
更改Integer
为。
df2 = df.withColumn("salary",col("salary").cast("Integer"))
df2.printSchema()
2.更新现有列的值
DataFrame的PySparkwithColumn()
函数也可以用于更改现有列的值。为了更改值,将现有的列名作为第一个参数传递,并将要分配的值作为第二个参数传递给withColumn()函数。请注意,第二个参数应为Column
type。
df3 = df.withColumn("salary",col("salary")*100)
df3.printSchema()
此代码段将“ salary”的值乘以100,并将其值更新回“ salary”列。
3.从现有的创建新列
要添加/创建新列,请使用您希望新列成为的名称指定第一个参数,并通过对现有列执行操作来使用第二个参数来分配值。
df4 = df.withColumn("CopiedColumn",col("salary")* -1)
df3.printSchema()
此代码段通过将“工资”列乘以值-1来创建新列“ CopiedColumn”。
4.使用withColumn()添加一个新列
为了创建新列,请将所需的列名传递给withColumn()
转换函数的第一个参数。确保此新列尚未出现在DataFrame上(如果显示的话)会更新该列的值。
在下面的代码片段中,使用lit()
函数将常量值添加到DataFrame列。我们还可以链接以添加多个列。
df5 = df.withColumn("Country", lit("USA"))
df5.printSchema()df6 = df.withColumn("Country", lit("USA")) \.withColumn("anotherColumn",lit("anotherValue"))
df6.printSchema()
5.重命名列名
尽管您不能使用withColumn重命名列,但我还是想介绍一下,因为重命名是我们在DataFrame上执行的常见操作之一。要重命名现有列,请使用withColumnRenamed()
DataFrame上的函数。
df.withColumnRenamed("gender","sex") \.show(truncate=False)
6.从PySpark DataFrame删除一列
使用“放置”功能从DataFrame放置特定的列。
df4.drop("CopiedColumn") \
.show(truncate=False)
**注意:**请注意,所有这些函数在应用函数后都将返回新的DataFrame,而不是更新DataFrame。
PySpark withColumn完整示例
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, StringType,IntegerTypespark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()data = [('James','','Smith','1991-04-01','M',3000),('Michael','Rose','','2000-05-19','M',4000),('Robert','','Williams','1978-09-05','M',4000),('Maria','Anne','Jones','1967-12-01','F',4000),('Jen','Mary','Brown','1980-02-17','F',-1)
]columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.printSchema()
df.show(truncate=False)df2 = df.withColumn("salary",col("salary").cast("Integer"))
df2.printSchema()
df2.show(truncate=False)df3 = df.withColumn("salary",col("salary")*100)
df3.printSchema()
df3.show(truncate=False) df4 = df.withColumn("CopiedColumn",col("salary")* -1)
df4.printSchema()df5 = df.withColumn("Country", lit("USA"))
df5.printSchema()df6 = df.withColumn("Country", lit("USA")) \.withColumn("anotherColumn",lit("anotherValue"))
df6.printSchema()df.withColumnRenamed("gender","sex") \.show(truncate=False) df4.drop("CopiedColumn") \
.show(truncate=False)
完整的代码可以从PySpark withColumn GitHub project下载
学习愉快!
这篇关于PySpark withColumn更新或添加列的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!