【实操记录】Oracle数据整库同步至Apache Doris

2023-10-07 01:15

本文主要是介绍【实操记录】Oracle数据整库同步至Apache Doris,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文是Oracle数据整库同步至Apache Doris实操记录,仅供参考

参考:https://cn.selectdb.com/blog/104

1、Oracle 配置

[root@node1 oracle]# pwd
/u01/app/oracle
[root@node1 oracle]# mkdir recovery_area
[root@node1 oracle]# chown -R oracle:dba recovery_area
SQL> conn sys/ as sysdba
输入口令: 
已连接。
SQL> alter system set db_recovery_file_dest_size = 2G;
系统已更改。
SQL> alter system set db_recovery_file_dest='/u01/app/oracle/recovery_area' scope=spfile;
系统已更改。
SQL> 

在这里插入图片描述

SQL> shutdown immediate;
数据库已经关闭。
已经卸载数据库。
ORACLE 例程已经关闭。
SQL> startup mount;
ORACLE 例程已经启动。Total System Global Area 1068937216 bytes
Fixed Size                  2233344 bytes
Variable Size             624954368 bytes
Database Buffers          436207616 bytes
Redo Buffers                5541888 bytes
数据库装载完毕。
SQL> alter database archivelog;数据库已更改。SQL> alter database open;数据库已更改。SQL> archive log list;
数据库日志模式            存档模式
自动存档             启用
存档终点            USE_DB_RECOVERY_FILE_DEST
最早的联机日志序列     1
下一个存档日志序列   2
当前日志序列           2
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;数据库已更改。SQL> 

2、Oracle 数据

SQL> CREATE USER admin IDENTIFIED BY 123;用户已创建。SQL> GRANT dba TO admin;授权成功。SQL> quit;
[root@node1 ~]# sqlplus admin/123SQL*Plus: Release 11.2.0.2.0 Production on 星期三 6月 28 08:06:29 2023Copyright (c) 1982, 2011, Oracle.  All rights reserved.连接到: 
Oracle Database 11g Express Edition Release 11.2.0.2.0 - 64bit Production
SQL> 
CREATE TABLE t_person(ID NUMBER(10),NAME VARCHAR2(128) ,PRIMARY KEY(ID));
INSERT INTO t_person VALUES (1, 'zhangsan');
INSERT INTO t_person VALUES (2, 'lisi');
INSERT INTO t_person VALUES (3, 'wangwu');
CREATE TABLE t_dept(ID NUMBER(10),NAME VARCHAR2(128) ,PRIMARY KEY(ID));
INSERT INTO t_dept VALUES (1, 'HR');
INSERT INTO t_dept VALUES (2, 'IT');
commit;

3、启动Doris

[root@node1 doris-2.0]# fe/bin/start_fe.sh --daemon
[root@node1 doris-2.0]# be/bin/start_be.sh --daemon
[root@node1 doris-2.0]# jps
16081 DorisBE
14723 DorisFE
16158 Jps
[root@node1 doris-2.0]# 

4、启动Flink

下载相关jar,复制到flink的lib目录

[root@node1 flink-1.17.1]# ll lib
总用量 270772
-rw-r--r-- 1  501 games    196491 5月  19 06:56 flink-cep-1.17.1.jar
-rw-r--r-- 1  501 games    542620 5月  19 06:59 flink-connector-files-1.17.1.jar
-rw-r--r-- 1  501 games    102472 5月  19 07:02 flink-csv-1.17.1.jar
-rw-r--r-- 1  501 games 135975541 5月  19 07:13 flink-dist-1.17.1.jar
-rw-r--r-- 1 root root    8452171 6月  27 23:12 flink-doris-connector-1.17-1.4.0.jar
-rw-r--r-- 1  501 games    180248 5月  19 07:02 flink-json-1.17.1.jar
-rw-r--r-- 1  501 games  21043319 5月  19 07:12 flink-scala_2.12-1.17.1.jar
-rw-r--r-- 1 root root   28702667 6月  28 08:28 flink-sql-connector-oracle-cdc-2.4.1.jar
-rw-r--r-- 1  501 games  15407424 5月  19 07:13 flink-table-api-java-uber-1.17.1.jar
-rw-r--r-- 1  501 games  38191226 5月  19 07:08 flink-table-planner-loader-1.17.1.jar
-rw-r--r-- 1  501 games   3146210 5月  19 06:56 flink-table-runtime-1.17.1.jar
-rw-r--r-- 1  501 games    208006 5月  17 06:07 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1  501 games    301872 5月  17 06:07 log4j-api-2.17.1.jar
-rw-r--r-- 1  501 games   1790452 5月  17 06:07 log4j-core-2.17.1.jar
-rw-r--r-- 1  501 games     24279 5月  17 06:07 log4j-slf4j-impl-2.17.1.jar
[root@node1 flink-1.17.1]#

