PySpark withColumn更新或添加列

2024-08-22 03:32
文章标签 更新 pyspark withcolumn

本文主要是介绍PySpark withColumn更新或添加列,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文:https://sparkbyexamples.com/pyspark/pyspark-withcolumn/

PySparkwithColumn()是DataFrame的转换函数,用于更改或更新值,转换现有DataFrame列的数据类型,添加/创建新列以及多核。在本文中,我将使用withColumn()示例向您介绍常用的PySpark DataFrame列操作。

  • PySpark withColumn –更改列的数据类型
  • 转换/更改现有列的值
  • 从现有列派生新列
  • 添加具有文字值的列
  • 重命名列名
  • 删除DataFrame列

首先,让我们创建一个要使用的DataFrame。

data = [('James','','Smith','1991-04-01','M',3000),('Michael','Rose','','2000-05-19','M',4000),('Robert','','Williams','1978-09-05','M',4000),('Maria','Anne','Jones','1967-12-01','F',4000),('Jen','Mary','Brown','1980-02-17','F',-1)
]columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)

1.使用带有列的PySpark更改列DataType

通过在DataFramewithColumn()上使用PySpark,我们可以强制转换或更改列的数据类型。为了更改数据类型,您还需要将cast()函数与withColumn()一起使用。下面的语句将“工资”列的数据类型从String更改Integer为。

 df2 = df.withColumn("salary",col("salary").cast("Integer"))
df2.printSchema()

2.更新现有列的值

DataFrame的PySparkwithColumn()函数也可以用于更改现有列的值。为了更改值,将现有的列名作为第一个参数传递,并将要分配的值作为第二个参数传递给withColumn()函数。请注意,第二个参数应为Columntype。

df3 = df.withColumn("salary",col("salary")*100)
df3.printSchema()

此代码段将“ salary”的值乘以100,并将其值更新回“ salary”列。

3.从现有的创建新列

要添加/创建新列,请使用您希望新列成为的名称指定第一个参数,并通过对现有列执行操作来使用第二个参数来分配值。

df4 = df.withColumn("CopiedColumn",col("salary")* -1)
df3.printSchema()

此代码段通过将“工资”列乘以值-1来创建新列“ CopiedColumn”。

4.使用withColumn()添加一个新列

为了创建新列,请将所需的列名传递给withColumn()转换函数的第一个参数。确保此新列尚未出现在DataFrame上(如果显示的话)会更新该列的值。

在下面的代码片段中,使用lit()函数将常量值添加到DataFrame列。我们还可以链接以添加多个列。

df5 = df.withColumn("Country", lit("USA"))
df5.printSchema()df6 = df.withColumn("Country", lit("USA")) \.withColumn("anotherColumn",lit("anotherValue"))
df6.printSchema()

5.重命名列名

尽管您不能使用withColumn重命名列,但我还是想介绍一下,因为重命名是我们在DataFrame上执行的常见操作之一。要重命名现有列,请使用withColumnRenamed()DataFrame上的函数。

df.withColumnRenamed("gender","sex") \.show(truncate=False) 

6.从PySpark DataFrame删除一列

使用“放置”功能从DataFrame放置特定的列。

df4.drop("CopiedColumn") \
.show(truncate=False) 

**注意:**请注意,所有这些函数在应用函数后都将返回新的DataFrame,而不是更新DataFrame。

PySpark withColumn完整示例

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, StringType,IntegerTypespark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()data = [('James','','Smith','1991-04-01','M',3000),('Michael','Rose','','2000-05-19','M',4000),('Robert','','Williams','1978-09-05','M',4000),('Maria','Anne','Jones','1967-12-01','F',4000),('Jen','Mary','Brown','1980-02-17','F',-1)
]columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)
df.printSchema()
df.show(truncate=False)df2 = df.withColumn("salary",col("salary").cast("Integer"))
df2.printSchema()
df2.show(truncate=False)df3 = df.withColumn("salary",col("salary")*100)
df3.printSchema()
df3.show(truncate=False) df4 = df.withColumn("CopiedColumn",col("salary")* -1)
df4.printSchema()df5 = df.withColumn("Country", lit("USA"))
df5.printSchema()df6 = df.withColumn("Country", lit("USA")) \.withColumn("anotherColumn",lit("anotherValue"))
df6.printSchema()df.withColumnRenamed("gender","sex") \.show(truncate=False) df4.drop("CopiedColumn") \
.show(truncate=False) 

完整的代码可以从PySpark withColumn GitHub project下载

