POSTGRESQL中ETL、fdw的平行替换

2024-01-13 23:36
文章标签 postgresql etl 替换 平行 fdw

本文主要是介绍POSTGRESQL中ETL、fdw的平行替换,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

POSTGRESQL中ETL、fdw的平行替换

01、简介

“ 在我前两次的文章中,说到postgresql对于python的支持,其实很多功能也就可以封装进入的postgresql数据库中去。比如fdw、etl等,本文将以此为叙述点,进行演示展示”

d9de783ffe5e4224852626e3f2388609_0.png

在postgresql数据库中fdw的支持,在创建和使用上都不上太方便,特别是fdw在用表级别关联的时候,性能会大大折扣,因为fdw的数据并不会落地到本地​。所以我们可以利用postgresql对于python的支持,自行封装一个库对库的调度工具,将远端数据进行落地​后再次使用。对于使用的便利性,读者可自行​对比。

02、postgresql16.1的安装

安装依赖

yum install -y bison flex readline-devel zlib-devel zlib zlib-devel gcc  gcc-c++ openssl-devel python3  python3-devel libicu-devel ncurses-devel sqlite-devel tk-devel gcc make

添加用户

useradd postgres 
vim /etc/sudo

在101行以下添加以下内容


postgres ALL=(ALL)     NOPASSWD: ALL

进入官网找到链接,这里使用源码安装。

wget https://ftp.postgresql.org/pub/source/v16.1/postgresql-16.1.tar.gz

解压并进入解压目录

mv postgresql-16.1.tar.gz /home/postgressu - postgres tar -zxf postgresql-16.1.tar.gzcd postgresql-16.1

这里编译python支持还是很重要。–with-python 自行构建plpython3u插件


./configure --prefix=/home/postgres/pg --with-openssl  --with-pythonmake && make install

编辑环境变量


cd 
vim .bash_profile

加入以下环境变量

export PATH=/home/postgres/pg/bin:$PATH 
export PGDATA=/home/postgres/pg/data 

加载环境变量


source ~/.bash_profile

初始化数据库


initdb -D $PGDATA -U postgres -W 
(输入超级用户密码两次)
pg_ctl start 
pg_ctl status

进入数据库创建拓展


CREATE EXTENSION plpython3u CASCADE;

02、创建支持跨库访问的函数

首先下载python链接数据库所需module

postgres=# \! pip3 install -i https://mirrors.aliyun.com/pypi/simple/ cx_Oracle pyodbc pymysql --user 
Looking in indexes: https://mirrors.aliyun.com/pypi/simple/
Requirement already satisfied: cx_Oracle in ./.local/lib/python3.6/site-packages (8.3.0)
Collecting pyodbcDownloading https://mirrors.aliyun.com/pypi/packages/27/5c/5e472d714dea2a634bd79df6b8ace55737a9f50c8fbb3b15521fceda4694/pyodbc-4.0.39-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (330 kB)|████████████████████████████████| 330 kB 2.8 MB/s            
Collecting pymysqlDownloading https://mirrors.aliyun.com/pypi/packages/4f/52/a115fe175028b058df353c5a3d5290b71514a83f67078a6482cff24d6137/PyMySQL-1.0.2-py3-none-any.whl (43 kB)|████████████████████████████████| 43 kB 2.4 MB/s             
Installing collected packages: pyodbc, pymysql
Successfully installed pymysql-1.0.2 pyodbc-4.0.39

在链接远程Oracle数据库,需要下载指定的客户端,本文使用的是oracle 19C

wget https://download.oracle.com/otn_software/linux/instantclient/1921000/oracle-instantclient19.21-basic-19.21.0.0.0-1.x86_64.rpm
sudo rpm -ivh oracle-instantclient19.21-basic-19.21.0.0.0-1.x86_64.rpm

编辑环境变量

vim /etc/profile

配置以下环境变量值

export LD_LIBRARY_PATH=/usr/lib/oracle/19.21/client64/lib:$LD_LIBRARY_PATH