启动flink集群

[root@node1 flink-1.17.1]# bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node1.
Starting taskexecutor daemon on host node1.
[root@node1 flink-1.17.1]# jps
11755 StandaloneSessionClusterEntrypoint
12091 TaskManagerRunner
12190 Jps
[root@node1 flink-1.17.1]# 

5、测试同步操作

[root@node1 flink-1.17.1]# bin/flink run \
>     -Dexecution.checkpointing.interval=10s \
>     -Dparallelism.default=1 \
>     -c org.apache.doris.flink.tools.cdc.CdcTools \
>     lib/flink-doris-connector-1.17-1.4.0.jar \
>     oracle-sync-database \
>     --database test \
>     --oracle-conf hostname=node1 \
>     --oracle-conf port=1521 \
>     --oracle-conf username=admin \
>     --oracle-conf password=admin123 \
>     --oracle-conf database-name=XE \
>     --oracle-conf schema-name=ADMIN \
>     --including-tables "t_.*" \
>     --sink-conf fenodes=node1:8030 \
>     --sink-conf username=root \
>     --sink-conf password=123456\
>     --sink-conf jdbc-url=jdbc:mysql://node1:9030 \
>     --sink-conf sink.label-prefix=label \
>     --table-conf replication_num=1Unknown operation oracle-sync-database
[root@node1 flink-1.17.1]#

异常:Unknown operation oracle-sync-database
处理办法:
需要使用最新的 flink-doris-connector 包https://repository.apache.org/content/repositories/snapshots/org/apache/doris/

[root@node1 flink-1.17.1]# ll lib
-rw-r--r-- 1  501 games    196491 5月  19 06:56 flink-cep-1.17.1.jar
-rw-r--r-- 1  501 games    542620 5月  19 06:59 flink-connector-files-1.17.1.jar
-rw-r--r-- 1  501 games    102472 5月  19 07:02 flink-csv-1.17.1.jar
-rw-r--r-- 1  501 games 135975541 5月  19 07:13 flink-dist-1.17.1.jar
-rw-r--r-- 1 root root    8526699 9月  21 12:46 flink-doris-connector-1.17-1.5.0-20230915.034854-2.jar
-rw-r--r-- 1  501 games    180248 5月  19 07:02 flink-json-1.17.1.jar
-rw-r--r-- 1  501 games  21043319 5月  19 07:12 flink-scala_2.12-1.17.1.jar
-rw-r--r-- 1 root root   28702667 6月  28 08:28 flink-sql-connector-oracle-cdc-2.4.1.jar
-rw-r--r-- 1  501 games  15407424 5月  19 07:13 flink-table-api-java-uber-1.17.1.jar
-rw-r--r-- 1  501 games  38191226 5月  19 07:08 flink-table-planner-loader-1.17.1.jar
-rw-r--r-- 1  501 games   3146210 5月  19 06:56 flink-table-runtime-1.17.1.jar
-rw-r--r-- 1  501 games    208006 5月  17 06:07 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1  501 games    301872 5月  17 06:07 log4j-api-2.17.1.jar
-rw-r--r-- 1  501 games   1790452 5月  17 06:07 log4j-core-2.17.1.jar
-rw-r--r-- 1  501 games     24279 5月  17 06:07 log4j-slf4j-impl-2.17.1.jar

重新执行

[root@node1 flink-1.17.1]# bin/flink run \
>     -Dexecution.checkpointing.interval=10s \
>     -Dparallelism.default=1 \
>     -c org.apache.doris.flink.tools.cdc.CdcTools \
>     lib/flink-doris-connector-1.17-1.5.0-20230915.034854-2.jar \
>     oracle-sync-database \
>     --database test \
>     --oracle-conf hostname=node1 \
>     --oracle-conf port=1521 \
>     --oracle-conf username=admin \
>     --oracle-conf password=admin123 \
>     --oracle-conf database-name=XE \
>     --oracle-conf schema-name=ADMIN \
>     --including-tables "t_.*" \
>     --sink-conf fenodes=node1:8030 \
>     --sink-conf username=root \
>     --sink-conf password=123456\
>     --sink-conf jdbc-url=jdbc:mysql://node1:9030 \
>     --sink-conf sink.label-prefix=label \
>     --table-conf replication_num=1------------------------------------------------------------The program finished with the following exception:org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No tables to be synchronized.at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.lang.IllegalStateException: No tables to be synchronized.at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)at org.apache.doris.flink.tools.cdc.DatabaseSync.build(DatabaseSync.java:101)at org.apache.doris.flink.tools.cdc.CdcTools.syncDatabase(CdcTools.java:116)at org.apache.doris.flink.tools.cdc.CdcTools.createOracleSyncDatabase(CdcTools.java:80)at org.apache.doris.flink.tools.cdc.CdcTools.main(CdcTools.java:53)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)... 9 more
[root@node1 flink-1.17.1]# 

