• Non ci sono risultati.

7.4 Caricamento dati dalle sorgenti all’area di staging

7.4.1 Store Procedure LOAD_STG

7.4.1.5 Caricamento delle tabelle di secondo livello d

Le tabelle di staging di secondo livello vengono caricate in delta. Tuttavia, può capitare che alcune tabelle del sistema sorgente non abbiano le chiavi pri- marie definite (ad esempio, questo può accadere quando le tabelle sono di log). Quindi, per le tabelle di secondo livello, bisogna distinguere quali sono quel- le che vanno caricate in delta da quelle che vanno caricate senza considerare i vincoli di chiave primaria.

Caricamento in delta.

Prima di fare qualsiasi operazione sulla tabella di staging di secondo livello, bisogna stabilire la tipologia di aggiornamento sulla tabella stessa, cioè se si andrà ad aggiornare dei campi (update) o inserire un nuovo record (insert). Questa informazione viene registrata nel campo tecnico OPERATION_CD, della tabella di staging di primo livello (Lista 7.9).

7.4. CARICAMENTO DATI DALLE SORGENTI ALL’AREA DI STAGING 73

Lista 7.9: Merge tra la tabella di primo e secondo livello CMD_MERGE_1LEV := '

MERGE INTO '||V_STG_SCHEMA_1_LEV ||'.'|| V_STG_TABLE_NAME_CD ||'_T DEST USING (

SELECT A.ROWID, CASE WHEN B.ROWID IS NOT NULL THEN ''U'' WHEN B.ROWID IS NULL THEN ''I'' ELSE NULL END AS OPERATION_CD

FROM '||V_STG_SCHEMA_1_LEV ||'.'|| V_STG_TABLE_NAME_CD ||'_T A

LEFT OUTER JOIN '|| V_STG_SCHEMA_2_LEV||'.'|| V_STG_TABLE_NAME_CD || ' B ON

'||V_JOIN_CONDITION_DESC || ' AND B.RCRD_STS_CD =''A''

AND B.END_TMSTMP=TO_DATE(''99991231'',''YYYYMMDD'') AND B.FACILITY_CD= '''||IN_SRC_FACILITY_CD||''' AND B.SRC_INST_CD= '''||IN_SRC_INST_CD||'''

WHERE ( B.ROWID IS NOT NULL AND

('||V_DIFF_CONDITION_DESC||') ) OR B.ROWID IS NULL ) ORIG ON (DEST.ROWID=ORIG.ROWID)

WHEN MATCHED THEN UPDATE SET

DEST.OPERATION_CD=ORIG.OPERATION_CD ';

DBMS_OUTPUT.PUT_LINE(' --> QUERY CMD_INSERT_1LEV: '||CMD_MERGE_1LEV || ' ');

EXECUTE IMMEDIATE CMD_MERGE_1LEV; COMMIT;

La store procedure effettua una left join della tabella di staging di primo livello con quella di secondo livello. Se nella parte destra della join ci sono dei valori a NULL vuol dire che per alcuni record della tabella di staging di secondo livello non c’è corrispondenza con la tabella di staging di primo livello. Se non c’è corrispondenza vanno inseriti dei nuovi campi, e quindi verrà eseguita una operazione di Insert. Nel caso contrario, invece, verrà fatto un aggiornamento dei campi.

Aggiornamento tabella di staging di secondo livello. Lo step della store pro- cedure precedentemente descritto ha il compito di valorizzare la variabile OPE- RATION_CD. Nel caso in cui questa variabile assuma valore ‘U’ si procede con l’aggiornamento degli attributi della tabella si staging di secondo livello. Viene definito un cursore all’interno del quale sono presenti le tuple della tabella di secondo livello da aggiornare.1 Nello specifico, le tuple inserite sono composte dai campi ROWID per la tabella di staging di primo livello e per la tabella di

1Il cursore è uno strumento, messo a disposizione da PL/SQL, per la gestione di query che

74 7. ESTRAZIONE, TRASFORMAZIONE ECARICAMENTO staging di secondo livello (Lista 7.10).1 Vengono selezionati solo i ROWID dei

record delle due table di staging risultati dalla INNER JOIN tra i campi chiave di business delle tabelle (la condizione di join viene definita nel campo JOIN_CON- DITION_DESC della tabella di configurazione TBSTGCNF_LOAD_STG) , dove il campo OPERATION_CD della tabella di primo livello è settato a ‘U’ , e quindi sono da aggiornare.

Lista 7.10: Inizializzazione variabile CURSOR_UPDATE SELECT A.ROWID , B.ROWID

