create or replace PACKAGE BODY CT_MRDS.FILE_ARCHIVER AS ---------------------------------------------------------------------------------------------------- FUNCTION GET_TABLE_STAT(pSourceFileConfigKey IN NUMBER) RETURN CT_MRDS.A_TABLE_STAT%ROWTYPE IS vTableStat CT_MRDS.A_TABLE_STAT%ROWTYPE; vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vCount PLS_INTEGER; vSourceFileType CT_MRDS.A_SOURCE_FILE_CONFIG.SOURCE_FILE_TYPE%TYPE; BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey),NULL))); ENV_MANAGER.LOG_PROCESS_EVENT('Start','DEBUG', vParameters); SELECT count(*) , min(SOURCE_FILE_TYPE) INTO vCount, vSourceFileType FROM CT_MRDS.A_TABLE_STAT s JOIN CT_MRDS.A_SOURCE_FILE_CONFIG c ON s.A_SOURCE_FILE_CONFIG_KEY = c.A_SOURCE_FILE_CONFIG_KEY WHERE s.A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey; IF vCount=0 and vSourceFileType='INPUT' THEN GATHER_TABLE_STAT(pSourceFileConfigKey); END IF; BEGIN SELECT * INTO vTableStat FROM CT_MRDS.A_TABLE_STAT WHERE A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey; -- EXCEPTION -- WHEN NO_DATA_FOUND THEN -- END; ENV_MANAGER.LOG_PROCESS_EVENT('End','DEBUG',vParameters); RETURN vTableStat; END GET_TABLE_STAT; ---------------------------------------------------------------------------------------------------- PROCEDURE ARCHIVE_TABLE_DATA ( pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE ) IS vSourceFileConfig CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE; vTableStat CT_MRDS.A_TABLE_STAT%ROWTYPE; vQuery VARCHAR2(4000); vTableName VARCHAR2(200); vUri VARCHAR2(1000); vfiles T_FILENAMES; vFilename VARCHAR2(300); vOperationId NUMBER := -1; vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vArchivalTriggeredBy VARCHAR2(60); -- Possible values: FILES_COUNT, ROWS_COUNT, BYTES_SUM vUserLoadOperations USER_LOAD_OPERATIONS%ROWTYPE; vProcessControlStatus VARCHAR2(60) := 'OK'; BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'))); ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); vSourceFileConfig := FILE_MANAGER.GET_SOURCE_FILE_CONFIG(pSourceFileConfigKey => pSourceFileConfigKey); vTableStat := GET_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey); if vSourceFileConfig.SOURCE_FILE_TYPE <> 'INPUT' then ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.MSG_NOT_INPUT_SOURCE_FILE_TYPE, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_NOT_INPUT_SOURCE_FILE_TYPE, ENV_MANAGER.MSG_NOT_INPUT_SOURCE_FILE_TYPE); end if; if vTableStat.created < sysdate-(vSourceFileConfig.HOURS_TO_EXPIRE_STATISTICS/24) then GATHER_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey); vTableStat := GET_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey); end if; if vTableStat.OVER_ARCH_THRESOLD_FILE_COUNT >= vSourceFileConfig.FILES_COUNT_OVER_ARCHIVE_THRESHOLD then vArchivalTriggeredBy := 'FILES_COUNT'; elsif vTableStat.OVER_ARCH_THRESOLD_ROW_COUNT >= vSourceFileConfig.ROWS_COUNT_OVER_ARCHIVE_THRESHOLD then vArchivalTriggeredBy := vArchivalTriggeredBy||', ROWS_COUNT'; elsif vTableStat.OVER_ARCH_THRESOLD_SIZE >= vSourceFileConfig.BYTES_SUM_OVER_ARCHIVE_THRESHOLD then vArchivalTriggeredBy := vArchivalTriggeredBy||', BYTES_SUM'; else ENV_MANAGER.LOG_PROCESS_EVENT('Non of archival triggers reached','INFO'); end if; if LENGTH(vArchivalTriggeredBy)>0 THEN ENV_MANAGER.LOG_PROCESS_EVENT('Archival Triggered By: '||vArchivalTriggeredBy,'INFO'); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSourceFileConfig.ODS_SCHEMA_NAME) || '.'||vSourceFileConfig.A_SOURCE_KEY||'_'||DBMS_ASSERT.simple_sql_name(vSourceFileConfig.TABLE_ID)||'_ODS'; vQuery := ' select t_filename( file$name ,file$path , to_char(h.workflow_start,''yyyy'') , to_char(h.workflow_start,''mm'') ) from '||vTableName||' s join CT_MRDS.a_workflow_history h on s.a_workflow_history_key = h.a_workflow_history_key where extract(day from (systimestamp - workflow_start)) > '||vSourceFileConfig.DAYS_FOR_ARCHIVE_THRESHOLD ; -- Get all files that will be archived into "vfiles" collection ("regular data files") execute immediate vQuery bulk collect into vfiles; -- Start EXPORT "regular data files" to parquet and DROP "csv" FOR ym_loop IN (select distinct year, month from table(vfiles) order by 1,2) LOOP dbms_output.put_line('year: '||ym_loop.year||' - '||'month: '||ym_loop.month); vQuery:= 'select s.* -- ,r.partition_year -- ,r.partition_month from '|| vTableName ||' s join CT_MRDS.A_SOURCE_FILE_RECEIVED r on s.file$name = r.source_file_name and r.a_source_file_config_key = '||pSourceFileConfigKey||' and r.partition_year='''||ym_loop.year||''' and r.partition_month='''||ym_loop.month||''' and r.PROCESSING_STATUS = ''INGESTED'' ' ; vUri := FILE_MANAGER.GET_BUCKET_URI('ARCHIVE')||vSourceFileConfig.A_SOURCE_KEY||'/'||vSourceFileConfig.TABLE_ID||'/PARTITION_YEAR='||ym_loop.year||'/PARTITION_MONTH='||ym_loop.month||'/'; ENV_MANAGER.LOG_PROCESS_EVENT('Start Archiving for YEAR_MONTH: '||ym_loop.year||'_'||ym_loop.month ,'INFO'); ENV_MANAGER.LOG_PROCESS_EVENT('Parameter for DBMS_CLOUD.EXPORT_DATA => file_uri_list' ,'DEBUG',vUri); ENV_MANAGER.LOG_PROCESS_EVENT('Parameter for DBMS_CLOUD.EXPORT_DATA => query' ,'DEBUG',vQuery); BEGIN DBMS_CLOUD.EXPORT_DATA( credential_name => ENV_MANAGER.gvCredentialName, file_uri_list => vUri||'d' , format => json_object('type' value 'parquet'), query => vQuery, operation_id => vOperationId ); EXCEPTION WHEN OTHERS THEN vProcessControlStatus :='EXPORT_FAILURE'; ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED, ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED); END; ENV_MANAGER.LOG_PROCESS_EVENT('vOperationId of export: '||vOperationId,'DEBUG'); -- Get USER_LOAD_OPERATIONS info select * into vUserLoadOperations from USER_LOAD_OPERATIONS where id = vOperationId; IF vUserLoadOperations.STATUS <>'COMPLETED' THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED, ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED ||cgBL|| ' Export ended with status '||vUserLoadOperations.STATUS); ELSIF vUserLoadOperations.STATUS = 'COMPLETED' and vUserLoadOperations.ROWS_LOADED = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED, ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED ||cgBL|| ' Zero rows were exported.'); ELSE ENV_MANAGER.LOG_PROCESS_EVENT('Data exported to archival file for YEAR_MONTH: 2025_01','INFO', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT(FILE_MANAGER.GET_DET_USER_LOAD_OPERATIONS (pOperationId => vOperationId),'DEBUG', vParameters); END IF; SELECT object_name into vFilename from DBMS_CLOUD.LIST_OBJECTS( credential_name => 'OCI$RESOURCE_PRINCIPAL', location_uri => vUri) where TO_UTC_TIMESTAMP_TZ(REGEXP_REPLACE(object_name, '.*(\d{4})(\d{2})(\d{2})T(\d{2})(\d{2})(\d{2})(\d{6})Z\.parquet$', '\1-\2-\3T\4:\5:\6.\7')) between vUserLoadOperations.START_TIME and vUserLoadOperations.UPDATE_TIME ; -- Try to drop EXPORTED FILES ("regular data files") BEGIN FOR f in (select filename, pathname from table(vfiles) where year = ym_loop.year and month = ym_loop.month) loop -- first change of status BEGIN UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED r SET PROCESSING_STATUS = 'ARCHIVED' ,ARCH_FILE_NAME = vUri||vFilename WHERE r.a_source_file_config_key= pSourceFileConfigKey AND r.source_file_name = f.filename AND r.processing_status = 'INGESTED' ; EXCEPTION WHEN OTHERS THEN ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); vProcessControlStatus := 'CHANGE_STATUS_TO_ARCHIVED_FAILURE'; END; EXIT WHEN vProcessControlStatus = 'CHANGE_STATUS_TO_ARCHIVED_FAILURE'; -- move file to trash before dropping BEGIN DBMS_CLOUD.MOVE_OBJECT(source_credential_name => ENV_MANAGER.gvCredentialName, source_object_uri => f.pathname||'/'||f.filename, target_object_uri => replace(f.pathname,'ODS','TRASH')||'/'||f.filename, target_credential_name => ENV_MANAGER.gvCredentialName ); ENV_MANAGER.LOG_PROCESS_EVENT('File moved to TRASH.','DEBUG', f.pathname||'/'||f.filename); EXCEPTION WHEN OTHERS THEN ENV_MANAGER.LOG_PROCESS_EVENT('Failed to move file to TRASH.','ERROR', f.pathname||'/'||f.filename); ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); rollback; vProcessControlStatus := 'MOVE_FILE_TO_TRASH_FAILURE'; END; EXIT WHEN vProcessControlStatus = 'MOVE_FILE_TO_TRASH_FAILURE'; commit; END LOOP; -------------------------------------------------------------------- -- IF All goes fine till this point, we drop files from TRASH (if not then ROLLBACK PART) IF vProcessControlStatus = 'OK' THEN FOR f in (select filename, pathname from table(vfiles) where year = ym_loop.year and month = ym_loop.month) LOOP --Drop file from TRASH DBMS_CLOUD.DELETE_OBJECT(credential_name => ENV_MANAGER.gvCredentialName, object_uri => replace(f.pathname,'ODS','TRASH')||'/'||f.filename); ENV_MANAGER.LOG_PROCESS_EVENT('File dropped from TRASH.','DEBUG', f.pathname||'/'||f.filename); END LOOP; --ROLLBACK PART --ROLLBACK PROCESS in case of FAILURE (restore files from TRASH) ELSIF vProcessControlStatus = 'MOVE_FILE_TO_TRASH_FAILURE' THEN FOR f in ( SELECT vf.filename, vf.pathname FROM TABLE(vfiles) vf JOIN CT_MRDS.A_SOURCE_FILE_RECEIVED r ON r.source_file_name = vf.filename AND r.a_source_file_config_key = pSourceFileConfigKey AND r.PROCESSING_STATUS = 'ARCHIVED' AND vf.year = ym_loop.year AND vf.month = ym_loop.month ) LOOP BEGIN DBMS_CLOUD.MOVE_OBJECT(source_credential_name => ENV_MANAGER.gvCredentialName, source_object_uri => replace(f.pathname,'ODS','TRASH')||'/'||f.filename, target_object_uri => f.pathname||'/'||f.filename, target_credential_name => ENV_MANAGER.gvCredentialName ); ENV_MANAGER.LOG_PROCESS_EVENT('File restored from TRASH.','DEBUG', f.pathname||'/'||f.filename); UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED r SET PROCESSING_STATUS = 'INGESTED' ,ARCH_FILE_NAME = NULL WHERE r.a_source_file_config_key = pSourceFileConfigKey AND r.source_file_name = f.filename ; EXCEPTION WHEN OTHERS THEN ENV_MANAGER.LOG_PROCESS_EVENT('Failed to restore file from TRASH.','ERROR', replace(f.pathname,'ODS','TRASH')||'/'||f.filename); ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); vProcessControlStatus := 'RESTORE_FILE_FROM_TRASH_FAILURE'; END; END LOOP; DBMS_CLOUD.DELETE_OBJECT(credential_name => ENV_MANAGER.gvCredentialName, object_uri => vUri||vFilename); ENV_MANAGER.LOG_PROCESS_EVENT('ROLLBACK operation: Archival PARQUET file dropped.','DEBUG', vUri||vFilename); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_MOVE_FILE_TO_TRASH_FAILED, ENV_MANAGER.MSG_MOVE_FILE_TO_TRASH_FAILED); ELSIF vProcessControlStatus = 'CHANGE_STATUS_TO_ARCHIVED_FAILURE' THEN ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.MSG_CHANGE_STAT_TO_ARCHIVED_FAILED, 'ERROR', vParameters); DBMS_CLOUD.DELETE_OBJECT(credential_name => ENV_MANAGER.gvCredentialName, object_uri => vUri||vFilename); ENV_MANAGER.LOG_PROCESS_EVENT('Archival PARQUET file dropped.','DEBUG', vUri||vFilename); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_CHANGE_STAT_TO_ARCHIVED_FAILED, ENV_MANAGER.MSG_CHANGE_STAT_TO_ARCHIVED_FAILED); ELSIF vProcessControlStatus = 'RESTORE_FILE_FROM_TRASH_FAILURE' THEN ENV_MANAGER.LOG_PROCESS_EVENT('Some files were not restored from TRASH. Check A_PROCESS_LOG table for details','ERROR'); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_RESTORE_FILE_FROM_TRASH, ENV_MANAGER.MSG_RESTORE_FILE_FROM_TRASH); END IF; EXCEPTION WHEN ENV_MANAGER.ERR_CHANGE_STAT_TO_ARCHIVED_FAILED THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_CHANGE_STAT_TO_ARCHIVED_FAILED, ENV_MANAGER.MSG_CHANGE_STAT_TO_ARCHIVED_FAILED); WHEN ENV_MANAGER.ERR_MOVE_FILE_TO_TRASH_FAILED THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_MOVE_FILE_TO_TRASH_FAILED, ENV_MANAGER.MSG_MOVE_FILE_TO_TRASH_FAILED); WHEN ENV_MANAGER.ERR_RESTORE_FILE_FROM_TRASH THEN ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_RESTORE_FILE_FROM_TRASH, ENV_MANAGER.MSG_RESTORE_FILE_FROM_TRASH); WHEN OTHERS THEN ENV_MANAGER.LOG_PROCESS_EVENT('Error during archiving process','ERROR'); ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_DROP_EXPORTED_FILES_FAILED, ENV_MANAGER.MSG_DROP_EXPORTED_FILES_FAILED); END; -- END of "Try to drop EXPORTED FILES" ENV_MANAGER.LOG_PROCESS_EVENT('All archived files had been dropped.','INFO'); ENV_MANAGER.LOG_PROCESS_EVENT('End Archiving for YEAR_MONTH: '||ym_loop.year||'_'||ym_loop.month ,'INFO'); END LOOP; --ym_loop end (YEAR_MONTH) COMMIT; ELSE ENV_MANAGER.LOG_PROCESS_EVENT('Non of archival thresholds reached. Skip archiving.'||vArchivalTriggeredBy,'INFO'); END IF; ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN ENV_MANAGER.ERR_NOT_INPUT_SOURCE_FILE_TYPE THEN ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.MSG_NOT_INPUT_SOURCE_FILE_TYPE , 'ERROR', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_NOT_INPUT_SOURCE_FILE_TYPE, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); WHEN ENV_MANAGER.ERR_EXP_DATA_FOR_ARCH_FAILED THEN ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED , 'ERROR', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); WHEN ENV_MANAGER.ERR_CHANGE_STAT_TO_ARCHIVED_FAILED THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_CHANGE_STAT_TO_ARCHIVED_FAILED, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); WHEN ENV_MANAGER.ERR_MOVE_FILE_TO_TRASH_FAILED THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_MOVE_FILE_TO_TRASH_FAILED, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); WHEN OTHERS THEN ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.MSG_UNKNOWN , 'ERROR', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END ARCHIVE_TABLE_DATA; ---------------------------------------------------------------------------------------------------- PROCEDURE GATHER_TABLE_STAT ( pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE ) IS vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vStats CT_MRDS.A_TABLE_STAT%ROWTYPE; vSourceFileConfig CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE; vTableName VARCHAR2(200); vQuery VARCHAR2(32000); BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'))); ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); vSourceFileConfig := FILE_MANAGER.GET_SOURCE_FILE_CONFIG(pSourceFileConfigKey => pSourceFileConfigKey); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSourceFileConfig.ODS_SCHEMA_NAME) || '.'||vSourceFileConfig.A_SOURCE_KEY||'_'||DBMS_ASSERT.simple_sql_name(vSourceFileConfig.TABLE_ID)||'_ODS'; ENV_MANAGER.LOG_PROCESS_EVENT('vTableName','DEBUG',vTableName); vQuery := 'with tmp as ( select s.* ,file$name as filename ,h.workflow_start , to_char(h.workflow_start,''yyyy'') as year , to_char(h.workflow_start,''mm'') as month from '||vTableName||' s join CT_MRDS.a_workflow_history h on s.a_workflow_history_key = h.a_workflow_history_key ) , tmp_gr as ( select filename, count(*) as row_count_per_file, min(workflow_start) as workflow_start from tmp group by filename ) select NULL as A_TABLE_STAT_KEY ,'||pSourceFileConfigKey||' as A_SOURCE_FILE_CONFIG_KEY ,'''||vTableName||''' as TABLE_NAME ,count(*) as FILE_COUNT ,sum(case when extract(day from (systimestamp - workflow_start)) > '||vSourceFileConfig.DAYS_FOR_ARCHIVE_THRESHOLD||' then 1 else 0 end) as OLD_FILE_COUNT ,sum (row_count_per_file) as ROW_COUNT ,sum(case when extract(day from (systimestamp - workflow_start)) > '||vSourceFileConfig.DAYS_FOR_ARCHIVE_THRESHOLD||' then row_count_per_file else 0 end) as OLD_ROW_COUNT ,sum(r.bytes) as BYTES ,sum(case when extract(day from (systimestamp - workflow_start)) > '||vSourceFileConfig.DAYS_FOR_ARCHIVE_THRESHOLD||' then r.bytes else 0 end) as OLD_BYTES ,'||vSourceFileConfig.DAYS_FOR_ARCHIVE_THRESHOLD||' as DAYS_FOR_ARCHIVE_THRESHOLD ,systimestamp as CREATED from tmp_gr t join (SELECT * from DBMS_CLOUD.LIST_OBJECTS( credential_name => '''||ENV_MANAGER.gvCredentialName||''', location_uri => FILE_MANAGER.GET_BUCKET_URI(''ODS'')||''ODS/'||vSourceFileConfig.A_SOURCE_KEY||'/'||vSourceFileConfig.TABLE_ID||'/'' ) ) r on t.filename = r.object_name' ; ENV_MANAGER.LOG_PROCESS_EVENT('vQuery','DEBUG',vQuery); execute immediate vQuery into vStats; vStats.A_TABLE_STAT_KEY := CT_MRDS.A_TABLE_STAT_KEY_SEQ.NEXTVAL; insert into A_TABLE_STAT_HIST values vStats; delete from A_TABLE_STAT where A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey; insert into A_TABLE_STAT values vStats; COMMIT; ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN OTHERS THEN ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.MSG_UNKNOWN , 'ERROR', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END GATHER_TABLE_STAT; ---------------------------------------------------------------------------------------------------- -- PACKAGE VERSION MANAGEMENT FUNCTIONS IMPLEMENTATION ---------------------------------------------------------------------------------------------------- FUNCTION GET_VERSION RETURN VARCHAR2 IS BEGIN RETURN PACKAGE_VERSION; END GET_VERSION; ---------------------------------------------------------------------------------------------------- FUNCTION GET_BUILD_INFO RETURN VARCHAR2 IS BEGIN RETURN ENV_MANAGER.GET_PACKAGE_VERSION_INFO( pPackageName => 'FILE_ARCHIVER', pVersion => PACKAGE_VERSION, pBuildDate => PACKAGE_BUILD_DATE, pAuthor => PACKAGE_AUTHOR ); END GET_BUILD_INFO; ---------------------------------------------------------------------------------------------------- FUNCTION GET_VERSION_HISTORY RETURN VARCHAR2 IS BEGIN RETURN ENV_MANAGER.FORMAT_VERSION_HISTORY( pPackageName => 'FILE_ARCHIVER', pVersionHistory => VERSION_HISTORY ); END GET_VERSION_HISTORY; ---------------------------------------------------------------------------------------------------- END;