报错:No tables to be synchronized
修改:--including-tables "T_.*"

[root@node1 flink-1.17.1]# bin/flink run \
>     -Dexecution.checkpointing.interval=10s \
>     -Dparallelism.default=1 \
>     -c org.apache.doris.flink.tools.cdc.CdcTools \
>     lib/flink-doris-connector-1.17-1.5.0-20230915.034854-2.jar \
>     oracle-sync-database \
>     --database test \
>     --oracle-conf hostname=node1 \
>     --oracle-conf port=1521 \
>     --oracle-conf username=admin \
>     --oracle-conf password=admin123 \
>     --oracle-conf database-name=XE \
>     --oracle-conf schema-name=ADMIN \
>     --including-tables "T_.*" \
>     --sink-conf fenodes=node1:8030 \
>     --sink-conf username=root \
>     --sink-conf password=123456\
>     --sink-conf jdbc-url=jdbc:mysql://node1:9030 \
>     --sink-conf sink.label-prefix=label \
>     --table-conf replication_num=1------------------------------------------------------------The program finished with the following exception:org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: The following SQL query could not be executed: SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: org.apache.doris.flink.exception.DorisSystemException: The following SQL query could not be executed: SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;at org.apache.doris.flink.catalog.doris.DorisSystem.extractColumnValuesBySQL(DorisSystem.java:137)at org.apache.doris.flink.catalog.doris.DorisSystem.listDatabases(DorisSystem.java:59)at org.apache.doris.flink.catalog.doris.DorisSystem.databaseExists(DorisSystem.java:67)at org.apache.doris.flink.tools.cdc.DatabaseSync.build(DatabaseSync.java:102)at org.apache.doris.flink.tools.cdc.CdcTools.syncDatabase(CdcTools.java:116)at org.apache.doris.flink.tools.cdc.CdcTools.createOracleSyncDatabase(CdcTools.java:80)at org.apache.doris.flink.tools.cdc.CdcTools.main(CdcTools.java:53)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)... 9 more
Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driverat java.net.URLClassLoader.findClass(URLClassLoader.java:382)at java.lang.ClassLoader.loadClass(ClassLoader.java:424)at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)at java.lang.ClassLoader.loadClass(ClassLoader.java:357)at java.lang.Class.forName0(Native Method)at java.lang.Class.forName(Class.java:264)at org.apache.doris.flink.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:52)at org.apache.doris.flink.catalog.doris.DorisSystem.extractColumnValuesBySQL(DorisSystem.java:121)... 20 more
[root@node1 flink-1.17.1]# 

缺少mysql驱动,flink的lib目录添加mysql-connector-java-5.1.49.jar

[root@node1 flink-1.17.1]# bin/flink run \
>     -Dexecution.checkpointing.interval=10s \
>     -Dparallelism.default=1 \
>     -c org.apache.doris.flink.tools.cdc.CdcTools \
>     lib/flink-doris-connector-1.17-1.5.0-20230915.034854-2.jar \
>     oracle-sync-database \
>     --database test \
>     --oracle-conf hostname=node1 \
>     --oracle-conf port=1521 \
>     --oracle-conf username=admin \
>     --oracle-conf password=admin123 \
>     --oracle-conf database-name=XE \
>     --oracle-conf schema-name=ADMIN \
>     --including-tables "T_.*" \
>     --sink-conf fenodes=node1:8030 \
>     --sink-conf username=root \
>     --sink-conf password=123456\
>     --sink-conf jdbc-url=jdbc:mysql://node1:9030 \
>     --sink-conf sink.label-prefix=label \
>     --table-conf replication_num=1Thu Sep 21 13:06:59 EDT 2023 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
Job has been submitted with JobID fd244bcb10f06e5aa801d2708441bb86

登录Doris查看同步效果