FROM '||V_STG_SCHEMA_1_LEV||'. '||V_STG_TABLE_NAME_CD||' _T A INNER JOIN '||

V_STG_SCHEMA_2_LEV||'.'||V_STG_TABLE_NAME_CD|| ' B ON '||V_JOIN_CONDITION_DESC ||

' AND A.OPERATION_CD=''U'' AND B.RCRD_STS_CD =''A'' AND B.END_TMSTMP=TO_DATE(''99991231'',''YYYYMMDD'') AND B.FACILITY_CD= '''||IN_SRC_FACILITY_CD||''' AND B.SRC_INST_CD= '''||IN_SRC_INST_CD||''' ';

Allo scopo di tener traccia dei cambiamenti avvenuti nella tabella di staging di secondo livello, e quindi storicizzare un determinato record, viene definita la variabile CMD_HISTOR (Lista 7.11). Questa variabile viene valorizzata con i comandi di insert sui campi dei record da storicizzare. In questo caso, per le tuple storicizzate, il campo RCRD_STS_CD assumerà valore ‘S’.

Lista 7.11: Inizializzazione variabile CMD_HISTOR CMD_HISTOR:= '

INSERT INTO '||V_STG_SCHEMA_2_LEV||'. '||V_STG_TABLE_NAME_CD ||' ( FACILITY_CD,SRC_INST_CD,RCRD_STS_CD,INS_TMSTMP, UPD_TMSTMP,END_TMSTMP,' || V_INSERT_COLUMNS ||' ) SELECT '''||IN_SRC_FACILITY_CD||''','''||IN_SRC_INST_CD ||''',''S'' AS RCRD_STS_CD,INS_TMSTMP,UPD_TMSTMP,:1 END_TMSTMP,'||V_SELECT_COLUMNS||' FROM '||V_STG_SCHEMA_2_LEV||'.'||V_STG_TABLE_NAME_CD ||' WHERE ROWID= :2 ' ;

La variabile CMD_UPDATE viene valorizzata con lo statement utilizzato per ag- giornare i campi della tabella di staging di secondo livello (Lista 7.12). Il cam- po ROWID viene utilizzato per considerare le tuple delle due tabelle di staging interessate.

essere aperto, valorizzato ed infine chiuso

1In Oracle il ROWID è una pseudocolonna, esistente per ogni tabella, che rappresenta

7.4. CARICAMENTO DATI DALLE SORGENTI ALL’AREA DI STAGING 75

Lista 7.12: Inizializzazione variabile CURSOR_UPDATE CMD_UPDATE:= '

MERGE INTO '||V_STG_SCHEMA_2_LEV||'.'|| V_STG_TABLE_NAME_CD ||' DEST USING ( SELECT :1 AS ROWID_STG2L,A.*

FROM '||V_STG_SCHEMA_1_LEV ||'.'||V_STG_TABLE_NAME_CD ||' _T A WHERE ROWID= :2 ) ORIG

ON (DEST.ROWID=ORIG.ROWID_STG2L) WHEN MATCHED THEN

UPDATE SET DEST.FACILITY_CD='''||IN_SRC_FACILITY_CD||''', DEST.SRC_INST_CD='''||IN_SRC_INST_CD||''',

DEST.RCRD_STS_CD=''A'', DEST.UPD_TMSTMP= :3,

DEST.END_TMSTMP=TO_DATE(''99991231'',''YYYYMMDD''), ' || V_MERGE_COLUMNS ;

Definita la variabile CURSOR_UPDATE da utilizzare per il cursore, e le variabili CMD_HISTOR e CMD_UPDATE per storicizzare e aggiornare il record rispetti- vamente, nella store procedure viene definito un ciclo per eseguire i comandi di aggiornamento (Lista 7.13). Prima che il ciclo abbia inizio, il cursore deve essere aperto. Aprire un cursore comporta l’esecuzione della query ad esso associato. Nello specifico viene eseguita la query contenuta nella variabile CURSOR_UP- DATE in modo tale che il cursore CURSOR1 possa ciclare sulle tuple della tabella di staging di primo livello e sulle tuple da aggiornare della tabella di secondo livello. Le tuple vengono salvate nelle variabili V_ROWID_STG1L e V_ROWID_- STG2L. Per ogni tupla trovata nel ciclo, se la variabile di storicizzazione V_HI- ST_FLG è settata a ‘Y’ si procede alla storicizzazione del record, eseguendo il co- mando contenuto nella variabile CMD_HISTOR. Per ogni tupla viene fatto l’ag- giornamento, eseguendo il comando contenuto nella variabile CMD_UPDATE. Infine viene tenuta traccia del numero di record che si stanno aggiornando. Que- st’informazione viene memorizzata dalla variabile v_STG_ROWS_UPD_NBR e verrà utilizzata per popolare la tabella di log TBSTGLOG_LOAD_STG.

76 7. ESTRAZIONE, TRASFORMAZIONE ECARICAMENTO

Lista 7.13: Aggiornamento della tabella di staging di secondo livello I:=0;

OPEN CURSOR1 FOR CURSOR_UPDATE;

FETCH CURSOR1 INTO V_ROWID_STG1L,V_ROWID_STG2L; LOOP

EXIT WHEN CURSOR1\%NOTFOUND; I:=I+1;

IF V_HIST_FLG='Y' THEN EXECUTE IMMEDIATE

CMD_HISTOR USING IN_TIMESTAMP_RIF, V_ROWID_STG2L; END IF; EXECUTE IMMEDIATE CMD_UPDATE USING V_ROWID_STG2L,

V_ROWID_STG1L, IN_TIMESTAMP_RIF ;

v_STG_ROWS_UPD_NBR :=v_STG_ROWS_UPD_NBR+1; IF I=V_ROW_COMMIT THEN COMMIT; I:=0; END IF; FETCH CURSOR1 INTO V_ROWID_STG1L,V_ROWID_STG2L; END LOOP;

COMMIT;

CLOSE CURSOR1;

Operazione di inserimento sulla tabella di staging di secondo livello. Nel caso in cui l’attributo OPERATION_CD, della tabella di staging di primo livello, sia settato a ‘I’ , si procede all’inserimento di nuove tuple all’interno della tabella di staging di secondo livello. Viene definita la variabile CURSOR_INSERT che verrà utilizzata da un cursore per ciclare sulle tuple della tabella di primo livello da inserire nella tabella di secondo livello (Lista 7.14).

Lista 7.14: Inizializzazione variabile CURSOR_INSERT CURSOR_INSERT := '

SELECT A.ROWID FROM '||V_STG_SCHEMA_1_LEV||' .'||V_STG_TABLE_NAME_CD ||'_T A

WHERE A.OPERATION_CD=''I'' ';

Nella variabile CMD_INSERT viene definito il comando di Insert necessario per inserire i nuovi record (Lista 7.15).

7.4. CARICAMENTO DATI DALLE SORGENTI ALL’AREA DI STAGING 77

Lista 7.15: Inizializzazione variabile CMD_INSERT CMD_INSERT:= '

INSERT INTO '||V_STG_SCHEMA_2_LEV||'. '||V_STG_TABLE_NAME_CD ||' (FACILITY_CD,SRC_INST_CD,RCRD_STS_CD,INS_TMSTMP, UPD_TMSTMP,END_TMSTMP,' || V_INSERT_COLUMNS ||' ) SELECT '''||IN_SRC_FACILITY_CD||''', '''||IN_SRC_INST_CD||''',''A'' AS RCRD_STS_CD, :1 AS INS_TMSTMP,:2 AS UPD_TMSTMP, TO_DATE(''99991231'',''YYYYMMDD'') END_TMSTMP, '|| V_SELECT_COLUMNS || ' FROM '||V_STG_SCHEMA_1_LEV||'.

'||V_STG_TABLE_NAME_CD ||'_T WHERE ROWID= :3 ' ;

La logica del cursore utilizzato per ciclare sulle tuple da inserire in tabella è la stessa per quanto fatto in caso di aggiornamento. La variabile contente il comando da eseguire è CMD_INSERT, mentre la gestione dei campi storicizza- ti ovviamente non viene affrontata. L’informazione inerente al numero di re- cord inseriti, da tener traccia nella tabella di log TBSTGLOG_LOAD_STG, viene memorizzata nella variabile v_STG_ROWS_INS_NBR (Lista 7.16).

Lista 7.16: Operazione di inserimento sulla tabella di staging di secondo livello I:=0;

OPEN CURSOR1 FOR CURSOR_INSERT; FETCH CURSOR1 INTO V_ROWID_STG1L; LOOP

EXIT WHEN CURSOR1\%NOTFOUND; I:=I+1;

EXECUTE IMMEDIATE CMD_INSERT USING IN_TIMESTAMP_RIF, IN_TIMESTAMP_RIF,V_ROWID_STG1L;

v_STG_ROWS_INS_NBR :=v_STG_ROWS_INS_NBR+1 IF I=V_ROW_COMMIT THEN COMMIT; I:=0; END IF; FETCH CURSOR1 INTO V_ROWID_STG1L;

END LOOP; COMMIT;

CLOSE CURSOR1;

Implementazione logica delete. Nell’area di staging è stato implementato un meccanismo di delete allo scopo di individuare e storicizzare le cancellazioni fatte sui dati dal sistema sorgente. Nello staging di livello 1 sono presenti dei dati provenienti dai sistemi sorgenti, mentre nello staging di livello 2 i dati pro- vengono dallo staging di primo livello, inserendo nuovi record o aggiornando quelli esistenti. Per individuare quali sono i record che non sono più presenti nel sistema sorgente, viene fatto un merge dei dati tra il livello 1 e il livello 2 del-

78 7. ESTRAZIONE, TRASFORMAZIONE ECARICAMENTO lo staging. I record appartenenti al secondo livello di staging che non trovano corrispondenza con i record dello staging di livello 1 vengono marcati a delete, e vengono quindi storicizzati come dati non più presenti nel sistema sorgente. Nello specifico, la tabella di secondo livello di staging viene messa in LEFT JOIN con la tabella di staging di primo livello. I valori che sono a NULL indicano che tra le due tabelle non c’è stata una corrispondenza. Questo succede quando dal sistema sorgente è stato eliminato un record. Lo staging di primo livello carica giornalmente i dati direttamente dalla sorgente, mentre lo staging di secondo livello si aggiorna con cadenza settimanale. Se quindi, mettendo in giunzione le tabelle tra il primo e il secondo livello di staging non c’è una corrispondenza, vuol dire che la tabella appartenente al primo livello non ha più quel record, ed essendo le tabelle di primo livello popolate dai dati più recenti provenienti dai sistemi sorgenti, vuol dire che il record in più presente nella tabella di se- condo livello deve essere considerato come un record da eliminare. Nella store procedure, la variabile CURSOR_DELETE contiene il comando atto a mettere in giunzione le tabelle dei due livelli di staging, e filtrare solo per le tuple della tabella di secondo livello che hanno il campo ROWID valorizzato a NULL (Lista 7.17).

Lista 7.17: Inizializzazione variabile CURSOR_DELETE CURSOR_DELETE:= '

SELECT A.ROWID

FROM '||V_STG_SCHEMA_2_LEV||'.'||V_STG_TABLE_NAME_CD ||' A

LEFT OUTER JOIN '|| V_STG_SCHEMA_1_LEV||'. '||V_STG_TABLE_NAME_CD

||'_T B ON '||V_JOIN_CONDITION_DESC || '

WHERE B.ROWID IS NULL AND A.RCRD_STS_CD =''A'' AND A.FACILITY_CD= '''||IN_SRC_FACILITY_CD||''' AND A.SRC_INST_CD= '''||IN_SRC_INST_CD||'''' ||

V_SRC_FILTER_DESC;

Nella variabile CMD_DELETE viene definito il comando che permette di as- segnare all’attributo RCRD_STS_CD il valore ‘D’, per indicare che quella tupla viene contrassegnata come cancellata (Lista 7.18).

Lista 7.18: Inizializzazione variabile CMD_DELETE CMD_DELETE:= '

UPDATE '||V_STG_SCHEMA_2_LEV||'.'||V_STG_TABLE_NAME_CD ||' SET RCRD_STS_CD=''D'' ,END_TMSTMP=:1

WHERE ROWID= :2 ' ;

Infine, viene aperto il cursore sulla variabile CURSOR_DELETE (Lista 7.19). La logica del ciclo è del tutto analoga a quanto detto precedentemente in meri-

7.4. CARICAMENTO DATI DALLE SORGENTI ALL’AREA DI STAGING 79 to ai metodi di aggiornamento e inserimento dei record. Anche in questo caso, nella tabella di log viene tenuta traccia dell’informazione riguardante il nume- ro di record che sono stati cancellati, andando a salvare l’informazione nella variabile v_STG_ROWS_DEL_NBR.

Lista 7.19: Esecuzione logica di delete I:=0;

OPEN CURSOR1 FOR CURSOR_DELETE; FETCH CURSOR1 INTO V_ROWID_STG2L; LOOP

EXIT WHEN CURSOR1\%NOTFOUND; I:=I+1;

EXECUTE IMMEDIATE CMD_DELETE

USING IN_TIMESTAMP_RIF,V_ROWID_STG2L; v_STG_ROWS_DEL_NBR :=v_STG_ROWS_DEL_NBR+1;

IF I=V_ROW_COMMIT THEN COMMIT; I:=0; END IF; FETCH CURSOR1 INTO V_ROWID_STG2L;

END LOOP; COMMIT;

CLOSE CURSOR1;

Caricamento in full.

Nel caso in cui il campo FULL_LOAD_FLG, della tabella di configurazione TBSTGCNF_LOAD_STG, sia settato a ‘Y’, anche le tabelle del secondo livello di staging vengono caricate in modalità full. La prima operazione da compiere è settare il campo OPERATION_CD, della tabella di staging di primo livello, a ‘I’ (Lista 7.20). In base al valore di questa variabile, infatti, la store procedure procede con operazioni di aggiornamento o di inserimento.

Lista 7.20: Valorizzazione dell’attributo OPERATION_CD CMD_MERGE_1LEV := '

UPDATE '||V_STG_SCHEMA_1_LEV ||'.'|| V_STG_TABLE_NAME_CD ||'_T DEST

SET DEST.OPERATION_CD=''I'' '; EXECUTE IMMEDIATE CMD_MERGE_1LEV;

COMMIT;

Nel caso in cui il caricamento dei dati sia in modalità full, anche per le tabelle appartenenti al secondo livello di staging bisogna cancellare i record presenti prima di poterne inserire di nuovi (Lista 7.21).

80 7. ESTRAZIONE, TRASFORMAZIONE ECARICAMENTO

Lista 7.21: Cancellazione record sulla tabella di staging di secondo livello EXECUTE IMMEDIATE 'DELETE FROM '||V_STG_SCHEMA_2_LEV||'

.'||V_STG_TABLE_NAME_CD ||' WHERE FACILITY_CD=

'''||IN_SRC_FACILITY_CD||''' AND SRC_INST_CD=

'''||IN_SRC_INST_CD||'''' ;

All’interno della variabile CURSOR_INSERT viene specificato il comando che permette di collezionare i ROWID dei record da inserire nella tabella di staging (Lista 7.22).

Lista 7.22: Inizializzazione variabile CURSOR_INSERT CURSOR_INSERT := '

SELECT A.ROWID FROM '||V_STG_SCHEMA_1_LEV||'. '||V_STG_TABLE_NAME_CD ||'_T A

WHERE A.OPERATION_CD=''I'' ';

Infine, la variabile CMD_INSERT, viene utilizzata per definire il comando di INSERT sui campi tecnici e su quelli originari della tabella, contenuti nella variabile V_INSERT_COLUMNS (Lista 7.23).

Lista 7.23: Inizializzazione variabile CMD_INSERT CMD_INSERT:= '

INSERT INTO '||V_STG_SCHEMA_2_LEV||' .'||V_STG_TABLE_NAME_CD ||' (FACILITY_CD,SRC_INST_CD,RCRD_STS_CD,INS_TMSTMP, UPD_TMSTMP,END_TMSTMP,' || V_INSERT_COLUMNS ||' ) SELECT '''||IN_SRC_FACILITY_CD||''', '''||IN_SRC_INST_CD||''',''A'' AS RCRD_STS_CD, :1 AS INS_TMSTMP,:2 AS UPD_TMSTMP, TO_DATE(''99991231'',''YYYYMMDD'') END_TMSTMP, '|| V_SELECT_COLUMNS || ' FROM '||V_STG_SCHEMA_1_LEV||'

.'||V_STG_TABLE_NAME_CD ||'_T WHERE ROWID= :3 ' ; Per inserire ogni singolo record nella tabella di destinazione, viene aperto il cursore sulla variabile CURSOR_INSERT (Lista 7.24).

Come sempre, l’informazione sul numero di record inseriti, viene memoriz- zata nella variabile v_STG_ROWS_INS_NBR.

7.4. CARICAMENTO DATI DALLE SORGENTI ALL’AREA DI STAGING 81

Lista 7.24: Caricamento dati nella tabella di staging di secondo livello I:=0;

OPEN CURSOR1 FOR CURSOR_INSERT; FETCH CURSOR1 INTO V_ROWID_STG1L; LOOP

EXIT WHEN CURSOR1\%NOTFOUND; I:=I+1;

EXECUTE IMMEDIATE CMD_INSERT USING IN_TIMESTAMP_RIF, IN_TIMESTAMP_RIF,V_ROWID_STG1L;

v_STG_ROWS_INS_NBR :=v_STG_ROWS_INS_NBR+1; IF I=V_ROW_COMMIT THEN COMMIT; I:=0; END IF; FETCH CURSOR1 INTO V_ROWID_STG1L;

END LOOP; COMMIT;

CLOSE CURSOR1; END IF;

Documenti correlati