资讯专栏INFORMATION COLUMN

ODI数据增量同步(一)

shadowbook / 2165人阅读

摘要:背景最近在项目上开始使用,需求是要将一个库中的表同步到另一个库中,本是一个正常的需求,但后面又续了一条要使用增量同步,不能先将数据全部删除再插入数据这里可以使用的方式进行同步数据,但方式需要在表上建,会对表的性能造成影响,所以该方案被否掉了

背景 最近在项目上开始使用ODI,需求是要将一个库中的表同步到另一个库中,本是一个正常的需求,但后面又续了一条: 要使用增量同步,不能先将数据全部删除再插入数据 这里可以使用ODI Studio的CDC方式进行同步数据,但CDC方式需要在表上建trigger,会对表的性能造成影响,所以该方案被否掉了,基于lastupdatedate的CDC同步虽然可以部分避免性能损失问题,但这种方式需要表中的lastupdatedate对任何数据更新都有相应变化,对于一些老系统难以实现,使用IKM SQL Incremental Update又没有删除操作,不能满足需求,所以方案确定为修改现有的知识模块,使其满足需求。 1.如下是一个增量同步的Mapping

目标表集成类型为增量同步

加载知识模块为LKM SQL to Oracle

集成知识模块IKM Oracle Incremental Update

源表

目标表

将源表的数据修改

运行接口

可以看到,接口运行完成后,修改的数据正常同步,但删除的数据没有起作用,有些接口为了处理这种情况,就在运行时先将目标表数据全部删除掉,再插入数据

将DELETE ALL设置为true可以在接口插入数据前将数据全部删除,理论上选择TRUNCATE也行,但没有效果

增量同步接口

运行过程如下

步骤详解

1.Drop work table

接口同步数据时会在目标端数据库创建工作表,以C$_0开头,这条命令在知识模块中配置了忽略错误,所以出错也不会造成接口错误

drop table TESTUSER.C$_0ODI_WLS_JMS_INC_SOURCE purge  
2.Create work table

创建工作表,工作表字段基于源表

create table TESTUSER.C$_0ODI_WLS_JMS_INC_SOURCE
(
    ID    NUMBER NULL,
    HOST    VARCHAR2(200) NULL,
    POST    VARCHAR2(200) NULL,
    JMS_SERVICE_NAME    VARCHAR2(200) NULL,
    JMS_NAME    VARCHAR2(200) NULL,
    JMS_SERVICE_TARGET    VARCHAR2(200) NULL,
    JMS_SERVICE_HEALTH    VARCHAR2(200) NULL,
    MESSAGES_CURRENT_COUNT    NUMBER NULL,
    MESSAGES_PENDING_COUNT    NUMBER NULL,
    CONSUMERS_CURRENT_COUNT    NUMBER NULL,
    CONSUMERS_HIGH_COUNT    NUMBER NULL,
    CONSUMERS_TOTAL_COUNT    NUMBER NULL,
    MESSAGES_HIGH_COUNT    NUMBER NULL,
    MESSAGES_RECEIVED_COUNT    NUMBER NULL,
    OBJECT_VERSION_NUMBER    NUMBER NULL,
    CREATION_DATE    DATE NULL,
    CREATED_BY    VARCHAR2(200) NULL,
    LAST_UPDATE_DATE    DATE NULL,
    LAST_UPDATED_BY    VARCHAR2(200) NULL,
    DATA_STATUS    VARCHAR2(200) NULL
)
NOLOGGING  
3.Load data

读取源表的数据,并插入到工作表中

目标代码

insert  into TESTUSER.C$_0ODI_WLS_JMS_INC_SOURCE
(
    ID,
    HOST,
    POST,
    JMS_SERVICE_NAME,
    JMS_NAME,
    JMS_SERVICE_TARGET,
    JMS_SERVICE_HEALTH,
    MESSAGES_CURRENT_COUNT,
    MESSAGES_PENDING_COUNT,
    CONSUMERS_CURRENT_COUNT,
    CONSUMERS_HIGH_COUNT,
    CONSUMERS_TOTAL_COUNT,
    MESSAGES_HIGH_COUNT,
    MESSAGES_RECEIVED_COUNT,
    OBJECT_VERSION_NUMBER,
    CREATION_DATE,
    CREATED_BY,
    LAST_UPDATE_DATE,
    LAST_UPDATED_BY,
    DATA_STATUS
)
values
(
    :ID,
    :HOST,
    :POST,
    :JMS_SERVICE_NAME,
    :JMS_NAME,
    :JMS_SERVICE_TARGET,
    :JMS_SERVICE_HEALTH,
    :MESSAGES_CURRENT_COUNT,
    :MESSAGES_PENDING_COUNT,
    :CONSUMERS_CURRENT_COUNT,
    :CONSUMERS_HIGH_COUNT,
    :CONSUMERS_TOTAL_COUNT,
    :MESSAGES_HIGH_COUNT,
    :MESSAGES_RECEIVED_COUNT,
    :OBJECT_VERSION_NUMBER,
    :CREATION_DATE,
    :CREATED_BY,
    :LAST_UPDATE_DATE,
    :LAST_UPDATED_BY,
    :DATA_STATUS
)  

