在使用spark2自定义累加器时提示:Exception in thread main org.apache.spark.SparkException: Task not serializable

本文主要是介绍在使用spark2自定义累加器时提示:Exception in thread main org.apache.spark.SparkException: Task not serializable,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在使用spark自定义累加器时提示如下错误:

Exception in thread "main" org.apache.spark.SparkException: Task not serializableat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)at org.apache.spark.rdd.RDD.filter(RDD.scala:386)at com.best.spark.UserVisitSessionAnalyzeSpark$.filterSessionAndAggrStat(UserVisitSessionAnalyzeSpark.scala:281)at com.best.spark.UserVisitSessionAnalyzeSpark$.main(UserVisitSessionAnalyzeSpark.scala:44)at com.best.spark.UserVisitSessionAnalyzeSpark.main(UserVisitSessionAnalyzeSpark.scala)
Caused by: java.lang.NullPointerExceptionat org.apache.spark.util.AccumulatorV2.copyAndReset(AccumulatorV2.scala:124)at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1218)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)... 12 more

如下图所示:

自定义累加器代码如:

package com.best.spark
import com.best.constanl.Constants
import com.best.util.StringUtils
import org.apache.spark.util.AccumulatorV2
class SessionAggrStatAccumulator extends AccumulatorV2[String,String] with java.io.Serializable{private val serialVersionUID = 7292644531814797752Lvar resultInit: String = Constants.SESSION_COUNT + "=0|" + Constants.TIME_PERIOD_1s_3s + "=0|" + Constants.TIME_PERIOD_4s_6s + "=0|" + Constants.TIME_PERIOD_7s_9s + "=0|" + Constants.TIME_PERIOD_10s_30s + "=0|" + Constants.TIME_PERIOD_30s_60s + "=0|" + Constants.TIME_PERIOD_1m_3m + "=0|" + Constants.TIME_PERIOD_3m_10m + "=0|" + Constants.TIME_PERIOD_10m_30m + "=0|" + Constants.TIME_PERIOD_30m + "=0|" + Constants.STEP_PERIOD_1_3 + "=0|" + Constants.STEP_PERIOD_4_6 + "=0|" + Constants.STEP_PERIOD_7_9 + "=0|" + Constants.STEP_PERIOD_10_30 + "=0|" + Constants.STEP_PERIOD_30_60 + "=0|" + Constants.STEP_PERIOD_60 + "=0"var result: String = resultInit/*** 当AccumulatorV2中存在类似数据不存在这种问题时,是否结束程序。** @return*/override def isZero = false/*** 拷贝一个新的AccumulatorV2** @return*/override def copy: AccumulatorV2[String, String] = null/*** 重置AccumulatorV2中的数据*/override def reset(): Unit = {result = resultInit}/*** session统计计算逻辑* v就表示传过来的要累加的key** @param v*/override def add(v: String): Unit = {if (StringUtils.isNotEmpty(v) && StringUtils.isNotEmpty(result)) {val oldValue = StringUtils.getFieldFromConcatString(result, "\\|", v)if (oldValue != null) {val newValue = Integer.valueOf(oldValue) + 1result = StringUtils.setFieldInConcatString(result, "\\|", v, String.valueOf(newValue))}}}/*** 合并数据** @param other*/override def merge(other: AccumulatorV2[String, String]): Unit = {if (other.isZero) result = other.value}/*** AccumulatorV2对外访问的数据结果** @return*/override def value: String = result
}

而使用的地方在:

出现此问题的原因是:sessionAggrStatAccumulator它是运行在Driver端的,而filter算子是运行在Executor端的,所以报错,因此将sessionAggrStatAccumulator移除到函数外部进行new,即在类中进行new,如:

这篇关于在使用spark2自定义累加器时提示:Exception in thread main org.apache.spark.SparkException: Task not serializable的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/733477

相关文章

Java学习手册之Filter和Listener使用方法

