基于OGG Datahub插件将Oracle数据同步上云

前端之家收集整理的这篇文章主要介绍了基于OGG Datahub插件将Oracle数据同步上云前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

一、背景介绍

随着数据规模的不断扩大,传统的RDBMS难以满足OLAP的需求,

OGG(Oracle GoldenGate)是一个基于日志的结构化数据备份工具,一般用于Oracle数据库之间的主从备份以及Oracle数据库到其他数据库(DB2,MysqL等)的同步。下面是Oracle官方提供的一个OGG的整体架构图,从图中可以看出OGG的部署分为源端和目标端两部分组成,主要有Manager,Extract,Pump,Collector,Replicat这么一些组件。

  • Manager:在源端和目标端都会有且只有一个Manager进程存在,负责管理其他进程的启停和监控等;
  • Extract:负责从源端数据库表或者事务日志中捕获数据,有初始加载和增量同步两种模式可以配置,初始加载模式是直接将源表数据同步到目标端,而增量同步就是分析源端数据库的日志,将变动的记录传到目标端,本文介绍的是增量同步的模式;
  • Pump:Extract从源端抽取的数据会先写到本地磁盘的Trail文件,Pump进程会负责将Trail文件的数据投递到目标端;
  • Collector:目标端负责接收来自源端的数据,生成Trail文件
  • Replicat:负责读取目标端的Trail文件,转化为相应的DDL和DML语句作用到目标数据库,实现数据同步。
  • @H_403_21@

    本文介绍的Oracle数据同步是通过OGG的Datahub插件实现的,该Datahub插件在架构图中处于Replicat的位置,会分析Trail文件,将数据的变化记录写入Datahub中,可以使用流计算对datahub中的数据进行实时分析,也可以将数据归档到MaxCompute中进行离线处理。

    二、安装步骤

    0. 环境要求

    • 源端已安装好Oracle
    • 源端已安装好OGG(建议版本Oracle GoldenGate V12.1.2.1)
    • 目标端已安装好OGG Adapters(建议版本Oracle GoldenGate Application Adapters
      12.1.2.1)
    • java 7
    • @H_403_21@

      (下面将介绍Oracle/OGG相关安装和配置过程,Oracle的安装将不做介绍,另外需要注意的是:Oracle/OGG相关参数配置以熟悉Oracle/OGG的运维人员配置为准,本示例只是提供一个可运行的样本,Oracle所使用的版本为ORA11g)

      1. 源端OGG安装
      下载OGG安装包解压后有如下目录:

      drwxr-xr-x install
      drwxrwxr-x response
      -rwxr-xr-x runInstaller
      drwxr-xr-x stage

      目前oracle一般采取response安装的方式,在response/oggcore.rsp中配置安装依赖,具体如下:

      oracle.install.responseFileVersion=/oracle/install/rspfmt_ogginstall_response_schema_v12_1_2
      # 需要目前与oracle版本对应
      INSTALL_OPTION=ORA11g
      # goldegate主目录
      SOFTWARE_LOCATION=/home/oracle/u01/ggate
      # 初始不启动manager
      START_MANAGER=false
      # manger端口
      MANAGER_PORT=7839
      # 对应oracle的主目录
      DATABASE_LOCATION=/home/oracle/u01/app/oracle/product/11.2.0/dbhome_1
      # 暂可不配置
      INVENTORY_LOCATION=
      # 分组(目前暂时将oracle和ogg用同一个账号ogg_test,实际可以给ogg单独账号)
      UNIX_GROUP_NAME=oinstall

      执行命令:

      runInstaller -silent -responseFile {YOUR_OGG_INSTALL_FILE_PATH}/response/oggcore.rsp

      本示例中,安装后OGG的目录在/home/oracle/u01/ggate,安装日志在/home/oracle/u01/ggate/cfgtoollogs/oui目录下,当silentInstall{时间}.log文件里出现如下提示,表明安装成功:

      The installation of Oracle GoldenGate Core was successful.

      执行/home/oracle/u01/ggate/ggsci命令,并在提示符下键入命令:CREATE SUBDIRS,从而生成ogg需要的各种目录(dir打头的那些)。
      至此,源端OGG安装完成。

      2. 源端Oracle配置
      以dba分身进入sqlplus:sqlplus / as sysdba

      # 创建独立的表空间
      create tablespace ATMV datafile '/home/oracle/u01/app/oracle/oradata/uprr/ATMV.dbf' size 100m autoextend on next 50m maxsize unlimited;
      
      # 创建ogg_test用户,密码也为ogg_test
      create user ogg_test identified by ogg_test default tablespace ATMV;
      
      # 给ogg_test赋予充分的权限
      grant connect,resource,dba to ogg_test;
      
      # 检查附加日志情况
      Select SUPPLEMENTAL_LOG_DATA_MIN,SUPPLEMENTAL_LOG_DATA_PK,SUPPLEMENTAL_LOG_DATA_UI,SUPPLEMENTAL_LOG_DATA_FK,SUPPLEMENTAL_LOG_DATA_ALL from v$database;
      
      # 增加数据库附加日志及回退
      alter database add supplemental log data;
      alter database add supplemental log data (primary key,unique,foreign key) columns;
      # rollback
      alter database drop supplemental log data (primary key,foreign key) columns;
      alter database drop supplemental log data;
      
      # 全字段模式,注意:在该模式下的delete操作也只有主键值
      ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
      # 开启数据库强制日志模式
      alter database force logging;
      # 执行marker_setup.sql 脚本
      @marker_setup.sql
      # 执行@ddl_setup.sql
      @ddl_setup.sql
      # 执行role_setup.sql
      @role_setup.sql
      # 给ogg用户赋权
      grant GGS_GGSUSER_ROLE to ogg_test;
      # 执行@ddl_enable.sql,开启DDL trigger
      @ddl_enable.sql
      # 执行优化脚本
      @ddl_pin ogg_test
      # 安装sequence support
      @sequence.sql
      #
      alter table sys.seq$ add supplemental log data (primary key) columns;

      3. OGG源端mgr配置
      以下是通过ggsci对ogg进行配置

      配置mgr
      edit params mgr

      PORT 7839
      DYNAMICPORTLIST  7840-7849
      USERID ogg_test,PASSWORD ogg_test
      PURGEOLDEXTRACTS ./dirdat/*,USECHECKPOINTS,MINKEEPDAYS 7
      LAGREPORTHOURS 1
      LAGINFOMINUTES 30
      LAGCRITICALMINUTES 45
      PURGEDDLHISTORY MINKEEPDAYS 3,MAXKEEPDAYS 7
      PURGEMARKERHISTORY MINKEEPDAYS 3,MAXKEEPDAYS 7

      启动mgr(运行日志在ggate/dirrpt中)

      start mgr

      查看mgr状态

      info mgr

      查看mgr配置

      view params mgr

      4. OGG源端extract配置
      以下是通过ggsci对ogg进行配置

      配置extract(名字可任取,extract是组名)
      edit params extract

      EXTRACT extract
      SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
      DBOPTIONS   ALLOWUNUSEDCOLUMN
      USERID ogg_test,PASSWORD ogg_test
      REPORTCOUNT EVERY 1 MINUTES,RATE
      NUMFILES 5000
      DISCARDFILE ./dirrpt/ext_test.dsc,APPEND,MEGABYTES 100
      DISCARDROLLOVER AT 2:00
      WARNLONGTRANS 2h,CHECKINTERVAL 3m
      EXTTRAIL ./dirdat/st,MEGABYTES 200
      DYNAMICRESOLUTION
      TRANlogoPTIONS CONVERTUCS2CLOBS
      TRANlogoPTIONS RAWDEVICEOFFSET 0
      DDL &
      INCLUDE MAPPED OBJTYPE 'table' &
      INCLUDE MAPPED OBJTYPE 'index' &
      INCLUDE MAPPED OBJTYPE 'SEQUENCE' &
      EXCLUDE OPTYPE COMMENT
      DDLOPTIONS  NOCROSSRENAME  REPORT
      TABLE     OGG_TEST.*;
      SEQUENCE  OGG_TEST.*;
      
      GETUPDATEBEFORES

      增加extract进程(ext后的名字要跟上面extract对应,本例中extract是组名)
      add ext extract,tranlog,begin now

      删除某废弃进程DP_TEST
      delete ext DP_TEST

      添加抽取进程,每个队列文件大小为200m
      add exttrail ./dirdat/st,ext extract,megabytes 200

      启动抽取进程(运行日志在ggate/dirrpt中)
      start extract extract
      至此,extract配置完成,数据库的一条变更可以在ggate/dirdat目录下的文件中看到

      5. 生成def文件
      源端ggsci起来后执行如下命令,生成defgen文件,并且拷贝到目标端dirdef下
      edit params defgen

      DEFSFILE ./dirdef/ogg_test.def
      USERID ogg_test,PASSWORD ogg_test
      table OGG_TEST.*;
      在shell中执行如下命令,生成ogg_test.def
      ./defgen paramfile ./dirprm/defgen.prm

      6. 目标端OGG安装和配置
      解压adapter包
      将源端中dirdef/ogg_test.def文件拷贝到adapter的dirdef下

      执行ggsci起来后执行如下命令,创建必须目录
      create subdirs

      编辑mgr配置
      edit params mgr

      PORT 7839
      DYNAMICPORTLIST  7840-7849
      PURGEOLDEXTRACTS ./dirdat/*,MAXKEEPDAYS 7

      启动mgr
      start mgr

      7. 源端ogg pump配置
      启动ggsci后执行如下操作:

      编辑pump配置
      edit params pump

      EXTRACT pump
      RMTHOST xx.xx.xx.xx,MGRPORT 7839,COMPRESS
      PASSTHRU
      NUMFILES 5000
      RMTTRAIL ./dirdat/st
      DYNAMICRESOLUTION
      TABLE      OGG_TEST.*;
      SEQUENCE   OGG_TEST.*;

      添加投递进程,从某一个队列开始投
      add ext pump,exttrailsource ./dirdat/st

      备注:投递进程,每个队文件大小为200m
      add rmttrail ./dirdat/st,ext pump,megabytes 200

      启动pump
      start pump
      启动后,结合上面adapter的配置,可以在目标端的dirdat目录下看到过来的trailfile

      8. Datahub插件安装和配置
      依赖环境:jdk1.7。
      配置好JAVA_HOME,LD_LIBRARY_PATH,可以将环境变量配置到~/.bash_profile中,例如

      export JAVA_HOME=/xxx/xxx/jrexx
      export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$JAVA_HOME/lib/amd64:$JAVA_HOME/lib/amd64/server

      修改环境变量后,请重启adapter的mgr进程
      下载datahub-ogg-plugin.tar.gz并解压:

      修改conf路径下的javaue.properties文件,将{YOUR_HOME}替换为解压后的路径

      gg.handlerlist=ggdatahub
      
      gg.handler.ggdatahub.type=com.aliyun.odps.ogg.handler.datahub.DatahubHandler
      gg.handler.ggdatahub.configureFileName={YOUR_HOME}/datahub-ogg-plugin/conf/configure.xml
      
      goldengate.userexit.nochkpt=false
      goldengate.userexit.timestamp=utc
      
      gg.classpath={YOUR_HOME}/datahub-ogg-plugin/lib/*
      gg.log.level=debug
      
      jvm.bootoptions=-Xmx512m -Dlog4j.configuration=file:{YOUR_HOME}/datahub-ogg-plugin/conf/log4j.properties -Djava.class.path=ggjava/ggjava.jar

      修改conf路径下的log4j.properties文件,将{YOUR_HOME}替换为解压后的路径

      修改conf路径下的configure.xml文件修改方式见文件中的注释

      <?xml version="1.0" encoding="UTF-8"?>
      <configue>
      
          <defaultOracleConfigure>
              <!-- oracle sid,必选-->
              <sid>100</sid>
              <!-- oracle schema,可以被mapping中的oracleSchema覆盖,两者必须有一个非空-->
              <schema>ogg_test</schema>
          </defaultOracleConfigure>
      
          <defalutDatahubConfigure>
              <!-- datahub endpoint,必填-->
              <endPoint>YOUR_DATAHUB_ENDPOINT</endPoint>
              <!-- datahub project,可以被mapping中的datahubProject,两者必须有一个非空-->
              <project>YOUR_DATAHUB_PROJECT</project>
              <!-- datahub accessId,可以被mapping中的datahubAccessId覆盖,两者必须有一个非空-->
              <accessId>YOUR_DATAHUB_ACCESS_ID</accessId>
              <!-- datahub accessKey,可以被mapping中的datahubAccessKey覆盖,两者必须有一个非空-->
              <accessKey>YOUR_DATAHUB_ACCESS_KEY</accessKey>
              <!-- 数据变更类型同步到datahub对应的字段,可以被columnMapping中的ctypeColumn覆盖 -->
              <ctypeColumn>optype</ctypeColumn>
              <!-- 数据变更时间同步到datahub对应的字段,可以被columnMapping中的ctimeColumn覆盖 -->
              <ctimeColumn>readtime</ctimeColumn>
              <!-- 数据变更序号同步到datahub对应的字段,按数据变更先后递增,不保证连续,可以被columnMapping中的cidColumn覆盖 -->
              <cidColumn>record_id</cidColumn>
      <!-- 额外增加的常量列,每条record该列值为指定值,格式为c1=xxx,c2=xxx,可以被columnMapping中的constColumnMap覆盖-->
               <constColumnMap></constColumnMap>
          </defalutDatahubConfigure>
      
          <!-- 默认最严格,不落文件 直接退出 无限重试-->
      
          <!-- 运行每批上次的最多纪录数,可选,默认1000-->
          <batchSize>1000</batchSize>
      
          <!-- 默认时间字段转换格式,默认yyyy-MM-dd HH:mm:ss-->
          <defaultDateFormat>yyyy-MM-dd HH:mm:ss</defaultDateFormat>
      
          <!-- 脏数据是否继续,默认false-->
          <dirtyDataContinue>true</dirtyDataContinue>
      
          <!-- 脏数据文件,默认datahub_ogg_plugin.dirty-->
          <dirtyDataFile>datahub_ogg_plugin.dirty</dirtyDataFile>
      
          <!-- 脏数据文件最大size,单位M,默认500-->
          <dirtyDataFileMaxSize>200</dirtyDataFileMaxSize>
      
          <!-- 重试次数,-1:无限重试 0:不重试 n:重试次数,默认-1-->
          <retryTimes>0</retryTimes>
      
          <!-- 重试间隔,单位毫秒,默认3000-->
          <retryInterval>4000</retryInterval>
      
          <!-- 点位文件,默认datahub_ogg_plugin.chk-->
          <checkPointFileName>datahub_ogg_plugin.chk</checkPointFileName>
      
          <mappings>
              <mapping>
                  <!-- oracle schema,见上描述-->
                  <oracleSchema></oracleSchema>
                  <!-- oracle table,必选-->
                  <oracleTable>t_person</oracleTable>
                  <!-- datahub project,见上描述-->
                  <datahubProject></datahubProject>
                  <!-- datahub AccessId,见上描述-->
                  <datahubAccessId></datahubAccessId>
                  <!-- datahub AccessKey,见上描述-->
                  <datahubAccessKey></datahubAccessKey>
                  <!-- datahub topic,必选-->
                  <datahubTopic>t_person</datahubTopic>
                  <ctypeColumn></ctypeColumn>
                  <ctimeColumn></ctimeColumn>
                  <cidColumn></cidColumn>
                  <constColumnMap></constColumnMap>
                  <columnMapping>
                      <!--
                      src:oracle字段名称,必须;
                      dest:datahub field,必须;
                      destOld:变更前数据落到datahub的field,可选;
                      isShardColumn: 是否作为shard的hashkey,默认为false,可以被shardId覆盖
                      isDateFormat: timestamp字段是否采用DateFormat格式转换,默认true. 如果是false,源端数据必须是long
                      dateFormat: timestamp字段的转换格式,不填就用默认值
                      -->
                      <column src="id" dest="id" isShardColumn="true"  isDateFormat="false" dateFormat="yyyy-MM-dd HH:mm:ss"/>
                      <column src="name" dest="name" isShardColumn="true"/>
                      <column src="age" dest="age"/>
                      <column src="address" dest="address"/>
                      <column src="comments" dest="comments"/>
                      <column src="sex" dest="sex"/>
                      <column src="temp" dest="temp" destOld="temp1"/>
                  </columnMapping>
      
                  <!--指定shard id,优先生效,可选-->
                  <shardId>1</shardId>
              </mapping>
          </mappings>
      </configue>

      在ggsci下启动datahub writer

      edit params dhwriter

      extract dhwriter
      getEnv (JAVA_HOME)
      getEnv (LD_LIBRARY_PATH)
      getEnv (PATH)
      CUSEREXIT ./libggjava_ue.so CUSEREXIT PASSTHRU INCLUDEUPDATEBEFORES,PARAMS "{YOUR_HOME}/datahub-ogg-plugin/conf/javaue.properties"
      sourcedefs ./dirdef/ogg_test.def
      table OGG_TEST.*;

      添加dhwriter
      add extract dhwriter,exttrailsource ./dirdat/st

      启动dhwriter
      start dhwriter

      三、使用场景

      这里会用一个简单的示例来说明数据的使用方法,例如我们在Oracle数据库有一张商品订单表orders(oid int,pid int,num int),该表有三列,分别为订单ID,商品ID和商品数量
      将这个表通过OGG Datahub进行增量数据同步之前,我们需要先将源表已有的数据通过datax同步到MaxCompute中。增量同步的关键步骤如下:
      (1)在Datahub上创建相应的Topic,Topic的schema为(string record_id,string optype,string readtime,bigint oid_before,bigint oid_after,bigint pid_before,bigint pid_after,bigint num_before,bigint num_after);
      (2)OGG Datahub的插件按照上述的安装流程部署配置,其中列的Mapping配置如下:

      <ctypeColumn>optype</ctypeColumn>
          <ctimeColumn>readtime</ctimeColumn>
          <columnMapping>
              <column src="oid" dest="oid_after" destOld="oid_before" isShardColumn="true"/>
              <column src="pid" dest="pid_after" destOld="pid_before"/>
              <column src="num" dest="num_after" destOld="num_before"/>
          </columnMapping>

      其中optype和readtime字段是记录数据库的数据变更类型和时间,optype有"I","D","U"三种取值,分别对应为“增”,“删”,“改”三种数据变更操作。
      (3)OGG Datahub插件部署好成功运行后,插件会源源不断的将源表的数据变更记录输送至datahub中,例如我们在源订单表中新增一条记录(1,2,1),datahub里收到的记录如下:

      +--------+------------+------------+------------+------------+------------+------------+------------+------------+
      | record_id | optype     | readtime   | oid_before | oid_after  | pid_before | pid_after  | num_before | num_after  |
      +-------+------------+------------+------------+------------+------------+------------+------------+------------+
      | 14810373343020000 |     I          | 2016-12-06 15:15:28.000141 | NULL       | 1          | NULL       | 2          | NULL       | 1   |

      修改这条数据,比如把num改为20,datahub则会收到的一条变更数据记录,如下:

      +-------+------------+------------+------------+------------+------------+------------+------------+------------+
      | record_id | optype     | readtime   | oid_before | oid_after  | pid_before | pid_after  | num_before | num_after  |
      +--------+------------+------------+------------+------------+------------+------------+------------+------------+
      | 14810373343080000 |     U          | 2016-12-06 15:15:58.000253 | 1          | 1          | 2          | 2          | 1          | 20         |

      实时计算
      在前一天的离线计算的基础数据上,我们可以写一个StreamCompute流计算的分析程序,很容易地对数据进行实时汇总,例如实时统计当前总的订单数,每种商品的销售量等。处理思路就是对于每一条到来的变更数据,可以拿到变化的数值,实时更新统计变量即可。

      离线处理
      为了便于后续的离线分析,我们也可以将Datahub里的数据归档到MaxCompute中,在MaxCompute中创建相应Schema的表:

      create table orders_log(record_id string,optype string,readtime string,oid_before bigint,oid_after bigint,pid_before bigint,pid_after bigint,num_before bigint,num_after bigint);
      在Datahub上创建MaxCompute的数据归档,上述流入Datahub里的数据将自动同步到MaxCompute当中。建议将同步到MaxCompute中的数据按照时间段进行划分,比如每一天的增量数据都对应一个独立分区。这样当天的数据同步完成后,我们可以处理对应的分区,拿到当天所有的数据变更,而与和前一天的全量数据进行合并后,即可得到当天的全量数据。为了简单起见,先不考虑分区表的情况,以2016-12-06这天的增量数据为例,假设前一天的全量数据在表orders_base里面,datahub同步过来的增量数据在orders_log表中,将orders_base与orders_log做合并操作,可以得到2016-12-06这天的最终全量数据写入表orders_result中。这个过程可以在MaxCompute上用如下这样一条sql完成。

      INSERT OVERWRITE TABLE orders_result
      SELECT t.oid,t.pid,t.num
      FROM
      (
           SELECT oid,pid,num,'0' x_record_id,1 AS x_optype
           FROM
           orders_base 
           UNION ALL
           SELECT decode(optype,'D',oid_before,oid_after) AS oid,decode(optype,pid_before,pid_after) AS pid,num_after AS num,record_id x_record_id,1) AS x_optype
           FROM
           orders_log
       ) t
      JOIN
       (
           SELECT
           oid,max(record_id) x_max_modified
           FROM
           (
           SELECT
           oid,'0' record_id
           FROM
           orders_base UNION ALL SELECT
                            decode(optype,record_id
                            FROM
                            orders_log ) g
           GROUP BY oid,pid
       ) s
      ON
      t.oid = s.oid AND t.pid = s.pid AND t.x_record_id = s.x_max_modified AND t.x_optype <> 0;

      四、常见问题

      Q:目标端报错 OGG-06551 Oracle GoldenGate Collector: Could not translate host name localhost into an Internet address.
      A:目标端机器hostname在/etc/hosts里面重新设置localhost对应的ip

      Q:找不到jvm相关的so包
      A:将jvm的so路径添加到LD_LIBRARY_PATH后,重启mgr

      例如:export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$JAVA_HOME/lib/amd64:$JAVA_HOME/lib/amd64/server
      Q:有了DDL语句,比如增加一列,源端ogg没有问题,但是adapter端的ffwriter和jmswriter进程退出,且报错: 2015-06-11 14:01:10 ERROR OGG-01161 Bad column index (5) specified for table OGG_TEST.T_PERSON,max columns = 5.
      A:由于表结构改变,需要重做def文件,将重做的def文件放入dirdef后重启即可

      本文作者:冶善

      原文链接

      本文为云栖社区原创内容,未经允许不得转载。

猜你在找的Oracle相关文章