源代码

select    
    ODI_WLS_JMS_INC_SOURCE.ID    AS ID,
    ODI_WLS_JMS_INC_SOURCE.HOST    AS HOST,
    ODI_WLS_JMS_INC_SOURCE.POST    AS POST,
    ODI_WLS_JMS_INC_SOURCE.JMS_SERVICE_NAME    AS JMS_SERVICE_NAME,
    ODI_WLS_JMS_INC_SOURCE.JMS_NAME    AS JMS_NAME,
    ODI_WLS_JMS_INC_SOURCE.JMS_SERVICE_TARGET    AS JMS_SERVICE_TARGET,
    ODI_WLS_JMS_INC_SOURCE.JMS_SERVICE_HEALTH    AS JMS_SERVICE_HEALTH,
    ODI_WLS_JMS_INC_SOURCE.MESSAGES_CURRENT_COUNT    AS MESSAGES_CURRENT_COUNT,
    ODI_WLS_JMS_INC_SOURCE.MESSAGES_PENDING_COUNT    AS MESSAGES_PENDING_COUNT,
    ODI_WLS_JMS_INC_SOURCE.CONSUMERS_CURRENT_COUNT    AS CONSUMERS_CURRENT_COUNT,
    ODI_WLS_JMS_INC_SOURCE.CONSUMERS_HIGH_COUNT    AS CONSUMERS_HIGH_COUNT,
    ODI_WLS_JMS_INC_SOURCE.CONSUMERS_TOTAL_COUNT    AS CONSUMERS_TOTAL_COUNT,
    ODI_WLS_JMS_INC_SOURCE.MESSAGES_HIGH_COUNT    AS MESSAGES_HIGH_COUNT,
    ODI_WLS_JMS_INC_SOURCE.MESSAGES_RECEIVED_COUNT    AS MESSAGES_RECEIVED_COUNT,
    ODI_WLS_JMS_INC_SOURCE.OBJECT_VERSION_NUMBER    AS OBJECT_VERSION_NUMBER,
    ODI_WLS_JMS_INC_SOURCE.CREATION_DATE    AS CREATION_DATE,
    ODI_WLS_JMS_INC_SOURCE.CREATED_BY    AS CREATED_BY,
    ODI_WLS_JMS_INC_SOURCE.LAST_UPDATE_DATE    AS LAST_UPDATE_DATE,
    ODI_WLS_JMS_INC_SOURCE.LAST_UPDATED_BY    AS LAST_UPDATED_BY,
    ODI_WLS_JMS_INC_SOURCE.DATA_STATUS    AS DATA_STATUS
from    ODI.ODI_WLS_JMS_INC_SOURCE ODI_WLS_JMS_INC_SOURCE
where    (1=1)
  
4.Analyze work table

记录操作日志

BEGIN
DBMS_STATS.GATHER_TABLE_STATS (
    ownname =>    "TESTUSER",
    tabname =>    "C$_0ODI_WLS_JMS_INC_SOURCE",
    estimate_percent =>    DBMS_STATS.AUTO_SAMPLE_SIZE
);
END;
  
5.Drop flow table

删除数据插入表,数据插入表是另一张同步中间表,以I$_开头,每次接口执行时执行删除操作,避免上次运行后没有删除造成问题,由于配置了忽略错误,所以出错也不会造成接口问题

drop table TESTUSER.I$_ODI_WLS_JMS_INC_TARGET   
6.Create flow table I$

创建数据插入表

create table TESTUSER.I$_ODI_WLS_JMS_INC_TARGET
(
    ID        NUMBER NULL,
    HOST        VARCHAR2(200) NULL,
    POST        VARCHAR2(200) NULL,
    JMS_SERVICE_NAME        VARCHAR2(200) NULL,
    JMS_NAME        VARCHAR2(200) NULL,
    JMS_SERVICE_TARGET        VARCHAR2(200) NULL,
    JMS_SERVICE_HEALTH        VARCHAR2(200) NULL,
    MESSAGES_CURRENT_COUNT        NUMBER NULL,
    MESSAGES_PENDING_COUNT        NUMBER NULL,
    CONSUMERS_CURRENT_COUNT        NUMBER NULL,
    CONSUMERS_HIGH_COUNT        NUMBER NULL,
    CONSUMERS_TOTAL_COUNT        NUMBER NULL,
    MESSAGES_HIGH_COUNT        NUMBER NULL,
    MESSAGES_RECEIVED_COUNT        NUMBER NULL,
    OBJECT_VERSION_NUMBER        NUMBER NULL,
    CREATION_DATE        DATE NULL,
    CREATED_BY        VARCHAR2(200) NULL,
    LAST_UPDATE_DATE        DATE NULL,
    LAST_UPDATED_BY        VARCHAR2(200) NULL,
    DATA_STATUS        VARCHAR2(400) NULL,
    IND_UPDATE        CHAR(1)
)
NOLOGGING  
7.Delete target table

