用spark获取前一行数据,DF.withColumn(colName,lag(colName,offset).over(Window.partitionBy().orderBy(desc())))

本文主要是介绍用spark获取前一行数据,DF.withColumn(colName,lag(colName,offset).over(Window.partitionBy().orderBy(desc()))),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

数据:

1,11,111
2,22,222
3,33,333
1,22,333
1,22,444

代码:

package com.emg.etp.analysis.preproces.nullphotoimport com.emg.etp.analysis.preproces.nullphoto.pojo.EcarData
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.SparkStrategies
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._import scala.collection.mutable.ListBuffer/*** @Auther: sss* @Date: 2020/7/21 16:20* @Description:*/
object Tests {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("etpProcess").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").registerKryoClasses(Array[Class[_]](EcarData.getClass))val spark = SparkSession.builder().config(conf).getOrCreate()val sc: SparkContext = spark.sparkContextimport spark.implicits._val rdd = sc.textFile("C:\\Users\\sss\\Desktop\\qqq\\aa.txt")val win = Window.partitionBy("id1").orderBy(desc("id3"))val rdd2 = rdd.map(line => {val data = line.split(",", -1)(data(0), data(1), data(2))})val data = rdd2.toDF("id1", "id2", "id3").withColumn("aa", lag("id3", 1).over(win))data.show()val df_difftime = data.withColumn("diff", when(isnull(col("id3") - col("aa")), 0).otherwise((col("id3") - col("aa"))))df_difftime.show()}
}

 结果:

 

这篇关于用spark获取前一行数据,DF.withColumn(colName,lag(colName,offset).over(Window.partitionBy().orderBy(desc())))的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/242710

相关文章

Android如何获取当前CPU频率和占用率

《Android如何获取当前CPU频率和占用率》最近在优化App的性能,需要获取当前CPU视频频率和占用率,所以本文小编就来和大家总结一下如何在Android中获取当前CPU频率和占用率吧... 最近在优化 App 的性能,需要获取当前 CPU视频频率和占用率,通过查询资料,大致思路如下:目前没有标准的

MySQL InnoDB引擎ibdata文件损坏/删除后使用frm和ibd文件恢复数据

《MySQLInnoDB引擎ibdata文件损坏/删除后使用frm和ibd文件恢复数据》mysql的ibdata文件被误删、被恶意修改,没有从库和备份数据的情况下的数据恢复,不能保证数据库所有表数据... 参考:mysql Innodb表空间卸载、迁移、装载的使用方法注意!此方法只适用于innodb_fi

mysql通过frm和ibd文件恢复表_mysql5.7根据.frm和.ibd文件恢复表结构和数据

《mysql通过frm和ibd文件恢复表_mysql5.7根据.frm和.ibd文件恢复表结构和数据》文章主要介绍了如何从.frm和.ibd文件恢复MySQLInnoDB表结构和数据,需要的朋友可以参... 目录一、恢复表结构二、恢复表数据补充方法一、恢复表结构(从 .frm 文件)方法 1:使用 mysq

mysql8.0无备份通过idb文件恢复数据的方法、idb文件修复和tablespace id不一致处理

《mysql8.0无备份通过idb文件恢复数据的方法、idb文件修复和tablespaceid不一致处理》文章描述了公司服务器断电后数据库故障的过程,作者通过查看错误日志、重新初始化数据目录、恢复备... 周末突然接到一位一年多没联系的妹妹打来电话,“刘哥,快来救救我”,我脑海瞬间冒出妙瓦底,电信火苲马扁.

golang获取prometheus数据(prometheus/client_golang包)

《golang获取prometheus数据(prometheus/client_golang包)》本文主要介绍了使用Go语言的prometheus/client_golang包来获取Prometheu... 目录1. 创建链接1.1 语法1.2 完整示例2. 简单查询2.1 语法2.2 完整示例3. 范围值

javaScript在表单提交时获取表单数据的示例代码

《javaScript在表单提交时获取表单数据的示例代码》本文介绍了五种在JavaScript中获取表单数据的方法:使用FormData对象、手动提取表单数据、使用querySelector获取单个字... 方法 1:使用 FormData 对象FormData 是一个方便的内置对象,用于获取表单中的键值

Rust中的BoxT之堆上的数据与递归类型详解

《Rust中的BoxT之堆上的数据与递归类型详解》本文介绍了Rust中的BoxT类型,包括其在堆与栈之间的内存分配,性能优势,以及如何利用BoxT来实现递归类型和处理大小未知类型,通过BoxT,Rus... 目录1. Box<T> 的基础知识1.1 堆与栈的分工1.2 性能优势2.1 递归类型的问题2.2

Python使用Pandas对比两列数据取最大值的五种方法

《Python使用Pandas对比两列数据取最大值的五种方法》本文主要介绍使用Pandas对比两列数据取最大值的五种方法,包括使用max方法、apply方法结合lambda函数、函数、clip方法、w... 目录引言一、使用max方法二、使用apply方法结合lambda函数三、使用np.maximum函数

如何利用Java获取当天的开始和结束时间

《如何利用Java获取当天的开始和结束时间》:本文主要介绍如何使用Java8的LocalDate和LocalDateTime类获取指定日期的开始和结束时间,展示了如何通过这些类进行日期和时间的处... 目录前言1. Java日期时间API概述2. 获取当天的开始和结束时间代码解析运行结果3. 总结前言在J

Redis的数据过期策略和数据淘汰策略

《Redis的数据过期策略和数据淘汰策略》本文主要介绍了Redis的数据过期策略和数据淘汰策略,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录一、数据过期策略1、惰性删除2、定期删除二、数据淘汰策略1、数据淘汰策略概念2、8种数据淘汰策略