加载环境变量

source /etc/profile

在postgresql数据库中创建具有跨库链接mysql\oracle\sqlserver功能的function。

CREATE OR REPLACE FUNCTION fdw_db(db_type varchar(100),host VARCHAR(100),port integer, username VARCHAR(100), password VARCHAR(100), db_name VARCHAR(100),tablename varchar(100))
RETURNS text AS $$import cx_Oracle
import pyodbc
import pymysqldef read_data_from_database(db_type, host, port, username, password, db_name, table_name):result_values = []  # Initialize as an empty list# 读取Oracle数据库中指定表数据的函数if db_type.lower() == 'oracle':connection_string = f"{username}/{password}@{host}:{port}/{db_name}"connection = cx_Oracle.connect(connection_string)cursor = connection.cursor()cursor.execute(f'SELECT * FROM {table_name}')result = cursor.fetchall()cursor.close()connection.close()# 将结果转换为支持INSERT INTO的VALUES语句for row in result:values_str = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])result_values.append(f'({values_str})')# 读取SQL Server数据库中指定表数据的函数elif db_type.lower() == 'sqlserver':connection = pyodbc.connect(f"DRIVER={{SQL Server}};SERVER={host};port={port};DATABASE={db_name};UID={username};PWD={password}")cursor = connection.cursor()cursor.execute(f'SELECT * FROM {table_name}')result = cursor.fetchall()cursor.close()connection.close()# 将结果转换为支持INSERT INTO的VALUES语句for row in result:values_str = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])result_values.append(f'({values_str})')# 读取MySQL数据库中指定表数据的函数elif db_type.lower() == 'mysql':connection = pymysql.connect(host=host, user=username, password=password, database=db_name, port=port)cursor = connection.cursor()cursor.execute(f'SELECT * FROM {table_name}')result = cursor.fetchall()cursor.close()connection.close()# 将结果转换为支持INSERT INTO的VALUES语句for row in result:values_str = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])result_values.append(f'({values_str})')else:raise ValueError("Unsupported database type. Supported types: 'oracle', 'sqlserver', 'mysql'")# 返回拼接的VALUES子句return ', '.join(result_values)insert_values = read_data_from_database(db_type, host, port, username, password, db_name, tablename)
return insert_values$$ LANGUAGE plpython3u;

以Oracle作为测试 在Oracle 和PG中均创建测试表conn_fdw
postgresql

-- 创建表 conn_fdw
CREATE TABLE conn_fdw (id integer,name VARCHAR(50),age integer,city VARCHAR(50),salary integer
);

oracle中

-- 创建表 conn_fdw
CREATE TABLE conn_fdw (id NUMBER,name VARCHAR2(50),age NUMBER,city VARCHAR2(50),salary NUMBER
);

Oracle中插入数据

-- 插入20行数据
INSERT INTO conn_fdw VALUES (1, 'John', 30, 'New York', 50000);
INSERT INTO conn_fdw VALUES (2, 'Alice', 25, 'Los Angeles', 60000);
INSERT INTO conn_fdw VALUES (3, 'Bob', 35, 'Chicago', 70000);
INSERT INTO conn_fdw VALUES (4, 'Eva', 28, 'San Francisco', 55000);
INSERT INTO conn_fdw VALUES (5, 'Mike', 32, 'Seattle', 65000);
INSERT INTO conn_fdw VALUES (6, 'Sophia', 29, 'Boston', 75000);
INSERT INTO conn_fdw VALUES (7, 'David', 27, 'Denver', 52000);
INSERT INTO conn_fdw VALUES (8, 'Emily', 31, 'Austin', 68000);
INSERT INTO conn_fdw VALUES (9, 'Daniel', 26, 'Phoenix', 58000);
INSERT INTO conn_fdw VALUES (10, 'Olivia', 33, 'Houston', 72000);
INSERT INTO conn_fdw VALUES (11, 'Liam', 24, 'Portland', 49000);
INSERT INTO conn_fdw VALUES (12, 'Ava', 34, 'Atlanta', 71000);
INSERT INTO conn_fdw VALUES (13, 'Logan', 30, 'Miami', 62000);
INSERT INTO conn_fdw VALUES (14, 'Mia', 28, 'Dallas', 54000);
INSERT INTO conn_fdw VALUES (15, 'Jackson', 29, 'Minneapolis', 67000);
INSERT INTO conn_fdw VALUES (16, 'Sophie', 31, 'Detroit', 59000);
INSERT INTO conn_fdw VALUES (17, 'William', 27, 'Philadelphia', 70000);
INSERT INTO conn_fdw VALUES (18, 'Emma', 32, 'San Diego', 66000);
INSERT INTO conn_fdw VALUES (19, 'James', 26, 'Raleigh', 63000);
INSERT INTO conn_fdw VALUES (20, 'Avery', 35, 'Tampa', 71000);

