spark学习(5)--之spark计算结果保存到oracle中

2024-06-08 14:58

本文主要是介绍spark学习(5)--之spark计算结果保存到oracle中,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在spark把计算结果保存到oracle中的操作和前边的学习到的spark计算步骤基本一样,都是
第一步创建SparkContext对象来连接spark
第二步读取文件
第三步执行计算
第四步就就开始往hadoop中保存或者oracle中保存
在创建工程的时候我们要导入spark中lib的包还需要把oracle中的驱动导入到程序当中,oracle的驱动在安装oracle的路径C:\oracle\product\10.2.0\db_1\jdbc\lib\ojdbc14.jar
这里我们主要是使用jdbc来往oracle中保存数据,需要注意保存到数据中的操作可能有个错误就是序列化问题,代码如下:

package demoimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.sql.Connection
import java.sql.DriverManagerobject MyCountToOracle {def main(args: Array[String]): Unit = {//创建sparkcontextval conf= new SparkConf().setAppName("MyWebCount").setMaster("local");val sc=new SparkContext(conf)//读入数据val rdd1=sc.textFile("G:/msdownld.tmp/localhost_access_log.2017-07-30.txt").map((line:String)=>{//[30/Jul/2017:12:54:56 +0800] "GET /MyDemoWeb/java.jsp HTTP/1.1" 200 240 192.168.88.1 - - val  line1=line.substring(line.indexOf("\"")+1, line.lastIndexOf("\""))val line2=line1.substring(line1.indexOf(" ")+1, line1.lastIndexOf(" "))val pageName=line2.substring(line2.lastIndexOf("/")+1);(pageName,1)})val rdd2=rdd1.reduceByKey(_+_)//通过网页名称进行排序val rdd3=rdd2.sortBy(_._2, true);//创建oracle链接Class.forName("oracle.jdbc.OracleDriver") //注册Oracle的驱动val conn:Connection=DriverManager.getConnection("jdbc:oracle:thin:@192.168.112.130:1521/orcl", "scott", "tiger")val statement=conn.prepareStatement("insert into pageview values(?,?)") //循环遍历写入数据库rdd3.foreach(f=>{statement.setString(1, f._1)statement.setInt(2, f._2)statement.executeUpdate();})statement.close()conn.close()//讲sparkcontext对象关闭掉sc.stop()}
}

它会报一个如下的错误:

Exception in thread "main" org.apache.spark.SparkException: Task not serializableat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:915)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)at org.apache.spark.rdd.RDD.foreach(RDD.scala:915)at demo.MyCountToOracle$.main(MyCountToOracle.scala:33)at demo.MyCountToOracle.main(MyCountToOracle.scala)
Caused by: java.io.NotSerializableException: oracle.jdbc.driver.T4CPreparedStatement
Serialization stack:- object not serializable (class: oracle.jdbc.driver.T4CPreparedStatement, value: oracle.jdbc.driver.T4CPreparedStatement@43d38654)- field (class: demo.MyCountToOracle$$anonfun$main$1, name: statement$1, type: interface java.sql.PreparedStatement)- object (class demo.MyCountToOracle$$anonfun$main$1, <function1>)at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)

这个是因为RDD是由分区组成,而T4CPreparedStatement没有实现序列化,所以不过在分区之间进行操作导致的解决这种问题,就需要用到一个算子foreachPartion。