学习愉快!

这篇关于PySpark withColumn更新或添加列的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1095115

相关文章

poj3468(线段树成段更新模板题)

题意:包括两个操作:1、将[a.b]上的数字加上v;2、查询区间[a,b]上的和 下面的介绍是下解题思路: 首先介绍  lazy-tag思想:用一个变量记录每一个线段树节点的变化值,当这部分线段的一致性被破坏我们就将这个变化值传递给子区间,大大增加了线段树的效率。 比如现在需要对[a,b]区间值进行加c操作,那么就从根节点[1,n]开始调用update函数进行操作,如果刚好执行到一个子节点,

hdu1394(线段树点更新的应用)

题意:求一个序列经过一定的操作得到的序列的最小逆序数 这题会用到逆序数的一个性质,在0到n-1这些数字组成的乱序排列,将第一个数字A移到最后一位,得到的逆序数为res-a+(n-a-1) 知道上面的知识点后,可以用暴力来解 代码如下: #include<iostream>#include<algorithm>#include<cstring>#include<stack>#in

hdu1689(线段树成段更新)

两种操作:1、set区间[a,b]上数字为v;2、查询[ 1 , n ]上的sum 代码如下: #include<iostream>#include<algorithm>#include<cstring>#include<stack>#include<queue>#include<set>#include<map>#include<stdio.h>#include<stdl

hdu 1754 I Hate It(线段树,单点更新,区间最值)

题意是求一个线段中的最大数。 线段树的模板题,试用了一下交大的模板。效率有点略低。 代码: #include <stdio.h>#include <string.h>#define TREE_SIZE (1 << (20))//const int TREE_SIZE = 200000 + 10;int max(int a, int b){return a > b ? a :

AI行业应用(不定期更新)

ChatPDF 可以让你上传一个 PDF 文件,然后针对这个 PDF 进行小结和提问。你可以把各种各样你要研究的分析报告交给它,快速获取到想要知道的信息。https://www.chatpdf.com/

GIS图形库更新2024.8.4-9.9

更多精彩内容请访问 dt.sim3d.cn ,关注公众号【sky的数孪技术】,技术交流、源码下载请添加微信:digital_twin123 Cesium 本期发布了1.121 版本。重大新闻,Cesium被Bentley收购。 ✨ 功能和改进 默认启用 MSAA,采样 4 次。若要关闭 MSAA,则可以设置scene.msaaSamples = 1。但是通过比较,发现并没有多大改善。

JavaFX应用更新检测功能(在线自动更新方案)

JavaFX开发的桌面应用属于C端,一般来说需要版本检测和自动更新功能,这里记录一下一种版本检测和自动更新的方法。 1. 整体方案 JavaFX.应用版本检测、自动更新主要涉及一下步骤: 读取本地应用版本拉取远程版本并比较两个版本如果需要升级,那么拉取更新历史弹出升级控制窗口用户选择升级时,拉取升级包解压,重启应用用户选择忽略时,本地版本标志为忽略版本用户选择取消时,隐藏升级控制窗口 2.

记录每次更新到仓库 —— Git 学习笔记 10

记录每次更新到仓库 文章目录 文件的状态三个区域检查当前文件状态跟踪新文件取消跟踪(un-tracking)文件重新跟踪(re-tracking)文件暂存已修改文件忽略某些文件查看已暂存和未暂存的修改提交更新跳过暂存区删除文件移动文件参考资料 咱们接着很多天以前的 取得Git仓库 这篇文章继续说。 文件的状态 不管是通过哪种方法,现在我们已经有了一个仓库,并从这个仓

消除安卓SDK更新时的“https://dl-ssl.google.com refused”异常的方法

消除安卓SDK更新时的“https://dl-ssl.google.com refused”异常的方法   消除安卓SDK更新时的“https://dl-ssl.google.com refused”异常的方法 [转载]原地址:http://blog.csdn.net/x605940745/article/details/17911115 消除SDK更新时的“

云原生之高性能web服务器学习(持续更新中)

高性能web服务器 1 Web服务器的基础介绍1.1 Web服务介绍1.1.1 Apache介绍1.1.2 Nginx-高性能的 Web 服务端 2 Nginx架构与安装2.1 Nginx概述2.1.1 Nginx 功能介绍2.1.2 基础特性2.1.3 Web 服务相关的功能 2.2 Nginx 架构和进程2.2.1 架构2.2.2 Ngnix进程结构 2.3 Nginx 模块介绍2.4