Decode360's Blog

          業精于勤而荒于嬉 QQ:150355677 MSN:decode360@hotmail.com

            BlogJava :: 首頁 :: 新隨筆 :: 聯系 ::  :: 管理 ::
            397 隨筆 :: 33 文章 :: 29 評論 :: 0 Trackbacks
          Oracle 10g CDC
          ?

          ??? 從Oracle9i開始,Oracle引入了CDC技術來實現對變化數據的捕獲。在Oracle9i中CDC只支持同步的數據捕獲(synchronous change capture),源數據的變化被實時的捕獲,捕獲的過程和源數據是同一個事務。它的實現需要源數據支持trigger,所以這種同步的技術會給數據源帶來性能的問題。這是CDC在Oracle9i的一個缺陷(在Oracle10g中已經改進)

          ??? 在CDC中變化的數據被保存在一個變化表(change table)中,使用者通過訂閱的方式(subscribe)生成一個視圖(subscribe view)并且通過這個視圖來得到這些變化的數據,一個change table可以有多個訂閱者,也就可以有多個subscribe view。每個subscribe view可以從change table提取自己所關心的不同的columns 和rows。

          ??? 在Oracle10g中,CDC開始支持異步方式來捕獲變化的數據。在Oracle10g中利用redo log來實現CDC。Redo log是一個記錄所有數據庫變化的獨立的文件。CDC的異步方式是非入侵方式(noninvasive)的實現,對于變化數據的捕獲就不需要數據庫加入trigger了。
          ?
          ?
          Oracle的redo log有兩種類型:
          ?
          ● On-line redo log
          ● Archive redo log

          這種從redo log中讀取數據變化的方式叫做“日志挖掘”(mining the logs),這種方式使捕獲數據的操作從源數據的事務中分離出來,減輕了trigger技術給數據源帶來的性能問題(是減輕不是消除!)
          ?

          在Oracle10g中,異步CDC有兩種實現方式:
          ?
          ● HOTLOG
          ?
          這種方式利用On-line redo log。變化的數據從on-line redo log中獲取,然后通過(Oracle Streams技術)把獲取的變化保存在本地的變化表中(變化表還是在數據源上)。然后還需要其他方式把這些數據移到數據倉庫中。

          AUTOLOG
          ?
          這種方式與HOTLOG不同的是數據獲取的位置不同。AUTOLOG方式利用Log Transport Services技術在不同數據庫之間轉換數據時進行變化數據獲取操作的。如果目標數據庫正好是數據倉庫的話,這種方式就比HOTLOG方式減少了一次數據遷移的環節。
          ?
          Log Transport Services技術是標準的日志遷移技術,比如Oracle的Oracle Data Guard。
          ?

          Latency of change:
          ?
          從發生變化的數據源事務提交到利用CDC發現變化的數據,并且把變化的數據移植到變化表之間的時間間隔。HOTLOG方式的時間滯后要比AUTOLOG方式小。
          ?

          Oracle Streams

          Oracle Streams技術可以在多個數據庫之間進行數據備份,它通過在數據隊列里面記錄變化的數據。Oracle Streams技術也是利用數據庫的redo log來解析變化的數據的。Oracle Streams除了進行數據備份外,還可以用來作為數據倉庫的增量抽取。
          ?
          Oracle Streams與CDC技術比較,可以用一個形象的比喻來形容:
          ?
          把Oracle Streams比作磚、瓦;
          把CDC看作是用磚、瓦蓋成的大廈;
          ?
          Oracle Streams可以用來實現CDC,它們是輔助的關系,不是真正競爭的關系。
          ?
          ?
          ?
          ?
          ?
          ?
          ?
          上面是CDC的簡單介紹,下面轉一篇應用(但是怎么看都不太防舒服)
          *****************************************************************************************
          Oracle 10g CDC Test
          ?
          ?
          怎樣使用oracle 10G CDC(Capturing Changed Data)

          1)create the user on the source DB and target DB.

          sqlplus "/? as sysdba"
          connect system/manager

          set serveroutput on size 100000
          set linesize 2000
          create or replace package etl_util AUTHID CURRENT_USER
          IS
          ? FUNCTION get_col_names(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2;
          ? FUNCTION get_cols_definition(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2;
          ? FUNCTION get_modified_col_names(schema IN VARCHAR2, table_name IN VARCHAR2, source_colmap$ IN RAW)? RETURN VARCHAR2;
          END etl_util;
          /
          show errors
          ?
          create or replace package body etl_util
          IS
          -- ******************************************************************************
          -- PUBLIC FUNCTION get_col_names
          -- ******************************************************************************
          ? FUNCTION get_col_names(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2???
          ??? IS
          ? BEGIN
          ??? DECLARE
          ????? v_col_nm VARCHAR2(100);
          ????? v_ret VARCHAR2(32767);
          ????? v_work_str VARCHAR2(32767);
          ?
          ????? CURSOR c_tab_col(p_tbl_nm VARCHAR2, p_schm VARCHAR2) IS???????
          ??????????? SELECT column_name FROM all_tab_columns
          ??????????? WHERE owner = UPPER(TRIM(p_schm))
          ??????????? AND table_name = UPPER(TRIM(p_tbl_nm))
          ??????????? ORDER BY column_name;
          ?
          ? BEGIN
          ? OPEN c_tab_col(p_table_name, p_schema);
          ? v_ret := '';
          ? v_work_str := ','||LOWER(REPLACE(p_skip_columns,' ','')) || ',';
          ?
          ? LOOP
          ????? FETCH c_tab_col INTO v_col_nm;
          ????? EXIT WHEN c_tab_col%NOTFOUND;
          ??? IF INSTR(v_work_str,','||LOWER(v_col_nm)) = 0 THEN
          ??????? v_ret := v_ret || ',' || p_pref ||LOWER(v_col_nm);
          ??? END IF;
          ? END LOOP;
          ? CLOSE c_tab_col;
          ? IF (v_ret IS NOT NULL) THEN v_ret := SUBSTR(v_ret, 2);
          ? END IF;
          ? IF (v_ret IS NULL) THEN
          ????? raise_application_error(-20201,'Table ' || p_schema||'.'||p_table_name
          ??????? || ' Does not exist or has no columns');
          ? END IF;
          ?
          ? RETURN v_ret;
          ? END;
          ? END;
          -- ******************************************************************************
          -- END OF PUBLIC FUNCTION get_col_names
          -- ******************************************************************************
          ?
          -- ******************************************************************************
          -- PUBLIC FUNCTION get_cols_definition
          -- ******************************************************************************
          ? FUNCTION get_cols_definition(p_schema IN VARCHAR2, p_table_name IN VARCHAR2, p_skip_columns IN VARCHAR2, p_pref IN VARCHAR2 DEFAULT NULL) RETURN VARCHAR2???
          ??? IS
          ? BEGIN
          ??? DECLARE
          ????? v_col_nm VARCHAR2(100);
          ????? v_col_def VARCHAR2(300);
          ????? v_ret VARCHAR2(32767);
          ????? v_work_str VARCHAR2(32767);
          ????? CURSOR c_tab_col(p_tbl_nm VARCHAR2, p_schm VARCHAR2) IS???????
          ??????? SELECT column_name,
          ??????????? RPAD(column_name,53) ||
          ??????? decode(data_type
          ??????????? ,'NUMBER','NUMBER('||TO_CHAR(data_precision)||
          ??????????????? decode(data_scale,0,'',','||data_scale)||')'
          ??????????? ,'CHAR','CHAR('||TO_CHAR(DATA_LENGTH)||')'
          ??????????? ,'VARCHAR2','VARCHAR2('||TO_CHAR(DATA_LENGTH)||')'
          ??????????? ,'DATE','DATE'
          ??????????? ,data_type) def
          ??????? from all_tab_columns
          ??????? where table_name = upper(trim(p_tbl_nm))
          ??????????? AND owner = upper(trim(p_schm))
          ??????? order by owner, table_name, column_name
          ??????? ;
          ? BEGIN
          ? OPEN c_tab_col(p_table_name, p_schema);
          ? v_ret := '';
          ? v_work_str := ','||LOWER(REPLACE(p_skip_columns,' ','')) || ',';
          ?
          ? LOOP
          ????? FETCH c_tab_col INTO v_col_nm, v_col_def;
          ????? EXIT WHEN c_tab_col%NOTFOUND;
          ??? IF INSTR(v_work_str,','||LOWER(v_col_nm)) = 0 THEN
          ??????? v_ret := v_ret || ', ' || p_pref ||LOWER(v_col_def);
          ??? END IF;
          ? END LOOP;
          ? CLOSE c_tab_col;
          ? IF (v_ret IS NOT NULL) THEN v_ret := SUBSTR(v_ret, 2); END IF;
          ? IF (v_ret IS NULL) THEN
          ????? raise_application_error(-20201,'Table ' || p_schema||'.'||p_table_name
          ??????? || ' Does not exist or has no columns');
          ? END IF;
          ?
          ? RETURN v_ret;
          ? exception
          ????????? when others
          ??????????? then dbms_output.put_line(SUBSTR('Error in get_cols_definition ' || sqlerrm,1,220));
          ? return 'error ' || sqlerrm ;
          ? END;
          ? END;
          -- ******************************************************************************
          -- END OF PUBLIC FUNCTION get_cols_definition
          -- ******************************************************************************
          ?
          -- ******************************************************************************
          -- PUBLIC FUNCTION get_modified_col_names
          -- ******************************************************************************
          ? FUNCTION get_modified_col_names(schema IN VARCHAR2, table_name IN VARCHAR2, source_colmap$ IN RAW) RETURN VARCHAR2
          ??? IS
          ? BEGIN
          ??? DECLARE
          ????? v_col_nm VARCHAR2(100);
          ????? v_col_pos number;
          ????? v_ret VARCHAR2(32767);
          ????? v_work_str VARCHAR2(32767);
          ??? v_byte_nr INTEGER;
          ??? v_col_map VARCHAR2(32767);
          ??? i INTEGER; j INTEGER;k INTEGER;
          ??? v_val VARCHAR2(5);
          ????? CURSOR c_tab_col(p_tbl_nm VARCHAR2, p_schm VARCHAR2) IS???????
          ??????? SELECT column_name, POWER(2,MOD(column_id,8)) col_pos, 1 + floor((column_id)/8) byte_nr
          ??????????? FROM all_tab_cols
          ??????????? WHERE table_name=UPPER(p_tbl_nm)
          ??????????????? AND OWNER=UPPER(p_schm)
          ??????????? ORDER BY column_id;???????
          ? BEGIN
          ????? SELECT dump(source_colmap$,10) INTO v_col_map FROM DUAL;
          ????? i:=INSTR(v_col_map,':');
          ????? v_col_map:=','||SUBSTR(v_col_map,i + 2)||',';
          --??? dbms_output.put_line('v_col_map='||v_col_map);
          ????? OPEN c_tab_col(table_name, schema);
          ????? v_ret := '';
          ????? LOOP
          --??? dbms_output.put_line('--1');
          ????????? FETCH c_tab_col INTO v_col_nm, v_col_pos, v_byte_nr;
          ????????? EXIT WHEN c_tab_col%NOTFOUND;
          --??? dbms_output.put_line('byte_nr='||to_char(v_byte_nr));
          ????????? i:=INSTR(v_col_map,',',1,v_byte_nr);
          ????????? j:=INSTR(v_col_map,',',1,v_byte_nr + 1);
          ????????? IF ((i=0) OR (j = 0)) THEN
          ????????????? EXIT;
          ????????? END IF;
          ????????? v_val := SUBSTR(v_col_map, i + 1, j - i - 1 );
          ????????? k := BITAND(TO_NUMBER(v_val),v_col_pos);
          ????????? IF (k>0) THEN
          ????????????? v_ret := v_ret || ',' || v_col_nm;
          ????????? END IF;
          --??? DBMS_OUTPUT.put_line('col='||v_col_nm ||' byte='||TO_CHAR(v_byte_nr)
          --????????? || ' i=' || TO_CHAR(i) || ' j='||TO_CHAR(j)
          --????????? ||' val='||v_val||' col_pos='||TO_CHAR(v_col_pos)||'res='||TO_CHAR(k));
          ????? END LOOP;
          ????? CLOSE c_tab_col;
          ????? IF (v_ret IS NOT NULL) THEN
          ????????? v_ret := SUBSTR(v_ret,2);
          ????? END IF;
          ????? RETURN v_ret;
          ? EXCEPTION
          ??????? WHEN OTHERS
          ??????????? THEN dbms_output.put_line(SUBSTR('Error in get_modifed_col_names: ',1,220));
          ??????????????? dbms_output.put_line(SUBSTR(sqlerrm,1,240));
          ????? return '';
          ? END;
          ? END;
          -- ******************************************************************************
          -- END OF PUBLIC FUNCTION get_modified_col_names
          -- ******************************************************************************
          ?
          END etl_util;
          /
          show errors

          grant execute on etl_util to public;
          DROP PUBLIC SYNONYM etl_util;
          CREATE PUBLIC SYNONYM etl_util FOR etl_util;
          --select etl_util.get_modified_col_names(schema1 => 'SCOTT'
          --,table_name => 'EMP'
          --,source_colmap$ => HEXTORAW('C000')) FROM dual;
          --
          BEGIN
          declare v_res VARCHAR2(1000);
          BEGIN
          ??? v_res := etl_util.get_modified_col_names('test', 'philip_test', HEXTORAW('0601'));
          ??? dbms_output.put_line('res='||v_res);
          END;
          END;
          /

          2)create the user

          connect system/manager
          DROP USER cdc_publisher CASCADE;
          CREATE USER cdc_publisher IDENTIFIED BY pass;
          GRANT EXECUTE_CATALOG_ROLE to cdc_publisher;
          GRANT SELECT_CATALOG_ROLE to cdc_publisher;
          ?
          3)grant the priviledge

          connect test/pass
          GRANT SELECT on PHILIP_TEST to cdc_publisher;
          GRANT SELECT on PHILIP_TEST2 to cdc_publisher;
          --the PHILIP_TEST2? is under? user?? "test"

          4)Create Change Tables
          --connect cdc_publisher/pass@sde223_dvweb
          set serveroutput on size 1000000
          set linesize 120
          ?
          /* Login as a publisher to run this */
          BEGIN
          ??? DECLARE work_sql VARCHAR2(32767);
          ??????? v_publisher_id VARCHAR2(20)? := 'cdc_publisher';
          ??????? v_source_schema VARCHAR2(20) := 'test';
          ??????? v_source_table VARCHAR2(100);
          ??????? v_cdc_table VARCHAR2(100);
          ??????? CURSOR c_tables IS
          ??????????? SELECT 'PHILIP_TEST' table_name FROM dual
          ??????????????? UNION
          ??????????? SELECT 'PHILIP_TEST2' table_name FROM dual
          ??????? ;
          BEGIN
          FOR tables IN c_tables LOOP
          ? v_source_table := tables.table_name;
          ? v_cdc_table := 'rep_'||v_source_table;
          ? work_sql := etl_util.get_cols_definition(
          ??????? p_schema=>v_source_schema,
          ??????? p_table_name =>v_source_table,
          ??????? p_skip_columns => NULL,
          ??????? p_pref => NULL);
          ??? DBMS_output.put_line('work_sql'||work_sql);
          ? DBMS_LOGMNR_CDC_PUBLISH.CREATE_CHANGE_TABLE (
          ??? OWNER => v_publisher_id, SOURCE_SCHEMA => v_source_schema
          ??? ,CHANGE_SET_NAME => 'SYNC_SET', CAPTURE_VALUES => 'new'--only Captures the changed values from the source table
          ??? ,RS_ID => 'n', ROW_ID => 'n', USER_ID => 'y', TIMESTAMP => 'y'
          ??? ,OBJECT_ID => 'n' -- leave it as 'N' or you will have "table has no columns" error
          ??? ,SOURCE_COLMAP => 'n', TARGET_COLMAP => 'n', OPTIONS_STRING => null
          ??? ,SOURCE_TABLE => v_source_table, CHANGE_TABLE_NAME => v_cdc_table
          ??? ,COLUMN_TYPE_LIST => work_sql);
          ? DBMS_output.put_line('Change table '||v_cdc_table ||' was created successfully');
          END LOOP;
          ?
          EXCEPTION
          ??? WHEN OTHERS THEN
          ??? DBMS_output.put_line('Error start ********************************************');
          ??? DBMS_output.put_line('Error during change table ' || v_cdc_table || ' creation:');
          ??? DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm);
          ??? DBMS_output.put_line('Error end? ********************************************');
          END;
          END;
          /

          5)when we change the "test.PHILIP_TEST" and "test.PHILIP_TEST2" ,under cdc_publisher , the table "cdc_philip_test will recorde this operation and the value at once.
          ?
          ?
          *****************************************************************************************
          ?
          ?
          ?
          ?
          ?
          另附一篇比較詳細的CDC原理研究:
          *****************************************************************************************
          oracle學習--CDC 研究(1)
          http://hi.baidu.com/ybgba/blog/item/898d93366f79c8d5a2cc2b01.html
          ?
          oracle學習--CDC 研究(2)
          http://hi.baidu.com/ybgba/blog/item/988d113d7c18ddcf9e3d6201.html
          ?
          ?
          ?
          posted on 2009-06-15 19:49 decode360 閱讀(1539) 評論(0)  編輯  收藏 所屬分類: 10.DB_Tools
          主站蜘蛛池模板: 葫芦岛市| 剑河县| 黄梅县| 青海省| 泗阳县| 南阳市| 朝阳区| 云阳县| 监利县| 措勤县| 潍坊市| 凤山县| 城市| 霸州市| 凌海市| 肇州县| 凌源市| 镶黄旗| 固安县| 龙胜| 台东市| 鄂尔多斯市| 瑞金市| 汾阳市| 华亭县| 洛隆县| 冷水江市| 扬中市| 疏附县| 瑞丽市| 芜湖市| 云林县| 治多县| 上思县| 延寿县| 岳池县| 长治市| 黔江区| 高雄市| 沽源县| 舞钢市|