此时再结合SQL语言进行处理远程连接传过来数据,再创建一个函数用于调用以上创建fdw_db

CREATE OR REPLACE FUNCTION inset_fdw_db(db_type varchar(100),host VARCHAR(100),port integer, username VARCHAR(100), password VARCHAR(100), db_name VARCHAR(100),tablename varchar(100),target_bale varchar(100))
RETURNS void AS $$
declare 
data_values text;
begin 
SELECT   fdw_db(db_type, host, port, username, password, db_name,tablename) into data_values;EXECUTE 'insert into '||target_bale ||' values'||data_values;
end;$$ LANGUAGE plpgsql;

进行调用

 SELECT   inset_fdw_db('oracle', '192.168.48.1', 1521, 'system', 'system', 'orcl', 'CONN_FDW','public.conn_fdw');

进入数据库中查看
此时数据已经落地

postgres=# select *  from CONN_FDW;id | name | age | city | salary 
----+------+-----+------+--------
(0 rows)postgres=#  SELECT   inset_fdw_db('oracle', '192.168.48.1', 1521, 'system', 'system', 'orcl', 'CONN_FDW','public.conn_fdw');inset_fdw_db 
--------------(1 row)postgres=# select *  from CONN_FDW;id |  name   | age |     city      | salary 
----+---------+-----+---------------+--------1 | John    |  30 | New York      |  500002 | Alice   |  25 | Los Angeles   |  600003 | Bob     |  35 | Chicago       |  700004 | Eva     |  28 | San Francisco |  550005 | Mike    |  32 | Seattle       |  650006 | Sophia  |  29 | Boston        |  750007 | David   |  27 | Denver        |  520008 | Emily   |  31 | Austin        |  680009 | Daniel  |  26 | Phoenix       |  5800010 | Olivia  |  33 | Houston       |  7200011 | Liam    |  24 | Portland      |  4900012 | Ava     |  34 | Atlanta       |  7100013 | Logan   |  30 | Miami         |  6200014 | Mia     |  28 | Dallas        |  5400015 | Jackson |  29 | Minneapolis   |  6700016 | Sophie  |  31 | Detroit       |  5900017 | William |  27 | Philadelphia  |  7000018 | Emma    |  32 | San Diego     |  6600019 | James   |  26 | Raleigh       |  6300020 | Avery   |  35 | Tampa         |  71000
(20 rows)

总结

该方法不仅可以应用到数据库对数据库之间,也可以应到,数据库对文件路径下。在postgresql嵌入python代码 其实可以替换掉一些中间件的使用。可控性,定制性也会更强。

这篇关于POSTGRESQL中ETL、fdw的平行替换的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/603176

相关文章

PostgreSQL核心功能特性与使用领域及场景分析