删除目标表,上面配置DELETE ALL后,执行接口会有该步骤,在数据插入前删除所有数据,如果DELETE ALL 选择否,就不会执行这条语句,结果就是接口不同步删除的数据

delete from TESTUSER.ODI_WLS_JMS_INC_TARGET  
8.Insert flow into I$ table

向I$_表中插入数据,使用NOT EXIST将目标表中已存在的数据过滤掉,向I$_表中插入所有不匹配的数据,标识符都为I

/* DETECTION_STRATEGY = NOT_EXISTS */
insert into    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET
(
    ID,
    HOST,
    POST,
    JMS_SERVICE_NAME,
    JMS_NAME,
    JMS_SERVICE_TARGET,
    JMS_SERVICE_HEALTH,
    MESSAGES_CURRENT_COUNT,
    MESSAGES_PENDING_COUNT,
    CONSUMERS_CURRENT_COUNT,
    CONSUMERS_HIGH_COUNT,
    CONSUMERS_TOTAL_COUNT,
    MESSAGES_HIGH_COUNT,
    MESSAGES_RECEIVED_COUNT,
    OBJECT_VERSION_NUMBER,
    CREATION_DATE,
    CREATED_BY,
    LAST_UPDATE_DATE,
    LAST_UPDATED_BY,
    DATA_STATUS,
    IND_UPDATE
)
select 
ID,
    HOST,
    POST,
    JMS_SERVICE_NAME,
    JMS_NAME,
    JMS_SERVICE_TARGET,
    JMS_SERVICE_HEALTH,
    MESSAGES_CURRENT_COUNT,
    MESSAGES_PENDING_COUNT,
    CONSUMERS_CURRENT_COUNT,
    CONSUMERS_HIGH_COUNT,
    CONSUMERS_TOTAL_COUNT,
    MESSAGES_HIGH_COUNT,
    MESSAGES_RECEIVED_COUNT,
    OBJECT_VERSION_NUMBER,
    CREATION_DATE,
    CREATED_BY,
    LAST_UPDATE_DATE,
    LAST_UPDATED_BY,
    DATA_STATUS,
    IND_UPDATE
 from (
select      
    ODI_WLS_JMS_INC_SOURCE_A.ID AS ID,
    ODI_WLS_JMS_INC_SOURCE_A.HOST AS HOST,
    ODI_WLS_JMS_INC_SOURCE_A.POST AS POST,
    ODI_WLS_JMS_INC_SOURCE_A.JMS_SERVICE_NAME AS JMS_SERVICE_NAME,
    ODI_WLS_JMS_INC_SOURCE_A.JMS_NAME AS JMS_NAME,
    ODI_WLS_JMS_INC_SOURCE_A.JMS_SERVICE_TARGET AS JMS_SERVICE_TARGET,
    ODI_WLS_JMS_INC_SOURCE_A.JMS_SERVICE_HEALTH AS JMS_SERVICE_HEALTH,
    ODI_WLS_JMS_INC_SOURCE_A.MESSAGES_CURRENT_COUNT AS MESSAGES_CURRENT_COUNT,
    ODI_WLS_JMS_INC_SOURCE_A.MESSAGES_PENDING_COUNT AS MESSAGES_PENDING_COUNT,
    ODI_WLS_JMS_INC_SOURCE_A.CONSUMERS_CURRENT_COUNT AS CONSUMERS_CURRENT_COUNT,
    ODI_WLS_JMS_INC_SOURCE_A.CONSUMERS_HIGH_COUNT AS CONSUMERS_HIGH_COUNT,
    ODI_WLS_JMS_INC_SOURCE_A.CONSUMERS_TOTAL_COUNT AS CONSUMERS_TOTAL_COUNT,
    ODI_WLS_JMS_INC_SOURCE_A.MESSAGES_HIGH_COUNT AS MESSAGES_HIGH_COUNT,
    ODI_WLS_JMS_INC_SOURCE_A.MESSAGES_RECEIVED_COUNT AS MESSAGES_RECEIVED_COUNT,
    ODI_WLS_JMS_INC_SOURCE_A.OBJECT_VERSION_NUMBER AS OBJECT_VERSION_NUMBER,
    ODI_WLS_JMS_INC_SOURCE_A.CREATION_DATE AS CREATION_DATE,
    ODI_WLS_JMS_INC_SOURCE_A.CREATED_BY AS CREATED_BY,
    ODI_WLS_JMS_INC_SOURCE_A.LAST_UPDATE_DATE AS LAST_UPDATE_DATE,
    ODI_WLS_JMS_INC_SOURCE_A.LAST_UPDATED_BY AS LAST_UPDATED_BY,
    ODI_WLS_JMS_INC_SOURCE_A.DATA_STATUS AS DATA_STATUS,
    "I" IND_UPDATE
from    TESTUSER.C$_0ODI_WLS_JMS_INC_SOURCE ODI_WLS_JMS_INC_SOURCE_A
where    (1=1)
) S
where NOT EXISTS 
    ( select 1 from TESTUSER.ODI_WLS_JMS_INC_TARGET T
    where    T.ID    = S.ID
    and    T.HOST    = S.HOST 
         and ((T.POST = S.POST) or (T.POST IS NULL and S.POST IS NULL)) and
        ((T.JMS_SERVICE_NAME = S.JMS_SERVICE_NAME) or (T.JMS_SERVICE_NAME IS NULL and S.JMS_SERVICE_NAME IS NULL)) and
        ((T.JMS_NAME = S.JMS_NAME) or (T.JMS_NAME IS NULL and S.JMS_NAME IS NULL)) and
        ((T.JMS_SERVICE_TARGET = S.JMS_SERVICE_TARGET) or (T.JMS_SERVICE_TARGET IS NULL and S.JMS_SERVICE_TARGET IS NULL)) and
        ((T.JMS_SERVICE_HEALTH = S.JMS_SERVICE_HEALTH) or (T.JMS_SERVICE_HEALTH IS NULL and S.JMS_SERVICE_HEALTH IS NULL)) and
        ((T.MESSAGES_CURRENT_COUNT = S.MESSAGES_CURRENT_COUNT) or (T.MESSAGES_CURRENT_COUNT IS NULL and S.MESSAGES_CURRENT_COUNT IS NULL)) and
        ((T.MESSAGES_PENDING_COUNT = S.MESSAGES_PENDING_COUNT) or (T.MESSAGES_PENDING_COUNT IS NULL and S.MESSAGES_PENDING_COUNT IS NULL)) and
        ((T.CONSUMERS_CURRENT_COUNT = S.CONSUMERS_CURRENT_COUNT) or (T.CONSUMERS_CURRENT_COUNT IS NULL and S.CONSUMERS_CURRENT_COUNT IS NULL)) and
        ((T.CONSUMERS_HIGH_COUNT = S.CONSUMERS_HIGH_COUNT) or (T.CONSUMERS_HIGH_COUNT IS NULL and S.CONSUMERS_HIGH_COUNT IS NULL)) and
        ((T.CONSUMERS_TOTAL_COUNT = S.CONSUMERS_TOTAL_COUNT) or (T.CONSUMERS_TOTAL_COUNT IS NULL and S.CONSUMERS_TOTAL_COUNT IS NULL)) and
        ((T.MESSAGES_HIGH_COUNT = S.MESSAGES_HIGH_COUNT) or (T.MESSAGES_HIGH_COUNT IS NULL and S.MESSAGES_HIGH_COUNT IS NULL)) and
        ((T.MESSAGES_RECEIVED_COUNT = S.MESSAGES_RECEIVED_COUNT) or (T.MESSAGES_RECEIVED_COUNT IS NULL and S.MESSAGES_RECEIVED_COUNT IS NULL)) and
        ((T.OBJECT_VERSION_NUMBER = S.OBJECT_VERSION_NUMBER) or (T.OBJECT_VERSION_NUMBER IS NULL and S.OBJECT_VERSION_NUMBER IS NULL)) and
        ((T.CREATION_DATE = S.CREATION_DATE) or (T.CREATION_DATE IS NULL and S.CREATION_DATE IS NULL)) and
        ((T.CREATED_BY = S.CREATED_BY) or (T.CREATED_BY IS NULL and S.CREATED_BY IS NULL)) and
        ((T.LAST_UPDATE_DATE = S.LAST_UPDATE_DATE) or (T.LAST_UPDATE_DATE IS NULL and S.LAST_UPDATE_DATE IS NULL)) and
        ((T.LAST_UPDATED_BY = S.LAST_UPDATED_BY) or (T.LAST_UPDATED_BY IS NULL and S.LAST_UPDATED_BY IS NULL)) and
        ((T.DATA_STATUS = S.DATA_STATUS) or (T.DATA_STATUS IS NULL and S.DATA_STATUS IS NULL))
        )
  
9.Create Index on flow table

为I$_表创建索引

create index     TESTUSER.I$_ODI_WLS_JMS_INC_TARGET_UK
on    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET (ID, HOST)
NOLOGGING  
10.Analyze integration table

记录日志

begin
    dbms_stats.gather_table_stats(
    ownname => "TESTUSER",
    tabname => "I$_ODI_WLS_JMS_INC_TARGET",
    estimate_percent => dbms_stats.auto_sample_size
    );
end;
  
11.create check table

创建校验表,用来存放插入失败的数据

create table TESTUSER.SNP_CHECK_TAB
(
    CATALOG_NAME    VARCHAR2(100 CHAR) NULL ,
    SCHEMA_NAME    VARCHAR2(100 CHAR) NULL ,
    RESOURCE_NAME    VARCHAR2(100 CHAR) NULL,
    FULL_RES_NAME    VARCHAR2(100 CHAR) NULL,
    ERR_TYPE        VARCHAR2(1 CHAR) NULL,
    ERR_MESS        VARCHAR2(250 CHAR) NULL ,
    CHECK_DATE    DATE NULL,
    ORIGIN        VARCHAR2(250 CHAR) NULL,
    CONS_NAME    VARCHAR2(128 CHAR) NULL,
    CONS_TYPE        VARCHAR2(2 CHAR) NULL,
    ERR_COUNT        NUMBER(10) NULL
)
      
12.delete previous check sum

删除以前的校验数据

delete from    TESTUSER.SNP_CHECK_TAB
where    SCHEMA_NAME    = "TESTUSER"
and    ORIGIN         = "(171)mdsProject.My_increment"
and    ERR_TYPE         = "F"
  
13.create error table

创建错误表,用来记录插入错误的数据

create table TESTUSER.E$_ODI_WLS_JMS_INC_TARGET
(
    ODI_ROW_ID         UROWID,
    ODI_ERR_TYPE        VARCHAR2(1 CHAR) NULL, 
    ODI_ERR_MESS        VARCHAR2(250 CHAR) NULL,
    ODI_CHECK_DATE    DATE NULL, 
    ID    NUMBER NULL,
    HOST    VARCHAR2(200) NULL,
    POST    VARCHAR2(200) NULL,
    JMS_SERVICE_NAME    VARCHAR2(200) NULL,
    JMS_NAME    VARCHAR2(200) NULL,
    JMS_SERVICE_TARGET    VARCHAR2(200) NULL,
    JMS_SERVICE_HEALTH    VARCHAR2(200) NULL,
    MESSAGES_CURRENT_COUNT    NUMBER NULL,
    MESSAGES_PENDING_COUNT    NUMBER NULL,
    CONSUMERS_CURRENT_COUNT    NUMBER NULL,
    CONSUMERS_HIGH_COUNT    NUMBER NULL,
    CONSUMERS_TOTAL_COUNT    NUMBER NULL,
    MESSAGES_HIGH_COUNT    NUMBER NULL,
    MESSAGES_RECEIVED_COUNT    NUMBER NULL,
    OBJECT_VERSION_NUMBER    NUMBER NULL,
    CREATION_DATE    DATE NULL,
    CREATED_BY    VARCHAR2(200) NULL,
    LAST_UPDATE_DATE    DATE NULL,
    LAST_UPDATED_BY    VARCHAR2(200) NULL,
    DATA_STATUS    VARCHAR2(400) NULL,
    ODI_ORIGIN        VARCHAR2(250 CHAR) NULL,
    ODI_CONS_NAME    VARCHAR2(128 CHAR) NULL,
    ODI_CONS_TYPE        VARCHAR2(2 CHAR) NULL,
    ODI_PK            VARCHAR2(32 CHAR) PRIMARY KEY,
    ODI_SESS_NO        VARCHAR2(36 CHAR)
)
  
14.delete previous errors

删除错误表之前数据

delete from     TESTUSER.E$_ODI_WLS_JMS_INC_TARGET
where    (ODI_ERR_TYPE = "S"    and "F" = "S")
or    (ODI_ERR_TYPE = "F"    and ODI_ORIGIN = "(171)mdsProject.My_increment")
15.Create index on PK

为I$_表创建索引

/* FLOW CONTROL CREATE THE INDEX ON I$TABLE */
create index     TESTUSER.I$_ODI_WLS_JMS_INC_TARGET_PK
on    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET (ID, HOST)
  
16.insert PK errors

插入主键不唯一的行

insert into TESTUSER.E$_ODI_WLS_JMS_INC_TARGET
(
    ODI_PK,
    ODI_SESS_NO,
    ODI_ROW_ID,
    ODI_ERR_TYPE,
    ODI_ERR_MESS,
    ODI_ORIGIN,
    ODI_CHECK_DATE,
    ODI_CONS_NAME,
    ODI_CONS_TYPE,
    ID,
    HOST,
    POST,
    JMS_SERVICE_NAME,
    JMS_NAME,
    JMS_SERVICE_TARGET,
    JMS_SERVICE_HEALTH,
    MESSAGES_CURRENT_COUNT,
    MESSAGES_PENDING_COUNT,
    CONSUMERS_CURRENT_COUNT,
    CONSUMERS_HIGH_COUNT,
    CONSUMERS_TOTAL_COUNT,
    MESSAGES_HIGH_COUNT,
    MESSAGES_RECEIVED_COUNT,
    OBJECT_VERSION_NUMBER,
    CREATION_DATE,
    CREATED_BY,
    LAST_UPDATE_DATE,
    LAST_UPDATED_BY,
    DATA_STATUS
)
select    SYS_GUID(),
    "abb01f39-16b9-41ba-9820-7733e137f237", 
    rowid,
    "F", 
    "ODI-15064: 主键 ODI_WLS_JMS_INC_TARGET_PK 不是唯一的。",
    "(171)mdsProject.My_increment",
    sysdate,
    "ODI_WLS_JMS_INC_TARGET_PK",
    "PK",    
    ODI_WLS_JMS_INC_TARGET.ID,
    ODI_WLS_JMS_INC_TARGET.HOST,
    ODI_WLS_JMS_INC_TARGET.POST,
    ODI_WLS_JMS_INC_TARGET.JMS_SERVICE_NAME,
    ODI_WLS_JMS_INC_TARGET.JMS_NAME,
    ODI_WLS_JMS_INC_TARGET.JMS_SERVICE_TARGET,
    ODI_WLS_JMS_INC_TARGET.JMS_SERVICE_HEALTH,
    ODI_WLS_JMS_INC_TARGET.MESSAGES_CURRENT_COUNT,
    ODI_WLS_JMS_INC_TARGET.MESSAGES_PENDING_COUNT,
    ODI_WLS_JMS_INC_TARGET.CONSUMERS_CURRENT_COUNT,
    ODI_WLS_JMS_INC_TARGET.CONSUMERS_HIGH_COUNT,
    ODI_WLS_JMS_INC_TARGET.CONSUMERS_TOTAL_COUNT,
    ODI_WLS_JMS_INC_TARGET.MESSAGES_HIGH_COUNT,
    ODI_WLS_JMS_INC_TARGET.MESSAGES_RECEIVED_COUNT,
    ODI_WLS_JMS_INC_TARGET.OBJECT_VERSION_NUMBER,
    ODI_WLS_JMS_INC_TARGET.CREATION_DATE,
    ODI_WLS_JMS_INC_TARGET.CREATED_BY,
    ODI_WLS_JMS_INC_TARGET.LAST_UPDATE_DATE,
    ODI_WLS_JMS_INC_TARGET.LAST_UPDATED_BY,
    ODI_WLS_JMS_INC_TARGET.DATA_STATUS
from    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET   ODI_WLS_JMS_INC_TARGET
where    exists  (
        select    SUB.ID,
            SUB.HOST
        from     TESTUSER.I$_ODI_WLS_JMS_INC_TARGET SUB
        where     SUB.ID=ODI_WLS_JMS_INC_TARGET.ID
            and SUB.HOST=ODI_WLS_JMS_INC_TARGET.HOST
        group by     SUB.ID,
            SUB.HOST
        having     count(1) > 1
        )
  
17.insert Not Null errors

插入主键为空的行,如果是多个主键,该步骤执行多次,每次一个主键字段

insert into TESTUSER.E$_ODI_WLS_JMS_INC_TARGET
(
    ODI_PK,
    ODI_SESS_NO,
    ODI_ROW_ID,
    ODI_ERR_TYPE,
    ODI_ERR_MESS,
    ODI_CHECK_DATE,
    ODI_ORIGIN,
    ODI_CONS_NAME,
    ODI_CONS_TYPE,
    ID,
    HOST,
    POST,
    JMS_SERVICE_NAME,
    JMS_NAME,
    JMS_SERVICE_TARGET,
    JMS_SERVICE_HEALTH,
    MESSAGES_CURRENT_COUNT,
    MESSAGES_PENDING_COUNT,
    CONSUMERS_CURRENT_COUNT,
    CONSUMERS_HIGH_COUNT,
    CONSUMERS_TOTAL_COUNT,
    MESSAGES_HIGH_COUNT,
    MESSAGES_RECEIVED_COUNT,
    OBJECT_VERSION_NUMBER,
    CREATION_DATE,
    CREATED_BY,
    LAST_UPDATE_DATE,
    LAST_UPDATED_BY,
    DATA_STATUS
)
select
    SYS_GUID(),
    "abb01f39-16b9-41ba-9820-7733e137f237", 
    rowid,
    "F", 
    "ODI-15066: 列ID不能为空值。",
    sysdate,
    "(171)mdsProject.My_increment",
    "ID",
    "NN",    
    ID,
    HOST,
    POST,
    JMS_SERVICE_NAME,
    JMS_NAME,
    JMS_SERVICE_TARGET,
    JMS_SERVICE_HEALTH,
    MESSAGES_CURRENT_COUNT,
    MESSAGES_PENDING_COUNT,
    CONSUMERS_CURRENT_COUNT,
    CONSUMERS_HIGH_COUNT,
    CONSUMERS_TOTAL_COUNT,
    MESSAGES_HIGH_COUNT,
    MESSAGES_RECEIVED_COUNT,
    OBJECT_VERSION_NUMBER,
    CREATION_DATE,
    CREATED_BY,
    LAST_UPDATE_DATE,
    LAST_UPDATED_BY,
    DATA_STATUS
from    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET
where    ID is null
  
18.create index on error table

为E$_表创建索引

 
/* FLOW CONTROL CREATE INDEX ON THE E$TABLE */
create index     TESTUSER.E$_ODI_WLS_JMS_INC_TARGET_IDX 
on    TESTUSER.E$_ODI_WLS_JMS_INC_TARGET (ODI_ROW_ID)
  
19.delete errors from controlled table

从I$_表中删除错误的行

delete from    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET  T
where    exists     (
        select    1
        from    TESTUSER.E$_ODI_WLS_JMS_INC_TARGET E
        where ODI_SESS_NO = "abb01f39-16b9-41ba-9820-7733e137f237"
        and T.rowid = E.ODI_ROW_ID
        )
  
20.insert check sum into check table

向TESTUSER.SNP_CHECK_TAB中记录错误数据

insert into TESTUSER.SNP_CHECK_TAB
(
    SCHEMA_NAME,
    RESOURCE_NAME,
    FULL_RES_NAME,
    ERR_TYPE,
    ERR_MESS,
    CHECK_DATE,
    ORIGIN,
    CONS_NAME,
    CONS_TYPE,
    ERR_COUNT
)
select    
    "TESTUSER",
    "ODI_WLS_JMS_INC_TARGET",
    "TESTUSER.ODI_WLS_JMS_INC_TARGET",
    E.ODI_ERR_TYPE,
    E.ODI_ERR_MESS,
    E.ODI_CHECK_DATE,
    E.ODI_ORIGIN,
    E.ODI_CONS_NAME,
    E.ODI_CONS_TYPE,
    count(1) 
from    TESTUSER.E$_ODI_WLS_JMS_INC_TARGET E
where    E.ODI_ERR_TYPE    = "F"
and    E.ODI_ORIGIN     = "(171)mdsProject.My_increment"
group by    E.ODI_ERR_TYPE,
    E.ODI_ERR_MESS,
    E.ODI_CHECK_DATE,
    E.ODI_ORIGIN,
    E.ODI_CONS_NAME,
    E.ODI_CONS_TYPE
  
21.Flag rows for update

设置I$_主键在目标表中存在的行IND_UPDATE 为U,表示这些行需要更新,为I表示需要插入

/* DETECTION_STRATEGY = NOT_EXISTS */
update    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET
set    IND_UPDATE = "U"
where    (ID, HOST)
    in    (
        select    ID,
            HOST
        from    TESTUSER.ODI_WLS_JMS_INC_TARGET
        )
  
22.Update existing rows

更新已存在的行

/* DETECTION_STRATEGY = NOT_EXISTS */
update    TESTUSER.ODI_WLS_JMS_INC_TARGET T
set     (
    T.POST,
    T.JMS_SERVICE_NAME,
    T.JMS_NAME,
    T.JMS_SERVICE_TARGET,
    T.JMS_SERVICE_HEALTH,
    T.MESSAGES_CURRENT_COUNT,
    T.MESSAGES_PENDING_COUNT,
    T.CONSUMERS_CURRENT_COUNT,
    T.CONSUMERS_HIGH_COUNT,
    T.CONSUMERS_TOTAL_COUNT,
    T.MESSAGES_HIGH_COUNT,
    T.MESSAGES_RECEIVED_COUNT,
    T.OBJECT_VERSION_NUMBER,
    T.CREATION_DATE,
    T.CREATED_BY,
    T.LAST_UPDATE_DATE,
    T.LAST_UPDATED_BY,
    T.DATA_STATUS
    ) =
        (
        select    S.POST,
            S.JMS_SERVICE_NAME,
            S.JMS_NAME,
            S.JMS_SERVICE_TARGET,
            S.JMS_SERVICE_HEALTH,
            S.MESSAGES_CURRENT_COUNT,
            S.MESSAGES_PENDING_COUNT,
            S.CONSUMERS_CURRENT_COUNT,
            S.CONSUMERS_HIGH_COUNT,
            S.CONSUMERS_TOTAL_COUNT,
            S.MESSAGES_HIGH_COUNT,
            S.MESSAGES_RECEIVED_COUNT,
            S.OBJECT_VERSION_NUMBER,
            S.CREATION_DATE,
            S.CREATED_BY,
            S.LAST_UPDATE_DATE,
            S.LAST_UPDATED_BY,
            S.DATA_STATUS
        from    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET S
        where    T.ID    =S.ID
        and    T.HOST    =S.HOST
             )
where    (ID, HOST)
    in    (
        select    ID,
            HOST
        from    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET
        where    IND_UPDATE = "U"
        )
  
23.Insert new rows

插入新的行

/* DETECTION_STRATEGY = NOT_EXISTS */
insert into     TESTUSER.ODI_WLS_JMS_INC_TARGET T
    (
    ID,
    HOST,
    POST,
    JMS_SERVICE_NAME,
    JMS_NAME,
    JMS_SERVICE_TARGET,
    JMS_SERVICE_HEALTH,
    MESSAGES_CURRENT_COUNT,
    MESSAGES_PENDING_COUNT,
    CONSUMERS_CURRENT_COUNT,
    CONSUMERS_HIGH_COUNT,
    CONSUMERS_TOTAL_COUNT,
    MESSAGES_HIGH_COUNT,
    MESSAGES_RECEIVED_COUNT,
    OBJECT_VERSION_NUMBER,
    CREATION_DATE,
    CREATED_BY,
    LAST_UPDATE_DATE,
    LAST_UPDATED_BY,
    DATA_STATUS
    )
select     ID,
    HOST,
    POST,
    JMS_SERVICE_NAME,
    JMS_NAME,
    JMS_SERVICE_TARGET,
    JMS_SERVICE_HEALTH,
    MESSAGES_CURRENT_COUNT,
    MESSAGES_PENDING_COUNT,
    CONSUMERS_CURRENT_COUNT,
    CONSUMERS_HIGH_COUNT,
    CONSUMERS_TOTAL_COUNT,
    MESSAGES_HIGH_COUNT,
    MESSAGES_RECEIVED_COUNT,
    OBJECT_VERSION_NUMBER,
    CREATION_DATE,
    CREATED_BY,
    LAST_UPDATE_DATE,
    LAST_UPDATED_BY,
    DATA_STATUS
