create or replace PACKAGE BODY CT_MRDS.FILE_ARCHIVER AS ---------------------------------------------------------------------------------------------------- -- PRIVATE FUNCTION: GET_ARCHIVAL_WHERE_CLAUSE ---------------------------------------------------------------------------------------------------- /** * @name GET_ARCHIVAL_WHERE_CLAUSE * @desc Private function that generates WHERE clause based on ARCHIVAL_STRATEGY configuration. * Supports three strategies: THRESHOLD_BASED, MINIMUM_AGE_MONTHS, HYBRID. * @param pSourceFileConfig - Source file configuration record with ARCHIVAL_STRATEGY * @return VARCHAR2 - WHERE clause for filtering archival candidates **/ FUNCTION GET_ARCHIVAL_WHERE_CLAUSE( pSourceFileConfig IN CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE ) RETURN VARCHAR2 IS vWhereClause VARCHAR2(4000); cgBL CONSTANT VARCHAR2(2) := CHR(13)||CHR(10); BEGIN CASE pSourceFileConfig.ARCHIVAL_STRATEGY -- Legacy threshold-based strategy (backward compatible) WHEN 'THRESHOLD_BASED' THEN vWhereClause := 'extract(day from (systimestamp - workflow_start)) > ' || pSourceFileConfig.ARCHIVE_THRESHOLD_DAYS; -- Archive data older than X months (0 = current month only) WHEN 'MINIMUM_AGE_MONTHS' THEN IF pSourceFileConfig.MINIMUM_AGE_MONTHS IS NULL THEN RAISE_APPLICATION_ERROR(-20001, 'MINIMUM_AGE_MONTHS must be configured for MINIMUM_AGE_MONTHS strategy'); END IF; vWhereClause := 'workflow_start < ADD_MONTHS(TRUNC(SYSDATE, ''MM''), -' || pSourceFileConfig.MINIMUM_AGE_MONTHS || ')'; -- Hybrid: Current month exclusion AND minimum age requirement WHEN 'HYBRID' THEN IF pSourceFileConfig.MINIMUM_AGE_MONTHS IS NULL THEN RAISE_APPLICATION_ERROR(-20001, 'MINIMUM_AGE_MONTHS must be configured for HYBRID strategy'); END IF; vWhereClause := 'TRUNC(workflow_start, ''MM'') < TRUNC(SYSDATE, ''MM'') ' || 'AND workflow_start < ADD_MONTHS(TRUNC(SYSDATE, ''MM''), -' || pSourceFileConfig.MINIMUM_AGE_MONTHS || ')'; ELSE RAISE_APPLICATION_ERROR(-20002, 'Invalid ARCHIVAL_STRATEGY: ' || pSourceFileConfig.ARCHIVAL_STRATEGY); END CASE; RETURN vWhereClause; END GET_ARCHIVAL_WHERE_CLAUSE; ---------------------------------------------------------------------------------------------------- FUNCTION GET_TABLE_STAT(pSourceFileConfigKey IN NUMBER) RETURN CT_MRDS.A_TABLE_STAT%ROWTYPE IS vTableStat CT_MRDS.A_TABLE_STAT%ROWTYPE; vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vCount PLS_INTEGER; vSourceFileType CT_MRDS.A_SOURCE_FILE_CONFIG.SOURCE_FILE_TYPE%TYPE; BEGIN vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey),NULL))); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','DEBUG', vParameters); SELECT count(*) , min(SOURCE_FILE_TYPE) INTO vCount, vSourceFileType FROM CT_MRDS.A_TABLE_STAT s JOIN CT_MRDS.A_SOURCE_FILE_CONFIG c ON s.A_SOURCE_FILE_CONFIG_KEY = c.A_SOURCE_FILE_CONFIG_KEY WHERE s.A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey; IF vCount=0 and vSourceFileType='INPUT' THEN GATHER_TABLE_STAT(pSourceFileConfigKey); END IF; BEGIN SELECT * INTO vTableStat FROM CT_MRDS.A_TABLE_STAT WHERE A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey; -- EXCEPTION -- WHEN NO_DATA_FOUND THEN -- END; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','DEBUG',vParameters); RETURN vTableStat; END GET_TABLE_STAT; ---------------------------------------------------------------------------------------------------- PROCEDURE ARCHIVE_TABLE_DATA ( pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE ) IS vSourceFileConfig CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE; vTableStat CT_MRDS.A_TABLE_STAT%ROWTYPE; vQuery VARCHAR2(4000); vTableName VARCHAR2(200); vUri VARCHAR2(1000); vfiles T_FILENAMES; vFilename VARCHAR2(300); vOperationId NUMBER := -1; vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vArchivalTriggeredBy VARCHAR2(60); -- Possible values: FILES_COUNT, ROWS_COUNT, BYTES_SUM vUserLoadOperations USER_LOAD_OPERATIONS%ROWTYPE; vProcessControlStatus VARCHAR2(60) := 'OK'; vKeepInTrash BOOLEAN; -- Derived from config BEGIN vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL') )); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); vSourceFileConfig := CT_MRDS.FILE_MANAGER.GET_SOURCE_FILE_CONFIG(pSourceFileConfigKey => pSourceFileConfigKey); -- Check if archiving is enabled for this configuration IF vSourceFileConfig.IS_ARCHIVE_ENABLED = 'N' THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Archiving disabled for this configuration (IS_ARCHIVE_ENABLED=N). Skipping.', 'WARNING', vParameters); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); RETURN; END IF; -- Get TRASH policy from configuration vKeepInTrash := (vSourceFileConfig.IS_KEEP_IN_TRASH = 'Y'); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('TRASH policy from config: IS_KEEP_IN_TRASH=' || vSourceFileConfig.IS_KEEP_IN_TRASH, 'INFO', vParameters); vTableStat := GET_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey); if vSourceFileConfig.SOURCE_FILE_TYPE <> 'INPUT' then CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_NOT_INPUT_SOURCE_FILE_TYPE, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_NOT_INPUT_SOURCE_FILE_TYPE, CT_MRDS.ENV_MANAGER.MSG_NOT_INPUT_SOURCE_FILE_TYPE); end if; if vTableStat.created < sysdate-(vSourceFileConfig.HOURS_TO_EXPIRE_STATISTICS/24) then GATHER_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey); vTableStat := GET_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey); end if; -- Strategy-based trigger logic (MARS-828) IF vSourceFileConfig.ARCHIVAL_STRATEGY = 'MINIMUM_AGE_MONTHS' THEN -- MINIMUM_AGE_MONTHS: Archive based on age only, ignore thresholds vArchivalTriggeredBy := 'AGE_BASED'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Archival strategy: MINIMUM_AGE_MONTHS (threshold-independent)','INFO'); ELSE -- THRESHOLD_BASED and HYBRID: Check thresholds if vTableStat.OVER_ARCH_THRESOLD_FILE_COUNT >= vSourceFileConfig.ARCHIVE_THRESHOLD_FILES_COUNT then vArchivalTriggeredBy := 'FILES_COUNT'; elsif vTableStat.OVER_ARCH_THRESOLD_ROW_COUNT >= vSourceFileConfig.ARCHIVE_THRESHOLD_ROWS_COUNT then vArchivalTriggeredBy := vArchivalTriggeredBy||', ROWS_COUNT'; elsif vTableStat.OVER_ARCH_THRESOLD_SIZE >= vSourceFileConfig.ARCHIVE_THRESHOLD_BYTES_SUM then vArchivalTriggeredBy := vArchivalTriggeredBy||', BYTES_SUM'; else CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Non of archival triggers reached','INFO'); end if; END IF; if LENGTH(vArchivalTriggeredBy)>0 THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Archival Triggered By: '||vArchivalTriggeredBy,'INFO'); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSourceFileConfig.ODS_SCHEMA_NAME) || '.'||DBMS_ASSERT.simple_sql_name(vSourceFileConfig.TABLE_ID)||'_ODS'; -- Use strategy-based WHERE clause (MARS-828) -- Using GROUP BY instead of DISTINCT to avoid ORA-22950 (object type ordering issue) vQuery := ' select CT_MRDS.t_filename( file$name ,file$path , to_char(h.workflow_start,''yyyy'') , to_char(h.workflow_start,''mm'') ) from '||vTableName||' s join CT_MRDS.a_workflow_history h on s.a_workflow_history_key = h.a_workflow_history_key where ' || GET_ARCHIVAL_WHERE_CLAUSE(vSourceFileConfig) || ' and h.WORKFLOW_SUCCESSFUL = ''Y'' group by file$name, file$path, to_char(h.workflow_start,''yyyy''), to_char(h.workflow_start,''mm'')' ; -- Get all files that will be archived into "vfiles" collection ("regular data files") -- MARS-1468: Handle ORA-29913 - no files in ODS bucket (empty external table location) BEGIN execute immediate vQuery bulk collect into vfiles; EXCEPTION WHEN OTHERS THEN IF SQLCODE = -29913 THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('No files found in ODS bucket (ORA-29913: empty location). Nothing to archive.', 'INFO', vParameters); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); RETURN; ELSE RAISE; END IF; END; -- Start EXPORT "regular data files" to parquet and DROP "csv" FOR ym_loop IN (select distinct year, month from table(vfiles) order by 1,2) LOOP dbms_output.put_line('year: '||ym_loop.year||' - '||'month: '||ym_loop.month); vQuery:= 'select s.* from '|| vTableName ||' s join CT_MRDS.A_SOURCE_FILE_RECEIVED r on s.file$name = r.source_file_name and r.a_source_file_config_key = '||pSourceFileConfigKey||' join CT_MRDS.a_workflow_history h on s.a_workflow_history_key = h.a_workflow_history_key and to_char(h.workflow_start,''yyyy'') = '''||ym_loop.year||''' and to_char(h.workflow_start,''mm'') = '''||ym_loop.month||''' and h.WORKFLOW_SUCCESSFUL = ''Y'' ' ; vUri := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('ARCHIVE')||'ARCHIVE/'||vSourceFileConfig.A_SOURCE_KEY||'/'||vSourceFileConfig.TABLE_ID||'/PARTITION_YEAR='||ym_loop.year||'/PARTITION_MONTH='||ym_loop.month||'/'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start Archiving for YEAR_MONTH: '||ym_loop.year||'_'||ym_loop.month ,'INFO'); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Parameter for DBMS_CLOUD.EXPORT_DATA => file_uri_list' ,'DEBUG',vUri); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Parameter for DBMS_CLOUD.EXPORT_DATA => query' ,'DEBUG',vQuery); BEGIN DBMS_CLOUD.EXPORT_DATA( credential_name => ENV_MANAGER.gvCredentialName, file_uri_list => vUri||'d' , format => json_object('type' value 'parquet'), query => vQuery, operation_id => vOperationId ); EXCEPTION WHEN OTHERS THEN vProcessControlStatus :='EXPORT_FAILURE'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED, CT_MRDS.ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED); END; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('vOperationId of export: '||vOperationId,'DEBUG'); -- Get USER_LOAD_OPERATIONS info select * into vUserLoadOperations from USER_LOAD_OPERATIONS where id = vOperationId; IF vUserLoadOperations.STATUS <>'COMPLETED' THEN RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED, CT_MRDS.ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED ||cgBL|| ' Export ended with status '||vUserLoadOperations.STATUS); ELSIF vUserLoadOperations.STATUS = 'COMPLETED' and vUserLoadOperations.ROWS_LOADED = 0 THEN RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED, CT_MRDS.ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED ||cgBL|| ' Zero rows were exported.'); ELSE CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Data exported to archival file for YEAR_MONTH: '||ym_loop.year||'_'||ym_loop.month,'INFO', vParameters); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.FILE_MANAGER.GET_DET_USER_LOAD_OPERATIONS (pOperationId => vOperationId),'DEBUG', vParameters); END IF; -- Note: DBMS_CLOUD.EXPORT_DATA may create multiple parquet files (parallel execution) -- Instead of tracking individual files, we store the archive directory prefix -- ARCH_PATH contain the directory URI where all parquet files are located vFilename := vUri; -- Store directory prefix instead of individual filename -- Try to drop EXPORTED FILES ("regular data files") BEGIN FOR f in (select filename, pathname from table(vfiles) where year = ym_loop.year and month = ym_loop.month) loop -- first change of status to ARCHIVED_AND_TRASHED (file will be moved to TRASH folder) BEGIN UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED r SET PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED' -- Status reflects file is archived and kept in TRASH ,ARCH_PATH = vFilename -- Now contains directory prefix, not individual file ,PARTITION_YEAR = ym_loop.year -- Record which partition year the data was archived to ,PARTITION_MONTH = ym_loop.month -- Record which partition month the data was archived to WHERE r.a_source_file_config_key= pSourceFileConfigKey AND r.source_file_name = f.filename AND r.processing_status = 'INGESTED' ; EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); vProcessControlStatus := 'CHANGE_STATUS_TO_ARCHIVED_AND_TRASHED_FAILURE'; END; EXIT WHEN vProcessControlStatus = 'CHANGE_STATUS_TO_ARCHIVED_AND_TRASHED_FAILURE'; -- move file to TRASH subfolder (DATA bucket: ODS/ → TRASH/) before dropping BEGIN DBMS_CLOUD.MOVE_OBJECT(source_credential_name => ENV_MANAGER.gvCredentialName, source_object_uri => f.pathname||'/'||f.filename, target_object_uri => replace(f.pathname,'ODS','TRASH')||'/'||f.filename, target_credential_name => ENV_MANAGER.gvCredentialName ); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File moved to TRASH folder.','DEBUG', f.pathname||'/'||f.filename); EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to move file to TRASH folder.','ERROR', f.pathname||'/'||f.filename); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); rollback; vProcessControlStatus := 'MOVE_FILE_TO_TRASH_FAILURE'; END; EXIT WHEN vProcessControlStatus = 'MOVE_FILE_TO_TRASH_FAILURE'; commit; END LOOP; -------------------------------------------------------------------- -- IF All goes fine till this point, we drop files from TRASH folder (if not then ROLLBACK PART) -- TRASH is a subfolder in DATA bucket (e.g., TRASH/LM/TABLE_NAME instead of ODS/LM/TABLE_NAME) IF vProcessControlStatus = 'OK' THEN IF NOT vKeepInTrash THEN -- Delete files from TRASH folder (cleanup) and update status to ARCHIVED_AND_PURGED FOR f in (select filename, pathname from table(vfiles) where year = ym_loop.year and month = ym_loop.month) LOOP DBMS_CLOUD.DELETE_OBJECT(credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName, object_uri => replace(f.pathname,'ODS','TRASH')||'/'||f.filename); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File dropped from TRASH folder.','DEBUG', f.pathname||'/'||f.filename); -- Update status to ARCHIVED_AND_PURGED UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED r SET PROCESSING_STATUS = 'ARCHIVED_AND_PURGED' WHERE r.a_source_file_config_key = pSourceFileConfigKey AND r.source_file_name = f.filename AND r.PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED'; END LOOP; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('All archived files removed from TRASH folder and marked as ARCHIVED_AND_PURGED (config: IS_KEEP_IN_TRASH=N).','INFO'); ELSE -- Keep files in TRASH folder (status remains ARCHIVED_AND_TRASHED) CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Archived files kept in TRASH folder for retention (config: IS_KEEP_IN_TRASH=Y, status: ARCHIVED_AND_TRASHED).','INFO'); END IF; --ROLLBACK PART --ROLLBACK PROCESS in case of FAILURE (restore files from TRASH subfolder in DATA bucket) ELSIF vProcessControlStatus = 'MOVE_FILE_TO_TRASH_FAILURE' THEN FOR f in ( SELECT vf.filename, vf.pathname FROM TABLE(vfiles) vf JOIN CT_MRDS.A_SOURCE_FILE_RECEIVED r ON r.source_file_name = vf.filename AND r.a_source_file_config_key = pSourceFileConfigKey AND r.PROCESSING_STATUS IN ('ARCHIVED_AND_TRASHED', 'ARCHIVED_AND_PURGED') AND vf.year = ym_loop.year AND vf.month = ym_loop.month ) LOOP BEGIN DBMS_CLOUD.MOVE_OBJECT(source_credential_name => ENV_MANAGER.gvCredentialName, source_object_uri => replace(f.pathname,'ODS','TRASH')||'/'||f.filename, target_object_uri => f.pathname||'/'||f.filename, target_credential_name => ENV_MANAGER.gvCredentialName ); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File restored from TRASH folder.','DEBUG', f.pathname||'/'||f.filename); UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED r SET PROCESSING_STATUS = 'INGESTED' ,ARCH_PATH = NULL WHERE r.a_source_file_config_key = pSourceFileConfigKey AND r.source_file_name = f.filename ; EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to restore file from TRASH folder.','ERROR', replace(f.pathname,'ODS','TRASH')||'/'||f.filename); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); vProcessControlStatus := 'RESTORE_FILE_FROM_TRASH_FAILURE'; END; END LOOP; -- ROLLBACK: Delete all parquet files from archive directory FOR arch_file IN ( SELECT object_name FROM DBMS_CLOUD.LIST_OBJECTS( credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName, location_uri => vFilename -- vFilename now contains directory prefix ) ) LOOP DBMS_CLOUD.DELETE_OBJECT( credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName, object_uri => vFilename || arch_file.object_name ); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('ROLLBACK operation: Archival PARQUET file dropped.','DEBUG', vFilename || arch_file.object_name); END LOOP; RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_MOVE_FILE_TO_TRASH_FAILED, CT_MRDS.ENV_MANAGER.MSG_MOVE_FILE_TO_TRASH_FAILED); ELSIF vProcessControlStatus = 'CHANGE_STATUS_TO_ARCHIVED_FAILURE' THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_CHANGE_STAT_TO_ARCHIVED_FAILED, 'ERROR', vParameters); -- ROLLBACK: Delete all parquet files from archive directory FOR arch_file IN ( SELECT object_name FROM DBMS_CLOUD.LIST_OBJECTS( credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName, location_uri => vFilename -- vFilename now contains directory prefix ) ) LOOP DBMS_CLOUD.DELETE_OBJECT( credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName, object_uri => vFilename || arch_file.object_name ); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Archival PARQUET file dropped.','DEBUG', vFilename || arch_file.object_name); END LOOP; RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_CHANGE_STAT_TO_ARCHIVED_FAILED, CT_MRDS.ENV_MANAGER.MSG_CHANGE_STAT_TO_ARCHIVED_FAILED); ELSIF vProcessControlStatus = 'RESTORE_FILE_FROM_TRASH_FAILURE' THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Some files were not restored from TRASH. Check A_PROCESS_LOG table for details','ERROR'); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_RESTORE_FILE_FROM_TRASH, CT_MRDS.ENV_MANAGER.MSG_RESTORE_FILE_FROM_TRASH); END IF; EXCEPTION WHEN CT_MRDS.ENV_MANAGER.ERR_CHANGE_STAT_TO_ARCHIVED_FAILED THEN RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_CHANGE_STAT_TO_ARCHIVED_FAILED, CT_MRDS.ENV_MANAGER.MSG_CHANGE_STAT_TO_ARCHIVED_FAILED); WHEN CT_MRDS.ENV_MANAGER.ERR_MOVE_FILE_TO_TRASH_FAILED THEN RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_MOVE_FILE_TO_TRASH_FAILED, CT_MRDS.ENV_MANAGER.MSG_MOVE_FILE_TO_TRASH_FAILED); WHEN CT_MRDS.ENV_MANAGER.ERR_RESTORE_FILE_FROM_TRASH THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_RESTORE_FILE_FROM_TRASH, CT_MRDS.ENV_MANAGER.MSG_RESTORE_FILE_FROM_TRASH); WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Error during archiving process','ERROR'); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_DROP_EXPORTED_FILES_FAILED, CT_MRDS.ENV_MANAGER.MSG_DROP_EXPORTED_FILES_FAILED); END; -- END of "Try to drop EXPORTED FILES" CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End Archiving for YEAR_MONTH: '||ym_loop.year||'_'||ym_loop.month ,'INFO'); END LOOP; --ym_loop end (YEAR_MONTH) COMMIT; ELSE CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Non of archival thresholds reached. Skip archiving.'||vArchivalTriggeredBy,'INFO'); END IF; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN CT_MRDS.ENV_MANAGER.ERR_NOT_INPUT_SOURCE_FILE_TYPE THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_NOT_INPUT_SOURCE_FILE_TYPE , 'ERROR', vParameters); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_NOT_INPUT_SOURCE_FILE_TYPE, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); WHEN CT_MRDS.ENV_MANAGER.ERR_EXP_DATA_FOR_ARCH_FAILED THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED , 'ERROR', vParameters); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); WHEN CT_MRDS.ENV_MANAGER.ERR_CHANGE_STAT_TO_ARCHIVED_FAILED THEN RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_CHANGE_STAT_TO_ARCHIVED_FAILED, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); WHEN CT_MRDS.ENV_MANAGER.ERR_MOVE_FILE_TO_TRASH_FAILED THEN RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_MOVE_FILE_TO_TRASH_FAILED, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_UNKNOWN , 'ERROR', vParameters); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_UNKNOWN, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END ARCHIVE_TABLE_DATA; ---------------------------------------------------------------------------------------------------- PROCEDURE GATHER_TABLE_STAT ( pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE ) IS vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vStats CT_MRDS.A_TABLE_STAT%ROWTYPE; vSourceFileConfig CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE; vTableName VARCHAR2(200); vQuery VARCHAR2(32000); vWhereClause VARCHAR2(4000); vOdsBucketUri VARCHAR2(1000); BEGIN vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'))); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); vSourceFileConfig := CT_MRDS.FILE_MANAGER.GET_SOURCE_FILE_CONFIG(pSourceFileConfigKey => pSourceFileConfigKey); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSourceFileConfig.ODS_SCHEMA_NAME) || '.'||DBMS_ASSERT.simple_sql_name(vSourceFileConfig.TABLE_ID)||'_ODS'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('vTableName','DEBUG',vTableName); -- Get WHERE clause based on archival strategy (MARS-828) vWhereClause := GET_ARCHIVAL_WHERE_CLAUSE(vSourceFileConfig); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('vWhereClause','DEBUG',vWhereClause); -- Get ODS bucket URI before building query vOdsBucketUri := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('ODS') || 'ODS/' || vSourceFileConfig.A_SOURCE_KEY || '/' || vSourceFileConfig.TABLE_ID || '/'; -- Use strategy-based WHERE clause for statistics (MARS-828) vQuery := 'with tmp as ( select s.* ,file$name as filename ,h.workflow_start , to_char(h.workflow_start,''yyyy'') as year , to_char(h.workflow_start,''mm'') as month from '||vTableName||' s join CT_MRDS.a_workflow_history h on s.a_workflow_history_key = h.a_workflow_history_key ) , tmp_gr as ( select filename, count(*) as row_count_per_file, min(workflow_start) as workflow_start from tmp group by filename ) select NULL as A_TABLE_STAT_KEY ,'||pSourceFileConfigKey||' as A_SOURCE_FILE_CONFIG_KEY ,'''||vTableName||''' as TABLE_NAME ,count(*) as FILE_COUNT ,nvl(sum(case when ' || vWhereClause || ' then 1 else 0 end), 0) as OLD_FILE_COUNT ,nvl(sum(row_count_per_file), 0) as ROW_COUNT ,nvl(sum(case when ' || vWhereClause || ' then row_count_per_file else 0 end), 0) as OLD_ROW_COUNT ,nvl(sum(r.bytes), 0) as BYTES ,nvl(sum(case when ' || vWhereClause || ' then r.bytes else 0 end), 0) as OLD_BYTES ,'||COALESCE(TO_CHAR(vSourceFileConfig.ARCHIVE_THRESHOLD_DAYS), 'NULL')||' as ARCHIVE_THRESHOLD_DAYS ,systimestamp as CREATED ,'''||vSourceFileConfig.ARCHIVAL_STRATEGY||''' as ARCHIVAL_STRATEGY ,'||COALESCE(TO_CHAR(vSourceFileConfig.MINIMUM_AGE_MONTHS), 'NULL')||' as ARCH_MINIMUM_AGE_MONTHS from tmp_gr t join (SELECT * from DBMS_CLOUD.LIST_OBJECTS( credential_name => '''||CT_MRDS.ENV_MANAGER.gvCredentialName||''', location_uri => '''||vOdsBucketUri||''' ) ) r on t.filename = r.object_name' ; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('vQuery','DEBUG',vQuery); -- MARS-1468: Handle ORA-29913 - no files in ODS bucket (empty external table location) BEGIN execute immediate vQuery into vStats; EXCEPTION WHEN OTHERS THEN IF SQLCODE = -29913 THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('No files found in ODS bucket (ORA-29913: empty location). Saving zero statistics.', 'INFO', vParameters); vStats.A_SOURCE_FILE_CONFIG_KEY := pSourceFileConfigKey; vStats.TABLE_NAME := vTableName; vStats.FILE_COUNT := 0; vStats.OVER_ARCH_THRESOLD_FILE_COUNT := 0; vStats.ROW_COUNT := 0; vStats.OVER_ARCH_THRESOLD_ROW_COUNT := 0; vStats."SIZE" := 0; vStats.OVER_ARCH_THRESOLD_SIZE := 0; vStats.ARCH_THRESHOLD_DAYS := vSourceFileConfig.ARCHIVE_THRESHOLD_DAYS; vStats.CREATED := SYSTIMESTAMP; vStats.ARCHIVAL_STRATEGY := vSourceFileConfig.ARCHIVAL_STRATEGY; vStats.ARCH_MINIMUM_AGE_MONTHS := vSourceFileConfig.MINIMUM_AGE_MONTHS; ELSE RAISE; END IF; END; vStats.A_TABLE_STAT_KEY := CT_MRDS.A_TABLE_STAT_KEY_SEQ.NEXTVAL; insert into CT_MRDS.A_TABLE_STAT_HIST values vStats; delete from CT_MRDS.A_TABLE_STAT where A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey; insert into CT_MRDS.A_TABLE_STAT values vStats; COMMIT; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_UNKNOWN , 'ERROR', vParameters); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_UNKNOWN, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END GATHER_TABLE_STAT; ---------------------------------------------------------------------------------------------------- -- TRASH FOLDER MANAGEMENT PROCEDURES ---------------------------------------------------------------------------------------------------- PROCEDURE RESTORE_FILE_FROM_TRASH ( pSourceFileReceivedKey IN CT_MRDS.A_SOURCE_FILE_RECEIVED.A_SOURCE_FILE_RECEIVED_KEY%TYPE DEFAULT NULL, pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE DEFAULT NULL, pRestoreAll IN BOOLEAN DEFAULT FALSE ) IS vSourceFileConfig CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE; vSourceFileReceived CT_MRDS.A_SOURCE_FILE_RECEIVED%ROWTYPE; vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vTrashPath VARCHAR2(1000); vOdsPath VARCHAR2(1000); vFilesRestored PLS_INTEGER := 0; vFilesUpdated PLS_INTEGER := 0; vRestoreLevel VARCHAR2(50); cgBL CONSTANT VARCHAR2(2) := CHR(13)||CHR(10); BEGIN vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSourceFileReceivedKey => '||nvl(to_char(pSourceFileReceivedKey), 'NULL'), 'pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'), 'pRestoreAll => '||CASE WHEN pRestoreAll THEN 'TRUE' ELSE 'FALSE' END )); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); -- Determine restore level (priority: LEVEL 3 > LEVEL 2 > LEVEL 1) IF pSourceFileReceivedKey IS NOT NULL THEN vRestoreLevel := 'LEVEL_3_SINGLE_FILE'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Restore level: LEVEL 3 - Single file restoration','INFO', 'A_SOURCE_FILE_RECEIVED_KEY: ' || pSourceFileReceivedKey); -- LEVEL 3: Restore single file by A_SOURCE_FILE_RECEIVED_KEY BEGIN SELECT * INTO vSourceFileReceived FROM CT_MRDS.A_SOURCE_FILE_RECEIVED WHERE A_SOURCE_FILE_RECEIVED_KEY = pSourceFileReceivedKey AND PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED'; vSourceFileConfig := CT_MRDS.FILE_MANAGER.GET_SOURCE_FILE_CONFIG(pSourceFileConfigKey => vSourceFileReceived.A_SOURCE_FILE_CONFIG_KEY); EXCEPTION WHEN NO_DATA_FOUND THEN RAISE_APPLICATION_ERROR(-20101, 'File not found or status is not ARCHIVED_AND_TRASHED for A_SOURCE_FILE_RECEIVED_KEY: ' || pSourceFileReceivedKey); END; vTrashPath := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('DATA') || 'TRASH/' || vSourceFileConfig.A_SOURCE_KEY || '/' || vSourceFileConfig.TABLE_ID || '/' || vSourceFileReceived.SOURCE_FILE_NAME; vOdsPath := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('DATA') || 'ODS/' || vSourceFileConfig.A_SOURCE_KEY || '/' || vSourceFileConfig.TABLE_ID || '/' || vSourceFileReceived.SOURCE_FILE_NAME; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Restoring single file from TRASH to ODS','INFO', 'Source: ' || vTrashPath || cgBL || 'Target: ' || vOdsPath); BEGIN DBMS_CLOUD.MOVE_OBJECT( source_credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName, source_object_uri => vTrashPath, target_object_uri => vOdsPath, target_credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName ); vFilesRestored := 1; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File successfully moved from TRASH to ODS','INFO', vSourceFileReceived.SOURCE_FILE_NAME); EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to move file from TRASH to ODS','ERROR', vTrashPath); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(-20103, 'Failed to restore file from TRASH: ' || SQLERRM); END; UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED SET PROCESSING_STATUS = 'INGESTED', ARCH_PATH = NULL, PARTITION_YEAR = NULL, PARTITION_MONTH = NULL WHERE A_SOURCE_FILE_RECEIVED_KEY = pSourceFileReceivedKey AND PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED'; vFilesUpdated := SQL%ROWCOUNT; ELSIF pSourceFileConfigKey IS NOT NULL THEN vRestoreLevel := 'LEVEL_2_CONFIG_FILES'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Restore level: LEVEL 2 - Configuration-based restoration','INFO', 'pSourceFileConfigKey: ' || pSourceFileConfigKey); -- LEVEL 2: Restore all files for specific pSourceFileConfigKey vSourceFileConfig := CT_MRDS.FILE_MANAGER.GET_SOURCE_FILE_CONFIG(pSourceFileConfigKey => pSourceFileConfigKey); FOR file_rec IN ( SELECT A_SOURCE_FILE_RECEIVED_KEY, SOURCE_FILE_NAME FROM CT_MRDS.A_SOURCE_FILE_RECEIVED WHERE A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey AND PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED' ) LOOP vTrashPath := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('DATA') || 'TRASH/' || vSourceFileConfig.A_SOURCE_KEY || '/' || vSourceFileConfig.TABLE_ID || '/' || file_rec.SOURCE_FILE_NAME; vOdsPath := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('DATA') || 'ODS/' || vSourceFileConfig.A_SOURCE_KEY || '/' || vSourceFileConfig.TABLE_ID || '/' || file_rec.SOURCE_FILE_NAME; BEGIN DBMS_CLOUD.MOVE_OBJECT( source_credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName, source_object_uri => vTrashPath, target_object_uri => vOdsPath, target_credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName ); vFilesRestored := vFilesRestored + 1; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File restored from TRASH','DEBUG', file_rec.SOURCE_FILE_NAME); EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to restore file from TRASH','ERROR', file_rec.SOURCE_FILE_NAME); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); END; END LOOP; UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED SET PROCESSING_STATUS = 'INGESTED', ARCH_PATH = NULL, PARTITION_YEAR = NULL, PARTITION_MONTH = NULL WHERE A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey AND PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED'; vFilesUpdated := SQL%ROWCOUNT; ELSIF pRestoreAll THEN vRestoreLevel := 'LEVEL_1_GLOBAL_RESTORE'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Restore level: LEVEL 1 - Global TRASH restoration','INFO', 'Restoring ALL files with ARCHIVED_AND_TRASHED status'); -- LEVEL 1: Restore all files with ARCHIVED_AND_TRASHED status across all configurations FOR file_rec IN ( SELECT r.A_SOURCE_FILE_RECEIVED_KEY, r.SOURCE_FILE_NAME, c.A_SOURCE_KEY, c.TABLE_ID FROM CT_MRDS.A_SOURCE_FILE_RECEIVED r JOIN CT_MRDS.A_SOURCE_FILE_CONFIG c ON r.A_SOURCE_FILE_CONFIG_KEY = c.A_SOURCE_FILE_CONFIG_KEY WHERE r.PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED' ) LOOP vTrashPath := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('DATA') || 'TRASH/' || file_rec.A_SOURCE_KEY || '/' || file_rec.TABLE_ID || '/' || file_rec.SOURCE_FILE_NAME; vOdsPath := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('DATA') || 'ODS/' || file_rec.A_SOURCE_KEY || '/' || file_rec.TABLE_ID || '/' || file_rec.SOURCE_FILE_NAME; BEGIN DBMS_CLOUD.MOVE_OBJECT( source_credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName, source_object_uri => vTrashPath, target_object_uri => vOdsPath, target_credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName ); vFilesRestored := vFilesRestored + 1; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File restored from TRASH','DEBUG', file_rec.SOURCE_FILE_NAME); EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to restore file from TRASH','ERROR', file_rec.SOURCE_FILE_NAME); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); END; END LOOP; UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED SET PROCESSING_STATUS = 'INGESTED', ARCH_PATH = NULL, PARTITION_YEAR = NULL, PARTITION_MONTH = NULL WHERE PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED'; vFilesUpdated := SQL%ROWCOUNT; ELSE RAISE_APPLICATION_ERROR(-20104, 'No restore level specified. Provide pSourceFileReceivedKey, pSourceFileConfigKey, or set pRestoreAll=TRUE'); END IF; COMMIT; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Total files restored from TRASH: ' || vFilesRestored,'INFO'); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Total file records updated to INGESTED: ' || vFilesUpdated,'INFO'); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('TRASH restoration completed successfully','INFO', 'Level: ' || vRestoreLevel || ', Files restored: ' || vFilesRestored || ', Records updated: ' || vFilesUpdated); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO', vParameters); EXCEPTION WHEN OTHERS THEN ROLLBACK; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_UNKNOWN , 'ERROR', vParameters); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_UNKNOWN, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END RESTORE_FILE_FROM_TRASH; ---------------------------------------------------------------------------------------------------- PROCEDURE PURGE_TRASH_FOLDER ( pSourceFileReceivedKey IN CT_MRDS.A_SOURCE_FILE_RECEIVED.A_SOURCE_FILE_RECEIVED_KEY%TYPE DEFAULT NULL, pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE DEFAULT NULL, pPurgeAll IN BOOLEAN DEFAULT FALSE ) IS vSourceFileConfig CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE; vSourceFileReceived CT_MRDS.A_SOURCE_FILE_RECEIVED%ROWTYPE; vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vTrashLocationUri VARCHAR2(1000); vFilesDeleted PLS_INTEGER := 0; vFilesUpdated PLS_INTEGER := 0; vPurgeLevel VARCHAR2(50); cgBL CONSTANT VARCHAR2(2) := CHR(13)||CHR(10); BEGIN vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSourceFileReceivedKey => '||nvl(to_char(pSourceFileReceivedKey), 'NULL'), 'pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'), 'pPurgeAll => '||CASE WHEN pPurgeAll THEN 'TRUE' ELSE 'FALSE' END )); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); -- Determine purge level (priority: LEVEL 3 > LEVEL 2 > LEVEL 1) IF pSourceFileReceivedKey IS NOT NULL THEN vPurgeLevel := 'LEVEL_3_SINGLE_FILE'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Purge level: LEVEL 3 - Single file deletion','INFO', 'A_SOURCE_FILE_RECEIVED_KEY: ' || pSourceFileReceivedKey); -- LEVEL 3: Delete single file by A_SOURCE_FILE_RECEIVED_KEY BEGIN SELECT * INTO vSourceFileReceived FROM CT_MRDS.A_SOURCE_FILE_RECEIVED WHERE A_SOURCE_FILE_RECEIVED_KEY = pSourceFileReceivedKey AND PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED'; vSourceFileConfig := CT_MRDS.FILE_MANAGER.GET_SOURCE_FILE_CONFIG(pSourceFileConfigKey => vSourceFileReceived.A_SOURCE_FILE_CONFIG_KEY); EXCEPTION WHEN NO_DATA_FOUND THEN RAISE_APPLICATION_ERROR(-20301, 'File not found or status is not ARCHIVED_AND_TRASHED for A_SOURCE_FILE_RECEIVED_KEY: ' || pSourceFileReceivedKey); END; vTrashLocationUri := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('DATA') || 'TRASH/' || vSourceFileConfig.A_SOURCE_KEY || '/' || vSourceFileConfig.TABLE_ID || '/' || vSourceFileReceived.SOURCE_FILE_NAME; BEGIN DBMS_CLOUD.DELETE_OBJECT( credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName, object_uri => vTrashLocationUri ); vFilesDeleted := 1; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Single file deleted from TRASH','INFO', vSourceFileReceived.SOURCE_FILE_NAME); EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to delete file from TRASH','ERROR', vTrashLocationUri); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(-20302, 'Failed to delete single file from TRASH: ' || SQLERRM); END; UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED SET PROCESSING_STATUS = 'ARCHIVED_AND_PURGED' WHERE A_SOURCE_FILE_RECEIVED_KEY = pSourceFileReceivedKey AND PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED'; vFilesUpdated := SQL%ROWCOUNT; ELSIF pSourceFileConfigKey IS NOT NULL THEN vPurgeLevel := 'LEVEL_2_CONFIG_FILES'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Purge level: LEVEL 2 - Configuration-based deletion','INFO', 'pSourceFileConfigKey: ' || pSourceFileConfigKey); -- LEVEL 2: Delete all files for specific pSourceFileConfigKey vSourceFileConfig := CT_MRDS.FILE_MANAGER.GET_SOURCE_FILE_CONFIG(pSourceFileConfigKey => pSourceFileConfigKey); vTrashLocationUri := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('DATA') || 'TRASH/' || vSourceFileConfig.A_SOURCE_KEY || '/' || vSourceFileConfig.TABLE_ID || '/'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Purging TRASH folder for configuration','INFO', 'Location: ' || vTrashLocationUri); BEGIN FOR trash_file IN ( SELECT object_name FROM DBMS_CLOUD.LIST_OBJECTS( credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName, location_uri => vTrashLocationUri ) ) LOOP BEGIN DBMS_CLOUD.DELETE_OBJECT( credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName, object_uri => vTrashLocationUri || trash_file.object_name ); vFilesDeleted := vFilesDeleted + 1; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File deleted from TRASH','DEBUG', trash_file.object_name); EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to delete file from TRASH','ERROR', trash_file.object_name); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); END; END LOOP; EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to list or delete TRASH files','ERROR', vTrashLocationUri); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(-20303, 'Failed to purge TRASH folder for configuration: ' || SQLERRM); END; UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED SET PROCESSING_STATUS = 'ARCHIVED_AND_PURGED' WHERE A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey AND PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED'; vFilesUpdated := SQL%ROWCOUNT; ELSIF pPurgeAll THEN vPurgeLevel := 'LEVEL_1_GLOBAL_PURGE'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Purge level: LEVEL 1 - Global TRASH purge','INFO', 'Deleting ALL files with ARCHIVED_AND_TRASHED status'); -- LEVEL 1: Delete all files with ARCHIVED_AND_TRASHED status across all configurations FOR config_rec IN ( SELECT DISTINCT c.A_SOURCE_FILE_CONFIG_KEY, c.A_SOURCE_KEY, c.TABLE_ID FROM CT_MRDS.A_SOURCE_FILE_CONFIG c JOIN CT_MRDS.A_SOURCE_FILE_RECEIVED r ON r.A_SOURCE_FILE_CONFIG_KEY = c.A_SOURCE_FILE_CONFIG_KEY WHERE r.PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED' ) LOOP vTrashLocationUri := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('DATA') || 'TRASH/' || config_rec.A_SOURCE_KEY || '/' || config_rec.TABLE_ID || '/'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Processing TRASH location','DEBUG', vTrashLocationUri); BEGIN FOR trash_file IN ( SELECT object_name FROM DBMS_CLOUD.LIST_OBJECTS( credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName, location_uri => vTrashLocationUri ) ) LOOP BEGIN DBMS_CLOUD.DELETE_OBJECT( credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName, object_uri => vTrashLocationUri || trash_file.object_name ); vFilesDeleted := vFilesDeleted + 1; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File deleted from TRASH','DEBUG', trash_file.object_name); EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to delete file from TRASH','ERROR', trash_file.object_name); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); END; END LOOP; EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to list TRASH files','WARNING', vTrashLocationUri || ' - ' || SQLERRM); END; END LOOP; UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED SET PROCESSING_STATUS = 'ARCHIVED_AND_PURGED' WHERE PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED'; vFilesUpdated := SQL%ROWCOUNT; ELSE RAISE_APPLICATION_ERROR(-20304, 'No purge level specified. Provide pSourceFileReceivedKey, pSourceFileConfigKey, or set pPurgeAll=TRUE'); END IF; COMMIT; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Total files deleted from TRASH: ' || vFilesDeleted,'INFO'); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Total file records updated to ARCHIVED_AND_PURGED: ' || vFilesUpdated,'INFO'); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('TRASH folder purge completed successfully','INFO', 'Level: ' || vPurgeLevel || ', Files deleted: ' || vFilesDeleted || ', Records updated: ' || vFilesUpdated); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO', vParameters); EXCEPTION WHEN OTHERS THEN ROLLBACK; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_UNKNOWN , 'ERROR', vParameters); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_UNKNOWN, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END PURGE_TRASH_FOLDER; ---------------------------------------------------------------------------------------------------- FUNCTION PURGE_TRASH_FOLDER ( pSourceFileReceivedKey IN CT_MRDS.A_SOURCE_FILE_RECEIVED.A_SOURCE_FILE_RECEIVED_KEY%TYPE DEFAULT NULL, pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE DEFAULT NULL, pPurgeAll IN BOOLEAN DEFAULT FALSE ) RETURN PLS_INTEGER IS PRAGMA AUTONOMOUS_TRANSACTION; vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; BEGIN vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSourceFileReceivedKey => '||nvl(to_char(pSourceFileReceivedKey), 'NULL'), 'pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'), 'pPurgeAll => '||CASE WHEN pPurgeAll THEN 'TRUE' ELSE 'FALSE' END )); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); ---- PURGE_TRASH_FOLDER( pSourceFileReceivedKey => pSourceFileReceivedKey, pSourceFileConfigKey => pSourceFileConfigKey, pPurgeAll => pPurgeAll ); ---- CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); RETURN SQLCODE; EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RETURN SQLCODE; END PURGE_TRASH_FOLDER; ---------------------------------------------------------------------------------------------------- FUNCTION RESTORE_FILE_FROM_TRASH ( pSourceFileReceivedKey IN CT_MRDS.A_SOURCE_FILE_RECEIVED.A_SOURCE_FILE_RECEIVED_KEY%TYPE DEFAULT NULL, pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE DEFAULT NULL, pRestoreAll IN BOOLEAN DEFAULT FALSE ) RETURN PLS_INTEGER IS PRAGMA AUTONOMOUS_TRANSACTION; vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; BEGIN vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSourceFileReceivedKey => '||nvl(to_char(pSourceFileReceivedKey), 'NULL'), 'pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'), 'pRestoreAll => '||CASE WHEN pRestoreAll THEN 'TRUE' ELSE 'FALSE' END )); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); ---- RESTORE_FILE_FROM_TRASH( pSourceFileReceivedKey => pSourceFileReceivedKey, pSourceFileConfigKey => pSourceFileConfigKey, pRestoreAll => pRestoreAll ); ---- CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); RETURN SQLCODE; EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RETURN SQLCODE; END RESTORE_FILE_FROM_TRASH; ---------------------------------------------------------------------------------------------------- -- PACKAGE VERSION MANAGEMENT FUNCTIONS IMPLEMENTATION ---------------------------------------------------------------------------------------------------- FUNCTION GET_VERSION RETURN VARCHAR2 IS BEGIN RETURN PACKAGE_VERSION; END GET_VERSION; ---------------------------------------------------------------------------------------------------- FUNCTION GET_BUILD_INFO RETURN VARCHAR2 IS BEGIN RETURN CT_MRDS.ENV_MANAGER.GET_PACKAGE_VERSION_INFO( pPackageName => 'FILE_ARCHIVER', pVersion => PACKAGE_VERSION, pBuildDate => PACKAGE_BUILD_DATE, pAuthor => PACKAGE_AUTHOR ); END GET_BUILD_INFO; ---------------------------------------------------------------------------------------------------- FUNCTION GET_VERSION_HISTORY RETURN VARCHAR2 IS BEGIN RETURN CT_MRDS.ENV_MANAGER.FORMAT_VERSION_HISTORY( pPackageName => 'FILE_ARCHIVER', pVersionHistory => VERSION_HISTORY ); END GET_VERSION_HISTORY; ---------------------------------------------------------------------------------------------------- FUNCTION FN_ARCHIVE_TABLE_DATA ( pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE ) RETURN PLS_INTEGER IS vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; BEGIN vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL') )); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); ---- ARCHIVE_TABLE_DATA(pSourceFileConfigKey => pSourceFileConfigKey); ---- CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); RETURN SQLCODE; EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RETURN SQLCODE; END FN_ARCHIVE_TABLE_DATA; ---------------------------------------------------------------------------------------------------- FUNCTION FN_GATHER_TABLE_STAT ( pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE ) RETURN PLS_INTEGER IS vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; BEGIN vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'))); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); ---- GATHER_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey); ---- CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); RETURN SQLCODE; EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RETURN SQLCODE; END FN_GATHER_TABLE_STAT; ---------------------------------------------------------------------------------------------------- -- BATCH ARCHIVAL PROCEDURES ---------------------------------------------------------------------------------------------------- PROCEDURE ARCHIVE_ALL ( pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE DEFAULT NULL, pSourceKey IN CT_MRDS.A_SOURCE.A_SOURCE_KEY%TYPE DEFAULT NULL, pArchiveAll IN BOOLEAN DEFAULT FALSE ) IS vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vTablesArchived PLS_INTEGER := 0; vTablesSkipped PLS_INTEGER := 0; vTablesFailed PLS_INTEGER := 0; vProcessingLevel VARCHAR2(50); BEGIN vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSourceFileConfigKey => '''||nvl(to_char(pSourceFileConfigKey),'NULL')||'''', 'pSourceKey => '''||nvl(pSourceKey,'NULL')||'''', 'pArchiveAll => '''||CASE WHEN pArchiveAll THEN 'TRUE' ELSE 'FALSE' END||'''' )); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); -- Determine processing level and validate parameters IF pSourceFileConfigKey IS NOT NULL THEN vProcessingLevel := 'LEVEL 1: Single Config Key'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Processing Level 1: pSourceFileConfigKey=' || pSourceFileConfigKey, 'INFO'); ELSIF pSourceKey IS NOT NULL THEN vProcessingLevel := 'LEVEL 2: Source Key'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Processing Level 2: pSourceKey=' || pSourceKey, 'INFO'); ELSIF pArchiveAll THEN vProcessingLevel := 'LEVEL 3: Archive All'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Processing Level 3: Archive All Enabled Tables', 'INFO'); ELSE RAISE_APPLICATION_ERROR(-20003, 'No processing level specified. Provide pSourceFileConfigKey, pSourceKey, or set pArchiveAll=TRUE'); END IF; FOR config_rec IN ( SELECT A_SOURCE_FILE_CONFIG_KEY, TABLE_ID, IS_ARCHIVE_ENABLED, IS_KEEP_IN_TRASH, A_SOURCE_KEY FROM CT_MRDS.A_SOURCE_FILE_CONFIG WHERE SOURCE_FILE_TYPE = 'INPUT' AND ( -- Level 1: Specific config key (pSourceFileConfigKey IS NOT NULL AND A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey) OR -- Level 2: All configs for source key (pSourceFileConfigKey IS NULL AND pSourceKey IS NOT NULL AND A_SOURCE_KEY = pSourceKey) OR -- Level 3: All configs when pArchiveAll = TRUE (pSourceFileConfigKey IS NULL AND pSourceKey IS NULL AND pArchiveAll = TRUE) ) ORDER BY A_SOURCE_KEY, A_SOURCE_FILE_CONFIG_KEY ) LOOP IF config_rec.IS_ARCHIVE_ENABLED = 'N' THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT( 'Skipping table ' || config_rec.TABLE_ID || ' (IS_ARCHIVE_ENABLED=N) [Source: ' || config_rec.A_SOURCE_KEY || ', Config: ' || config_rec.A_SOURCE_FILE_CONFIG_KEY || ']', 'INFO' ); vTablesSkipped := vTablesSkipped + 1; ELSE BEGIN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT( 'Archiving table ' || config_rec.TABLE_ID || ' [Source: ' || config_rec.A_SOURCE_KEY || ', Config: ' || config_rec.A_SOURCE_FILE_CONFIG_KEY || ', IS_KEEP_IN_TRASH=' || config_rec.IS_KEEP_IN_TRASH || ']', 'INFO' ); ARCHIVE_TABLE_DATA(pSourceFileConfigKey => config_rec.A_SOURCE_FILE_CONFIG_KEY); vTablesArchived := vTablesArchived + 1; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT( 'Successfully archived table ' || config_rec.TABLE_ID, 'INFO' ); EXCEPTION WHEN OTHERS THEN vTablesFailed := vTablesFailed + 1; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT( 'Failed to archive table ' || config_rec.TABLE_ID || ' [Config: ' || config_rec.A_SOURCE_FILE_CONFIG_KEY || ']: ' || SQLERRM, 'ERROR' ); -- Continue with next table END; END IF; END LOOP; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT( vProcessingLevel || ' - Batch archival summary: Archived=' || vTablesArchived || ', Skipped=' || vTablesSkipped || ', Failed=' || vTablesFailed, 'INFO' ); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_UNKNOWN , 'ERROR', vParameters); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_UNKNOWN, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END ARCHIVE_ALL; ---------------------------------------------------------------------------------------------------- FUNCTION FN_ARCHIVE_ALL ( pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE DEFAULT NULL, pSourceKey IN CT_MRDS.A_SOURCE.A_SOURCE_KEY%TYPE DEFAULT NULL, pArchiveAll IN BOOLEAN DEFAULT FALSE ) RETURN PLS_INTEGER IS vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; BEGIN vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSourceFileConfigKey => '''||nvl(to_char(pSourceFileConfigKey),'NULL')||'''', 'pSourceKey => '''||nvl(pSourceKey,'NULL')||'''', 'pArchiveAll => '''||CASE WHEN pArchiveAll THEN 'TRUE' ELSE 'FALSE' END||'''' )); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); ---- ARCHIVE_ALL( pSourceFileConfigKey => pSourceFileConfigKey, pSourceKey => pSourceKey, pArchiveAll => pArchiveAll ); ---- CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); RETURN SQLCODE; EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RETURN SQLCODE; END FN_ARCHIVE_ALL; ---------------------------------------------------------------------------------------------------- -- BATCH STATISTICS GATHERING PROCEDURES ---------------------------------------------------------------------------------------------------- PROCEDURE GATHER_TABLE_STAT_ALL ( pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE DEFAULT NULL, pSourceKey IN CT_MRDS.A_SOURCE.A_SOURCE_KEY%TYPE DEFAULT NULL, pGatherAll IN BOOLEAN DEFAULT FALSE, pOnlyEnabled IN BOOLEAN DEFAULT TRUE ) IS vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vTablesProcessed PLS_INTEGER := 0; vTablesSkipped PLS_INTEGER := 0; vTablesFailed PLS_INTEGER := 0; vProcessingLevel VARCHAR2(50); vEnabledFilter VARCHAR2(50); BEGIN vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSourceFileConfigKey => '''||nvl(to_char(pSourceFileConfigKey),'NULL')||'''', 'pSourceKey => '''||nvl(pSourceKey,'NULL')||'''', 'pGatherAll => '''||CASE WHEN pGatherAll THEN 'TRUE' ELSE 'FALSE' END||'''', 'pOnlyEnabled => '''||CASE WHEN pOnlyEnabled THEN 'TRUE' ELSE 'FALSE' END||'''' )); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); -- Determine processing level and validate parameters IF pSourceFileConfigKey IS NOT NULL THEN vProcessingLevel := 'LEVEL 1: Single Config Key'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Processing Level 1: pSourceFileConfigKey=' || pSourceFileConfigKey, 'INFO'); ELSIF pSourceKey IS NOT NULL THEN vProcessingLevel := 'LEVEL 2: Source Key'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Processing Level 2: pSourceKey=' || pSourceKey, 'INFO'); ELSIF pGatherAll THEN vProcessingLevel := 'LEVEL 3: Gather All Statistics'; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Processing Level 3: Gather All Enabled Statistics', 'INFO'); ELSE RAISE_APPLICATION_ERROR(-20003, 'No processing level specified. Provide pSourceFileConfigKey, pSourceKey, or set pGatherAll=TRUE'); END IF; -- Set enabled filter info vEnabledFilter := CASE WHEN pOnlyEnabled THEN 'IS_ARCHIVE_ENABLED=Y only' ELSE 'All tables' END; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Filter mode: ' || vEnabledFilter, 'INFO'); FOR config_rec IN ( SELECT A_SOURCE_FILE_CONFIG_KEY, TABLE_ID, IS_ARCHIVE_ENABLED, A_SOURCE_KEY FROM CT_MRDS.A_SOURCE_FILE_CONFIG WHERE SOURCE_FILE_TYPE = 'INPUT' AND ( -- Level 1: Specific config key (pSourceFileConfigKey IS NOT NULL AND A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey) OR -- Level 2: All configs for source key (pSourceFileConfigKey IS NULL AND pSourceKey IS NOT NULL AND A_SOURCE_KEY = pSourceKey) OR -- Level 3: All configs when pGatherAll = TRUE (pSourceFileConfigKey IS NULL AND pSourceKey IS NULL AND pGatherAll = TRUE) ) -- Apply IS_ARCHIVE_ENABLED filter if pOnlyEnabled = TRUE AND (pOnlyEnabled = FALSE OR IS_ARCHIVE_ENABLED = 'Y') ORDER BY A_SOURCE_KEY, A_SOURCE_FILE_CONFIG_KEY ) LOOP IF pOnlyEnabled AND config_rec.IS_ARCHIVE_ENABLED = 'N' THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT( 'Skipping table ' || config_rec.TABLE_ID || ' (IS_ARCHIVE_ENABLED=N) [Source: ' || config_rec.A_SOURCE_KEY || ', Config: ' || config_rec.A_SOURCE_FILE_CONFIG_KEY || ']', 'INFO' ); vTablesSkipped := vTablesSkipped + 1; ELSE BEGIN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT( 'Gathering statistics for table ' || config_rec.TABLE_ID || ' [Source: ' || config_rec.A_SOURCE_KEY || ', Config: ' || config_rec.A_SOURCE_FILE_CONFIG_KEY || ', IS_ARCHIVE_ENABLED=' || config_rec.IS_ARCHIVE_ENABLED || ']', 'INFO' ); GATHER_TABLE_STAT(pSourceFileConfigKey => config_rec.A_SOURCE_FILE_CONFIG_KEY); vTablesProcessed := vTablesProcessed + 1; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT( 'Successfully gathered statistics for table ' || config_rec.TABLE_ID, 'INFO' ); EXCEPTION WHEN OTHERS THEN vTablesFailed := vTablesFailed + 1; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT( 'Failed to gather statistics for table ' || config_rec.TABLE_ID || ' [Config: ' || config_rec.A_SOURCE_FILE_CONFIG_KEY || ']: ' || SQLERRM, 'ERROR' ); -- Continue with next table END; END IF; END LOOP; CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT( vProcessingLevel || ' - Batch statistics gathering summary: Processed=' || vTablesProcessed || ', Skipped=' || vTablesSkipped || ', Failed=' || vTablesFailed || ' [Filter: ' || vEnabledFilter || ']', 'INFO' ); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_UNKNOWN , 'ERROR', vParameters); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_UNKNOWN, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END GATHER_TABLE_STAT_ALL; ---------------------------------------------------------------------------------------------------- FUNCTION FN_GATHER_TABLE_STAT_ALL ( pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE DEFAULT NULL, pSourceKey IN CT_MRDS.A_SOURCE.A_SOURCE_KEY%TYPE DEFAULT NULL, pGatherAll IN BOOLEAN DEFAULT FALSE, pOnlyEnabled IN BOOLEAN DEFAULT TRUE ) RETURN PLS_INTEGER IS vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; BEGIN vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSourceFileConfigKey => '''||nvl(to_char(pSourceFileConfigKey),'NULL')||'''', 'pSourceKey => '''||nvl(pSourceKey,'NULL')||'''', 'pGatherAll => '''||CASE WHEN pGatherAll THEN 'TRUE' ELSE 'FALSE' END||'''', 'pOnlyEnabled => '''||CASE WHEN pOnlyEnabled THEN 'TRUE' ELSE 'FALSE' END||'''' )); CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); ---- GATHER_TABLE_STAT_ALL( pSourceFileConfigKey => pSourceFileConfigKey, pSourceKey => pSourceKey, pGatherAll => pGatherAll, pOnlyEnabled => pOnlyEnabled ); ---- CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); RETURN SQLCODE; EXCEPTION WHEN OTHERS THEN CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RETURN SQLCODE; END FN_GATHER_TABLE_STAT_ALL; ---------------------------------------------------------------------------------------------------- END; /