spark学习(5)--之spark计算结果保存到oracle中

2024-06-08 14:58

本文主要是介绍spark学习(5)--之spark计算结果保存到oracle中,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在spark把计算结果保存到oracle中的操作和前边的学习到的spark计算步骤基本一样,都是
第一步创建SparkContext对象来连接spark
第二步读取文件
第三步执行计算
第四步就就开始往hadoop中保存或者oracle中保存
在创建工程的时候我们要导入spark中lib的包还需要把oracle中的驱动导入到程序当中,oracle的驱动在安装oracle的路径C:\oracle\product\10.2.0\db_1\jdbc\lib\ojdbc14.jar
这里我们主要是使用jdbc来往oracle中保存数据,需要注意保存到数据中的操作可能有个错误就是序列化问题,代码如下:

package demoimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.sql.Connection
import java.sql.DriverManagerobject MyCountToOracle {def main(args: Array[String]): Unit = {//创建sparkcontextval conf= new SparkConf().setAppName("MyWebCount").setMaster("local");val sc=new SparkContext(conf)//读入数据val rdd1=sc.textFile("G:/msdownld.tmp/localhost_access_log.2017-07-30.txt").map((line:String)=>{//[30/Jul/2017:12:54:56 +0800] "GET /MyDemoWeb/java.jsp HTTP/1.1" 200 240 192.168.88.1 - - val  line1=line.substring(line.indexOf("\"")+1, line.lastIndexOf("\""))val line2=line1.substring(line1.indexOf(" ")+1, line1.lastIndexOf(" "))val pageName=line2.substring(line2.lastIndexOf("/")+1);(pageName,1)})val rdd2=rdd1.reduceByKey(_+_)//通过网页名称进行排序val rdd3=rdd2.sortBy(_._2, true);//创建oracle链接Class.forName("oracle.jdbc.OracleDriver") //注册Oracle的驱动val conn:Connection=DriverManager.getConnection("jdbc:oracle:thin:@192.168.112.130:1521/orcl", "scott", "tiger")val statement=conn.prepareStatement("insert into pageview values(?,?)") //循环遍历写入数据库rdd3.foreach(f=>{statement.setString(1, f._1)statement.setInt(2, f._2)statement.executeUpdate();})statement.close()conn.close()//讲sparkcontext对象关闭掉sc.stop()}
}

它会报一个如下的错误:

Exception in thread "main" org.apache.spark.SparkException: Task not serializableat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:915)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)at org.apache.spark.rdd.RDD.foreach(RDD.scala:915)at demo.MyCountToOracle$.main(MyCountToOracle.scala:33)at demo.MyCountToOracle.main(MyCountToOracle.scala)
Caused by: java.io.NotSerializableException: oracle.jdbc.driver.T4CPreparedStatement
Serialization stack:- object not serializable (class: oracle.jdbc.driver.T4CPreparedStatement, value: oracle.jdbc.driver.T4CPreparedStatement@43d38654)- field (class: demo.MyCountToOracle$$anonfun$main$1, name: statement$1, type: interface java.sql.PreparedStatement)- object (class demo.MyCountToOracle$$anonfun$main$1, <function1>)at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)

这个是因为RDD是由分区组成,而T4CPreparedStatement没有实现序列化,所以不过在分区之间进行操作导致的解决这种问题,就需要用到一个算子foreachPartion。

package demoimport org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.sql.Connection
import java.sql.DriverManagerobject MyCountToOracle1 {def main(args: Array[String]): Unit = {//创建sparkcontextval conf= new SparkConf().setAppName("MyWebCount").setMaster("local");val sc=new SparkContext(conf)//读入数据val rdd1=sc.textFile("G:/msdownld.tmp/localhost_access_log.2017-07-30.txt").map((line:String)=>{//[30/Jul/2017:12:54:56 +0800] "GET /MyDemoWeb/java.jsp HTTP/1.1" 200 240 192.168.88.1 - - val  line1=line.substring(line.indexOf("\"")+1, line.lastIndexOf("\""))val line2=line1.substring(line1.indexOf(" ")+1, line1.lastIndexOf(" "))val pageName=line2.substring(line2.lastIndexOf("/")+1);(pageName,1)})val rdd2=rdd1.reduceByKey(_+_)//通过网页名称进行排序val rdd3=rdd2.sortBy(_._2, true);rdd3.foreachPartition(saveAsOracle)//讲sparkcontext对象关闭掉sc.stop()}def saveAsOracle(iter:Iterator[(String,Int)]):Unit={//创建oracle链接Class.forName("oracle.jdbc.OracleDriver") //注册Oracle的驱动val conn:Connection=DriverManager.getConnection("jdbc:oracle:thin:@192.168.112.130:1521/orcl", "scott", "tiger")val statement=conn.prepareStatement("insert into pageview values(?,?)") //循环遍历写入数据库iter.foreach(f=>{statement.setString(1, f._1)statement.setInt(2, f._2)statement.executeUpdate();})statement.close()conn.close()}
}

这篇关于spark学习(5)--之spark计算结果保存到oracle中的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1042515

相关文章

Oracle数据库使用 listagg去重删除重复数据的方法汇总

《Oracle数据库使用listagg去重删除重复数据的方法汇总》文章介绍了在Oracle数据库中使用LISTAGG和XMLAGG函数进行字符串聚合并去重的方法,包括去重聚合、使用XML解析和CLO... 目录案例表第一种:使用wm_concat() + distinct去重聚合第二种:使用listagg,

oracle中exists和not exists用法举例详解

《oracle中exists和notexists用法举例详解》:本文主要介绍oracle中exists和notexists用法的相关资料,EXISTS用于检测子查询是否返回任何行,而NOTE... 目录基本概念:举例语法pub_name总结 exists (sql 返回结果集为真)not exists (s

Oracle的to_date()函数详解

《Oracle的to_date()函数详解》Oracle的to_date()函数用于日期格式转换,需要注意Oracle中不区分大小写的MM和mm格式代码,应使用mi代替分钟,此外,Oracle还支持毫... 目录oracle的to_date()函数一.在使用Oracle的to_date函数来做日期转换二.日

oracle数据库索引失效的问题及解决

《oracle数据库索引失效的问题及解决》本文总结了在Oracle数据库中索引失效的一些常见场景,包括使用isnull、isnotnull、!=、、、函数处理、like前置%查询以及范围索引和等值索引... 目录oracle数据库索引失效问题场景环境索引失效情况及验证结论一结论二结论三结论四结论五总结ora

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Oracle Expdp按条件导出指定表数据的方法实例

《OracleExpdp按条件导出指定表数据的方法实例》:本文主要介绍Oracle的expdp数据泵方式导出特定机构和时间范围的数据,并通过parfile文件进行条件限制和配置,文中通过代码介绍... 目录1.场景描述 2.方案分析3.实验验证 3.1 parfile文件3.2 expdp命令导出4.总结

Oracle数据库执行计划的查看与分析技巧

《Oracle数据库执行计划的查看与分析技巧》在Oracle数据库中,执行计划能够帮助我们深入了解SQL语句在数据库内部的执行细节,进而优化查询性能、提升系统效率,执行计划是Oracle数据库优化器为... 目录一、什么是执行计划二、查看执行计划的方法(一)使用 EXPLAIN PLAN 命令(二)通过 S

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06