[root@node1 doris-2.0]# mysql -h192.168.203.101 -P9030 -uroot -p
Enter password: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 3
Server version: 5.7.99 Doris version doris-2.0.1.1-ea0bfb2Copyright (c) 2000, 2021, Oracle and/or its affiliates. All rights reserved.Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.mysql> use test;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -ADatabase changed
mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| T_DEPT         |
| T_PERSON       |
+----------------+
2 rows in set (0.00 sec)mysql> select * from T_PERSON;
Empty set (0.29 sec)mysql> select * from T_DEPT;
Empty set (0.06 sec)

6、同步成功

bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1 \-c org.apache.doris.flink.tools.cdc.CdcTools \lib/flink-doris-connector-1.17-1.5.0-20230915.034854-2.jar \oracle-sync-database \--database test \--oracle-conf hostname=node1 \--oracle-conf port=1521 \--oracle-conf username=admin \--oracle-conf password=123 \--oracle-conf database-name=XE \--oracle-conf schema-name=ADMIN \--including-tables "T_.*" \--sink-conf fenodes=node1:8030 \--sink-conf username=root \--sink-conf password=123456\--sink-conf jdbc-url=jdbc:mysql://node1:9030 \--sink-conf sink.label-prefix=label \--table-conf replication_num=1

在这里插入图片描述

在这里插入图片描述
查看Flink中运行的Job http://192.168.203.101:8081/
在这里插入图片描述

7、测试新增记录同步

SQL> INSERT INTO t_person VALUES (4, 'test_user');已创建 1 行。SQL> INSERT INTO t_dept VALUES (3, 'CS');已创建 1 行。SQL> commit;提交完成。SQL> 

在这里插入图片描述

在这里插入图片描述

这篇关于【实操记录】Oracle数据整库同步至Apache Doris的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/155166

相关文章

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D

oracle DBMS_SQL.PARSE的使用方法和示例

《oracleDBMS_SQL.PARSE的使用方法和示例》DBMS_SQL是Oracle数据库中的一个强大包,用于动态构建和执行SQL语句,DBMS_SQL.PARSE过程解析SQL语句或PL/S... 目录语法示例注意事项DBMS_SQL 是 oracle 数据库中的一个强大包,它允许动态地构建和执行

关于Spring @Bean 相同加载顺序不同结果不同的问题记录

《关于Spring@Bean相同加载顺序不同结果不同的问题记录》本文主要探讨了在Spring5.1.3.RELEASE版本下,当有两个全注解类定义相同类型的Bean时,由于加载顺序不同,最终生成的... 目录问题说明测试输出1测试输出2@Bean注解的BeanDefiChina编程nition加入时机总结问题说明

PLsql Oracle 下载安装图文过程详解

《PLsqlOracle下载安装图文过程详解》PL/SQLDeveloper是一款用于开发Oracle数据库的集成开发环境,可以通过官网下载安装配置,并通过配置tnsnames.ora文件及环境变... 目录一、PL/SQL Developer 简介二、PL/SQL Developer 安装及配置详解1.下

Redis的数据过期策略和数据淘汰策略

《Redis的数据过期策略和数据淘汰策略》本文主要介绍了Redis的数据过期策略和数据淘汰策略,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录一、数据过期策略1、惰性删除2、定期删除二、数据淘汰策略1、数据淘汰策略概念2、8种数据淘汰策略

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

Python给Excel写入数据的四种方法小结

《Python给Excel写入数据的四种方法小结》本文主要介绍了Python给Excel写入数据的四种方法小结,包含openpyxl库、xlsxwriter库、pandas库和win32com库,具有... 目录1. 使用 openpyxl 库2. 使用 xlsxwriter 库3. 使用 pandas 库

SpringBoot定制JSON响应数据的实现

《SpringBoot定制JSON响应数据的实现》本文主要介绍了SpringBoot定制JSON响应数据的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录前言一、如何使用@jsonView这个注解?二、应用场景三、实战案例注解方式编程方式总结 前言

使用Python在Excel中创建和取消数据分组

《使用Python在Excel中创建和取消数据分组》Excel中的分组是一种通过添加层级结构将相邻行或列组织在一起的功能,当分组完成后,用户可以通过折叠或展开数据组来简化数据视图,这篇博客将介绍如何使... 目录引言使用工具python在Excel中创建行和列分组Python在Excel中创建嵌套分组Pyt

在Rust中要用Struct和Enum组织数据的原因解析

《在Rust中要用Struct和Enum组织数据的原因解析》在Rust中,Struct和Enum是组织数据的核心工具,Struct用于将相关字段封装为单一实体,便于管理和扩展,Enum用于明确定义所有... 目录为什么在Rust中要用Struct和Enum组织数据?一、使用struct组织数据:将相关字段绑