Oracle 利用管道函数(pipelined)实现高性能大数据处理

2024-05-29 21:38

本文主要是介绍Oracle 利用管道函数(pipelined)实现高性能大数据处理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

引言

在plsql开发中,会涉及到一些大数据量表的数据处理,如将某记录数超亿的表的记录经过处理转换插入到另外一张或几张表。
常规的操作方法固然可以实现,但时间、磁盘IO、redo日志等等都非常大。Oracle 提供了一种高级函数,可以将这种数据处理的性能提升到极限。这种函数称为管道函数。在实际项目中,管道函数会和表函数、数据流函数(即表函数和CURSOR结合)、数据集合、并行度一起使用,达到大数据处理的性能顶峰。转自:(http://mikixiyou.iteye.com/blog/1673672)

一、普通方法处理大数据

下面是一个例子,将表t_ss_normal的记录插入到表t_target中,插入过程中有部分转换操作。我分成四个方法来实现这个数据处理操作。

这是源表和目标表的表结构。现在源表有200W条,其数据来自dba_objects视图。

create table T_SS_NORMAL
(owner          VARCHAR2(30),object_name    VARCHAR2(128),subobject_name VARCHAR2(30),object_id      NUMBER,data_object_id NUMBER,object_type    VARCHAR2(19),created        DATE,last_ddl_time  DATE,timestamp      VARCHAR2(19),status         VARCHAR2(7),temporary      VARCHAR2(1),generated      VARCHAR2(1),secondary      VARCHAR2(1)
);
/create table T_TARGET
(owner       VARCHAR2(30),object_name VARCHAR2(128),comm        VARCHAR2(10)
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

1、一个insert into select语句搞定这个数据处理,简单。

create or replace package pkg_test isprocedure load_target_normal;
end pkg_test;create or replace package body pkg_test isprocedure load_target_normal isbegin  insert into t_target (owner, object_name, comm)select owner, object_name, 'xxx' from t_ss_normal;  commit;  end;
beginnull;
end pkg_test; 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2、采用管道函数实现这个数据处理。

create type obj_target as object(
owner VARCHAR2(30), object_name VARCHAR2(128), comm varchar2(10)
);
/
create or replace type typ_array_target as table of obj_target;
/create or replace package pkg_test is  function pipe_target(p_source_data in sys_refcursor) return typ_array_targetpipelined;procedure load_target;
end pkg_test;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

首先创建两个自定义的类型。obj_target的定义和t_target的表结构一致,用于存储每一条目标表记录。typ_array_target用于管道函数的返回值。
接着定义一个管道函数。
普通函数的结尾加一个pipelined关键字,就是管道函数。这个函数的返回参数类型为集合,这是为了使其能作为表函数使用。表函数就是在from子句中以table(v_resultset)调用的,v_resultset就是一个集合类型的参数。
最后定义一个调用存储过程。

在包体中定义该管道函数和调用存储过程。管道函数pipe_target的传入参数一个sys_refcursor类型。这是一个游标,可以理解为使用select * from table才能得到的结果集。
你也可以不用这个传入的游标,取而代之,在函数中定义一个游标,也一样使用。

  function pipe_target(p_source_data in sys_refcursor) return typ_array_targetpipelined isr_target_data obj_target := obj_target(null, null, null);r_source_data t_ss%rowtype; beginloopfetch p_source_datainto r_source_data;exit when p_source_data%notfound;    r_target_data.owner       := r_source_data.owner;r_target_data.object_name := r_source_data.object_name;r_target_data.comm        := 'xxx';    pipe row(r_target_data);end loop;close p_source_data;return;end;procedure load_target isbegin  insert into t_target(owner, object_name, comm)select owner, object_name, commfrom table(pipe_target(cursor(select * from t_ss_normal)));  commit;  end;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

关键字 pipe row 的作用是将obj_target插入到typ_array_target类型的数组中,管道函数自动返回这些数据。因为源表的数据量会非常大,所以在fetch取值时会使用bulk collect ,实现批量取值。这样做可以减少plsql引擎和sql引擎的控制转换次数。这种转换称为上下文切换。

  function pipe_target_array(p_source_data in sys_refcursor,p_limit_size  in pls_integer default c_default_limit)return typ_array_targetpipelined is  r_target_data obj_target := obj_target(null, null, null); type typ_source_data is table of t_ss%rowtype index by pls_integer;aa_source_data typ_source_data;beginloopfetch p_source_data bulk collectinto aa_source_data;exit when aa_source_data.count = 0;for i in 1 .. aa_source_data.count loopr_target_data.owner       := aa_source_data(i).owner;r_target_data.object_name := aa_source_data(i).object_name;r_target_data.comm        := 'xxx';pipe row(r_target_data);end loop;end loop;close p_source_data;return;end;procedure load_target_array isbegininsert into t_target(owner, object_name, comm)select owner, object_name, commfrom table(pipe_target_array(cursor (select * from t_ss_normal),100));  commit;  end;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

还可以使用并行度,使得管道函数可以多进程同时执行。并行度还有一个好处,就是将数据插入方式从常规路径转换为直接路径。直接路径可以大量减少redo日志的生成量。

  function pipe_target_parallel(p_source_data in sys_refcursor,p_limit_size  in pls_integer default c_default_limit)return typ_array_targetpipelinedparallel_enable(partition p_source_data by any) isr_target_data obj_target := obj_target(null, null, null);type typ_source_data is table of t_ss%rowtype index by pls_integer;  aa_source_data typ_source_data;begin  loopfetch p_source_data bulk collectinto aa_source_data;exit when aa_source_data.count = 0;    for i in 1 .. aa_source_data.count loop      r_target_data.owner       := aa_source_data(i).owner;r_target_data.object_name := aa_source_data(i).object_name;r_target_data.comm        := 'xxx';      pipe row(r_target_data);      end loop;    end loop;  close p_source_data;return;end;procedure load_target_parallel isbeginexecute immediate 'alter session enable parallel dml';  insert /*+parallel(t,4)*/into t_target t(owner, object_name, comm)select owner, object_name, commfrom table(pipe_target_array(cursor (select /*+parallel(s,4)*/*from t_ss_normal s),100));  commit;end;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

还可以使用并行度,使得管道函数可以多进程同时执行。并行度还有一个好处,就是将数据插入方式从常规路径转换为直接路径。直接路径可以大量减少redo日志的生成量。

  function pipe_target_parallel(p_source_data in sys_refcursor,p_limit_size  in pls_integer default c_default_limit)return typ_array_targetpipelinedparallel_enable(partition p_source_data by any) isr_target_data obj_target := obj_target(null, null, null);type typ_source_data is table of t_ss%rowtype index by pls_integer;  aa_source_data typ_source_data;begin  loopfetch p_source_data bulk collectinto aa_source_data;exit when aa_source_data.count = 0;    for i in 1 .. aa_source_data.count loop      r_target_data.owner       := aa_source_data(i).owner;r_target_data.object_name := aa_source_data(i).object_name;r_target_data.comm        := 'xxx';      pipe row(r_target_data);      end loop;    end loop;  close p_source_data;return;end;procedure load_target_parallel isbeginexecute immediate 'alter session enable parallel dml';  insert /*+parallel(t,4)*/into t_target t(owner, object_name, comm)select owner, object_name, commfrom table(pipe_target_array(cursor (select /*+parallel(s,4)*/*from t_ss_normal s),100));  commit;end;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

在测试过程中,我测试200W记录的操作,时间从24秒降到到8秒,重做日志也降低更多。

这篇关于Oracle 利用管道函数(pipelined)实现高性能大数据处理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1014709

相关文章

PostgreSQL中rank()窗口函数实用指南与示例

《PostgreSQL中rank()窗口函数实用指南与示例》在数据分析和数据库管理中,经常需要对数据进行排名操作,PostgreSQL提供了强大的窗口函数rank(),可以方便地对结果集中的行进行排名... 目录一、rank()函数简介二、基础示例:部门内员工薪资排名示例数据排名查询三、高级应用示例1. 每

全面掌握 SQL 中的 DATEDIFF函数及用法最佳实践

《全面掌握SQL中的DATEDIFF函数及用法最佳实践》本文解析DATEDIFF在不同数据库中的差异,强调其边界计算原理,探讨应用场景及陷阱,推荐根据需求选择TIMESTAMPDIFF或inte... 目录1. 核心概念:DATEDIFF 究竟在计算什么?2. 主流数据库中的 DATEDIFF 实现2.1

Linux下删除乱码文件和目录的实现方式

《Linux下删除乱码文件和目录的实现方式》:本文主要介绍Linux下删除乱码文件和目录的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux下删除乱码文件和目录方法1方法2总结Linux下删除乱码文件和目录方法1使用ls -i命令找到文件或目录

MySQL中的LENGTH()函数用法详解与实例分析

《MySQL中的LENGTH()函数用法详解与实例分析》MySQLLENGTH()函数用于计算字符串的字节长度,区别于CHAR_LENGTH()的字符长度,适用于多字节字符集(如UTF-8)的数据验证... 目录1. LENGTH()函数的基本语法2. LENGTH()函数的返回值2.1 示例1:计算字符串

SpringBoot+EasyExcel实现自定义复杂样式导入导出

《SpringBoot+EasyExcel实现自定义复杂样式导入导出》这篇文章主要为大家详细介绍了SpringBoot如何结果EasyExcel实现自定义复杂样式导入导出功能,文中的示例代码讲解详细,... 目录安装处理自定义导出复杂场景1、列不固定,动态列2、动态下拉3、自定义锁定行/列,添加密码4、合并

mybatis执行insert返回id实现详解

《mybatis执行insert返回id实现详解》MyBatis插入操作默认返回受影响行数,需通过useGeneratedKeys+keyProperty或selectKey获取主键ID,确保主键为自... 目录 两种方式获取自增 ID:1. ​​useGeneratedKeys+keyProperty(推

Spring Boot集成Druid实现数据源管理与监控的详细步骤

《SpringBoot集成Druid实现数据源管理与监控的详细步骤》本文介绍如何在SpringBoot项目中集成Druid数据库连接池,包括环境搭建、Maven依赖配置、SpringBoot配置文件... 目录1. 引言1.1 环境准备1.2 Druid介绍2. 配置Druid连接池3. 查看Druid监控

Linux在线解压jar包的实现方式

《Linux在线解压jar包的实现方式》:本文主要介绍Linux在线解压jar包的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux在线解压jar包解压 jar包的步骤总结Linux在线解压jar包在 Centos 中解压 jar 包可以使用 u

c++ 类成员变量默认初始值的实现

《c++类成员变量默认初始值的实现》本文主要介绍了c++类成员变量默认初始值,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录C++类成员变量初始化c++类的变量的初始化在C++中,如果使用类成员变量时未给定其初始值,那么它将被

Qt使用QSqlDatabase连接MySQL实现增删改查功能

《Qt使用QSqlDatabase连接MySQL实现增删改查功能》这篇文章主要为大家详细介绍了Qt如何使用QSqlDatabase连接MySQL实现增删改查功能,文中的示例代码讲解详细,感兴趣的小伙伴... 目录一、创建数据表二、连接mysql数据库三、封装成一个完整的轻量级 ORM 风格类3.1 表结构