Oracle的pipelined函数实现高性能大数据处理

2024-06-06 06:58

本文主要是介绍Oracle的pipelined函数实现高性能大数据处理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文地址:http://mikixiyou.iteye.com/blog/1673672
在plsql开发中,会涉及到一些大数据量表的数据处理,如将某记录数超亿的表的记录经过处理转换插入到另外一张或几张表。
常规的操作方法固然可以实现,但时间、磁盘IO、redo日志等等都非常大。Oracle 提供了一种高级函数,可以将这种数据处理的性能提升到极限。这种函数称为管道函数。
在实际项目中,管道函数会和表函数、数据流函数(即表函数和CURSOR结合)、数据集合、并行度一起使用,达到大数据处理的性能顶峰。
下面是一个例子,将表t_ss_normal的记录插入到表t_target中,插入过程中有部分转换操作。
我分成四个方法来实现这个数据处理操作。

第一个方法,也是最常规的方法,代码如下:

create table T_SS_NORMAL  
(  owner          VARCHAR2(30),  object_name    VARCHAR2(128),  subobject_name VARCHAR2(30),  object_id      NUMBER,  data_object_id NUMBER,  object_type    VARCHAR2(19),  created        DATE,  last_ddl_time  DATE,  timestamp      VARCHAR2(19),  status         VARCHAR2(7),  temporary      VARCHAR2(1),  generated      VARCHAR2(1),  secondary      VARCHAR2(1)  
);  
/  
create table T_TARGET  
(  owner       VARCHAR2(30),  object_name VARCHAR2(128),  comm        VARCHAR2(10)  
);  

这是源表和目标表的表结构。现在源表有200W条,其数据来自dba_objects视图。

create or replace package pkg_test is  procedure load_target_normal;  
end pkg_test;  create or replace package body pkg_test is  procedure load_target_normal is  begin    insert into t_target (owner, object_name, comm)  select owner, object_name, 'xxx' from t_ss_normal;    commit;    end;  
begin  null;  
end pkg_test;

一个insert into select语句搞定这个数据处理,简单。

第二方法,采用管道函数实现这个数据处理。

create type obj_target as object(  
owner VARCHAR2(30), object_name VARCHAR2(128), comm varchar2(10)  
);  
/  
create or replace type typ_array_target as table of obj_target;  
/  create or replace package pkg_test is  function pipe_target(p_source_data in sys_refcursor) return typ_array_target  pipelined;  procedure load_target;  
end pkg_test;  

首先创建两个自定义的类型。obj_target的定义和t_target的表结构一致,用于存储每一条目标表记录。typ_array_target用于管道函数的返回值。
接着定义一个管道函数。
普通函数的结尾加一个pipelined关键字,就是管道函数。这个函数的返回参数类型为集合,这是为了使其能作为表函数使用。表函数就是在from子句中以table(v_resultset)调用的,v_resultset就是一个集合类型的参数。
最后定义一个调用存储过程。

在包体中定义该管道函数和调用存储过程。管道函数pipe_target的传入参数一个sys_refcursor类型。这是一个游标,可以理解为使用select * from table才能得到的结果集。
你也可以不用这个传入的游标,取而代之,在函数中定义一个游标,也一样使用。

function pipe_target(p_source_data in sys_refcursor) return typ_array_target  pipelined is  r_target_data obj_target := obj_target(null, null, null);  r_source_data t_ss%rowtype;  begin  loop  fetch p_source_data  into r_source_data;  exit when p_source_data%notfound;      r_target_data.owner       := r_source_data.owner;  r_target_data.object_name := r_source_data.object_name;  r_target_data.comm        := 'xxx';      pipe row(r_target_data);  end loop;  close p_source_data;  return;  end;  procedure load_target is  begin    insert into t_target  (owner, object_name, comm)  select owner, object_name, comm  from table(pipe_target(cursor(select * from t_ss_normal)));    commit;    end;  

关键字 pipe row 的作用是将obj_target插入到typ_array_target类型的数组中,管道函数自动返回这些数据。

因为源表的数据量会非常大,所以在fetch取值时会使用bulk collect ,实现批量取值。这样做可以减少plsql引擎和sql引擎的控制转换次数。这种转换称为上下文切换。

 function pipe_target_array(p_source_data in sys_refcursor,  p_limit_size  in pls_integer default c_default_limit)  return typ_array_target  pipelined is    r_target_data obj_target := obj_target(null, null, null);   type typ_source_data is table of t_ss%rowtype index by pls_integer;  aa_source_data typ_source_data;  begin  loop  fetch p_source_data bulk collect  into aa_source_data;  exit when aa_source_data.count = 0;  for i in 1 .. aa_source_data.count loop  r_target_data.owner       := aa_source_data(i).owner;  r_target_data.object_name := aa_source_data(i).object_name;  r_target_data.comm        := 'xxx';  pipe row(r_target_data);  end loop;  end loop;  close p_source_data;  return;  end;  procedure load_target_array is  
