【大数据开发运维解决方案】Ogg For Bigdata 同步Oracle数据到KAFKA(包括初始化历史数据)

本文主要是介绍【大数据开发运维解决方案】Ogg For Bigdata 同步Oracle数据到KAFKA(包括初始化历史数据),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

OGG同步Oracle数据到KAFKA:OGG初始化进程初始化历史数据

在前面曾写过几篇关于OGG同步Oracle等库数据到kafka的文章:
OGG实时同步Oracle数据到Kafka实施文档(供flink流式计算)
OGG For Bigdata 12按操作类型同步Oracle数据到kafka不同topic
但是那都是做测试,没有说实际工作情况下如何将Oracle等库表的历史数据初始化到kafka的方案,我这里用过两个方案,第一个比较笨的方案那就是写shell脚本将数据从Oracle导出成json格式的数据然后再写到kafka,另一种就是现在要介绍的通过OGG本身的初始化进程来做历史数据初始化,本篇文章环境完全根据前面文章搭建的环境来做的。
先再来看下当前环境的大致配置情况:
在这里插入图片描述
由于本文做的一系列Ogg forBigdata投递j’son消息到kafka操作是为了提供flink消费做实时计算用,为了极大的降低flink代码解析json的成本,提高消费速度,本人文章对insert,delete,update/pkupdate的映射大致逻辑是这样映射的:
1、对于insert操作,由于ogg for bigdata生成的json消息是下面这种情况:

 {"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-16 10:17:08.006650","current_ts":"2019-09-16T10:25:01.379000","pos":"00000000000000002199","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"EMPNO":1232,"ENAME":"    FORD","JOB":"ANALYST","MGR":7566,"HIREDATE":"1981-12-03 00:00:00","SAL":3000.00,"COMM":null,"DEPTNO":20}}

也就是有效数据存储在after的部分,这里不做变化;
2、对于delete 操作,由于ogg for bigdata生成的json消息是下面这种情况:

 {"table":"SCOTT.SCEMP","op_type":"D","op_ts":"2019-09-16 10:17:08.006650","current_ts":"2019-09-16T10:25:01.379000","pos":"00000000000000002199","tokens":{"TKN-OP-TYPE":"DELETE"},"BEFORE":{"EMPNO":1232,"ENAME":"    FORD","JOB":"ANALYST","MGR":7566,"HIREDATE":"1981-12-03 00:00:00","SAL":3000.00,"COMM":null,"DEPTNO":20}}

也就是有效数据存储在before的部分,由于insert,delete,update我这里不再像前面文章映射到不同topic,这里都映射到一个topic中,这里flink解析就有问题了,因为json结构不同,insert的有效数据在after而delete的在before,这里为了flink解析json方便,将delete的操作对应的json的有效数据也放到after中,怎么实现?就是将delete转成insert,转置后的结果json如下:

{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-16 10:17:08.006650","current_ts":"2019-09-16T10:25:01.379000","pos":"00000000000000002199","tokens":{"TKN-OP-TYPE":"DELETE"},"after":{"EMPNO":1232,"ENAME":"    FORD","JOB":"ANALYST","MGR":7566,"HIREDATE":"1981-12-03 00:00:00","SAL":3000.00,"COMM":null,"DEPTNO":20}}

但是转置完后,标识操作类型的op_type也变成了I,那后面flink计算时候怎么知道这条记录实际做的是delete?,这就是为什么我上篇文章在源端抽取进程加了TKN-OP-TYPE属性来标识这条记录做的是什么操作,这样就算replicat做了转置,op_type会变,但是TKN-OP-TYPE是从源端带来的属性值,这个不会变。
3、对于普通update操作,由于ogg for bigdata生成的json消息是下面这种情况:

{"table":"SCOTT.SCEMP","op_type":"U","op_ts":"2019-09-16 16:23:50.607615","current_ts":"2019-09-16T16:24:01.925000","pos":"00000000230000015887","tokens":{"TKN-OP-TYPE":"SQL COMPUPDATE"},"before":{"EMPNO":8888},"aft
er":{"EMPNO":6666,"ENAME":"zyand"}}

这里的json只会带有加了附加日志的主键及被修改的字段值, 我们首先需要做的是,把update after的数据单独拿出来做一个json:

{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-16 16:36:58.607186","current_ts":"2019-09-16T16:37:06.891000","pos":"00000000230000016276","tokens":{"TKN-OP-TYPE":"SQL COMPUPDATE"},"after":{"EMPNO":6666,"ENAME":"zyand"}}

为什么不取before的数据,因为before的数据对我们没用,不需要取这些数据,其次,由于flink要计算的字段涉及empno,ename,job,sal,deptno这些字段,就算只是改了ename字段,其他字段没有变化,我们也要将这些没有变动的字段及其现在的值拿出来写到kafka,保证json消息的完整性,让flink在处理的时候更方便。
4、对于pkupdate操作,无论是主键+其他字段的修改还是仅主键单独的变更,原本的pkupdate消息如下:

{"table":"SCOTT.SCEMP","op_type":"U","op_ts":"2019-09-16 15:18:29.607061","current_ts":"2019-09-16T15:46:06.534000","pos":"00000000230000013943","tokens":{"TKN-OP-TYPE":"PK UPDATE"},"before":{"EMPNO":6666},"after":{"EMPNO":8888,"ENAME":"zyand","JOB":"kfc","SAL":100.00,"DEPTNO":30}}

这里我们要把pkupdate before的数据拆分成一个单独的json拿出来,并且让除了主键以外的其他需要计算的指标ename,job,sal,deptno也要在这个json中并且这些除主键外的字段值均要为null值,如下:

{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-17 09:36:39.480539","current_ts":"2019-09-17T09:36:52.022000","pos":"00000000230000021370","tokens":{"TKN-OP-TYPE":"PK UPDATE"},"after":{"EMPNO":6666,"ENAME":null,"JOB":null,"SAL":null,"DEPTNO":null}}

而after的也要单独拆分,要保证主键和所有字段的值都是现在最新的状态值:

{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-17 09:36:39.480539","current_ts":"2019-09-17T09:37:12.096000","pos":"00000000230000021370","tokens":{"TKN-OP-TYPE":"PK UPDATE"},"after":{"EMPNO":8888,"ENAME":"zyd","JOB":"kfc","SAL":100.00,"DEPTNO":30}}

之所以这么做一是因为前面说的保证j’son消息的完整性,其次是主键变更后,变更前的主键对应的j’son数据还在kafka中,而新的主键(包括变更主键和其他字段)对应的相关值除了变更主键时被变更的字段外其他的字段值都与旧主键值一致,这样flink计算的时候就会重复计算,为了避免重复计算,在主键变更后生成了新的主键+其他加了附加日志的字段j’son后,还要写一个旧的主键对应的j’son消息,让旧的主键最新的其他字段值都为null,这样flink在计算的时候,根据主键取最新状态值的时候就不会出现重复计算的问题了。
下面是上面逻辑的大致流程图:
在这里插入图片描述
下面看具体实验:

–下面所有源端表都是在scott用户下操作。

一、源端创建测试用表

create table scemp as select * from emp;
create table scdept as select * from dept;
ALTER TABLE scemp  ADD CONSTRAINT PK_scemp PRIMARY KEY (EMPNO);
ALTER TABLE scdept  ADD CONSTRAINT PK_scdept PRIMARY KEY (DEPTNO);

在这里插入图片描述
在这里插入图片描述

二、源端OGG操作

1、添加附加日志

[oracle@source ogg12]$ ./ggsciOracle GoldenGate Command Interpreter for Oracle
Version 12.2.0.2.2 OGGCORE_12.2.0.2.0_PLATFORMS_170630.0419_FBO
Linux, x64, 64bit (optimized), Oracle 11g on Jun 30 2017 14:42:26
Operating system character set identified as UTF-8.Copyright (C) 1995, 2017, Oracle and/or its affiliates. All rights reserved.GGSCI (source) 16> info allProgram     Status      Group       Lag at Chkpt  Time Since ChkptMANAGER     RUNNING                                           
EXTRACT     ABENDED     D_KA        00:00:00      5714:15:13  
EXTRACT     ABENDED     D_KF        00:00:00      6507:59:26  
EXTRACT     ABENDED     D_SC        00:00:00      140:41:02   
EXTRACT     RUNNING     D_ZT        00:00:00      00:00:04    
EXTRACT     STOPPED     E_KA        00:00:00      2692:41:17  
EXTRACT     ABENDED     E_SC        00:00:00      00:29:58    
EXTRACT     STOPPED     E_ZT        00:00:00      00:15:43   GGSCI (source) 2> dblogin userid ogg password ogg
Successfully logged into database.GGSCI (source as ogg@orcl) 18> add trandata SCOTT.SCEMPLogging of supplemental redo data enabled for table SCOTT.SCEMP.
TRANDATA for scheduling columns has been added on table 'SCOTT.SCEMP'.
TRANDATA for instantiation CSN has been added on table 'SCOTT.SCEMP'.
GGSCI (source as ogg@orcl) 19> add trandata SCOTT.SCDEPTLogging of supplemental redo data enabled for table SCOTT.SCDEPT.
TRANDATA for scheduling columns has been added on table 'SCOTT.SCDEPT'.
TRANDATA for instantiation CSN has been added on table 'SCOTT.SCDEPT'.
GGSCI (source as ogg@orcl) 20> info trandata SCOTT.SC*Logging of supplemental redo log data is enabled for table SCOTT.SCDEPT.Columns supplementally logged for table SCOTT.SCDEPT: DEPTNO.Prepared CSN for table SCOTT.SCDEPT: 2151646
Logging of supplemental redo log data is enabled for table SCOTT.SCEMP.Columns supplementally logged for table SCOTT.SCEMP: EMPNO.Prepared CSN for table SCOTT.SCEMP: 2151611

因为现在只是对主键加了附加日志,未来DML操作,insert,delete向kafka投递消息时,规定所有的数据都在after中便于j’son解析注册,没问题,但是update以json格式投递到kafka然后flink消费时字段值只有主键和被修改的字段存在值,但是未来SCEMP表可能empno,ename,job,sal,deptno这几个字段都会用到,dept表所有字段都会用到,并且要求无论对哪些字段做update操作,投递到kafka的所有json数据必须都要有上面几个字段及相关值。所以额外给emp表的empno,ename,job,sal,deptno组合添加附加日志,dept表给整个表添加附加日志来支持后续flink计算:

[oracle@source ~]$ sqlplus / as sysdbaSQL*Plus: Release 11.2.0.4.0 Production on Fri Sep 6 15:46:02 2019Copyright (c) 1982, 2013, Oracle.  All rights reserved.Connected to:
Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production
With the Partitioning, OLAP, Data Mining and Real Application Testing optionsSQL> alter table  scott.scemp add SUPPLEMENTAL LOG GROUP groupaa(empno,ename,job,sal,deptno) always;Table altered.SQL> ALTER TABLE scott.scdept add SUPPLEMENTAL LOG DATA (ALL) COLUMNS;Table altered.

2、源端配置初始化进程

数据初始化,指的是从源端Oracle 数据库将已存在的需要的数据同步至目标端,配置初始化进程:

GGSCI (source) 1> info allProgram     Status      Group       Lag at Chkpt  Time Since ChkptMANAGER     RUNNING                                           
EXTRACT     ABENDED     D_KA        00:00:00      5714:15:13  
EXTRACT     ABENDED     D_KF        00:00:00      6507:59:26  
EXTRACT     ABENDED     D_SC        00:00:00      140:41:02   
EXTRACT     RUNNING     D_ZT        00:00:00      00:00:04    
EXTRACT     STOPPED     E_KA        00:00:00      2692:41:17  
EXTRACT     ABENDED     E_SC        00:00:00      00:29:58    
EXTRACT     STOPPED     E_ZT        00:00:00      00:15:43   GGSCI (source) 2> dblogin userid ogg password ogg
Successfully logged into database.GGSCI (source as ogg@orcl) 3> add extract initsc,sourceistable
EXTRACT added.
GGSCI (source as ogg@orcl) 4> edit params init01
加入下面配置
EXTRACT init01
SETENV (NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
USERID ogg,PASSWORD ogg
RMTHOST 192.168.1.66, MGRPORT 7809
RMTFILE ./dirdat/ed,maxfiles 999, megabytes 500
----------SCOTT.SCEMP
table SCOTT.SCEMP,tokens(
TKN-OP-TYPE = @GETENV ('GGHEADER', 'OPTYPE')
);
----------SCOTT.SCDEPT
table SCOTT.SCDEPT,tokens(
TKN-OP-TYPE = @GETENV ('GGHEADER', 'OPTYPE')
);

3、源端生成表结构

GoldenGate 提供了 DEFGEN 工具,用于生成数据定义,当源表和目标表中 的定义不同时,GoldenGate 进程将引用该专用工具。在运行 DEFGEN 之前,需要 为其创建一个参数文件:

GGSCI (source) 1> edit params init_scott
加入下面配置
defsfile /u01/app/oracle/ogg12/dirdef/init_scott.def
userid ogg,password ogg
table scott.SCEMP;
table scott.SCDEPT;GGSCI (source) 4> exit
生成表结构文件,需要执行shell命令,如果配置中的文件已经存在,执行下面命令会报错,所以 在执行前需要先删除:
[oracle@source ogg12]$ cd dirdef/
[oracle@source dirdef]$ ls
emp.def  hdfs.def  init_emp.def  init_scott.def  kafka.def  scott.def  ztvoucher.def
[oracle@source dirdef]$ rm -rf init_scott.def 
[oracle@source ogg12]$ ./defgen paramfile dirprm/init_scott.prm ***********************************************************************Oracle GoldenGate Table Definition Generator for OracleVersion 12.2.0.2.2 OGGCORE_12.2.0.2.0_PLATFORMS_170630.0419Linux, x64, 64bit (optimized), Oracle 11g on Jun 30 2017 11:35:56Copyright (C) 1995, 2017, Oracle and/or its affiliates. All rights reserved.Starting at 2019-09-10 15:41:15
***********************************************************************Operating System Version:
Linux
Version #2 SMP Tue May 17 07:23:38 PDT 2016, Release 4.1.12-37.4.1.el6uek.x86_64
Node: source
Machine: x86_64soft limit   hard limit
Address Space Size   :    unlimited    unlimited
Heap Size            :    unlimited    unlimited
File Size            :    unlimited    unlimited
CPU Time             :    unlimited    unlimitedProcess id: 84810***********************************************************************
**            Running with the following parameters                  **
***********************************************************************
defsfile /u01/app/oracle/ogg12/dirdef/init_scott.def
userid ogg,password ***
table scott.SCEMP;
Retrieving definition for SCOTT.SCEMP.
table scott.SCDEPT;
Retrieving definition for SCOTT.SCDEPT.Definitions generated for 2 tables in /u01/app/oracle/ogg12/dirdef/init_scott.def.

将生成的定义文件传送到目标端, 目标端的replicate进程会使用这个文件。

[oracle@source ogg12]$ scp  /u01/app/oracle/ogg12/dirdef/init_scott.def root@192.168.1.66:/hadoop/ogg12/dirdef/
root@192.168.1.66's password: 
init_scott.def                                                                                                                                                                  100% 2354     2.3KB/s   00:00    

4、配置抽取进程

因为环境中已经存在一个向195.168.1.66作用的抽取进程和投递进程 e_zt,d_zt:

GGSCI (source) 1> info allProgram     Status      Group       Lag at Chkpt  Time Since ChkptMANAGER     RUNNING                                           
EXTRACT     ABENDED     D_KA        00:00:00      5714:15:13  
EXTRACT     ABENDED     D_KF        00:00:00      6507:59:26  
EXTRACT     ABENDED     D_SC        00:00:00      140:41:02   
EXTRACT     RUNNING     D_ZT        00:00:00      00:00:04    --这个
EXTRACT     STOPPED     E_KA        00:00:00      2692:41:17  
EXTRACT     ABENDED     E_SC        00:00:00      00:29:58    
EXTRACT     STOPPED     E_ZT        00:00:00      00:15:43     --这个

,并且195.168.1.66的kafka应用进程已经存在并停止了:

[root@hadoop ogg12]# ./ggsciOracle GoldenGate for Big Data
Version 12.3.2.1.1 (Build 005)Oracle GoldenGate Command Interpreter
Version 12.3.0.1.2 OGGCORE_OGGADP.12.3.0.1.2_PLATFORMS_180712.2305
Linux, x64, 64bit (optimized), Generic on Jul 13 2018 00:46:09
Operating system character set identified as UTF-8.Copyright (C) 1995, 2018, Oracle and/or its affiliates. All rights reserved.GGSCI (hadoop) 2> info allProgram     Status      Group       Lag at Chkpt  Time Since ChkptMANAGER     RUNNING                                           
REPLICAT    STOPPED     RHDFS       00:00:00      454:26:03   
REPLICAT    STOPPED     RKAFKA      00:00:00      2637:46:02   --这个

现在只需要把上面两张表的配置加入到e_zt,现在抽取进程配置如下:

GGSCI (source) 2> edit params e_zt写入下面内容:
extract e_zt
userid ogg,password ogg
setenv(NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
setenv(ORACLE_SID="orcl")
reportcount every 30 minutes,rate
numfiles 5000
discardfile ./dirrpt/e_zt.dsc,append,megabytes 1000
warnlongtrans 2h,checkinterval 30m
exttrail ./dirdat/zt
dboptions allowunusedcolumn
tranlogoptions archivedlogonly
tranlogoptions altarchivelogdest primary /u01/arch
dynamicresolution
fetchoptions nousesnapshot
ddl include mapped
ddloptions addtrandata,report
notcpsourcetimer
NOCOMPRESSDELETES
NOCOMPRESSUPDATES
GETUPDATEBEFORES
----------SCOTT.ZTVOUCHER
table SCOTT.ZTVOUCHER,keycols(MANDT,GJAHR,BUKRS,BELNR,BUZEI,MONAT,BUDAT),tokens(
TKN-OP-TYPE = @GETENV ('GGHEADER', 'OPTYPE')
);
----------SCOTT.ORA_HDFS
table SCOTT.ORA_HDFS,tokens(
TKN-OP-TYPE = @GETENV ('GGHEADER', 'OPTYPE')
);
----------SCOTT.SCEMP
table SCOTT.SCEMP,tokens(
TKN-OP-TYPE = @GETENV ('GGHEADER', 'OPTYPE')
);
----------SCOTT.SCDEPT
table SCOTT.SCDEPT,tokens(
TKN-OP-TYPE = @GETENV ('GGHEADER', 'OPTYPE')
);

5、配置投递进程

将上面两张表加进来

extract d_zt
rmthost 192.168.1.66,mgrport 7809,compress
userid ogg,password ogg
PASSTHRU
numfiles 5000
rmttrail ./dirdat/zt
dynamicresolution
table scott.ztvoucher;
table scott.ora_hdfs;
table scott.scemp;
table scott.scdept;

三、ODS端操作

1、配置初始化进程

GGSCI (hadoop) 3> ADD replicat init01, specialrun
REPLICAT added.GGSCI (hadoop) 4> edit params init01
添加下面配置:
SPECIALRUN
end runtime
setenv(NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
targetdb libfile libggjava.so set property=./dirprm/kafka.props
SOURCEDEFS ./dirdef/init_scott.def
EXTFILE ./dirdat/ed
reportcount every 1 minutes, rate
grouptransops 10000
map scott.SCEMP,target SCOTT.SCEMP;
map scott.SCDEPT,target SCOTT.SCDEPT;

2、配置应用进程

因为之前已经配置了rkafka进程,现在只需要在这个进程里面加那两张表的配置就行。
这里有一个问题,虽然update之后的数据能够让flink正常算,但是对于pkupdate之前的主键对应的记录值我们还是会做计算,所以这里flink计算会出现问题,会让同一条记录(只变了主键其他值不变,在kafka中是两条消息)计算两次了,而且我们前面规定了为了flink计算方便,所有数据都从json的after部分取数,所以这里我把对于pkupdate操作来说,在插入kafka一条update之后的数据后,再插入一条update前的数据,并且这个update前的数据除了主键是原来的值外,其余要计算的指标值都设置成null,这样相当于原来变更前的主键其他指标最新的值都是null了,flink在对当前主键最新值计算的时候就会把这些值当成空值来计算从而减去update前的值,只计算update后的值,就不会出现重复计算了,而且前面的配置太冗余,看最新的应用进程配置:

GGSCI (hadoop) 9> view params rkafkaREPLICAT rkafka
-- Trail file for this example is located in "AdapterExamples/trail" directory
-- Command to add REPLICAT
-- add replicat rkafka, exttrail AdapterExamples/trail/tr
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
allowduptargetmap
NOINSERTDELETES
IGNOREDELETES
IGNOREINSERTS
GETUPDATES
INSERTUPDATES
MAP SCOTT.SCEMP, TARGET SCOTT.SCEMP;
IGNOREDELETES
IGNOREINSERTS
MAP SCOTT.SCEMP, TARGET SCOTT.SCEMP,colmap(
EMPNO=before.EMPNO,
ENAME=@COLSTAT (NULL), 
JOB=@COLSTAT (NULL), 
SAL=@COLSTAT (NULL), 
DEPTNO=@COLSTAT (NULL)
),filter(@strfind(@token('TKN-OP-TYPE'),'PK UPDATE') >0);
NOINSERTUPDATES
GETINSERTS
IGNOREDELETES
IGNOREUPDATES
MAP SCOTT.SCEMP, TARGET SCOTT.SCEMP;
IGNOREUPDATES
IGNOREINSERTS
GETDELETES
INSERTDELETES
MAP SCOTT.SCEMP, TARGET SCOTT.SCEMP;
NOINSERTDELETES
IGNOREDELETES
IGNOREINSERTS
GETUPDATES
INSERTUPDATES
MAP SCOTT.SCDEPT, TARGET SCOTT.SCDEPT;
IGNOREDELETES
IGNOREINSERTS
MAP SCOTT.SCDEPT, TARGET SCOTT.SCDEPT,colmap(
DEPTNO=before.DEPTNO,
DNAME=@COLSTAT (NULL), 
LOC=@COLSTAT (NULL), 
TESS=@COLSTAT (NULL)
),filter(@strfind(@token('TKN-OP-TYPE'),'PK UPDATE') >0);
NOINSERTUPDATES
GETINSERTS
IGNOREDELETES
IGNOREUPDATES
MAP SCOTT.SCDEPT, TARGET SCOTT.SCDEPT;
IGNOREUPDATES
IGNOREINSERTS
GETDELETES
INSERTDELETES
MAP SCOTT.SCDEPT, TARGET SCOTT.SCDEPT;

三、同步数据

1、源端操作

启动进程:

GGSCI (source) 9> info allProgram     Status      Group       Lag at Chkpt  Time Since ChkptMANAGER     RUNNING                                           
EXTRACT     ABENDED     D_KA        00:00:00      5853:30:25  
EXTRACT     ABENDED     D_KF        00:00:00      6647:14:38  
EXTRACT     ABENDED     D_SC        00:00:00      279:56:15   
EXTRACT     RUNNING     D_ZT        00:00:00      00:00:07    
EXTRACT     STOPPED     E_KA        00:00:00      2831:56:29  
EXTRACT     ABENDED     E_SC        00:00:00      00:08:54    
EXTRACT     STOPPED     E_ZT        00:00:00      137:51:42   GGSCI (source) 10> start e_ztSending START request to MANAGER ...
EXTRACT E_ZT starting
GGSCI (source) 12> start d_zt
EXTRACT D_ZT is already running.

2、源端初始化进程

GGSCI (source) 3> start init01Sending START request to MANAGER ...
EXTRACT INIT01 startingGGSCI (source) 4> info init01EXTRACT    INIT01    Last Started 2019-09-16 10:14   Status STARTING
Checkpoint Lag       Not Available
Process ID           87517
Log Read Checkpoint  Table SCOTT.SCDEPT2019-09-16 10:14:47  Record 4
Task                 SOURCEISTABLEGGSCI (source) 5> info init01EXTRACT    INIT01    Last Started 2019-09-16 10:17   Status STOPPED
Checkpoint Lag       Not Available
Log Read Checkpoint  Table SCOTT.SCDEPT2019-09-16 10:17:01  Record 4
Task                 SOURCEISTABLE或则通过下面方式初始化[oracle@source ogg12]$ ./extract paramfile dirprm/init01.prm  reportfile dirrpt/init01.rpt
[oracle@source ogg12]$ tail -30f dirrpt/init01.rpt 2019-09-16 09:45:32  INFO    OGG-02911  Processing table SCOTT.SCEMP.2019-09-16 09:45:32  INFO    OGG-02911  Processing table SCOTT.SCDEPT.***********************************************************************
*                   ** Run Time Statistics **                         *
***********************************************************************Report at 2019-09-16 09:45:32 (activity since 2019-09-16 09:45:26)Output to ./dirdat/ed:From Table SCOTT.SCEMP:#                   inserts:        16#                   updates:         0#                   deletes:         0#                  discards:         0
From Table SCOTT.SCDEPT:#                   inserts:         4#                   updates:         0#                   deletes:         0#                  discards:         0REDO Log StatisticsBytes parsed                    0Bytes output                 4417

去目标端查看生成的trail文件:

[root@hadoop dirdat]# ls -ltr ed*
-rw-r----- 1 root root 6265 Sep 16 10:17 ed000000[root@hadoop ogg12]# cat loginit_zt 
cd ./dirdat
open ed000000 
ghdr on
detail on
detail data
usertoken on
usertoken detail
ggstoken on
ggstoken detail
headertoken on
headertoken detail
reclen 0
pos last
pos rev
logtrail
pos
[root@hadoop dirdat]# cd ..
[root@hadoop ogg12]# ./logdump Oracle GoldenGate Log File Dump Utility
Version 12.3.0.1.2 OGGCORE_OGGADP.12.3.0.1.2_PLATFORMS_180712.2305Copyright (C) 1995, 2018, Oracle and/or its affiliates. All rights reserved.Logdump 91 >obey loginit_zt
cd ./dirdat
open ed000000
Current LogTrail is /hadoop/ogg12/dirdat/ed000000 
ghdr on
detail on
detail data
usertoken on
usertoken detail
ggstoken on
ggstoken detail
headertoken on
headertoken detail
reclen 0
Reclen set to 0 
pos last
Reading forward from RBA 6265 
pos rev
Reading in reverse from RBA 6265 
logtrail
Current LogTrail is /hadoop/ogg12/dirdat/ed000000 
pos
Current position is RBA 6265   Reverse 
Logdump 92 >pos last 
Reading in reverse from RBA 6265 
Logdump 93 >pos rev 
Reading in reverse from RBA 6265 
Logdump 94 >n 
TokenID x47 'G' Record Header    Info x01  Length  129 
TokenID x48 'H' GHDR             Info x00  Length   36 450c 0041 3600 05ff e26e 1fa8 d5b8 f202 0000 0000 | E..A6....n..........  0000 0000 0000 0000 0352 0000 0001 0000           | .........R......  
TokenID x44 'D' Data             Info x00  Length   54 
TokenID x55 'U' User Tokens      Info x00  Length   19 
TokenID x5a 'Z' Record Trailer   Info x01  Length  129 
___________________________________________________________________ 
Hdr-Ind    :     E  (x45)     Partition  :     .  (x0c)  
UndoFlag   :     .  (x00)     BeforeAfter:     A  (x41)  
RecLength  :    54  (x0036)   IO Time    : 2019/09/16 10:17:08.011.746   
IOType     :     5  (x05)     OrigNode   :   255  (xff) 
TransInd   :     .  (x03)     FormatType :     R  (x52) 
SyskeyLen  :     0  (x00)     Incomplete :     .  (x00) 
AuditRBA   :          0       AuditPos   : 0 
Continued  :     N  (x00)     RecCount   :     1  (x01) 2019/09/16 10:17:08.011.746 Insert               Len    54 RBA 6136 
Name: SCOTT.SCDEPT  (TDR Index: 2) 
After  Image:                                             Partition 12    U s   0000 000a 0000 0000 0000 0000 0028 0001 000e 0000 | .............(......  000a 4f50 4552 4154 494f 4e53 0002 000a 0000 0006 | ..OPERATIONS........  424f 5354 4f4e 0003 0004 ffff 0000                | BOSTON........  
Column     0 (x0000), Len    10 (x000a)  0000 0000 0000 0000 0028                          | .........(  
Column     1 (x0001), Len    14 (x000e)  0000 000a 4f50 4552 4154 494f 4e53                | ....OPERATIONS  
Column     2 (x0002), Len    10 (x000a)  0000 0006 424f 5354 4f4e                          | ....BOSTON  
Column     3 (x0003), Len     4 (x0004)  ffff 0000                                         | ....  User tokens:   19 bytes 
TKN-OP-TYPE         : INSERT 

数据过来了

3、目标端初始化进程

先看下当前kafka中topic信息:

[root@hadoop kafka]# cat ./list.sh 
#!/bin/bash
bin/kafka-topics.sh -describe -zookeeper 192.168.1.66:2181
[root@hadoop kafka]# ./list.sh 
Topic:DEPT	PartitionCount:1	ReplicationFactor:1	Configs:Topic: DEPT	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic:EMP	PartitionCount:1	ReplicationFactor:1	Configs:Topic: EMP	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic:ZTVOUCHER_DEL	PartitionCount:1	ReplicationFactor:1	Configs:Topic: ZTVOUCHER_DEL	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic:ZTVOUCHER_INS	PartitionCount:1	ReplicationFactor:1	Configs:Topic: ZTVOUCHER_INS	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic:kylin_streaming_topic	PartitionCount:1	ReplicationFactor:1	Configs:Topic: kylin_streaming_topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic:scott	PartitionCount:1	ReplicationFactor:1	Configs:Topic: scott	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic:zttest	PartitionCount:1	ReplicationFactor:1	Configs:Topic: zttest	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic:ztvoucher	PartitionCount:1	ReplicationFactor:1	Configs:Topic: ztvoucher	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

因为kafka已经配置了当没有相关topic时会自动创建相关topic,但是为了规范,这里手动创建topic:

 [root@hadoop kafka]# cat create.sh 
read -p "input topic:" name
bin/kafka-topics.sh --create --zookeeper 192.168.1.66:2181 --replication-factor 1 --partitions 1 --topic $name
[root@hadoop kafka]# ./create.sh 
input topic:SCEMP
Created topic "SCEMP".
[root@hadoop kafka]# ./create.sh 
input topic:SCDEPT
Created topic "SCDEPT".

单独开两个会话消费上面两个topic数据:

[root@hadoop kafka]# cat console.sh 
#!/bin/bash
read -p "input topic:" namebin/kafka-console-consumer.sh --zookeeper 192.168.1.66:2181 --topic $name --from-beginning
[root@hadoop kafka]# ./console.sh 
input topic:SCEMP
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
[root@hadoop kafka]# ./console.sh 
input topic:SCDEPT
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

开始初始化数据:

[root@hadoop ogg12]# ./replicat paramfile ./dirprm/init01.prm reportfile ./dirrpt/init01.rpt -p INITIALDATALOAD

查看日志:

[root@hadoop ogg12]# tail -f dirrpt/init01.rpt 
Sep 16, 2019 10:24:59 AM org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions
INFO: Loading XML bean definitions from class path resource [oracle/goldengate/datasource/DataSource-context.xml]
Sep 16, 2019 10:24:59 AM org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions
INFO: Loading XML bean definitions from class path resource [config/ggue-context.xml]
Sep 16, 2019 10:24:59 AM org.springframework.beans.factory.support.DefaultListableBeanFactory registerBeanDefinition
INFO: Overriding bean definition for bean 'dataSourceConfig' with a different definition: replacing [Generic bean: class [oracle.goldengate.datasource.DataSourceConfig]; scope=singleton; abstract=false; lazyIni
t=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in class path resource [oracle/goldengate/datasource/DataSource-context.xml]] with [Generic bean: class [oracle.goldengate.datasource.DataSourceConfig]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in class path resource [config/ggue-context.xml]]Sep 16, 2019 10:24:59 AM org.springframework.beans.factory.support.DefaultListableBeanFactory registerBeanDefinition
INFO: Overriding bean definition for bean 'versionInfo' with a different definition: replacing [Generic bean: class [oracle.goldengate.util.VersionInfo]; scope=singleton; abstract=false; lazyInit=false; autowir
eMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in class path resource [oracle/goldengate/datasource/DataSource-context.xml]] with [Generic bean: class [oracle.goldengate.util.VersionInfo]; scope=singleton; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null; defined in class path resource [config/ggue-context.xml]]Sep 16, 2019 10:24:59 AM org.springframework.context.support.AbstractApplicationContext prepareRefresh
INFO: Refreshing org.springframework.context.support.GenericApplicationContext@501edcf1: startup date [Mon Sep 16 10:24:59 CST 2019]; root of context hierarchyOracle GoldenGate for Big Data, 12.3.2.1.1.005
Copyright (c) 2007, 2018. Oracle and/or its affiliates. All rights reserved
Built with Java 1.8.0_161  (class version: 52.0)
SOURCEDEFS ./dirdef/init_scott.def
EXTFILE ./dirdat/ed
reportcount every 1 minutes, rate
grouptransops 10000
map scott.SCEMP,target SCOTT.SCEMP;
map scott.SCDEPT,target SCOTT.SCDEPT;2019-09-16 10:25:01  INFO    OGG-01815  Virtual Memory Facilities for: COManon alloc: mmap(MAP_ANON)  anon free: munmapfile alloc: mmap(MAP_SHARED)  file free: munmaptarget directories:/hadoop/ogg12/dirtmp.Database Version:Database Language and Character Set:2019-09-16 10:25:01  INFO    OGG-02243  Opened trail file /hadoop/ogg12/dirdat/ed000000 at 2019-09-16 10:25:01.285030.2019-09-16 10:25:01  INFO    OGG-03506  The source database character set, as determined from the trail file, is UTF-8.***********************************************************************
**                     Run Time Messages                             **
***********************************************************************2019-09-16 10:25:01  INFO    OGG-02243  Opened trail file /hadoop/ogg12/dirdat/ed000000 at 2019-09-16 10:25:01.303836.2019-09-16 10:25:01  WARNING OGG-02761  Source definitions file, ./dirdef/init_scott.def, is ignored because trail file /hadoop/ogg12/dirdat/ed000000 contains table definitions.2019-09-16 10:25:01  INFO    OGG-06505  MAP resolved (entry scott.SCEMP): map "SCOTT"."SCEMP",target SCOTT.SCEMP.2019-09-16 10:25:01  INFO    OGG-02756  The definition for table SCOTT.SCEMP is obtained from the trail file.2019-09-16 10:25:01  INFO    OGG-06511  Using following columns in default map by name: EMPNO, ENAME, JOB, MGR, HIREDATE, SAL, COMM, DEPTNO.2019-09-16 10:25:01  INFO    OGG-06510  Using the following key columns for target table SCOTT.SCEMP: EMPNO.

查看两个topic消费情况:

[root@hadoop kafka]# ./console.sh 
input topic:SCEMP
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-16 10:17:08.006650","current_ts":"2019-09-16T10:25:01.379000","pos":"00000000000000002199","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"EMPNO":1232,"ENAME":"
FORD","JOB":"ANALYST","MGR":7566,"HIREDATE":"1981-12-03 00:00:00","SAL":3000.00,"COMM":null,"DEPTNO":20}}{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-16 10:17:08.006650","current_ts":"2019-09-16T10:25:21.767000","pos":"00000000000000002396","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"EMPNO":1222,"ENAME":"
FORD","JOB":"ANALYST","MGR":7566,"HIREDATE":"1981-12-03 00:00:00","SAL":3000.00,"COMM":null,"DEPTNO":20}}{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-16 10:17:08.006650","current_ts":"2019-09-16T10:25:31.787000","pos":"00000000000000002593","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"EMPNO":3211,"ENAME":"
FORD","JOB":"ANALYST","MGR":7566,"HIREDATE":"1981-12-03 00:00:00","SAL":3000.00,"COMM":null,"DEPTNO":20}}{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-16 10:17:08.006650","current_ts":"2019-09-16T10:25:41.803000","pos":"00000000000000002790","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"EMPNO":7369,"ENAME":"
er","JOB":"CLERK","MGR":7902,"HIREDATE":"1980-12-17 00:00:00","SAL":800.00,"COMM":null,"DEPTNO":20}}{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-16 10:17:08.006650","current_ts":"2019-09-16T10:25:51.814000","pos":"00000000000000002983","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"EMPNO":7499,"ENAME":"
ALLEN","JOB":"SALESMAN","MGR":7698,"HIREDATE":"1981-02-20 00:00:00","SAL":1600.00,"COMM":300.00,"DEPTNO":30}}{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-16 10:17:08.006650","current_ts":"2019-09-16T10:26:01.831000","pos":"00000000000000003182","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"EMPNO":7521,"ENAME":"
WARD","JOB":"SALESMAN","MGR":7698,"HIREDATE":"1981-02-22 00:00:00","SAL":1250.00,"COMM":500.00,"DEPTNO":30}}{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-16 10:17:08.006650","current_ts":"2019-09-16T10:26:11.847000","pos":"00000000000000003380","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"EMPNO":7566,"ENAME":"
JONES","JOB":"MANAGER","MGR":7839,"HIREDATE":"1981-04-02 00:00:00","SAL":2975.00,"COMM":null,"DEPTNO":20}}{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-16 10:17:08.006650","current_ts":"2019-09-16T10:26:21.864000","pos":"00000000000000003578","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"EMPNO":7654,"ENAME":"
MARTIN","JOB":"SALESMAN","MGR":7698,"HIREDATE":"1981-09-28 00:00:00","SAL":1250.00,"COMM":1400.00,"DEPTNO":30}}{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-16 10:17:08.006650","current_ts":"2019-09-16T10:26:31.881000","pos":"00000000000000003778","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"EMPNO":7698,"ENAME":"
BLAKE","JOB":"MANAGER","MGR":7839,"HIREDATE":"1981-05-01 00:00:00","SAL":2850.00,"COMM":null,"DEPTNO":30}}{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-16 10:17:08.006650","current_ts":"2019-09-16T10:26:41.898000","pos":"00000000000000003976","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"EMPNO":7782,"ENAME":"
CLARK","JOB":"MANAGER","MGR":7839,"HIREDATE":"1981-06-09 00:00:00","SAL":2450.00,"COMM":null,"DEPTNO":10}}{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-16 10:17:08.006650","current_ts":"2019-09-16T10:26:51.907000","pos":"00000000000000004174","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"EMPNO":7839,"ENAME":"
KING","JOB":"PRESIDENT","MGR":null,"HIREDATE":"1981-11-17 00:00:00","SAL":5000.00,"COMM":null,"DEPTNO":10}}{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-16 10:17:08.006650","current_ts":"2019-09-16T10:27:01.920000","pos":"00000000000000004373","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"EMPNO":7844,"ENAME":"
TURNER","JOB":"SALESMAN","MGR":7698,"HIREDATE":"1981-09-08 00:00:00","SAL":1500.00,"COMM":0,"DEPTNO":30}}{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-16 10:17:08.006650","current_ts":"2019-09-16T10:27:11.932000","pos":"00000000000000004573","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"EMPNO":7876,"ENAME":"
ADAMS","JOB":"CLERK","MGR":7788,"HIREDATE":"1987-05-23 00:00:00","SAL":1100.00,"COMM":null,"DEPTNO":20}}{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-16 10:17:08.006650","current_ts":"2019-09-16T10:27:21.938000","pos":"00000000000000004769","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"EMPNO":7900,"ENAME":"
JAMES","JOB":"CLERK","MGR":7698,"HIREDATE":"1981-12-03 00:00:00","SAL":950.00,"COMM":null,"DEPTNO":30}}{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-16 10:17:08.006650","current_ts":"2019-09-16T10:27:31.948000","pos":"00000000000000004965","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"EMPNO":7902,"ENAME":"
FORD","JOB":"ANALYST","MGR":7566,"HIREDATE":"1981-12-03 00:00:00","SAL":3000.00,"COMM":null,"DEPTNO":20}}[root@hadoop kafka]# ./console.sh 
input topic:SCDEPT
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{"table":"SCOTT.SCDEPT","op_type":"I","op_ts":"2019-09-16 10:17:08.011746","current_ts":"2019-09-16T10:27:51.977000","pos":"00000000000000005753","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"DEPTNO":10,"DNAME":"
ACCOUNTING","LOC":"NEW YORK","TESS":null}}{"table":"SCOTT.SCDEPT","op_type":"I","op_ts":"2019-09-16 10:17:08.011746","current_ts":"2019-09-16T10:28:12.000000","pos":"00000000000000005884","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"DEPTNO":20,"DNAME":"
RESEARCH","LOC":"DALLAS","TESS":null}}{"table":"SCOTT.SCDEPT","op_type":"I","op_ts":"2019-09-16 10:17:08.011746","current_ts":"2019-09-16T10:28:22.011000","pos":"00000000000000006011","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"DEPTNO":30,"DNAME":"
SALES","LOC":"CHICAGO","TESS":null}}{"table":"SCOTT.SCDEPT","op_type":"I","op_ts":"2019-09-16 10:17:08.011746","current_ts":"2019-09-16T10:28:32.027000","pos":"00000000000000006136","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"DEPTNO":40,"DNAME":"
OPERATIONS","LOC":"BOSTON","TESS":null}}

SCDEP ,SCEMP表已经 初始化数据过来了。
接下来启动应用进程增量同步数据:

GGSCI (hadoop) 2> start rkafkaSending START request to MANAGER ...
REPLICAT RKAFKA startingGGSCI (hadoop) 3> info allProgram     Status      Group       Lag at Chkpt  Time Since ChkptMANAGER     RUNNING                                           
REPLICAT    STOPPED     RHDFS       00:00:00      617:28:44   
REPLICAT    STARTING    RKAFKA      00:00:00      00:32:14    GGSCI (hadoop) 8> !
info allProgram     Status      Group       Lag at Chkpt  Time Since ChkptMANAGER     RUNNING                                           
REPLICAT    STOPPED     RHDFS       00:00:00      617:28:58   
REPLICAT    RUNNING     RKAFKA      00:00:00      00:00:02 

4、验证增量同步

a、源端做insert操作:

insert into scemp (EMPNO, ENAME, JOB, MGR, HIREDATE, SAL, COMM, DEPTNO)
values (9999, 'zhaoyd', 'kfc', 6666, sysdate, 100.00, 300.00, 30);
insert into scdept(deptno,dname,loc)values(99,'kfc','bj');

去kafka看结果:

----SCEMP消息:
{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-18 09:29:42.000388","current_ts":"2019-09-18T09:30:57.750000","pos":"00000000230000057892","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"EMPNO":9999,"ENAME":"zhaoyd","JOB":"kfc","MGR":6666,"HIREDATE":"2019-09-18 09:29:40","SAL":100.00,"COMM":300.00,"DEPTNO":30}}
----SCDEPT消息:
{"table":"SCOTT.SCDEPT","op_type":"I","op_ts":"2019-09-18 09:29:42.000388","current_ts":"2019-09-18T09:31:07.764000","pos":"00000000230000058138","tokens":{"TKN-OP-TYPE":"INSERT"},"after":{"DEPTNO":99,"DNAME":"kfc","LOC":"bj","TESS":null}}

可以看到insert都正常同步过来了。

b、源端做普通update操作:

update scemp set  ename='zyd' where empno=9999;--修改带附加日志的字段
update scemp set  mgr=654 where empno=9999;--修改没加附加日志的字段
update scdept set dname='beij' where deptno=99;--这个表全列附加日志

去kafka看结果:

----SCEMP
{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-18 09:32:59.000686","current_ts":"2019-09-18T09:33:53.994000","pos":"00000000230000058486","tokens":{"TKN-OP-TYPE":"SQL COMPUPDATE"},"after":{"EMPNO":9999,"ENAME":"zyd","JOB":"kfc","SAL":100.00,"DEPTNO":30}}
{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-18 09:32:59.000686","current_ts":"2019-09-18T09:34:04.009000","pos":"00000000230000058850","tokens":{"TKN-OP-TYPE":"SQL COMPUPDATE"},"after":{"EMPNO":9999,"ENAME":"zyd","JOB":"kfc","MGR":654,"SAL":100.00,"DEPTNO":30}}----SCDEPT
{"table":"SCOTT.SCDEPT","op_type":"I","op_ts":"2019-09-18 09:32:59.000686","current_ts":"2019-09-18T09:34:14.021000","pos":"00000000230000059193","tokens":{"TKN-OP-TYPE":"SQL COMPUPDATE"},"after":{"DEPTNO":99,"
DNAME":"beij","LOC":"bj","TESS":null}}

从第一条update结果看,所有添加了附加日志的列及最新值都过来了,第二条结果发现SCEMP表在做了update mgr字段时候,除了其余所有加了附加日志的字段值都跟着过来了,mgr最新值也过来了,现在的json内容是:
主键+附加日志字段+被修改字段,能够满足flink极为方便的获取每个需要计算指标的最新值。

c、源端做pkupdate操作:


update scemp set  empno=3333, ename='zzd'  where empno=9999;update scemp set  empno=9999, mgr=6543  where empno=3333;update scemp set  empno=3333   where empno=9999;update scdept set deptno=33,dname='zd' where deptno=99;

去看kafka消息:

----SCEMP
{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-18 09:40:33.000792","current_ts":"2019-09-18T09:40:58.666000","pos":"00000000230000059547","tokens":{"TKN-OP-TYPE":"PK UPDATE"},"after":{"EMPNO":3333,"ENAME":"zzd","JOB":"kfc","SAL":100.00,"DEPTNO":30}}
{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-18 09:40:33.000792","current_ts":"2019-09-18T09:41:08.681000","pos":"00000000230000059547","tokens":{"TKN-OP-TYPE":"PK UPDATE"},"after":{"EMPNO":9999,"ENAME":null,"JOB":null,"SAL":null,"DEPTNO":null}}{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-18 09:53:59.000496","current_ts":"2019-09-18T09:54:07.801000","pos":"00000000230000060265","tokens":{"TKN-OP-TYPE":"PK UPDATE"},"after":{"EMPNO":9999,"ENAME":"zzd","JOB":"kfc","MGR":6543,"SAL":100.00,"DEPTNO":30}}
{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-18 09:53:59.000496","current_ts":"2019-09-18T09:54:17.820000","pos":"00000000230000060265","tokens":{"TKN-OP-TYPE":"PK UPDATE"},"after":{"EMPNO":3333,"ENAME":null,"JOB":null,"SAL":null,"DEPTNO":null}}{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-18 09:54:44.000419","current_ts":"2019-09-18T09:54:54.876000","pos":"00000000230000060664","tokens":{"TKN-OP-TYPE":"PK UPDATE"},"after":{"EMPNO":3333,"ENAME":"zzd","JOB":"kfc","SAL":100.00,"DEPTNO":30}}
{"table":"SCOTT.SCEMP","op_type":"I","op_ts":"2019-09-18 09:54:44.000419","current_ts":"2019-09-18T09:55:04.882000","pos":"00000000230000060664","tokens":{"TKN-OP-TYPE":"PK UPDATE"},"after":{"EMPNO":9999,"ENAME":null,"JOB":null,"SAL":null,"DEPTNO":null}}
----SCDEPT
{"table":"SCOTT.SCDEPT","op_type":"I","op_ts":"2019-09-18 09:40:33.000792","current_ts":"2019-09-18T09:41:18.697000","pos":"00000000230000059888","tokens":{"TKN-OP-TYPE":"PK UPDATE"},"after":{"DEPTNO":33,"DNAME":"zd","LOC":"bj","TESS":null}}
{"table":"SCOTT.SCDEPT","op_type":"I","op_ts":"2019-09-18 09:40:33.000792","current_ts":"2019-09-18T09:41:28.704000","pos":"00000000230000059888","tokens":{"TKN-OP-TYPE":"PK UPDATE"},"after":{"DEPTNO":99,"DNAME":null,"LOC":null,"TESS":null}}

从上面结果看到,现在pkupdate操作被分成了两个json,旧的主键对应的j’son中需要计算的指标值都是空,而新的主键对应的json中需要计算的指标都是各指标最新的值,能够满足flink在发生pkupdate时候计算不会出错。

这篇关于【大数据开发运维解决方案】Ogg For Bigdata 同步Oracle数据到KAFKA(包括初始化历史数据)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/612217

相关文章

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

这15个Vue指令,让你的项目开发爽到爆

1. V-Hotkey 仓库地址: github.com/Dafrok/v-ho… Demo: 戳这里 https://dafrok.github.io/v-hotkey 安装: npm install --save v-hotkey 这个指令可以给组件绑定一个或多个快捷键。你想要通过按下 Escape 键后隐藏某个组件,按住 Control 和回车键再显示它吗?小菜一碟: <template

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd