源库(ENV库)中定义如下包:
包定义如下:
CREATE OR REPLACE PACKAGE PKG_DATA_REPORT IS -- Author : JOHNFNASH -- Created : 2017/6/8 17:48:03 -- Purpose : DATA REPORT TO YW --数据同步存储过程 PROCEDURE SP_DATA_RESPORT; --数据增量同步存储过程 PROCEDURE SP_DATA_INCREMENTAL_REPORT; END PKG_DATA_REPORT;
包具体实现如下:
CREATE OR REPLACE PACKAGE BODY PKG_DATA_REPORT IS --数据同步存储过程 PROCEDURE SP_DATA_RESPORT IS NODE_NAME VARCHAR(40) := 'BASIC_DATA_SYNC'; V_START_TIME DATE; V_END_TIME DATE := SYSDATE; DATA_ROW T_DATA_SYNC_RECORD%ROWTYPE; V_ERRORTEXT VARCHAR(200); BEGIN --锁住记录防止并发上报 SELECT * INTO DATA_ROW FROM T_DATA_SYNC_RECORD R WHERE R.DATA_SYNC_NAME = NODE_NAME AND R.NUM = 1 FOR UPDATE; IF DATA_ROW.RESULT IS NULL THEN --如果上一次数据同步成功,则开始本次数据同步 V_START_TIME := DATA_ROW.END_TIME; ELSE --如果上一次数据同步失败,则继续上一次数据同步 V_START_TIME := DATA_ROW.START_TIME; V_END_TIME := DATA_ROW.END_TIME; END IF; ------------------- 部门数据同步 ------------------- --删除之前的记录 PROC_TRUNCATE_REMOTE_TAB@DBL_YW('T_DEPARTMENT'); --DELETE FROM T_DEPARTMENT@DBL_YW; --重新写入记录 INSERT INTO T_DEPARTMENT@DBL_YW SELECT D.ID,D.NAMECN,D.NAMEEN,D.CODE,D.PARENTID,D.NAMENPATH,D.NAMCNPATH FROM T_DEPARTMENT D; ------------------- 设备数据同步 ------------------- --写入记录 MERGE INTO T_DEVICE@DBL_LCMYW T1 USING (SELECT * FROM T_DEVICE) T2 ON ( T1.ID=T2.ID) WHEN MATCHED THEN UPDATE SET T1.STATUS = T2.STATUS,T1.TIMESTAMEP=T2.TIMESTAMEP WHEN NOT MATCHED THEN INSERT (ID,NAME,LABID,REGION,STATUS,CREATETIME,TIMESTAMEP,CREATOR) VALUES(T2.ID,T2.NAME,T2.LABID,T2.REGION,T2.STATUS,T2.CREATETIME,T2.TIMESTAMEP,T2.CREATOR); --写入本次数据同步结果 INSERT INTO T_DATA_SYNC_RECORD VALUES (NODE_NAME,SYSDATE,V_START_TIME,V_END_TIME,NULL,0); --执行序号加1 UPDATE T_DATA_SYNC_RECORD SET NUM = NUM + 1 WHERE DATA_SYNC_NAME = NODE_NAME; COMMIT; EXCEPTION WHEN OTHERS THEN ROLLBACK; --事物回滚 V_ERRORTEXT := SUBSTR(sqlERRM,1,200); --错误信息 --写入本次数据同步结果 INSERT INTO T_DATA_SYNC_RECORD VALUES (NODE_NAME,V_ERRORTEXT,0); --执行序号加1 UPDATE T_DATA_SYNC_RECORD SET NUM = NUM + 1 WHERE DATA_SYNC_NAME = NODE_NAME; COMMIT; END; END SP_ENV_OPY_REPORT; END PKG_DATA_REPORT
其中涉及到的目标库(YW库) truncate表的存储过程 PROC_TRUNCATE_REMOTE_TAB@DBL_YW 定义如下:
CREATE OR REPLACE PROCEDURE PROC_TRUNCATE_REMOTE_TAB(P_TNAME IN VARCHAR2) AS BEGIN EXECUTE IMMEDIATE 'TRUNCATE TABLE ' || P_TNAME; EXCEPTION WHEN OTHERS THEN RAISE_APPLICATION_ERROR(-20001,sqlERRM); END PROC_TRUNCATE_REMOTE_TAB;
由于表中数据量较大时,使用delete删除数据比较慢,使用truncate操作更快,但是oracle不允许执行远程库
的truncate。换个方式,在远程库建立存储过程,执行对该库中数据表的truncate操作,其它库建立dblink之后
,可以调用该存储过程,实现对远程库表的truncate操作。
下面对上面的比较关键的地方进行说明:
1) T_DATA_SYSNC_RECORD
刚进入存储过程时,锁住该表的记录,防止并发上报;然后根据上一次的执行结果确定本次执行的起始时间
、结束时间,特别是对于增量同步,每次只将时间段内的数据同步到目标库
(详见SP_DATA_INCREMENTAL_REPORT存储过程)
-- Create table create table T_DATA_SYNC_RECORD ( data_sync_name VARCHAR2(40) not null,timestamp DATE,start_time DATE,end_time DATE,result VARCHAR2(200),num NUMBER ); -- Add comments to the columns comment on column T_DATA_SYNC_RECORD.data_sync_name is '数据同步任务名称'; comment on column T_DATA_SYNC_RECORD.timestamp is '任务执行时间'; comment on column T_DATA_SYNC_RECORD.start_time is '数据同步开始时间'; comment on column T_DATA_SYNC_RECORD.end_time is '数据同步结束时间'; comment on column T_DATA_SYNC_RECORD.result is '数据同步执行结果,为空表示成功'; comment on column T_DATA_SYNC_RECORD.num is '序号,从1开始,每执行一次之前同一同步任务的序号加1';2) 源库中需建立与目标库的dblink
Create database link create database link DBL_YW connect to YW identified by password using 'ILCM';3) 数据同步的形式
根据业务的不同,同步的方式可能有好几种,如删除目标库之前所有数据,重新写入(如果只是少量字段
的值可能有修改,可以使用merge into进行数据同步,如上);增量同步,每次只同步新增的记录到目标库
(适用于历史数据只读的情况)
4) 当有多个库的数据要汇总到同一个目标库时,可以在数据库添加一个表示来源于哪个节点的字段,每个库
的同步存储过程为这个字段设置不同的值,就可以区分数据是来源于哪个数据库。不过这种情况下,直接对
目标库进行truncate操作就不合适了,因为可能会导致数据丢失。这种情况怎么处理,这里就不叙述了。
本文远程库truncate实现参考: http://www.cnblogs.com/songdavid/articles/2153228.html