PostgreSQL有什么优点? 开源和免费 PostgreSQL是一个开源的数据库管理系统,可以免费使用和修改。这降低了企业的成本,并为开发者提供了一个活跃的社区和丰富的资源。 高度兼容 PostgreSQL支持多种操作系统(如Linux、Windows、macOS等)和编程语言(如C、C++、Java、Python、Ruby等),并提供了多种接口(如JDBC、ODBC、ADO.NET等

PostgreSQL中的多版本并发控制(MVCC)深入解析

引言 PostgreSQL作为一款强大的开源关系数据库管理系统,以其高性能、高可靠性和丰富的功能特性而广受欢迎。在并发控制方面,PostgreSQL采用了多版本并发控制(MVCC)机制,该机制为数据库提供了高效的数据访问和更新能力,同时保证了数据的一致性和隔离性。本文将深入解析PostgreSQL中的MVCC功能,探讨其工作原理、使用场景,并通过具体SQL示例来展示其在实际应用中的表现。 一、

PostgreSQL入门介绍

一、PostgreSQL 背景及主要功能介绍 1、背景 PG数据库,全称为PostgreSQL数据库,是一款开源的关系型数据库管理系统(RDBMS)。其起源可以追溯到20世纪80年代末和90年代初,由加拿大的计算机科学家Michael Stonebraker及其团队在加州大学伯克利分校启动。该项目旨在创建一个强大的、开源的关系型数据库管理系统,作为早期关系型数据库系统Ingres的继承者。Mi

notepad++ 正则表达式多条件查找替换

基础语法参考: https://www.cnblogs.com/winstonet/p/10635043.html https://www.linuxidc.com/Linux/2019-05/158701.htm   通常情况下我们查找的内容和要被替换掉的内容是一样的,我们只需要使用正则表达式精确框定查找内容,替换直接输入要替换的内容即可。 但有时会比较复杂,查找的内容,只需要替换其中

shell脚本中变量中字符串替换的测试 /和//的区别

test_char=abbbcbbbf echo "bf:test_char = " $test_char test_char=${test_char/bbb/ddd} echo "af:test_char = " $test_char 输出: bf:test_char =  abbbcbbbf af:test_char =  adddcbbbf 只匹配第一个

PostgreSQL索引介绍

梦中彩虹   博客园首页新随笔联系管理 随笔 - 131  文章 - 1  评论 - 14 PostgreSQL索引介绍 INDEX 索引是增强数据库性能的常用方法。索引使得数据库在查找和检索数据库的特定行的时候比没有索引快的多。但索引也增加了整个数据库系统的开销,所以应该合理使用。 介绍 假设我们有一个类似这样的表: CREATE TABLE test1 (id integ

PostgreSQL分区表(partitioning)应用实例详解

https://www.jb51.net/article/97937.htm   PostgreSQL分区表(partitioning)应用实例详解  更新时间:2016年11月22日 10:25:58   作者:小灯光环    我要评论   这篇文章主要为大家详细介绍了PostgreSQL分区表(partitioning)应用实例,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

PostgreSql中WITH语句的使用

https://blog.csdn.net/chuan_day/article/details/44809125 PostgreSql中WITH语句的使用 With语句是为庞大的查询语句提供了辅助的功能。这些语句通常是引用了表表达式或者CTEs(一种临时数据的存储方式),可以看做是一个查询语句的临时表。在With语句中可以使用select,insert,update,delete语句。当然wit

springboot启动时替换配置参数

SpringBoot启动时配置参数替换 一.背景 SpringBoot项目启动的时候,在不使用配置中心等的前提下或者有公司强制使用指定的“密码箱”情况下,需要远程获取关键配置信息,比如数据库密码,则需要在项目启动前获取配置并且进行本地配置替换。 二.Demo实现 1.maven依赖 <dependencies><dependency><groupId>org.springframewor

PostgreSQL 17即将发布,新功能Top 3

按照计划,PostgreSQL 17 即将在 2024 年 9 月 26 日发布,目前已经发布了第一个 RC 版本,新版本的功能增强可以参考 Release Notes。 本文给大家分享其中 3 个重大的新增功能。 MERGE 语句增强 MERGE 语句是 PostgreSQL 15 增加的一个新功能,它可以在单个语句中实现 INSERT、UPDATE 以及 DELETE 操作,非常适合数据