一.oracle goldengate技术架构
Oracle GoldenGate 实现原理是通过抽取源端的redo log 或者 archive log ,然后通过TCP/IP投递到目标端,最后解析还原应用到目标端,使目标端实现 同源端数据同步。图1-1 是Oracle GoldenGate 的技术架构。
@H_301_16@
账号:192.168.3.207:root/root 路径:/usr/local/soft/ggs_source
# cd /usr/local/soft/ggs_source # ./ggsci
# dblogin sourcedb test@localhost:3306,userid root,password root
# edit param mgr port 17809 dynamicportlist 17800-18000 purgeoldextracts ./dirdat/*,usecheckpoints,minkeepdays 7
# start mgr 查看启动进程 # info all
# edit param ext_wpkg extract ext_wpkg setenv (MysqL_HOME="/data/MysqL/data") tranlogoptions altlogdest /data/MysqL/data/MysqL-bin.index dboptions host localhost,connectionport 3306 sourcedb test,password root exttrail /usr/local/soft/ggs_source/dirdat/W3 dynamicresolution gettruncates GETUPDATEBEFORES NOCOMPRESSDELETES NOCOMPRESSUPDATES table test.wms_test,TOKENS(TK-MY=@GETENV ('GGENVIRONMENT','OSUSERNAME'));
table test.wms_entry_warehouse_package;
# ADD EXTRACT ext_wpkg,tranlog,begin now # ADDEXTTRAIL /usr/local/soft/ggs_source/dirdat/W3,EXTRACT ext_wpkg
# edit param pum_wpkg extract pum_wpkg rmthost 192.168.3.65,mgrport 17809 rmttrail /usr/local/ogg/ogg/dirdat/WC passthru gettruncates table test.wms_test;
# ADD EXTRACT pum_wcmd,EXTTRAILSOURCE /usr/local/soft/ggs_source/dirdat/W3; # ADDRMTTRAIL /usr/local/ogg/ogg/dirdat/WC,EXTRACT pum_wpkg
# edit param defgen_wpkg defsfile /usr/local/soft/ggs_source/dirdef/defgen_wpkg.prm sourcedb test@localhost:3306,password root table test.wms_entry_warehouse_wpkg;
# ./defgen paramfile/usr/local/soft/ggs_source/dirprm/defgen_wpkg.prm
# start ext_wpkg # start pum_wpkg # info all
账号:182.168.3.65:ogg/ogg 路径:/usr/local/ogg/ogg
# vi ~/.bashrc
定位../dirprm # vi kafka.props
gg.handlerlist = kafkahandler gg.handler.kafkahandler.type = kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties gg.handler.kafkahandler.TopicName =wms_spark #gg.handler.kafkahandler.format =avro_op #gg.handler.kafkahandler.format=delimitedtext #json gg.handler.kafkahandler.format = json gg.handler.kafkahandler.format.insertOpKey = I gg.handler.kafkahandler.format.updateOpKey = U gg.handler.kafkahandler.format.deleteOpKey = D gg.handler.kafkahandler.format.prettyPrint = true gg.handler.kafkahandler.format.jsonDelimiter = CDATA[] #gg.handler.kafkahandler.format.generateSchema = true gg.handler.kafkahandler.format.schemaDirectory = dirdef #gg.handler.kafkahandler.format.treatAllColumnsAsString = true gg.handler.kafkahandler.format.includePrimaryKeys = true #gg.handler.kafkahandler.format.includeColumnNames=true #gg.handler.kafkahandler.format.fieldDelimiter=♫ gg.handler.kafkahandler.SchemaTopicName=wms_spark_test gg.handler.kafkahandler.BlockingSend =false gg.handler.kafkahandler.includeTokens=true gg.handler.kafka.topicPartitioning=table gg.handler.kafkahandler.mode =op #gg.handler.kafkahandler.mode =tx #gg.handler.kafkahandler.maxGroupSize =100,1Mb #gg.handler.kafkahandler.minGroupSize =50,500Kb goldengate.userexit.timestamp=utc goldengate.userexit.writers=javawriter javawriter.stats.display=TRUE javawriter.stats.full=TRUE gg.log=log4j gg.log.level=INFO gg.report.time=30sec #Sample gg.classpath for Apache Kafka gg.classpath=dirprm/:/usr/local/hadoop/kafka_2.11-0.10.1.0/libs/*:/usr/local/ogg/ogg:/usr/local/ogg/ogg/lib/* #Sample gg.classpath for HDP #gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/* javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
# vi custom_kafka_producer.properties
bootstrap.servers=hadoop-test-01:9092,hadoop-test-02:9092,hadoop-test-03:9092 acks=1 compression.type=gzip reconnect.backoff.ms=1000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer # 100KB per partition batch.size=102400 linger.ms=10000
# ./ggsci
# edit param mgr PORT 17809 DYNAMICPORTLIST 17810-17909 AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 PURGEOLDEXTRACTS /usr/local/ogg/ogg/dirdat/*,minkeepdays 3
# start mgr
# edit param rep_wpkg REPLICAT rep_wpkg sourcedefs /usr/local/ogg/ogg/dirdef/defgen_wpkg.prm TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka_wms.props REPORTCOUNT EVERY 1 MINUTES,RATE GROUPTRANSOPS 10000 #getUpdateBefores MAP test.wms_test,TARGET test.wms_test,keycols(ID),colMap(USEDEFAULTS,CREATEDBY=UPDATEDBY);
# add replicat rep_wpkg,exttrail ./dirdat/WC
# start rep_wpkg # info all
{ "table":"test.wms_test","op_type":"U","op_ts":"2017-08-17 11:04:52.602743","current_ts":"2017-08-17T19:04:57.208000","pos":"00000000070000013868","primary_keys":[ "ID" ],"tokens":{ "TK-MY":"root" },"before":{ "ID":1,... },"after":{ "ID":1,... } }