spark学习(5)--之spark计算结果保存到oracle中

2024-06-08 14:58

本文主要是介绍spark学习(5)--之spark计算结果保存到oracle中,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在spark把计算结果保存到oracle中的操作和前边的学习到的spark计算步骤基本一样,都是
第一步创建SparkContext对象来连接spark
第二步读取文件
第三步执行计算
第四步就就开始往hadoop中保存或者oracle中保存
在创建工程的时候我们要导入spark中lib的包还需要把oracle中的驱动导入到程序当中,oracle的驱动在安装oracle的路径C:\oracle\product\10.2.0\db_1\jdbc\lib\ojdbc14.jar
这里我们主要是使用jdbc来往oracle中保存数据,需要注意保存到数据中的操作可能有个错误就是序列化问题,代码如下:

package demoimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.sql.Connection
import java.sql.DriverManagerobject MyCountToOracle {def main(args: Array[String]): Unit = {//创建sparkcontextval conf= new SparkConf().setAppName("MyWebCount").setMaster("local");val sc=new SparkContext(conf)//读入数据val rdd1=sc.textFile("G:/msdownld.tmp/localhost_access_log.2017-07-30.txt").map((line:String)=>{//[30/Jul/2017:12:54:56 +0800] "GET /MyDemoWeb/java.jsp HTTP/1.1" 200 240 192.168.88.1 - - val  line1=line.substring(line.indexOf("\"")+1, line.lastIndexOf("\""))val line2=line1.substring(line1.indexOf(" ")+1, line1.lastIndexOf(" "))val pageName=line2.substring(line2.lastIndexOf("/")+1);(pageName,1)})val rdd2=rdd1.reduceByKey(_+_)//通过网页名称进行排序val rdd3=rdd2.sortBy(_._2, true);//创建oracle链接Class.forName("oracle.jdbc.OracleDriver") //注册Oracle的驱动val conn:Connection=DriverManager.getConnection("jdbc:oracle:thin:@192.168.112.130:1521/orcl", "scott", "tiger")val statement=conn.prepareStatement("insert into pageview values(?,?)") //循环遍历写入数据库rdd3.foreach(f=>{statement.setString(1, f._1)statement.setInt(2, f._2)statement.executeUpdate();})statement.close()conn.close()//讲sparkcontext对象关闭掉sc.stop()}
}

它会报一个如下的错误:

Exception in thread "main" org.apache.spark.SparkException: Task not serializableat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:915)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)at org.apache.spark.rdd.RDD.foreach(RDD.scala:915)at demo.MyCountToOracle$.main(MyCountToOracle.scala:33)at demo.MyCountToOracle.main(MyCountToOracle.scala)
Caused by: java.io.NotSerializableException: oracle.jdbc.driver.T4CPreparedStatement
Serialization stack:- object not serializable (class: oracle.jdbc.driver.T4CPreparedStatement, value: oracle.jdbc.driver.T4CPreparedStatement@43d38654)- field (class: demo.MyCountToOracle$$anonfun$main$1, name: statement$1, type: interface java.sql.PreparedStatement)- object (class demo.MyCountToOracle$$anonfun$main$1, <function1>)at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)

这个是因为RDD是由分区组成,而T4CPreparedStatement没有实现序列化,所以不过在分区之间进行操作导致的解决这种问题,就需要用到一个算子foreachPartion。