《Java学习手册之Filter和Listener使用方法》:本文主要介绍Java学习手册之Filter和Listener使用方法的相关资料,Filter是一种拦截器,可以在请求到达Servl... 目录一、Filter(过滤器)1. Filter 的工作原理2. Filter 的配置与使用二、Listen

Pandas使用AdaBoost进行分类的实现

《Pandas使用AdaBoost进行分类的实现》Pandas和AdaBoost分类算法,可以高效地进行数据预处理和分类任务,本文主要介绍了Pandas使用AdaBoost进行分类的实现,具有一定的参... 目录什么是 AdaBoost?使用 AdaBoost 的步骤安装必要的库步骤一:数据准备步骤二:模型

使用Pandas进行均值填充的实现

《使用Pandas进行均值填充的实现》缺失数据(NaN值)是一个常见的问题,我们可以通过多种方法来处理缺失数据,其中一种常用的方法是均值填充,本文主要介绍了使用Pandas进行均值填充的实现,感兴趣的... 目录什么是均值填充?为什么选择均值填充?均值填充的步骤实际代码示例总结在数据分析和处理过程中,缺失数

如何使用 Python 读取 Excel 数据

《如何使用Python读取Excel数据》:本文主要介绍使用Python读取Excel数据的详细教程,通过pandas和openpyxl,你可以轻松读取Excel文件,并进行各种数据处理操... 目录使用 python 读取 Excel 数据的详细教程1. 安装必要的依赖2. 读取 Excel 文件3. 读

解决Maven项目idea找不到本地仓库jar包问题以及使用mvn install:install-file

《解决Maven项目idea找不到本地仓库jar包问题以及使用mvninstall:install-file》:本文主要介绍解决Maven项目idea找不到本地仓库jar包问题以及使用mvnin... 目录Maven项目idea找不到本地仓库jar包以及使用mvn install:install-file基

Python使用getopt处理命令行参数示例解析(最佳实践)

《Python使用getopt处理命令行参数示例解析(最佳实践)》getopt模块是Python标准库中一个简单但强大的命令行参数处理工具,它特别适合那些需要快速实现基本命令行参数解析的场景,或者需要... 目录为什么需要处理命令行参数?getopt模块基础实际应用示例与其他参数处理方式的比较常见问http

C 语言中enum枚举的定义和使用小结

《C语言中enum枚举的定义和使用小结》在C语言里,enum(枚举)是一种用户自定义的数据类型,它能够让你创建一组具名的整数常量,下面我会从定义、使用、特性等方面详细介绍enum,感兴趣的朋友一起看... 目录1、引言2、基本定义3、定义枚举变量4、自定义枚举常量的值5、枚举与switch语句结合使用6、枚

使用Python从PPT文档中提取图片和图片信息(如坐标、宽度和高度等)

《使用Python从PPT文档中提取图片和图片信息(如坐标、宽度和高度等)》PPT是一种高效的信息展示工具,广泛应用于教育、商务和设计等多个领域,PPT文档中常常包含丰富的图片内容,这些图片不仅提升了... 目录一、引言二、环境与工具三、python 提取PPT背景图片3.1 提取幻灯片背景图片3.2 提取

使用Python实现图像LBP特征提取的操作方法

《使用Python实现图像LBP特征提取的操作方法》LBP特征叫做局部二值模式,常用于纹理特征提取,并在纹理分类中具有较强的区分能力,本文给大家介绍了如何使用Python实现图像LBP特征提取的操作方... 目录一、LBP特征介绍二、LBP特征描述三、一些改进版本的LBP1.圆形LBP算子2.旋转不变的LB

Maven的使用和配置国内源的保姆级教程

《Maven的使用和配置国内源的保姆级教程》Maven是⼀个项目管理工具,基于POM(ProjectObjectModel,项目对象模型)的概念,Maven可以通过一小段描述信息来管理项目的构建,报告... 目录1. 什么是Maven?2.创建⼀个Maven项目3.Maven 核心功能4.使用Maven H