package demoimport org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.sql.Connection
import java.sql.DriverManagerobject MyCountToOracle1 {def main(args: Array[String]): Unit = {//创建sparkcontextval conf= new SparkConf().setAppName("MyWebCount").setMaster("local");val sc=new SparkContext(conf)//读入数据val rdd1=sc.textFile("G:/msdownld.tmp/localhost_access_log.2017-07-30.txt").map((line:String)=>{//[30/Jul/2017:12:54:56 +0800] "GET /MyDemoWeb/java.jsp HTTP/1.1" 200 240 192.168.88.1 - - val  line1=line.substring(line.indexOf("\"")+1, line.lastIndexOf("\""))val line2=line1.substring(line1.indexOf(" ")+1, line1.lastIndexOf(" "))val pageName=line2.substring(line2.lastIndexOf("/")+1);(pageName,1)})val rdd2=rdd1.reduceByKey(_+_)//通过网页名称进行排序val rdd3=rdd2.sortBy(_._2, true);rdd3.foreachPartition(saveAsOracle)//讲sparkcontext对象关闭掉sc.stop()}def saveAsOracle(iter:Iterator[(String,Int)]):Unit={//创建oracle链接Class.forName("oracle.jdbc.OracleDriver") //注册Oracle的驱动val conn:Connection=DriverManager.getConnection("jdbc:oracle:thin:@192.168.112.130:1521/orcl", "scott", "tiger")val statement=conn.prepareStatement("insert into pageview values(?,?)") //循环遍历写入数据库iter.foreach(f=>{statement.setString(1, f._1)statement.setInt(2, f._2)statement.executeUpdate();})statement.close()conn.close()}
}

这篇关于spark学习(5)--之spark计算结果保存到oracle中的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1042515

相关文章

SpringBoot整合Apache Spark实现一个简单的数据分析功能

《SpringBoot整合ApacheSpark实现一个简单的数据分析功能》ApacheSpark是一个开源的大数据处理框架,它提供了丰富的功能和API,用于分布式数据处理、数据分析和机器学习等任务... 目录第一步、添加android依赖第二步、编写配置类第三步、编写控制类启动项目并测试总结ApacheS

使用C#导出Excel数据并保存多种格式的完整示例

《使用C#导出Excel数据并保存多种格式的完整示例》在现代企业信息化管理中,Excel已经成为最常用的数据存储和分析工具,从员工信息表、销售数据报表到财务分析表,几乎所有部门都离不开Excel,本文... 目录引言1. 安装 Spire.XLS2. 创建工作簿和填充数据3. 保存为不同格式4. 效果展示5

Python连接Spark的7种方法大全

《Python连接Spark的7种方法大全》ApacheSpark是一个强大的分布式计算框架,广泛用于大规模数据处理,通过PySpark,Python开发者能够无缝接入Spark生态系统,本文给大家介... 目录第一章:python与Spark集成概述PySpark 的核心优势基本集成配置步骤启动一个简单的

sqlserver、mysql、oracle、pgsql、sqlite五大关系数据库的对象名称和转义字符

《sqlserver、mysql、oracle、pgsql、sqlite五大关系数据库的对象名称和转义字符》:本文主要介绍sqlserver、mysql、oracle、pgsql、sqlite五大... 目录一、转义符1.1 oracle1.2 sqlserver1.3 PostgreSQL1.4 SQLi

Oracle数据库在windows系统上重启步骤

《Oracle数据库在windows系统上重启步骤》有时候在服务中重启了oracle之后,数据库并不能正常访问,下面:本文主要介绍Oracle数据库在windows系统上重启的相关资料,文中通过代... oracle数据库在Windows上重启的方法我这里是使用oracle自带的sqlplus工具实现的方

Oracle Scheduler任务故障诊断方法实战指南

《OracleScheduler任务故障诊断方法实战指南》Oracle数据库作为企业级应用中最常用的关系型数据库管理系统之一,偶尔会遇到各种故障和问题,:本文主要介绍OracleSchedul... 目录前言一、故障场景:当定时任务突然“消失”二、基础环境诊断:搭建“全局视角”1. 数据库实例与PDB状态2

oracle 11g导入\导出(expdp impdp)之导入过程

《oracle11g导入导出(expdpimpdp)之导入过程》导出需使用SEC.DMP格式,无分号;建立expdir目录(E:/exp)并确保存在;导入在cmd下执行,需sys用户权限;若需修... 目录准备文件导入(impdp)1、建立directory2、导入语句 3、更改密码总结上一个环节,我们讲了

Unity新手入门学习殿堂级知识详细讲解(图文)

《Unity新手入门学习殿堂级知识详细讲解(图文)》Unity是一款跨平台游戏引擎,支持2D/3D及VR/AR开发,核心功能模块包括图形、音频、物理等,通过可视化编辑器与脚本扩展实现开发,项目结构含A... 目录入门概述什么是 UnityUnity引擎基础认知编辑器核心操作Unity 编辑器项目模式分类工程

Python学习笔记之getattr和hasattr用法示例详解

《Python学习笔记之getattr和hasattr用法示例详解》在Python中,hasattr()、getattr()和setattr()是一组内置函数,用于对对象的属性进行操作和查询,这篇文章... 目录1.getattr用法详解1.1 基本作用1.2 示例1.3 原理2.hasattr用法详解2.

Oracle迁移PostgreSQL隐式类型转换配置指南

《Oracle迁移PostgreSQL隐式类型转换配置指南》Oracle迁移PostgreSQL时因类型差异易引发错误,需通过显式/隐式类型转换、转换关系管理及冲突处理解决,并配合验证测试确保数据一致... 目录一、问题背景二、解决方案1. 显式类型转换2. 隐式转换配置三、维护操作1. 转换关系管理2.