前沿拓展:
目的:测试使用OGG将数据单向同步到Kafka上。
简要说明:Kafka使用单节点单Broker部署;单独部署简单ZooKeeper;需要使用到JAVA1.8;OGG需要2个版本,一个for oracle版本,一个Big Data。
1 环境说明
oracle
11.2.0.4
zookeeper
apache-zookeeper-3.6.2-bin.tar.gz
kafka
kafka_2.13-2.7.0.tgz
ogg for bigdata
OGG_BigData_Linux_x64_12.3.2.1.1.zip
ogg for oracle
fbo_ggs_Linux_x64_shiphome.zip
jdk
jdk-8u181-linux-x64.tar.gz
CentOS
CentOS-6.9-x86_64-bin-**D1
2 Java 环境安装
[root@test01 ~]# tar -zxvf jdk-8u181-linux-x64.tar.gz -C /usr/local/
[root@test01 ~]# cd /usr/local/jdk1.8.0_181/bin/
[root@test01 bin]# ./java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
添加环境变量:(在文件最后添加)
vi /etc/profile
export JAVA_HOME=/usr/local/jdk1.8.0_181
export PATH=$JAVA_HOME/bin:$PATH
3 ZOOKEEPER安装3.1 ZooKeeper 简单安装
[root@test01 ~] tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz -c /usr/local
[root@test01 ~] mv /usr/local/apache-zookeeper-3.6.2-bin /usr/local/apache-zookeeper-3.6.2
3.2 编辑参数文件zoo.cfg
[root@test01 ~] cd /usr/local/apache-zookeeper-3.6.2
进入Zookeeper的config目录下
拷贝zoo_sample.cfg文件重命名为zoo.cfg,第二修改dataDir属性,其他参数保持不变
[root@test01 conf]# cp zoo_sample.cfg zoo.cfg
[root@test01 conf]# vi zoo.cfg
# 数据的存放目录
dataDir=/usr/local/apache-zookeeper-3.6.2/zkdata
# 端口,默认就是2181
clientPort=2181
3.3 环境变量配置
[root@test01 ~] vi /etc/profile –在末尾添加
export ZOOKEEPER_HOME=/usr/local/apache-zookeeper-3.6.2
export JAVA_HOME=/usr/local/jdk1.8.0_181
export PATH=$ZOOKEEPER_HOME/bin:$JAVA_HOME/bin:$PATH
3.4 启动和停止zookeeper
[root@test01 bin]# cd /usr/local/apache-zookeeper-3.6.2/bin
[root@test01 bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/apache-zookeeper-3.6.2/bin/../conf/zoo.cfg
Starting zookeeper … STARTED
$ [root@test01 bin]# zkServer.sh stop
3.5 JPS检查进程
[root@test01 bin]# jps
1971 QuorumPeerMain
5645 Jps
4 Kafka安装4.1 Kafka 单节点部署
[root@test01 ~]# tar -zxvf kafka_2.13-2.7.0.tgz -C /usr/local/
4.2 配置Kafka参数
进入kafka的config目录下,有一个server.properties,添加如下配置
[root@test01 ~]# cd /usr/local/kafka_2.13-2.7.0/config
[root@test01 config]# vi server.properties
# broker的全局唯一编号,不能重复
broker.id=0
# **
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.141.107:9092
a**ertised.listeners=PLAINTEXT://192.168.141.107:9092
# 日志目录
log.dirs=/usr/local/kafka_2.13-2.7.0/kafka-logs
# 配置zookeeper的连接(如果不是本机,需要改为ip或主机名)
#zookeeper.connect=localhost:2181
zookeeper.connect=192.168.141.107:2181
4.3 添加环境变量
[root@test01 config]#vi /etc/profile
export ZOOKEEPER_HOME=/usr/local/apache-zookeeper-3.6.2
export JAVA_HOME=/usr/local/jdk1.8.0_181
export KAFKA_HOME=/usr/local/kafka_2.13-2.7.0
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:
export PATH=$KAFKA_HOME/bin:$ZOOKEEPER_HOME/bin:$JAVA_HOME/bin:$PATH
4.4 启动和停止Kafka
启动:
[root@test01 bin]# cd /usr/local/kafka_2.13-2.7.0/bin
[root@test01 bin]#./ kafka-server-start.sh $KAFKA_HOME/config/server.properties &
停止:
[root@test01 bin]# ./kafka-server-stop.sh
4.5 JPS进程查看
[root@test01 kafka_2.13-2.7.0]# jps
1971 QuorumPeerMain
2702 Jps
2287 Kafka
4.6 Kafka测试
[root@test01 ~]# cd /usr/local/kafka_2.13-2.7.0/bin
创建topic
[root@test01 bin]#./kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
Created topic test.
查看所有的topic信息
[root@test01 bin]# ./kafka-topics.sh –list –zookeeper localhost:2181
test
查看到返回值’test’ 说明创建成功!
5. OGG源端安装5.1解压安装包
[oracle@oracle01 ~]$unzip fbo_ggs_Linux_x64_shiphome.zip
5.2安装:(图形化安装略)
[oracle@oracle01 ~]$ cd fbo_ggs_Linux_x64_shiphome/Disk1
[oracle@oracle01 Disk1]$./runInstaller
…..此处略……..
使用图形化安装界面安装完成后,目录直接已经生成好了!!
检查安装效果:
[oracle@oracle01 ogg12.2]$ ldd ggsci
linux-vdso.so.1 => (0x00007ffdb9efa000)
librt.so.1 => /lib64/librt.so.1 (0x0000003df0600000)
libdl.so.2 => /lib64/libdl.so.2 (0x0000003defa00000)
libgglog.so => /u01/ogg12.2/./libgglog.so (0x00007f02e8ccc000)
libggrepo.so => /u01/ogg12.2/./libggrepo.so (0x00007f02e8a5a000)
libdb-6.1.so => /u01/ogg12.2/./libdb-6.1.so (0x00007f02e8675000)
libggperf.so => /u01/ogg12.2/./libggperf.so (0x00007f02e8445000)
libggparam.so => /u01/ogg12.2/./libggparam.so (0x00007f02e733b000)
libicui18n.so.48 => /u01/ogg12.2/./libicui18n.so.48 (0x00007f02e6f4b000)
libicuuc.so.48 => /u01/ogg12.2/./libicuuc.so.48 (0x00007f02e6bca000)
libicudata.so.48 => /u01/ogg12.2/./libicudata.so.48 (0x00007f02e5405000)
libpthread.so.0 => /lib64/libpthread.so.0 (0x0000003df0200000)
libxerces-c.so.28 => /u01/ogg12.2/./libxerces-c.so.28 (0x00007f02e4e3e000)
libantlr3c.so => /u01/ogg12.2/./libantlr3c.so (0x00007f02e4c25000)
libnnz11.so => /u01/app/oracle/product/11.2.0/dbhome_1/lib/libnnz11.so (0x00007f02e4857000)
libclntsh.so.11.1 => /u01/app/oracle/product/11.2.0/dbhome_1/lib/libclntsh.so.11.1 (0x00007f02e1ded000)
libggnnzitp.so => /u01/ogg12.2/./libggnnzitp.so (0x00007f02e1696000)
libm.so.6 => /lib64/libm.so.6 (0x0000003df0a00000)
libc.so.6 => /lib64/libc.so.6 (0x0000003defe00000)
/lib64/ld-linux-x86-64.so.2 (0x0000003def600000)
libstdc++.so.6 => /usr/lib64/libstdc++.so.6 (0x0000003df3200000)
libgcc_s.so.1 => /lib64/libgcc_s.so.1 (0x0000003df2e00000)
libnsl.so.1 => /lib64/libnsl.so.1 (0x0000003df2a00000)
libaio.so.1 => /lib64/libaio.so.1 (0x00007f02e1492000)
初始化目录:(图形化安装会生成目录)
GGSCI (oracle01) 2> create SUBDIRS
Creating subdirectories under current directory /u01/ogg12.2
Parameter files /u01/ogg12.2/dirprm: already exists
Report files /u01/ogg12.2/dirrpt: already exists
Checkpoint files /u01/ogg12.2/dirchk: already exists
Process status files /u01/ogg12.2/dirpcs: already exists
SQL script files /u01/ogg12.2/dirsql: already exists
Database definitions files /u01/ogg12.2/dirdef: already exists
Extract data files /u01/ogg12.2/dirdat: already exists
Temporary files /u01/ogg12.2/dirtmp: already exists
Credential store files /u01/ogg12.2/dircrd: already exists
Masterkey wallet files /u01/ogg12.2/dirwlt: already exists
Dump files /u01/ogg12.2/dirdmp: already exists
5.3 检查数据库配置和更改参数5.3.1 源端数据需要处于归档模式,且开启supplemental log和force_logging
SQL> archive log list;
Database log mode Archive Mode
Automatic archival Enabled
Archive destination USE_DB_RECOVERY_FILE_DEST
Oldest online log sequence 8
Next log sequence to archive 10
Current log sequence 10
SQL> select name,supplemental_log_data_min,force_logging from v$database;
NAME SUPPLEMENTAL_LOG_DATA_MI FORCE_LOG
————————— ———————— ———
TDB01 YES YES
如果需要更改,使用下面语句:
alter database archivelog;
Alter database force logging;
alter database add supplemental log data;
ALTER SYSTEM SET ENABLE_GOLDENGATE_REPLICATION = TRUE SCOPE=BOTH;
5.3.2 创建OGG用户和单独表空间
create tablespace ts_ogg datafile '/u01/app/oracle/oradata/tdb01/ts_ogg01.dbf' size 200M AUTOEXTEND on extent management local segment space management auto;
create user ggusr identified by ggusr default tablespace ts_ogg;
为了方便直接给予DBA权限:
grant resource,connect,dba to ggusr;
5.3.3 配置测试用户
alter user scott identified by scott account unlock;
grant select_catalog_role to scott;
在scott用户下创建新的表,做测试表:
create table test_ogg(id int ,name varchar(20),primary key(id));
5.4 配置MGR
GGSCI (oracle01) 3> edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
–AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45
注释:
PORT即mgr的默认**端口;
DYNAMICPORTLIST动态端口列表,当指定的mgr端口不可用时,会在这个端口列表中选择一个,最大指定范围为256个;
AUTORESTART重启参数设置表示重启所有EXTRACT进程,最多5次,每次间隔3分钟;
PURGEOLDEXTRACTS即TRAIL文件的定期清理
5.5 添加**附加日志信息
使用ogg用户ggusr登录
##GGSCI (oracle01) 5> dblogin userid ggusr, password ggusr
Successfully logged into database.
GGSCI (oracle01 as ggusr@tdb01) 6> info trandata scott.test_ogg
Logging of supplemental redo log data is disabled for table SCOTT.TEST_OGG.
GGSCI (oracle01 as ggusr@tdb01) 7> add trandata scott.test_ogg
Logging of supplemental redo data enabled for table SCOTT.TEST_OGG.
TRANDATA for sche**ng columns has been added on table 'SCOTT.TEST_OGG'.
TRANDATA for instantiation CSN has been added on table 'SCOTT.TEST_OGG'.
5.6 配置EXTRACT抽取进程
GGSCI (oracle01) 18> edit param extkafka
extract extkafka
setenv (ORACLE_SID=tdb01)
setenv (ORACLE_HOME=/u01/app/oracle/product/11.2.0/dbhome_1)
Setenv (NLS_LANG="AMERICAN_AMERICA.UTF8")
userid ggusr,password ggusr
exttrail ./dirdat/ka
table scott.test_ogg;
添加进程:
–add extract extkafka,tranlog,begin now
绑定trail文件:
–add exttrail ./dirdat/ka,extract extkafka
5.7 配置PUMP投递进程(EXTRACT进程的另一种用法)
GGSCI (oracle01) 19> edit param pukafka
extract pukafka
setenv (ORACLE_SID=tdb01)
setenv (ORACLE_HOME=/u01/app/oracle/product/11.2.0/dbhome_1)
passthru
dynamicresolution
userid ggusr,password ggusr
rmthost 192.168.141.107,mgrport 7809
rmttrail ./dirdat/pa
table scott.test_ogg;
注释:
第一行指定extract进程名称;
passthru即禁止OGG与Oracle交互,我们这里使用pump逻辑传输,故禁止即可;
dynamicresolution动态解析;
userid ogg,password ogg即OGG连接Oracle数据库的帐号密码
rmthost和mgrhost即目标端(kafka)OGG的mgr服务的地址以及**端口;
rmttrail即目标端trail文件存储位置以及名称
将本地文件和目标端文件绑定到PUMP进程中:
add extract pukafka,exttrailsource ./dirdat/ka
add rmttrail ./dirdat/pa,extract pukafka
5.8 配置define文件用来定义表之间的关系映射
oracle与MySQL,Hadoop集群(HDFS,Hive,kafka等)等之间数据传输可以定义为异构数据类型的传输,需要定义表之间的关系映射
GGSCI (oracle01) 1> edit param test_ogg
defsfile /u01/ogg12.2/dirdef/scott.test_ogg
userid ggusr,password ggusr
table scott.test_ogg;
#GGSCI (oracle01) 3> view param test_ogg
defsfile /u01/ogg12.2/dirdef/scott.test_ogg
userid ggusr,password ggusr
table scott.test_ogg;
在OGG主目录下执行(oracle用户):
./defgen paramfile dirprm/test_ogg.prm
—执行后会在/u01/ogg12.2/dirdef目录下生成相关文件scott.test_ogg
[oracle@oracle01 ogg12.2]$ ./defgen paramfile dirprm/test_ogg.prm
***********************************************************************
Oracle GoldenGate Table Definition Generator for Oracle
Version 12.2.0.1.1 OGGCORE_12.2.0.1.0_PLATFORMS_151211.1401
Linux, x64, 64bit (optimized), Oracle 11g on Dec 11 2015 21:37:21
Copyright (C) 1995, 2015, Oracle and/or its affiliates. All rights reserved.
Starting at 2021-03-10 11:02:19
***********************************************************************
Operating System Version:
Linux
Version #1 **P Tue May 10 17:27:01 UTC 2016, Release 2.6.32-642.el6.x86_64
Node: oracle01
Machine: x86_64
soft limit hard limit
Address Space Size : unlimited unlimited
Heap Size : unlimited unlimited
File Size : unlimited unlimited
CPU Time : unlimited unlimited
Process id: 2602
**********************************************************************
** Running with the following parameters **
***********************************************************************
defsfile /u01/ogg12.2/dirdef/scott.test_ogg
userid ggusr,password ***
table scott.test_ogg;
Retrieving definition for SCOTT.TEST_OGG.
Definitions generated for 1 table in /u01/ogg12.2/dirdef/scott.test_ogg.
将生成的文件/u01/ogg12.2/dirdef/scott.test_ogg 拷贝到目标端ogg目录下的dirdef目录中!!!!!
scp /u01/ogg12.2/dirdef/scott.test_ogg root@192.168.141.107:/u01/ogg12.3/dirdef
6.O GG目标端(kafka)安装6.1 确认zookeeper服务,kafka服务开启
[root@test01 dirdef]# jps
3760 Jps
1971 QuorumPeerMain
2287 Kafka
6.2 配置MGR
GGSCI (test01) 3> edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
–AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 —暂时不要
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45
6.3 配置checkpoint
checkpoint即**可追溯的一个偏移量记录,在全局配置里添加checkpoint表即可。
GGSCI (test01) 14>edit param ./GLOBALS
CHECKPOINTTABLE ggusr.checkpoint
6.4 配置replicate进程
GGSCI (test01) 6> EDIT PARAMS rekafka
replicat rekafka
sourcedefs /u01/ogg12.3/dirdef/scott.test_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
map scott.test_ogg,target scott.test_ogg;
注释:
REPLICATE rekafka定义rep进程名称;
sourcedefs 即是从源服务器上**过来的表映射文件;
TARGETDB LIBFILE即定义kafka一些适配性的库文件以及配置文件,配置文件位于OGG主目录下的dirprm/kafka.props;
REPORTCOUNT即**任务的报告生成频率;
GROUPTRANSOPS为以事务传输时,事务合并的单位,减少IO**作;
MAP即源端与目标端的映射关系
6.5 配置文件kafka.props
注意:配置时需要将注释删除不然会报错!!!
root@test01 ~]# cd /u01/ogg12.3/dirprm 编辑kafka.props
[root@test01 dirprm]# vi kafka.props
gg.handlerlist=kafkahandler //handler类型
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相关配置
gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名称,无需手动创建
gg.handler.kafkahandler.format=json //传输文件的格式,支持json,xml等
gg.handler.kafkahandler.mode=op //OGG for Big Data中传输模式,即op为一次SQL传输一次,tx为一次事务传输一次
##gg.classpath=dirprm/:/u01/ogg12.3/:/u01/ogg12.3/lib/*
一定要有kafka安装的库文件,不然会一直报错
gg.classpath=dirprm/:/u01/ogg12.3/*:/u01/ogg12.3/lib/*:/u01/ogg12.3/ggjava/resources/lib/*:/usr/local/kafka_2.13-2.7.0/libs/*
6.6 编辑文件custom_kafka_producer.properties
注意:配置时需要将注释删除
[root@test01 dirprm]#vi custom_kafka_producer.properties
bootstrap.servers=192.168.141.107:9092 //kafkabroker的地址
acks=1
compression.type=gzip //压缩类型
reconnect.backoff.ms=1000 //重连延时
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000
6.7 添加trail文件到replicate进程
GGSCI (test01) 14>add replicat rekafka exttrail ./dirdat/pa,checkpointtable ggusr.checkpoint
7. 测试7.1 启动相关进程并检查
源端启动mgr ,extkafka, pukafka
Start mgr
Start extkafka
Start pukafka
GGSCI (oracle01) 1> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING EXTKAFKA 00:00:00 00:00:06
EXTRACT RUNNING PUKAFKA 00:00:00 00:00:00
目标端启动mgr rekafka
Start mgr
Start rekafka
GGSCI (test01) 1> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT RUNNING REKAFKA 00:00:00 00:00:07
7.2 数据插入,更改,和删除测试
在源端数据上,使用scott用户对表test_ogg做insert,update,delete**作
insert into test_ogg values(1,'test');
commit;
update test_ogg set name='zhangsan' where id=1;
commit;
delete from test_ogg where id=1;
Commit;
目标端查看topics是否创建:
./kafka-topics.sh –list –zookeeper 192.168.141.107:2181
[root@test01 bin]# ./kafka-topics.sh –list –zookeeper 192.168.141.107:2181
test_ogg —文件中定义的名字, 出现说明同步正常
通过消费者查看数据是否同步:
启动kafka消费者,会在前台挂起一个实时进程,第二在源端对表进程**作,会实时显示在消费者端!!!
[root@test01 bin]# ./kafka-console-consumer.sh –bootstrap-server 192.168.141.107:9092 –topic test_ogg –from-beginning
[root@test01 bin]# ./kafka-console-consumer.sh –bootstrap-server 192.168.141.107:9092 –topic test_ogg –from-beginning
{"table":"SCOTT.TEST_OGG","op_type":"I","op_ts":"2021-03-10 12:27:13.417910","current_ts":"2021-03-10T12:27:20.018000","pos":"00000000000000001817","after":{"ID":1,"NAME":"test"}}
{"table":"SCOTT.TEST_OGG","op_type":"U","op_ts":"2021-03-10 12:32:36.417849","current_ts":"2021-03-10T12:32:43.324000","pos":"00000000000000001945","before":{"ID":1,"NAME":"test"},"after":{"ID":1,"NAME":"zhangsan"}}
{"table":"SCOTT.TEST_OGG","op_type":"D","op_ts":"2021-03-10 15:19:28.414300","current_ts":"2021-03-10T15:19:35.464000","pos":"00000000000000002098","before":{"ID":1,"NAME":"zhangsan"}}
正常同步到Kafka,格式为json,
其中op_type代表**作类型,可配置或者使用默认的配置:
gg.handler.kafkahandler.format.insertOpKey = I
gg.handler.kafkahandler.format.updateOpKey = U
gg.handler.kafkahandler.format.deleteOpKey = D
before代表**作之前的数据,after代表**作后的数据!!
拓展知识:
原创文章,作者:九贤生活小编,如若转载,请注明出处:http://www.wangguangwei.com/66309.html