from    TESTUSER.I$_ODI_WLS_JMS_INC_TARGET S
where    IND_UPDATE = "I"
  
24.Drop work table

删除工作表

drop table TESTUSER.C$_0ODI_WLS_JMS_INC_SOURCE purge  
25.Drop flow table

删除I$_表

drop table TESTUSER.I$_ODI_WLS_JMS_INC_TARGET   
一次增量同步完成 修改方案 根据上面的过程,可以知道,如果想在增量同步中加入删除操作,只要向 I$_表中插入目标表在工作表(C$_表)中不存在的数据,并记录标识符为D,在向目标表更新和插入数据后将标识符为D的数据删除即可 废话不多,直接贴代码

在标记标识符为U之前,向I$_表中插入需要删除的数据

Insert deleted rows

/* DETECTION_STRATEGY = NOT_EXISTS */ 
insert into  <%=odiRef.getTable("L","INT_NAME","A")%> (  
 <%=odiRef.getColList("", "[COL_NAME]", ", ", "", "UK")%>,   IND_UPDATE ) 
 select    <%=odiRef.getColList("", "[COL_NAME]", ", ", "", "UK")%>,   
 "D" 
 from <%=odiRef.getTable("L", "TARG_NAME", "A")%> T2
 where NOT EXISTS    
 (   select "X" from  <%=odiRef.getFrom(0)%>
  where  (<%=odiRef.getColList("","[EXPRESSION]	= T2.[COL_NAME]", "
	and	", "", "UK")%> ) ) 

Synchronize deletions

从目标表删除I$_表中标识符为D的数据

delete from <%=odiRef.getTable("L","TARG_NAME","A")%>
where exists (
        select     "X"
        from    <%=odiRef.getTable("L","INT_NAME","A")%> <%=odiRef.getInfo("DEST_TAB_ALIAS_WORD")%> I
        where    <%=odiRef.getColList("", odiRef.getTable("L","TARG_NAME","A") + ".[COL_NAME] = I.[COL_NAME]", "
		and 	", "", "UK")%>
        and    IND_UPDATE = "D"
    )
加完后重新测试接口,确认生效

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/35368.html

相关文章

  • 使用 阿里巴巴 Canal 增量订阅&消费组件 同步 MySQL 数据到 Redis

    摘要:使用阿里巴巴增量订阅消费组件同步数据到背景阿里巴巴的增量订阅消费组件早期,阿里巴巴公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。 使用 阿里巴巴 Canal 增量订阅&消费组件 同步 MySQL 数据到 Redis 背景 《阿里巴巴的增量订阅&消费组件》 https://github.com/alibaba/canal 早期,阿里巴巴B2B公司因为存在杭州和美国双机房部...

    wwolf 评论0 收藏0
  • 基于 MySQL Binlog 的 Elasticsearch 数据同步实践

    摘要:经过调研后,我们采用开源项目实现数据同步,并针对马蜂窝技术栈和实际的业务环境进行了一些定制化开发。微服务和配置中心项目使用马蜂窝微服务部署,为新接入业务提供了快速上线支持,并且在业务数据突增时可以方便快速的扩容。 一、背景 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticse...

    summerpxy 评论0 收藏0
  • 基于 MySQL Binlog 的 Elasticsearch 数据同步实践

    摘要:经过调研后,我们采用开源项目实现数据同步,并针对马蜂窝技术栈和实际的业务环境进行了一些定制化开发。微服务和配置中心项目使用马蜂窝微服务部署,为新接入业务提供了快速上线支持,并且在业务数据突增时可以方便快速的扩容。 一、背景 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticse...

    makeFoxPlay 评论0 收藏0
  • MySQL开源数据传输中间件架构设计实践

    摘要:元数据信息保存在分布式一致性存储中,如果某工作节点或进程挂了,工作任务会转移至其他进程继续之前的断点处理数据同步,不影响服务连续性。角色主要负责元数据信息存储,任务的接收和分发,节点健康状态检测故障转移。 本文根据洪斌10月27日在「3306π」技术 Meetup - 武汉站现场演讲内容整理而成。 showImg(https://segmentfault.com/img/bVbjdVD...

    henry14 评论0 收藏0
  • UCloud数据迁移方案UDTS:在线业务数据键平滑迁移

    摘要:自上线以来,总迁移已超过级数据量。支持通过内网专线以及外网迁移数据。全量迁移在全球多地地域部署服务,根据源目位置来就近选择运行位置,保证迁移效率。数据库是公司核心资产的重要存储基座,面对用户不同场景,数据也需跨数据库迁移。如用户业务上云、多云灾备下的数据同步、不同数据源的聚合分析等,都需数据跨数据库的迁移。UCloud数据迁移方案UDTSUCloud数据传输服务UDTS(UCloud Dat...

    Tecode 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<