begin  insert into t_target  (owner, object_name, comm)  select owner, object_name, comm  from table(pipe_target_array(cursor (select * from t_ss_normal),  100));    commit;    
end;  

还可以使用并行度,使得管道函数可以多进程同时执行。并行度还有一个好处,就是将数据插入方式从常规路径转换为直接路径。直接路径可以大量减少redo日志的生成量。

function pipe_target_parallel(p_source_data in sys_refcursor,  p_limit_size  in pls_integer default c_default_limit)  return typ_array_target  pipelined  parallel_enable(partition p_source_data by any) is  r_target_data obj_target := obj_target(null, null, null);  type typ_source_data is table of t_ss%rowtype index by pls_integer;    aa_source_data typ_source_data;  begin    loop  fetch p_source_data bulk collect  into aa_source_data;  exit when aa_source_data.count = 0;      for i in 1 .. aa_source_data.count loop        r_target_data.owner       := aa_source_data(i).owner;  r_target_data.object_name := aa_source_data(i).object_name;  r_target_data.comm        := 'xxx';        pipe row(r_target_data);        end loop;      end loop;    close p_source_data;  return;  end;  procedure load_target_parallel is  
begin  execute immediate 'alter session enable parallel dml';    insert /*+parallel(t,4)*/  into t_target t  (owner, object_name, comm)  select owner, object_name, comm  from table(pipe_target_array(cursor (select /*+parallel(s,4)*/  *  from t_ss_normal s),  100));    commit;  
end;  

在测试过程中,我测试200W记录的操作,时间从24秒降到到8秒,重做日志也降低更多。

这篇关于Oracle的pipelined函数实现高性能大数据处理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1035401

相关文章

SpringBoot3实现Gzip压缩优化的技术指南

《SpringBoot3实现Gzip压缩优化的技术指南》随着Web应用的用户量和数据量增加,网络带宽和页面加载速度逐渐成为瓶颈,为了减少数据传输量,提高用户体验,我们可以使用Gzip压缩HTTP响应,... 目录1、简述2、配置2.1 添加依赖2.2 配置 Gzip 压缩3、服务端应用4、前端应用4.1 N

SpringBoot实现数据库读写分离的3种方法小结

《SpringBoot实现数据库读写分离的3种方法小结》为了提高系统的读写性能和可用性,读写分离是一种经典的数据库架构模式,在SpringBoot应用中,有多种方式可以实现数据库读写分离,本文将介绍三... 目录一、数据库读写分离概述二、方案一:基于AbstractRoutingDataSource实现动态

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整

Java枚举类实现Key-Value映射的多种实现方式

《Java枚举类实现Key-Value映射的多种实现方式》在Java开发中,枚举(Enum)是一种特殊的类,本文将详细介绍Java枚举类实现key-value映射的多种方式,有需要的小伙伴可以根据需要... 目录前言一、基础实现方式1.1 为枚举添加属性和构造方法二、http://www.cppcns.co

使用Python实现快速搭建本地HTTP服务器

《使用Python实现快速搭建本地HTTP服务器》:本文主要介绍如何使用Python快速搭建本地HTTP服务器,轻松实现一键HTTP文件共享,同时结合二维码技术,让访问更简单,感兴趣的小伙伴可以了... 目录1. 概述2. 快速搭建 HTTP 文件共享服务2.1 核心思路2.2 代码实现2.3 代码解读3.

Kotlin 作用域函数apply、let、run、with、also使用指南

《Kotlin作用域函数apply、let、run、with、also使用指南》在Kotlin开发中,作用域函数(ScopeFunctions)是一组能让代码更简洁、更函数式的高阶函数,本文将... 目录一、引言:为什么需要作用域函数?二、作用域函China编程数详解1. apply:对象配置的 “流式构建器”最

MySQL双主搭建+keepalived高可用的实现

《MySQL双主搭建+keepalived高可用的实现》本文主要介绍了MySQL双主搭建+keepalived高可用的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录一、测试环境准备二、主从搭建1.创建复制用户2.创建复制关系3.开启复制,确认复制是否成功4.同

Java实现文件图片的预览和下载功能

《Java实现文件图片的预览和下载功能》这篇文章主要为大家详细介绍了如何使用Java实现文件图片的预览和下载功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... Java实现文件(图片)的预览和下载 @ApiOperation("访问文件") @GetMapping("

使用Sentinel自定义返回和实现区分来源方式

《使用Sentinel自定义返回和实现区分来源方式》:本文主要介绍使用Sentinel自定义返回和实现区分来源方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Sentinel自定义返回和实现区分来源1. 自定义错误返回2. 实现区分来源总结Sentinel自定

Java实现时间与字符串互相转换详解

《Java实现时间与字符串互相转换详解》这篇文章主要为大家详细介绍了Java中实现时间与字符串互相转换的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、日期格式化为字符串(一)使用预定义格式(二)自定义格式二、字符串解析为日期(一)解析ISO格式字符串(二)解析自定义