package demoimport org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.sql.Connection
import java.sql.DriverManagerobject MyCountToOracle1 {def main(args: Array[String]): Unit = {//创建sparkcontextval conf= new SparkConf().setAppName("MyWebCount").setMaster("local");val sc=new SparkContext(conf)//读入数据val rdd1=sc.textFile("G:/msdownld.tmp/localhost_access_log.2017-07-30.txt").map((line:String)=>{//[30/Jul/2017:12:54:56 +0800] "GET /MyDemoWeb/java.jsp HTTP/1.1" 200 240 192.168.88.1 - - val  line1=line.substring(line.indexOf("\"")+1, line.lastIndexOf("\""))val line2=line1.substring(line1.indexOf(" ")+1, line1.lastIndexOf(" "))val pageName=line2.substring(line2.lastIndexOf("/")+1);(pageName,1)})val rdd2=rdd1.reduceByKey(_+_)//通过网页名称进行排序val rdd3=rdd2.sortBy(_._2, true);rdd3.foreachPartition(saveAsOracle)//讲sparkcontext对象关闭掉sc.stop()}def saveAsOracle(iter:Iterator[(String,Int)]):Unit={//创建oracle链接Class.forName("oracle.jdbc.OracleDriver") //注册Oracle的驱动val conn:Connection=DriverManager.getConnection("jdbc:oracle:thin:@192.168.112.130:1521/orcl", "scott", "tiger")val statement=conn.prepareStatement("insert into pageview values(?,?)") //循环遍历写入数据库iter.foreach(f=>{statement.setString(1, f._1)statement.setInt(2, f._2)statement.executeUpdate();})statement.close()conn.close()}
}

这篇关于spark学习(5)--之spark计算结果保存到oracle中的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1042515

相关文章

oracle DBMS_SQL.PARSE的使用方法和示例

《oracleDBMS_SQL.PARSE的使用方法和示例》DBMS_SQL是Oracle数据库中的一个强大包,用于动态构建和执行SQL语句,DBMS_SQL.PARSE过程解析SQL语句或PL/S... 目录语法示例注意事项DBMS_SQL 是 oracle 数据库中的一个强大包,它允许动态地构建和执行

Java深度学习库DJL实现Python的NumPy方式

《Java深度学习库DJL实现Python的NumPy方式》本文介绍了DJL库的背景和基本功能,包括NDArray的创建、数学运算、数据获取和设置等,同时,还展示了如何使用NDArray进行数据预处理... 目录1 NDArray 的背景介绍1.1 架构2 JavaDJL使用2.1 安装DJL2.2 基本操

PLsql Oracle 下载安装图文过程详解

《PLsqlOracle下载安装图文过程详解》PL/SQLDeveloper是一款用于开发Oracle数据库的集成开发环境,可以通过官网下载安装配置,并通过配置tnsnames.ora文件及环境变... 目录一、PL/SQL Developer 简介二、PL/SQL Developer 安装及配置详解1.下

使用C++将处理后的信号保存为PNG和TIFF格式

《使用C++将处理后的信号保存为PNG和TIFF格式》在信号处理领域,我们常常需要将处理结果以图像的形式保存下来,方便后续分析和展示,C++提供了多种库来处理图像数据,本文将介绍如何使用stb_ima... 目录1. PNG格式保存使用stb_imagephp_write库1.1 安装和包含库1.2 代码解

oracle如何连接登陆SYS账号

《oracle如何连接登陆SYS账号》在Navicat12中连接Oracle11g的SYS用户时,如果设置了新密码但连接失败,可能是因为需要以SYSDBA或SYSOPER角色连接,解决方法是确保在连接... 目录oracle连接登陆NmOtMSYS账号工具问题解决SYS用户总结oracle连接登陆SYS账号

Oracle数据库如何切换登录用户(system和sys)

《Oracle数据库如何切换登录用户(system和sys)》文章介绍了如何使用SQL*Plus工具登录Oracle数据库的system用户,包括打开登录入口、输入用户名和口令、以及切换到sys用户的... 目录打开登录入口登录system用户总结打开登录入口win+R打开运行对话框,输php入:sqlp

查询Oracle数据库表是否被锁的实现方式

《查询Oracle数据库表是否被锁的实现方式》本文介绍了查询Oracle数据库表是否被锁的方法,包括查询锁表的会话、人员信息,根据object_id查询表名,以及根据会话ID查询和停止本地进程,同时,... 目录查询oracle数据库表是否被锁1、查询锁表的会话、人员等信息2、根据 object_id查询被

vscode保存代码时自动eslint格式化图文教程

《vscode保存代码时自动eslint格式化图文教程》:本文主要介绍vscode保存代码时自动eslint格式化的相关资料,包括打开设置文件并复制特定内容,文中通过代码介绍的非常详细,需要的朋友... 目录1、点击设置2、选择远程--->点击右上角打开设置3、会弹出settings.json文件,将以下内

Oracle查询优化之高效实现仅查询前10条记录的方法与实践

《Oracle查询优化之高效实现仅查询前10条记录的方法与实践》:本文主要介绍Oracle查询优化之高效实现仅查询前10条记录的相关资料,包括使用ROWNUM、ROW_NUMBER()函数、FET... 目录1. 使用 ROWNUM 查询2. 使用 ROW_NUMBER() 函数3. 使用 FETCH FI

数据库oracle用户密码过期查询及解决方案

《数据库oracle用户密码过期查询及解决方案》:本文主要介绍如何处理ORACLE数据库用户密码过期和修改密码期限的问题,包括创建用户、赋予权限、修改密码、解锁用户和设置密码期限,文中通过代码介绍... 目录前言一、创建用户、赋予权限、修改密码、解锁用户和设置期限二、查询用户密码期限和过期后的修改1.查询用