create or replace PACKAGE BODY CT_MRDS.FILE_ARCHIVER AS ---------------------------------------------------------------------------------------------------- -- PRIVATE FUNCTION: GET_ARCHIVAL_WHERE_CLAUSE ---------------------------------------------------------------------------------------------------- /** * @name GET_ARCHIVAL_WHERE_CLAUSE * @desc Private function that generates WHERE clause based on ARCHIVAL_STRATEGY configuration. * Supports three strategies: THRESHOLD_BASED, MINIMUM_AGE_MONTHS, HYBRID. * @param pSourceFileConfig - Source file configuration record with ARCHIVAL_STRATEGY * @return VARCHAR2 - WHERE clause for filtering archival candidates **/ FUNCTION GET_ARCHIVAL_WHERE_CLAUSE( pSourceFileConfig IN CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE ) RETURN VARCHAR2 IS vWhereClause VARCHAR2(4000); cgBL CONSTANT VARCHAR2(2) := CHR(13)||CHR(10); BEGIN CASE pSourceFileConfig.ARCHIVAL_STRATEGY -- Legacy threshold-based strategy (backward compatible) WHEN 'THRESHOLD_BASED' THEN vWhereClause := 'extract(day from (systimestamp - workflow_start)) > ' || pSourceFileConfig.DAYS_FOR_ARCHIVE_THRESHOLD; -- Archive data older than X months (0 = current month only) WHEN 'MINIMUM_AGE_MONTHS' THEN IF pSourceFileConfig.MINIMUM_AGE_MONTHS IS NULL THEN RAISE_APPLICATION_ERROR(-20001, 'MINIMUM_AGE_MONTHS must be configured for MINIMUM_AGE_MONTHS strategy'); END IF; vWhereClause := 'workflow_start < ADD_MONTHS(TRUNC(SYSDATE, ''MM''), -' || pSourceFileConfig.MINIMUM_AGE_MONTHS || ')'; -- Hybrid: Current month exclusion AND minimum age requirement WHEN 'HYBRID' THEN IF pSourceFileConfig.MINIMUM_AGE_MONTHS IS NULL THEN RAISE_APPLICATION_ERROR(-20001, 'MINIMUM_AGE_MONTHS must be configured for HYBRID strategy'); END IF; vWhereClause := 'TRUNC(workflow_start, ''MM'') < TRUNC(SYSDATE, ''MM'') ' || 'AND workflow_start < ADD_MONTHS(TRUNC(SYSDATE, ''MM''), -' || pSourceFileConfig.MINIMUM_AGE_MONTHS || ')'; ELSE RAISE_APPLICATION_ERROR(-20002, 'Invalid ARCHIVAL_STRATEGY: ' || pSourceFileConfig.ARCHIVAL_STRATEGY); END CASE; RETURN vWhereClause; END GET_ARCHIVAL_WHERE_CLAUSE; ---------------------------------------------------------------------------------------------------- FUNCTION GET_TABLE_STAT(pSourceFileConfigKey IN NUMBER) RETURN CT_MRDS.A_TABLE_STAT%ROWTYPE IS vTableStat CT_MRDS.A_TABLE_STAT%ROWTYPE; vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vCount PLS_INTEGER; vSourceFileType CT_MRDS.A_SOURCE_FILE_CONFIG.SOURCE_FILE_TYPE%TYPE; BEGIN vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey),NULL))); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','DEBUG', vParameters); SELECT count(*) , min(SOURCE_FILE_TYPE) INTO vCount, vSourceFileType FROM CT_MRDS.A_TABLE_STAT s JOIN CT_MRDS.A_SOURCE_FILE_CONFIG c ON s.A_SOURCE_FILE_CONFIG_KEY = c.A_SOURCE_FILE_CONFIG_KEY WHERE s.A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey; IF vCount=0 and vSourceFileType='INPUT' THEN GATHER_TABLE_STAT(pSourceFileConfigKey); END IF; BEGIN SELECT * INTO vTableStat FROM CT_MRDS.A_TABLE_STAT WHERE A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey; -- EXCEPTION -- WHEN NO_DATA_FOUND THEN -- END; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','DEBUG',vParameters); RETURN vTableStat; END GET_TABLE_STAT; ---------------------------------------------------------------------------------------------------- PROCEDURE ARCHIVE_TABLE_DATA ( pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE ) IS vSourceFileConfig CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE; vTableStat CT_MRDS.A_TABLE_STAT%ROWTYPE; vQuery VARCHAR2(4000); vTableName VARCHAR2(200); vUri VARCHAR2(1000); vfiles T_FILENAMES; vFilename VARCHAR2(300); vOperationId NUMBER := -1; vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vArchivalTriggeredBy VARCHAR2(60); -- Possible values: FILES_COUNT, ROWS_COUNT, BYTES_SUM vUserLoadOperations USER_LOAD_OPERATIONS%ROWTYPE; vProcessControlStatus VARCHAR2(60) := 'OK'; BEGIN vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'))); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); vSourceFileConfig := CT_MRDS.FILE_MANAGER.GET_SOURCE_FILE_CONFIG(pSourceFileConfigKey => pSourceFileConfigKey); vTableStat := GET_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey); if vSourceFileConfig.SOURCE_FILE_TYPE <> 'INPUT' then CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_NOT_INPUT_SOURCE_FILE_TYPE, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_NOT_INPUT_SOURCE_FILE_TYPE, CT_MRDS.ENV_MANAGER.MSG_NOT_INPUT_SOURCE_FILE_TYPE); end if; if vTableStat.created < sysdate-(vSourceFileConfig.HOURS_TO_EXPIRE_STATISTICS/24) then GATHER_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey); vTableStat := GET_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey); end if; -- Strategy-based trigger logic (MARS-828) IF vSourceFileConfig.ARCHIVAL_STRATEGY = 'MINIMUM_AGE_MONTHS' THEN -- MINIMUM_AGE_MONTHS: Archive based on age only, ignore thresholds vArchivalTriggeredBy := 'AGE_BASED'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Archival strategy: MINIMUM_AGE_MONTHS (threshold-independent)','INFO'); ELSE -- THRESHOLD_BASED and HYBRID: Check thresholds if vTableStat.OVER_ARCH_THRESOLD_FILE_COUNT >= vSourceFileConfig.FILES_COUNT_OVER_ARCHIVE_THRESHOLD then vArchivalTriggeredBy := 'FILES_COUNT'; elsif vTableStat.OVER_ARCH_THRESOLD_ROW_COUNT >= vSourceFileConfig.ROWS_COUNT_OVER_ARCHIVE_THRESHOLD then vArchivalTriggeredBy := vArchivalTriggeredBy||', ROWS_COUNT'; elsif vTableStat.OVER_ARCH_THRESOLD_SIZE >= vSourceFileConfig.BYTES_SUM_OVER_ARCHIVE_THRESHOLD then vArchivalTriggeredBy := vArchivalTriggeredBy||', BYTES_SUM'; else CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Non of archival triggers reached','INFO'); end if; END IF; if LENGTH(vArchivalTriggeredBy)>0 THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Archival Triggered By: '||vArchivalTriggeredBy,'INFO'); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSourceFileConfig.ODS_SCHEMA_NAME) || '.'||DBMS_ASSERT.simple_sql_name(vSourceFileConfig.TABLE_ID)||'_ODS'; -- Use strategy-based WHERE clause (MARS-828) -- Using GROUP BY instead of DISTINCT to avoid ORA-22950 (object type ordering issue) vQuery := ' select t_filename( file$name ,file$path , to_char(h.workflow_start,''yyyy'') , to_char(h.workflow_start,''mm'') ) from '||vTableName||' s join CT_MRDS.a_workflow_history h on s.a_workflow_history_key = h.a_workflow_history_key where ' || GET_ARCHIVAL_WHERE_CLAUSE(vSourceFileConfig) || ' group by file$name, file$path, to_char(h.workflow_start,''yyyy''), to_char(h.workflow_start,''mm'')' ; -- Get all files that will be archived into "vfiles" collection ("regular data files") execute immediate vQuery bulk collect into vfiles; -- Start EXPORT "regular data files" to parquet and DROP "csv" FOR ym_loop IN (select distinct year, month from table(vfiles) order by 1,2) LOOP dbms_output.put_line('year: '||ym_loop.year||' - '||'month: '||ym_loop.month); vQuery:= 'select s.* from '|| vTableName ||' s join CT_MRDS.A_SOURCE_FILE_RECEIVED r on s.file$name = r.source_file_name and r.a_source_file_config_key = '||pSourceFileConfigKey||' and r.PROCESSING_STATUS = ''INGESTED'' join CT_MRDS.a_workflow_history h on s.a_workflow_history_key = h.a_workflow_history_key and to_char(h.workflow_start,''yyyy'') = '''||ym_loop.year||''' and to_char(h.workflow_start,''mm'') = '''||ym_loop.month||''' ' ; vUri := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('ARCHIVE')||vSourceFileConfig.A_SOURCE_KEY||'/'||vSourceFileConfig.TABLE_ID||'/PARTITION_YEAR='||ym_loop.year||'/PARTITION_MONTH='||ym_loop.month||'/'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start Archiving for YEAR_MONTH: '||ym_loop.year||'_'||ym_loop.month ,'INFO'); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Parameter for DBMS_CLOUD.EXPORT_DATA => file_uri_list' ,'DEBUG',vUri); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Parameter for DBMS_CLOUD.EXPORT_DATA => query' ,'DEBUG',vQuery); BEGIN DBMS_CLOUD.EXPORT_DATA( credential_name => ENV_MANAGER.gvCredentialName, file_uri_list => vUri||'d' , format => json_object('type' value 'parquet'), query => vQuery, operation_id => vOperationId ); EXCEPTION WHEN OTHERS THEN vProcessControlStatus :='EXPORT_FAILURE'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED, CT_MRDS.ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED); END; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('vOperationId of export: '||vOperationId,'DEBUG'); -- Get USER_LOAD_OPERATIONS info select * into vUserLoadOperations from USER_LOAD_OPERATIONS where id = vOperationId; IF vUserLoadOperations.STATUS <>'COMPLETED' THEN RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED, CT_MRDS.ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED ||cgBL|| ' Export ended with status '||vUserLoadOperations.STATUS); ELSIF vUserLoadOperations.STATUS = 'COMPLETED' and vUserLoadOperations.ROWS_LOADED = 0 THEN RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED, CT_MRDS.ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED ||cgBL|| ' Zero rows were exported.'); ELSE CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Data exported to archival file for YEAR_MONTH: '||ym_loop.year||'_'||ym_loop.month,'INFO', vParameters); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.FILE_MANAGER.GET_DET_USER_LOAD_OPERATIONS (pOperationId => vOperationId),'DEBUG', vParameters); END IF; -- Note: DBMS_CLOUD.EXPORT_DATA may create multiple parquet files (parallel execution) -- Instead of tracking individual files, we store the archive directory prefix -- ARCH_FILE_NAME will contain the directory URI where all parquet files are located vFilename := vUri; -- Store directory prefix instead of individual filename -- Try to drop EXPORTED FILES ("regular data files") BEGIN FOR f in (select filename, pathname from table(vfiles) where year = ym_loop.year and month = ym_loop.month) loop -- first change of status BEGIN UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED r SET PROCESSING_STATUS = 'ARCHIVED' ,ARCH_FILE_NAME = vFilename -- Now contains directory prefix, not individual file ,PARTITION_YEAR = ym_loop.year -- Record which partition year the data was archived to ,PARTITION_MONTH = ym_loop.month -- Record which partition month the data was archived to WHERE r.a_source_file_config_key= pSourceFileConfigKey AND r.source_file_name = f.filename AND r.processing_status = 'INGESTED' ; EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); vProcessControlStatus := 'CHANGE_STATUS_TO_ARCHIVED_FAILURE'; END; EXIT WHEN vProcessControlStatus = 'CHANGE_STATUS_TO_ARCHIVED_FAILURE'; -- move file to trash before dropping BEGIN DBMS_CLOUD.MOVE_OBJECT(source_credential_name => ENV_MANAGER.gvCredentialName, source_object_uri => f.pathname||'/'||f.filename, target_object_uri => replace(f.pathname,'ODS','TRASH')||'/'||f.filename, target_credential_name => ENV_MANAGER.gvCredentialName ); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File moved to TRASH.','DEBUG', f.pathname||'/'||f.filename); EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to move file to TRASH.','ERROR', f.pathname||'/'||f.filename); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); rollback; vProcessControlStatus := 'MOVE_FILE_TO_TRASH_FAILURE'; END; EXIT WHEN vProcessControlStatus = 'MOVE_FILE_TO_TRASH_FAILURE'; commit; END LOOP; -------------------------------------------------------------------- -- IF All goes fine till this point, we drop files from TRASH (if not then ROLLBACK PART) IF vProcessControlStatus = 'OK' THEN FOR f in (select filename, pathname from table(vfiles) where year = ym_loop.year and month = ym_loop.month) LOOP --Drop file from TRASH DBMS_CLOUD.DELETE_OBJECT(credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName, object_uri => replace(f.pathname,'ODS','TRASH')||'/'||f.filename); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File dropped from TRASH.','DEBUG', f.pathname||'/'||f.filename); END LOOP; --ROLLBACK PART --ROLLBACK PROCESS in case of FAILURE (restore files from TRASH) ELSIF vProcessControlStatus = 'MOVE_FILE_TO_TRASH_FAILURE' THEN FOR f in ( SELECT vf.filename, vf.pathname FROM TABLE(vfiles) vf JOIN CT_MRDS.A_SOURCE_FILE_RECEIVED r ON r.source_file_name = vf.filename AND r.a_source_file_config_key = pSourceFileConfigKey AND r.PROCESSING_STATUS = 'ARCHIVED' AND vf.year = ym_loop.year AND vf.month = ym_loop.month ) LOOP BEGIN DBMS_CLOUD.MOVE_OBJECT(source_credential_name => ENV_MANAGER.gvCredentialName, source_object_uri => replace(f.pathname,'ODS','TRASH')||'/'||f.filename, target_object_uri => f.pathname||'/'||f.filename, target_credential_name => ENV_MANAGER.gvCredentialName ); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File restored from TRASH.','DEBUG', f.pathname||'/'||f.filename); UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED r SET PROCESSING_STATUS = 'INGESTED' ,ARCH_FILE_NAME = NULL WHERE r.a_source_file_config_key = pSourceFileConfigKey AND r.source_file_name = f.filename ; EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to restore file from TRASH.','ERROR', replace(f.pathname,'ODS','TRASH')||'/'||f.filename); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); vProcessControlStatus := 'RESTORE_FILE_FROM_TRASH_FAILURE'; END; END LOOP; -- ROLLBACK: Delete all parquet files from archive directory FOR arch_file IN ( SELECT object_name FROM DBMS_CLOUD.LIST_OBJECTS( credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName, location_uri => vFilename -- vFilename now contains directory prefix ) ) LOOP DBMS_CLOUD.DELETE_OBJECT( credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName, object_uri => vFilename || arch_file.object_name ); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('ROLLBACK operation: Archival PARQUET file dropped.','DEBUG', vFilename || arch_file.object_name); END LOOP; RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_MOVE_FILE_TO_TRASH_FAILED, CT_MRDS.ENV_MANAGER.MSG_MOVE_FILE_TO_TRASH_FAILED); ELSIF vProcessControlStatus = 'CHANGE_STATUS_TO_ARCHIVED_FAILURE' THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_CHANGE_STAT_TO_ARCHIVED_FAILED, 'ERROR', vParameters); -- ROLLBACK: Delete all parquet files from archive directory FOR arch_file IN ( SELECT object_name FROM DBMS_CLOUD.LIST_OBJECTS( credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName, location_uri => vFilename -- vFilename now contains directory prefix ) ) LOOP DBMS_CLOUD.DELETE_OBJECT( credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName, object_uri => vFilename || arch_file.object_name ); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Archival PARQUET file dropped.','DEBUG', vFilename || arch_file.object_name); END LOOP; RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_CHANGE_STAT_TO_ARCHIVED_FAILED, CT_MRDS.ENV_MANAGER.MSG_CHANGE_STAT_TO_ARCHIVED_FAILED); ELSIF vProcessControlStatus = 'RESTORE_FILE_FROM_TRASH_FAILURE' THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Some files were not restored from TRASH. Check A_PROCESS_LOG table for details','ERROR'); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_RESTORE_FILE_FROM_TRASH, CT_MRDS.ENV_MANAGER.MSG_RESTORE_FILE_FROM_TRASH); END IF; EXCEPTION WHEN CT_MRDS.ENV_MANAGER.ERR_CHANGE_STAT_TO_ARCHIVED_FAILED THEN RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_CHANGE_STAT_TO_ARCHIVED_FAILED, CT_MRDS.ENV_MANAGER.MSG_CHANGE_STAT_TO_ARCHIVED_FAILED); WHEN CT_MRDS.ENV_MANAGER.ERR_MOVE_FILE_TO_TRASH_FAILED THEN RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_MOVE_FILE_TO_TRASH_FAILED, CT_MRDS.ENV_MANAGER.MSG_MOVE_FILE_TO_TRASH_FAILED); WHEN CT_MRDS.ENV_MANAGER.ERR_RESTORE_FILE_FROM_TRASH THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_RESTORE_FILE_FROM_TRASH, CT_MRDS.ENV_MANAGER.MSG_RESTORE_FILE_FROM_TRASH); WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Error during archiving process','ERROR'); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_DROP_EXPORTED_FILES_FAILED, CT_MRDS.ENV_MANAGER.MSG_DROP_EXPORTED_FILES_FAILED); END; -- END of "Try to drop EXPORTED FILES" CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('All archived files had been dropped.','INFO'); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End Archiving for YEAR_MONTH: '||ym_loop.year||'_'||ym_loop.month ,'INFO'); END LOOP; --ym_loop end (YEAR_MONTH) COMMIT; ELSE CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Non of archival thresholds reached. Skip archiving.'||vArchivalTriggeredBy,'INFO'); END IF; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN CT_MRDS.ENV_MANAGER.ERR_NOT_INPUT_SOURCE_FILE_TYPE THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_NOT_INPUT_SOURCE_FILE_TYPE , 'ERROR', vParameters); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_NOT_INPUT_SOURCE_FILE_TYPE, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); WHEN CT_MRDS.ENV_MANAGER.ERR_EXP_DATA_FOR_ARCH_FAILED THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED , 'ERROR', vParameters); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); WHEN CT_MRDS.ENV_MANAGER.ERR_CHANGE_STAT_TO_ARCHIVED_FAILED THEN RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_CHANGE_STAT_TO_ARCHIVED_FAILED, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); WHEN CT_MRDS.ENV_MANAGER.ERR_MOVE_FILE_TO_TRASH_FAILED THEN RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_MOVE_FILE_TO_TRASH_FAILED, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_UNKNOWN , 'ERROR', vParameters); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_UNKNOWN, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END ARCHIVE_TABLE_DATA; ---------------------------------------------------------------------------------------------------- PROCEDURE GATHER_TABLE_STAT ( pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE ) IS vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vStats CT_MRDS.A_TABLE_STAT%ROWTYPE; vSourceFileConfig CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE; vTableName VARCHAR2(200); vQuery VARCHAR2(32000); vWhereClause VARCHAR2(4000); BEGIN vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'))); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); vSourceFileConfig := CT_MRDS.FILE_MANAGER.GET_SOURCE_FILE_CONFIG(pSourceFileConfigKey => pSourceFileConfigKey); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSourceFileConfig.ODS_SCHEMA_NAME) || '.'||DBMS_ASSERT.simple_sql_name(vSourceFileConfig.TABLE_ID)||'_ODS'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('vTableName','DEBUG',vTableName); -- Get WHERE clause based on archival strategy (MARS-828) vWhereClause := GET_ARCHIVAL_WHERE_CLAUSE(vSourceFileConfig); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('vWhereClause','DEBUG',vWhereClause); -- Use strategy-based WHERE clause for statistics (MARS-828) vQuery := 'with tmp as ( select s.* ,file$name as filename ,h.workflow_start , to_char(h.workflow_start,''yyyy'') as year , to_char(h.workflow_start,''mm'') as month from '||vTableName||' s join CT_MRDS.a_workflow_history h on s.a_workflow_history_key = h.a_workflow_history_key ) , tmp_gr as ( select filename, count(*) as row_count_per_file, min(workflow_start) as workflow_start from tmp group by filename ) select NULL as A_TABLE_STAT_KEY ,'||pSourceFileConfigKey||' as A_SOURCE_FILE_CONFIG_KEY ,'''||vTableName||''' as TABLE_NAME ,count(*) as FILE_COUNT ,sum(case when ' || vWhereClause || ' then 1 else 0 end) as OLD_FILE_COUNT ,sum (row_count_per_file) as ROW_COUNT ,sum(case when ' || vWhereClause || ' then row_count_per_file else 0 end) as OLD_ROW_COUNT ,sum(r.bytes) as BYTES ,sum(case when ' || vWhereClause || ' then r.bytes else 0 end) as OLD_BYTES ,'||COALESCE(TO_CHAR(vSourceFileConfig.DAYS_FOR_ARCHIVE_THRESHOLD), 'NULL')||' as DAYS_FOR_ARCHIVE_THRESHOLD ,systimestamp as CREATED from tmp_gr t join (SELECT * from DBMS_CLOUD.LIST_OBJECTS( credential_name => '''||CT_MRDS.ENV_MANAGER.gvCredentialName||''', location_uri => CT_MRDS.FILE_MANAGER.GET_BUCKET_URI(''ODS'')||''ODS/'||vSourceFileConfig.A_SOURCE_KEY||'/'||vSourceFileConfig.TABLE_ID||'/'') ) ) r on t.filename = r.object_name' ; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('vQuery','DEBUG',vQuery); execute immediate vQuery into vStats; vStats.A_TABLE_STAT_KEY := CT_MRDS.A_TABLE_STAT_KEY_SEQ.NEXTVAL; insert into A_TABLE_STAT_HIST values vStats; delete from A_TABLE_STAT where A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey; insert into A_TABLE_STAT values vStats; COMMIT; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_UNKNOWN , 'ERROR', vParameters); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_UNKNOWN, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END GATHER_TABLE_STAT; ---------------------------------------------------------------------------------------------------- -- PACKAGE VERSION MANAGEMENT FUNCTIONS IMPLEMENTATION ---------------------------------------------------------------------------------------------------- FUNCTION GET_VERSION RETURN VARCHAR2 IS BEGIN RETURN PACKAGE_VERSION; END GET_VERSION; ---------------------------------------------------------------------------------------------------- FUNCTION GET_BUILD_INFO RETURN VARCHAR2 IS BEGIN RETURN CT_MRDS.ENV_MANAGER.GET_PACKAGE_VERSION_INFO( pPackageName => 'FILE_ARCHIVER', pVersion => PACKAGE_VERSION, pBuildDate => PACKAGE_BUILD_DATE, pAuthor => PACKAGE_AUTHOR ); END GET_BUILD_INFO; ---------------------------------------------------------------------------------------------------- FUNCTION GET_VERSION_HISTORY RETURN VARCHAR2 IS BEGIN RETURN CT_MRDS.ENV_MANAGER.FORMAT_VERSION_HISTORY( pPackageName => 'FILE_ARCHIVER', pVersionHistory => VERSION_HISTORY ); END GET_VERSION_HISTORY; ---------------------------------------------------------------------------------------------------- FUNCTION ARCHIVE_TABLE_DATA ( pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE ) RETURN PLS_INTEGER IS vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; BEGIN vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'))); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); ---- ARCHIVE_TABLE_DATA(pSourceFileConfigKey => pSourceFileConfigKey); ---- CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); RETURN SQLCODE; EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RETURN SQLCODE; END ARCHIVE_TABLE_DATA; ---------------------------------------------------------------------------------------------------- FUNCTION GATHER_TABLE_STAT ( pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE ) RETURN PLS_INTEGER IS vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; BEGIN vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'))); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); ---- GATHER_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey); ---- CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); RETURN SQLCODE; EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RETURN SQLCODE; END GATHER_TABLE_STAT; ---------------------------------------------------------------------------------------------------- END; /