在使用spark2自定义累加器时提示:Exception in thread main org.apache.spark.SparkException: Task not serializable

本文主要是介绍在使用spark2自定义累加器时提示:Exception in thread main org.apache.spark.SparkException: Task not serializable,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在使用spark自定义累加器时提示如下错误:

Exception in thread "main" org.apache.spark.SparkException: Task not serializableat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:387)at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:386)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)at org.apache.spark.rdd.RDD.filter(RDD.scala:386)at com.best.spark.UserVisitSessionAnalyzeSpark$.filterSessionAndAggrStat(UserVisitSessionAnalyzeSpark.scala:281)at com.best.spark.UserVisitSessionAnalyzeSpark$.main(UserVisitSessionAnalyzeSpark.scala:44)at com.best.spark.UserVisitSessionAnalyzeSpark.main(UserVisitSessionAnalyzeSpark.scala)
Caused by: java.lang.NullPointerExceptionat org.apache.spark.util.AccumulatorV2.copyAndReset(AccumulatorV2.scala:124)at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:162)at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1218)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)... 12 more

如下图所示:

自定义累加器代码如:

package com.best.spark
import com.best.constanl.Constants
import com.best.util.StringUtils
import org.apache.spark.util.AccumulatorV2
class SessionAggrStatAccumulator extends AccumulatorV2[String,String] with java.io.Serializable{private val serialVersionUID = 7292644531814797752Lvar resultInit: String = Constants.SESSION_COUNT + "=0|" + Constants.TIME_PERIOD_1s_3s + "=0|" + Constants.TIME_PERIOD_4s_6s + "=0|" + Constants.TIME_PERIOD_7s_9s + "=0|" + Constants.TIME_PERIOD_10s_30s + "=0|" + Constants.TIME_PERIOD_30s_60s + "=0|" + Constants.TIME_PERIOD_1m_3m + "=0|" + Constants.TIME_PERIOD_3m_10m + "=0|" + Constants.TIME_PERIOD_10m_30m + "=0|" + Constants.TIME_PERIOD_30m + "=0|" + Constants.STEP_PERIOD_1_3 + "=0|" + Constants.STEP_PERIOD_4_6 + "=0|" + Constants.STEP_PERIOD_7_9 + "=0|" + Constants.STEP_PERIOD_10_30 + "=0|" + Constants.STEP_PERIOD_30_60 + "=0|" + Constants.STEP_PERIOD_60 + "=0"var result: String = resultInit/*** 当AccumulatorV2中存在类似数据不存在这种问题时,是否结束程序。** @return*/override def isZero = false/*** 拷贝一个新的AccumulatorV2** @return*/override def copy: AccumulatorV2[String, String] = null/*** 重置AccumulatorV2中的数据*/override def reset(): Unit = {result = resultInit}/*** session统计计算逻辑* v就表示传过来的要累加的key** @param v*/override def add(v: String): Unit = {if (StringUtils.isNotEmpty(v) && StringUtils.isNotEmpty(result)) {val oldValue = StringUtils.getFieldFromConcatString(result, "\\|", v)if (oldValue != null) {val newValue = Integer.valueOf(oldValue) + 1result = StringUtils.setFieldInConcatString(result, "\\|", v, String.valueOf(newValue))}}}/*** 合并数据** @param other*/override def merge(other: AccumulatorV2[String, String]): Unit = {if (other.isZero) result = other.value}/*** AccumulatorV2对外访问的数据结果** @return*/override def value: String = result
}

而使用的地方在:

出现此问题的原因是:sessionAggrStatAccumulator它是运行在Driver端的,而filter算子是运行在Executor端的,所以报错,因此将sessionAggrStatAccumulator移除到函数外部进行new,即在类中进行new,如:

这篇关于在使用spark2自定义累加器时提示:Exception in thread main org.apache.spark.SparkException: Task not serializable的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/733477

相关文章

详解Vue如何使用xlsx库导出Excel文件

《详解Vue如何使用xlsx库导出Excel文件》第三方库xlsx提供了强大的功能来处理Excel文件,它可以简化导出Excel文件这个过程,本文将为大家详细介绍一下它的具体使用,需要的小伙伴可以了解... 目录1. 安装依赖2. 创建vue组件3. 解释代码在Vue.js项目中导出Excel文件,使用第三

linux报错INFO:task xxxxxx:634 blocked for more than 120 seconds.三种解决方式

《linux报错INFO:taskxxxxxx:634blockedformorethan120seconds.三种解决方式》文章描述了一个Linux最小系统运行时出现的“hung_ta... 目录1.问题描述2.解决办法2.1 缩小文件系统缓存大小2.2 修改系统IO调度策略2.3 取消120秒时间限制3

Linux alias的三种使用场景方式

《Linuxalias的三种使用场景方式》文章介绍了Linux中`alias`命令的三种使用场景:临时别名、用户级别别名和系统级别别名,临时别名仅在当前终端有效,用户级别别名在当前用户下所有终端有效... 目录linux alias三种使用场景一次性适用于当前用户全局生效,所有用户都可调用删除总结Linux

java图像识别工具类(ImageRecognitionUtils)使用实例详解

《java图像识别工具类(ImageRecognitionUtils)使用实例详解》:本文主要介绍如何在Java中使用OpenCV进行图像识别,包括图像加载、预处理、分类、人脸检测和特征提取等步骤... 目录前言1. 图像识别的背景与作用2. 设计目标3. 项目依赖4. 设计与实现 ImageRecogni

python管理工具之conda安装部署及使用详解

《python管理工具之conda安装部署及使用详解》这篇文章详细介绍了如何安装和使用conda来管理Python环境,它涵盖了从安装部署、镜像源配置到具体的conda使用方法,包括创建、激活、安装包... 目录pytpshheraerUhon管理工具:conda部署+使用一、安装部署1、 下载2、 安装3

Mysql虚拟列的使用场景

《Mysql虚拟列的使用场景》MySQL虚拟列是一种在查询时动态生成的特殊列,它不占用存储空间,可以提高查询效率和数据处理便利性,本文给大家介绍Mysql虚拟列的相关知识,感兴趣的朋友一起看看吧... 目录1. 介绍mysql虚拟列1.1 定义和作用1.2 虚拟列与普通列的区别2. MySQL虚拟列的类型2

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

关于@MapperScan和@ComponentScan的使用问题

《关于@MapperScan和@ComponentScan的使用问题》文章介绍了在使用`@MapperScan`和`@ComponentScan`时可能会遇到的包扫描冲突问题,并提供了解决方法,同时,... 目录@MapperScan和@ComponentScan的使用问题报错如下原因解决办法课外拓展总结@

mysql数据库分区的使用

《mysql数据库分区的使用》MySQL分区技术通过将大表分割成多个较小片段,提高查询性能、管理效率和数据存储效率,本文就来介绍一下mysql数据库分区的使用,感兴趣的可以了解一下... 目录【一】分区的基本概念【1】物理存储与逻辑分割【2】查询性能提升【3】数据管理与维护【4】扩展性与并行处理【二】分区的

使用Python实现在Word中添加或删除超链接

《使用Python实现在Word中添加或删除超链接》在Word文档中,超链接是一种将文本或图像连接到其他文档、网页或同一文档中不同部分的功能,本文将为大家介绍一下Python如何实现在Word中添加或... 在Word文档中,超链接是一种将文本或图像连接到其他文档、网页或同一文档中不同部分的功能。通过添加超