Files
mars/MARS_Packages/REL02_POST/MARS-1409/new_version/FILE_ARCHIVER.pkb

1355 lines
74 KiB
Plaintext

create or replace PACKAGE BODY CT_MRDS.FILE_ARCHIVER
AS
----------------------------------------------------------------------------------------------------
-- PRIVATE FUNCTION: GET_ARCHIVAL_WHERE_CLAUSE
----------------------------------------------------------------------------------------------------
/**
* @name GET_ARCHIVAL_WHERE_CLAUSE
* @desc Private function that generates WHERE clause based on ARCHIVAL_STRATEGY configuration.
* Supports three strategies: THRESHOLD_BASED, MINIMUM_AGE_MONTHS, HYBRID.
* @param pSourceFileConfig - Source file configuration record with ARCHIVAL_STRATEGY
* @return VARCHAR2 - WHERE clause for filtering archival candidates
**/
FUNCTION GET_ARCHIVAL_WHERE_CLAUSE(
pSourceFileConfig IN CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE
) RETURN VARCHAR2
IS
vWhereClause VARCHAR2(4000);
cgBL CONSTANT VARCHAR2(2) := CHR(13)||CHR(10);
BEGIN
CASE pSourceFileConfig.ARCHIVAL_STRATEGY
-- Legacy threshold-based strategy (backward compatible)
WHEN 'THRESHOLD_BASED' THEN
vWhereClause := 'extract(day from (systimestamp - workflow_start)) > ' || pSourceFileConfig.ARCHIVE_THRESHOLD_DAYS;
-- Archive data older than X months (0 = current month only)
WHEN 'MINIMUM_AGE_MONTHS' THEN
IF pSourceFileConfig.MINIMUM_AGE_MONTHS IS NULL THEN
RAISE_APPLICATION_ERROR(-20001, 'MINIMUM_AGE_MONTHS must be configured for MINIMUM_AGE_MONTHS strategy');
END IF;
vWhereClause := 'workflow_start < ADD_MONTHS(TRUNC(SYSDATE, ''MM''), -' || pSourceFileConfig.MINIMUM_AGE_MONTHS || ')';
-- Hybrid: Current month exclusion AND minimum age requirement
WHEN 'HYBRID' THEN
IF pSourceFileConfig.MINIMUM_AGE_MONTHS IS NULL THEN
RAISE_APPLICATION_ERROR(-20001, 'MINIMUM_AGE_MONTHS must be configured for HYBRID strategy');
END IF;
vWhereClause := 'TRUNC(workflow_start, ''MM'') < TRUNC(SYSDATE, ''MM'') ' ||
'AND workflow_start < ADD_MONTHS(TRUNC(SYSDATE, ''MM''), -' || pSourceFileConfig.MINIMUM_AGE_MONTHS || ')';
ELSE
RAISE_APPLICATION_ERROR(-20002, 'Invalid ARCHIVAL_STRATEGY: ' || pSourceFileConfig.ARCHIVAL_STRATEGY);
END CASE;
RETURN vWhereClause;
END GET_ARCHIVAL_WHERE_CLAUSE;
----------------------------------------------------------------------------------------------------
FUNCTION GET_TABLE_STAT(pSourceFileConfigKey IN NUMBER)
RETURN CT_MRDS.A_TABLE_STAT%ROWTYPE
IS
vTableStat CT_MRDS.A_TABLE_STAT%ROWTYPE;
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
vCount PLS_INTEGER;
vSourceFileType CT_MRDS.A_SOURCE_FILE_CONFIG.SOURCE_FILE_TYPE%TYPE;
BEGIN
vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey),NULL)));
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','DEBUG', vParameters);
SELECT count(*) , min(SOURCE_FILE_TYPE)
INTO vCount, vSourceFileType
FROM CT_MRDS.A_TABLE_STAT s
JOIN CT_MRDS.A_SOURCE_FILE_CONFIG c
ON s.A_SOURCE_FILE_CONFIG_KEY = c.A_SOURCE_FILE_CONFIG_KEY
WHERE s.A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey;
IF vCount=0 and vSourceFileType='INPUT' THEN
GATHER_TABLE_STAT(pSourceFileConfigKey);
END IF;
BEGIN
SELECT *
INTO vTableStat
FROM CT_MRDS.A_TABLE_STAT
WHERE A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey;
-- EXCEPTION
-- WHEN NO_DATA_FOUND THEN
--
END;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','DEBUG',vParameters);
RETURN vTableStat;
END GET_TABLE_STAT;
----------------------------------------------------------------------------------------------------
PROCEDURE ARCHIVE_TABLE_DATA (
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE
)
IS
vSourceFileConfig CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE;
vTableStat CT_MRDS.A_TABLE_STAT%ROWTYPE;
vQuery VARCHAR2(4000);
vTableName VARCHAR2(200);
vUri VARCHAR2(1000);
vfiles T_FILENAMES;
vFilename VARCHAR2(300);
vOperationId NUMBER := -1;
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
vArchivalTriggeredBy VARCHAR2(60); -- Possible values: FILES_COUNT, ROWS_COUNT, BYTES_SUM
vUserLoadOperations USER_LOAD_OPERATIONS%ROWTYPE;
vProcessControlStatus VARCHAR2(60) := 'OK';
vKeepInTrash BOOLEAN; -- Derived from config
BEGIN
vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST(
'pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL')
));
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
vSourceFileConfig := CT_MRDS.FILE_MANAGER.GET_SOURCE_FILE_CONFIG(pSourceFileConfigKey => pSourceFileConfigKey);
-- Check if archiving is enabled for this configuration
IF vSourceFileConfig.IS_ARCHIVE_ENABLED = 'N' THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Archiving disabled for this configuration (IS_ARCHIVE_ENABLED=N). Skipping.', 'WARNING', vParameters);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
RETURN;
END IF;
-- Get TRASH policy from configuration
vKeepInTrash := (vSourceFileConfig.IS_KEEP_IN_TRASH = 'Y');
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('TRASH policy from config: IS_KEEP_IN_TRASH=' || vSourceFileConfig.IS_KEEP_IN_TRASH, 'INFO', vParameters);
vTableStat := GET_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey);
if vSourceFileConfig.SOURCE_FILE_TYPE <> 'INPUT' then
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_NOT_INPUT_SOURCE_FILE_TYPE, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_NOT_INPUT_SOURCE_FILE_TYPE, CT_MRDS.ENV_MANAGER.MSG_NOT_INPUT_SOURCE_FILE_TYPE);
end if;
if vTableStat.created < sysdate-(vSourceFileConfig.HOURS_TO_EXPIRE_STATISTICS/24) then
GATHER_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey);
vTableStat := GET_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey);
end if;
-- Strategy-based trigger logic (MARS-828)
IF vSourceFileConfig.ARCHIVAL_STRATEGY = 'MINIMUM_AGE_MONTHS' THEN
-- MINIMUM_AGE_MONTHS: Archive based on age only, ignore thresholds
vArchivalTriggeredBy := 'AGE_BASED';
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Archival strategy: MINIMUM_AGE_MONTHS (threshold-independent)','INFO', vParameters);
ELSE
-- THRESHOLD_BASED and HYBRID: Check thresholds
if vTableStat.OVER_ARCH_THRESOLD_FILE_COUNT >= vSourceFileConfig.ARCHIVE_THRESHOLD_FILES_COUNT then vArchivalTriggeredBy := 'FILES_COUNT';
elsif vTableStat.OVER_ARCH_THRESOLD_ROW_COUNT >= vSourceFileConfig.ARCHIVE_THRESHOLD_ROWS_COUNT then vArchivalTriggeredBy := vArchivalTriggeredBy||', ROWS_COUNT';
elsif vTableStat.OVER_ARCH_THRESOLD_TOTAL_SIZE >= vSourceFileConfig.ARCHIVE_THRESHOLD_BYTES_SUM then vArchivalTriggeredBy := vArchivalTriggeredBy||', BYTES_SUM';
else CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Non of archival triggers reached','INFO', vParameters);
end if;
END IF;
if LENGTH(vArchivalTriggeredBy)>0 THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Archival Triggered By: '||vArchivalTriggeredBy,'INFO', vParameters);
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSourceFileConfig.ODS_SCHEMA_NAME) || '.'||DBMS_ASSERT.simple_sql_name(vSourceFileConfig.TABLE_ID)||'_ODS';
-- Use strategy-based WHERE clause (MARS-828)
-- Using GROUP BY instead of DISTINCT to avoid ORA-22950 (object type ordering issue)
vQuery := '
select CT_MRDS.t_filename(
file$name
,file$path
, to_char(h.workflow_start,''yyyy'')
, to_char(h.workflow_start,''mm'')
)
from '||vTableName||' s
join CT_MRDS.a_workflow_history h
on s.a_workflow_history_key = h.a_workflow_history_key
where ' || GET_ARCHIVAL_WHERE_CLAUSE(vSourceFileConfig) || '
' || CASE WHEN vSourceFileConfig.IS_WORKFLOW_SUCCESS_REQUIRED = 'Y' THEN 'and h.WORKFLOW_SUCCESSFUL = ''Y''' ELSE '' END || '
group by file$name, file$path, to_char(h.workflow_start,''yyyy''), to_char(h.workflow_start,''mm'')'
;
-- Get all files that will be archived into "vfiles" collection ("regular data files")
-- MARS-1468: Handle ORA-29913 - no files in ODS bucket (empty external table location)
BEGIN
execute immediate vQuery bulk collect into vfiles;
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE = -29913 THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('No files found in ODS bucket (ORA-29913: empty location). Nothing to archive.', 'INFO', vParameters);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
RETURN;
ELSE
RAISE;
END IF;
END;
-- Start EXPORT "regular data files" to parquet and DROP "csv"
FOR ym_loop IN (select distinct year, month from table(vfiles) order by 1,2) LOOP
dbms_output.put_line('year: '||ym_loop.year||' - '||'month: '||ym_loop.month);
vQuery:=
'select
s.*
from '|| vTableName ||' s
join CT_MRDS.A_SOURCE_FILE_RECEIVED r
on s.file$name = r.source_file_name
and r.a_source_file_config_key = '||pSourceFileConfigKey||'
join CT_MRDS.a_workflow_history h
on s.a_workflow_history_key = h.a_workflow_history_key
and to_char(h.workflow_start,''yyyy'') = '''||ym_loop.year||'''
and to_char(h.workflow_start,''mm'') = '''||ym_loop.month||'''
'|| CASE WHEN vSourceFileConfig.IS_WORKFLOW_SUCCESS_REQUIRED = 'Y' THEN 'and h.WORKFLOW_SUCCESSFUL = ''Y''' ELSE '' END ||'
'
;
vUri := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('ARCHIVE')||'ARCHIVE/'||vSourceFileConfig.A_SOURCE_KEY||'/'||vSourceFileConfig.TABLE_ID||'/PARTITION_YEAR='||ym_loop.year||'/PARTITION_MONTH='||ym_loop.month||'/';
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start Archiving for YEAR_MONTH: '||ym_loop.year||'_'||ym_loop.month ,'INFO', vParameters);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Parameter for DBMS_CLOUD.EXPORT_DATA => file_uri_list' ,'DEBUG',vUri);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Parameter for DBMS_CLOUD.EXPORT_DATA => query' ,'DEBUG',vQuery);
BEGIN
DBMS_CLOUD.EXPORT_DATA(
credential_name => ENV_MANAGER.gvCredentialName,
file_uri_list => vUri||'d' ,
format => json_object('type' value 'parquet'),
query => vQuery,
operation_id => vOperationId
);
EXCEPTION
WHEN OTHERS THEN
vProcessControlStatus :='EXPORT_FAILURE';
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED, CT_MRDS.ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED);
END;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('vOperationId of export: '||vOperationId,'DEBUG', vParameters);
-- Get USER_LOAD_OPERATIONS info
select *
into vUserLoadOperations
from USER_LOAD_OPERATIONS
where id = vOperationId;
IF vUserLoadOperations.STATUS <>'COMPLETED' THEN
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED,
CT_MRDS.ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED ||cgBL|| ' Export ended with status '||vUserLoadOperations.STATUS);
ELSIF vUserLoadOperations.STATUS = 'COMPLETED' and vUserLoadOperations.ROWS_LOADED = 0 THEN
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED,
CT_MRDS.ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED ||cgBL|| ' Zero rows were exported.');
ELSE
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Data exported to archival file for YEAR_MONTH: '||ym_loop.year||'_'||ym_loop.month,'INFO', vParameters);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.FILE_MANAGER.GET_DET_USER_LOAD_OPERATIONS (pOperationId => vOperationId),'DEBUG', vParameters);
END IF;
-- Note: DBMS_CLOUD.EXPORT_DATA may create multiple parquet files (parallel execution)
-- Instead of tracking individual files, we store the archive directory prefix
-- ARCH_PATH contain the directory URI where all parquet files are located
vFilename := vUri; -- Store directory prefix instead of individual filename
-- Try to drop EXPORTED FILES ("regular data files")
BEGIN
FOR f in (select filename, pathname from table(vfiles) where year = ym_loop.year and month = ym_loop.month) loop
-- first change of status to ARCHIVED_AND_TRASHED (file will be moved to TRASH folder)
BEGIN
UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED r
SET PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED' -- Status reflects file is archived and kept in TRASH
,ARCH_PATH = vFilename -- Now contains directory prefix, not individual file
,PARTITION_YEAR = ym_loop.year -- Record which partition year the data was archived to
,PARTITION_MONTH = ym_loop.month -- Record which partition month the data was archived to
WHERE r.a_source_file_config_key= pSourceFileConfigKey
AND r.source_file_name = f.filename
AND r.processing_status = 'INGESTED'
;
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
vProcessControlStatus := 'CHANGE_STATUS_TO_ARCHIVED_AND_TRASHED_FAILURE';
END;
EXIT WHEN vProcessControlStatus = 'CHANGE_STATUS_TO_ARCHIVED_AND_TRASHED_FAILURE';
-- move file to TRASH subfolder (DATA bucket: ODS/ → TRASH/) before dropping
BEGIN
DBMS_CLOUD.MOVE_OBJECT(source_credential_name => ENV_MANAGER.gvCredentialName,
source_object_uri => f.pathname||'/'||f.filename,
target_object_uri => replace(f.pathname,'ODS','TRASH')||'/'||f.filename,
target_credential_name => ENV_MANAGER.gvCredentialName
);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File moved to TRASH folder: '||f.pathname||'/'||f.filename,'DEBUG', vParameters);
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to move file to TRASH folder: '||f.pathname||'/'||f.filename,'ERROR', vParameters);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
rollback;
vProcessControlStatus := 'MOVE_FILE_TO_TRASH_FAILURE';
END;
EXIT WHEN vProcessControlStatus = 'MOVE_FILE_TO_TRASH_FAILURE';
commit;
END LOOP;
--------------------------------------------------------------------
-- IF All goes fine till this point, we drop files from TRASH folder (if not then ROLLBACK PART)
-- TRASH is a subfolder in DATA bucket (e.g., TRASH/LM/TABLE_NAME instead of ODS/LM/TABLE_NAME)
IF vProcessControlStatus = 'OK' THEN
IF NOT vKeepInTrash THEN
-- Delete files from TRASH folder (cleanup) and update status to ARCHIVED_AND_PURGED
FOR f in (select filename, pathname from table(vfiles) where year = ym_loop.year and month = ym_loop.month) LOOP
DBMS_CLOUD.DELETE_OBJECT(credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName,
object_uri => replace(f.pathname,'ODS','TRASH')||'/'||f.filename);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File dropped from TRASH folder: '||f.pathname||'/'||f.filename,'DEBUG', vParameters);
-- Update status to ARCHIVED_AND_PURGED
UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED r
SET PROCESSING_STATUS = 'ARCHIVED_AND_PURGED'
WHERE r.a_source_file_config_key = pSourceFileConfigKey
AND r.source_file_name = f.filename
AND r.PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED';
END LOOP;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('All archived files removed from TRASH folder and marked as ARCHIVED_AND_PURGED (config: IS_KEEP_IN_TRASH=N).','INFO', vParameters);
ELSE
-- Keep files in TRASH folder (status remains ARCHIVED_AND_TRASHED)
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Archived files kept in TRASH folder for retention (config: IS_KEEP_IN_TRASH=Y, status: ARCHIVED_AND_TRASHED).','INFO', vParameters);
END IF;
--ROLLBACK PART
--ROLLBACK PROCESS in case of FAILURE (restore files from TRASH subfolder in DATA bucket)
ELSIF vProcessControlStatus = 'MOVE_FILE_TO_TRASH_FAILURE' THEN
FOR f in ( SELECT vf.filename, vf.pathname
FROM TABLE(vfiles) vf
JOIN CT_MRDS.A_SOURCE_FILE_RECEIVED r
ON r.source_file_name = vf.filename
AND r.a_source_file_config_key = pSourceFileConfigKey
AND r.PROCESSING_STATUS IN ('ARCHIVED_AND_TRASHED', 'ARCHIVED_AND_PURGED')
AND vf.year = ym_loop.year
AND vf.month = ym_loop.month
) LOOP
BEGIN
DBMS_CLOUD.MOVE_OBJECT(source_credential_name => ENV_MANAGER.gvCredentialName,
source_object_uri => replace(f.pathname,'ODS','TRASH')||'/'||f.filename,
target_object_uri => f.pathname||'/'||f.filename,
target_credential_name => ENV_MANAGER.gvCredentialName
);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File restored from TRASH folder: '||f.pathname||'/'||f.filename,'DEBUG', vParameters);
UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED r
SET PROCESSING_STATUS = 'INGESTED'
,ARCH_PATH = NULL
WHERE r.a_source_file_config_key = pSourceFileConfigKey
AND r.source_file_name = f.filename
;
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to restore file from TRASH folder: '||replace(f.pathname,'ODS','TRASH')||'/'||f.filename,'ERROR', vParameters);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
vProcessControlStatus := 'RESTORE_FILE_FROM_TRASH_FAILURE';
END;
END LOOP;
-- ROLLBACK: Delete all parquet files from archive directory
FOR arch_file IN (
SELECT object_name
FROM DBMS_CLOUD.LIST_OBJECTS(
credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName,
location_uri => vFilename -- vFilename now contains directory prefix
)
) LOOP
DBMS_CLOUD.DELETE_OBJECT(
credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName,
object_uri => vFilename || arch_file.object_name
);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('ROLLBACK operation: Archival PARQUET file dropped.','DEBUG', vFilename || arch_file.object_name);
END LOOP;
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_MOVE_FILE_TO_TRASH_FAILED, CT_MRDS.ENV_MANAGER.MSG_MOVE_FILE_TO_TRASH_FAILED);
ELSIF vProcessControlStatus = 'CHANGE_STATUS_TO_ARCHIVED_FAILURE' THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_CHANGE_STAT_TO_ARCHIVED_FAILED, 'ERROR', vParameters);
-- ROLLBACK: Delete all parquet files from archive directory
FOR arch_file IN (
SELECT object_name
FROM DBMS_CLOUD.LIST_OBJECTS(
credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName,
location_uri => vFilename -- vFilename now contains directory prefix
)
) LOOP
DBMS_CLOUD.DELETE_OBJECT(
credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName,
object_uri => vFilename || arch_file.object_name
);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Archival PARQUET file dropped: '||vFilename || arch_file.object_name,'DEBUG', vParameters);
END LOOP;
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_CHANGE_STAT_TO_ARCHIVED_FAILED, CT_MRDS.ENV_MANAGER.MSG_CHANGE_STAT_TO_ARCHIVED_FAILED);
ELSIF vProcessControlStatus = 'RESTORE_FILE_FROM_TRASH_FAILURE' THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Some files were not restored from TRASH. Check A_PROCESS_LOG table for details','ERROR', vParameters);
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_RESTORE_FILE_FROM_TRASH, CT_MRDS.ENV_MANAGER.MSG_RESTORE_FILE_FROM_TRASH);
END IF;
EXCEPTION
WHEN CT_MRDS.ENV_MANAGER.ERR_CHANGE_STAT_TO_ARCHIVED_FAILED THEN
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_CHANGE_STAT_TO_ARCHIVED_FAILED, CT_MRDS.ENV_MANAGER.MSG_CHANGE_STAT_TO_ARCHIVED_FAILED);
WHEN CT_MRDS.ENV_MANAGER.ERR_MOVE_FILE_TO_TRASH_FAILED THEN
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_MOVE_FILE_TO_TRASH_FAILED, CT_MRDS.ENV_MANAGER.MSG_MOVE_FILE_TO_TRASH_FAILED);
WHEN CT_MRDS.ENV_MANAGER.ERR_RESTORE_FILE_FROM_TRASH THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_RESTORE_FILE_FROM_TRASH, CT_MRDS.ENV_MANAGER.MSG_RESTORE_FILE_FROM_TRASH);
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Error during archiving process','ERROR');
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_DROP_EXPORTED_FILES_FAILED, CT_MRDS.ENV_MANAGER.MSG_DROP_EXPORTED_FILES_FAILED);
END;
-- END of "Try to drop EXPORTED FILES"
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End Archiving for YEAR_MONTH: '||ym_loop.year||'_'||ym_loop.month ,'INFO');
END LOOP; --ym_loop end (YEAR_MONTH)
COMMIT;
ELSE
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Non of archival thresholds reached. Skip archiving.'||vArchivalTriggeredBy,'INFO');
END IF;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
EXCEPTION
WHEN CT_MRDS.ENV_MANAGER.ERR_NOT_INPUT_SOURCE_FILE_TYPE THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_NOT_INPUT_SOURCE_FILE_TYPE , 'ERROR', vParameters);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_NOT_INPUT_SOURCE_FILE_TYPE, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
WHEN CT_MRDS.ENV_MANAGER.ERR_EXP_DATA_FOR_ARCH_FAILED THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED , 'ERROR', vParameters);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
WHEN CT_MRDS.ENV_MANAGER.ERR_CHANGE_STAT_TO_ARCHIVED_FAILED THEN
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_CHANGE_STAT_TO_ARCHIVED_FAILED, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
WHEN CT_MRDS.ENV_MANAGER.ERR_MOVE_FILE_TO_TRASH_FAILED THEN
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_MOVE_FILE_TO_TRASH_FAILED, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_UNKNOWN , 'ERROR', vParameters);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_UNKNOWN, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
END ARCHIVE_TABLE_DATA;
----------------------------------------------------------------------------------------------------
PROCEDURE GATHER_TABLE_STAT (
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE
) IS
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
vStats CT_MRDS.A_TABLE_STAT%ROWTYPE;
vSourceFileConfig CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE;
vTableName VARCHAR2(200);
vQuery VARCHAR2(32000);
vWhereClause VARCHAR2(4000);
vOverThresholdWhereClause VARCHAR2(4000);
vOdsBucketUri VARCHAR2(1000);
BEGIN
vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL')));
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
vSourceFileConfig := CT_MRDS.FILE_MANAGER.GET_SOURCE_FILE_CONFIG(pSourceFileConfigKey => pSourceFileConfigKey);
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSourceFileConfig.ODS_SCHEMA_NAME) || '.'||DBMS_ASSERT.simple_sql_name(vSourceFileConfig.TABLE_ID)||'_ODS';
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('vTableName = '||vTableName, 'DEBUG', vParameters);
-- Get WHERE clause based on archival strategy (MARS-828)
vWhereClause := GET_ARCHIVAL_WHERE_CLAUSE(vSourceFileConfig);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('vWhereClause = '||vWhereClause, 'DEBUG', vParameters);
-- Get ODS bucket URI before building query
vOdsBucketUri := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('ODS') || 'ODS/' || vSourceFileConfig.A_SOURCE_KEY || '/' || vSourceFileConfig.TABLE_ID || '/';
-- Build WHERE clause for OVER_ARCH_THRESOLD columns:
-- Combines archival strategy time-condition with optional workflow success filter.
-- IS_WORKFLOW_SUCCESS_REQUIRED='Y': only files with WORKFLOW_SUCCESSFUL='Y' are counted as eligible.
-- IS_WORKFLOW_SUCCESS_REQUIRED='N': all files passing the time-condition are counted as eligible.
IF vSourceFileConfig.IS_WORKFLOW_SUCCESS_REQUIRED = 'Y' THEN
vOverThresholdWhereClause := vWhereClause || ' AND workflow_successful = ''Y''';
ELSE
vOverThresholdWhereClause := vWhereClause;
END IF;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('vOverThresholdWhereClause = '||vOverThresholdWhereClause, 'DEBUG', vParameters);
-- Use strategy-based WHERE clause for statistics (MARS-828)
-- FILE_COUNT, ROW_COUNT, TOTAL_SIZE: all files regardless of workflow success (never zero due to workflow filter)
-- OVER_ARCH_THRESOLD_*: IS_WORKFLOW_SUCCESS_REQUIRED-aware count of eligible files
-- WORKFLOW_SUCCESS_*: informational count of files with WORKFLOW_SUCCESSFUL='Y'
-- Column order MUST match A_TABLE_STAT column definition order for positional INTO vStats to work:
-- 1:A_TABLE_STAT_KEY, 2:A_SOURCE_FILE_CONFIG_KEY, 3:TABLE_NAME, 4:CREATED, 5:ARCHIVAL_STRATEGY,
-- 6:ARCH_MINIMUM_AGE_MONTHS, 7:ARCH_THRESHOLD_DAYS, 8:IS_WORKFLOW_SUCCESS_REQUIRED,
-- 9:FILE_COUNT, 10:ROW_COUNT, 11:TOTAL_SIZE,
-- 12:OVER_ARCH_THRESOLD_FILE_COUNT, 13:OVER_ARCH_THRESOLD_ROW_COUNT, 14:OVER_ARCH_THRESOLD_TOTAL_SIZE,
-- 15:WORKFLOW_SUCCESS_FILE_COUNT, 16:WORKFLOW_SUCCESS_ROW_COUNT, 17:WORKFLOW_SUCCESS_TOTAL_SIZE
vQuery :=
'with tmp as (
select
s.*
,file$name as filename
,h.workflow_start
,h.workflow_successful
, to_char(h.workflow_start,''yyyy'') as year
, to_char(h.workflow_start,''mm'') as month
from '||vTableName||' s
join CT_MRDS.a_workflow_history h
on s.a_workflow_history_key = h.a_workflow_history_key
)
, tmp_gr as (
select
filename
,count(*) as row_count_per_file
,min(workflow_start) as workflow_start
,max(workflow_successful) as workflow_successful
from tmp
group by filename
)
select
NULL as A_TABLE_STAT_KEY
,'||pSourceFileConfigKey||' as A_SOURCE_FILE_CONFIG_KEY
,'''||vTableName||''' as TABLE_NAME
,systimestamp as CREATED
,'''||vSourceFileConfig.ARCHIVAL_STRATEGY||''' as ARCHIVAL_STRATEGY
,'||COALESCE(TO_CHAR(vSourceFileConfig.MINIMUM_AGE_MONTHS), 'NULL')||' as ARCH_MINIMUM_AGE_MONTHS
,'||COALESCE(TO_CHAR(vSourceFileConfig.ARCHIVE_THRESHOLD_DAYS), 'NULL')||' as ARCH_THRESHOLD_DAYS
,'''||vSourceFileConfig.IS_WORKFLOW_SUCCESS_REQUIRED||''' as IS_WORKFLOW_SUCCESS_REQUIRED
,count(*) as FILE_COUNT
,nvl(sum(row_count_per_file), 0) as ROW_COUNT
,nvl(sum(r.bytes), 0) as TOTAL_SIZE
,nvl(sum(case when ' || vOverThresholdWhereClause || ' then 1 else 0 end), 0) as OVER_ARCH_THRESOLD_FILE_COUNT
,nvl(sum(case when ' || vOverThresholdWhereClause || ' then row_count_per_file else 0 end), 0) as OVER_ARCH_THRESOLD_ROW_COUNT
,nvl(sum(case when ' || vOverThresholdWhereClause || ' then r.bytes else 0 end), 0) as OVER_ARCH_THRESOLD_TOTAL_SIZE
,nvl(sum(case when workflow_successful = ''Y'' then 1 else 0 end), 0) as WORKFLOW_SUCCESS_FILE_COUNT
,nvl(sum(case when workflow_successful = ''Y'' then row_count_per_file else 0 end), 0) as WORKFLOW_SUCCESS_ROW_COUNT
,nvl(sum(case when workflow_successful = ''Y'' then r.bytes else 0 end), 0) as WORKFLOW_SUCCESS_TOTAL_SIZE
from tmp_gr t
join (SELECT * from DBMS_CLOUD.LIST_OBJECTS(
credential_name => '''||CT_MRDS.ENV_MANAGER.gvCredentialName||''',
location_uri => '''||vOdsBucketUri||'''
)
) r
on t.filename = r.object_name'
;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('vQuery:', 'DEBUG', vQuery);
-- MARS-1468: Handle ORA-29913 - no files in ODS bucket (empty external table location)
BEGIN
execute immediate vQuery into vStats;
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE = -29913 THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('No files found in ODS bucket (ORA-29913: empty location). Saving zero statistics.', 'INFO', vParameters);
vStats.A_SOURCE_FILE_CONFIG_KEY := pSourceFileConfigKey;
vStats.TABLE_NAME := vTableName;
vStats.FILE_COUNT := 0;
vStats.OVER_ARCH_THRESOLD_FILE_COUNT := 0;
vStats.ROW_COUNT := 0;
vStats.OVER_ARCH_THRESOLD_ROW_COUNT := 0;
vStats.TOTAL_SIZE := 0;
vStats.OVER_ARCH_THRESOLD_TOTAL_SIZE := 0;
vStats.ARCH_THRESHOLD_DAYS := vSourceFileConfig.ARCHIVE_THRESHOLD_DAYS;
vStats.CREATED := SYSTIMESTAMP;
vStats.ARCHIVAL_STRATEGY := vSourceFileConfig.ARCHIVAL_STRATEGY;
vStats.ARCH_MINIMUM_AGE_MONTHS := vSourceFileConfig.MINIMUM_AGE_MONTHS;
vStats.IS_WORKFLOW_SUCCESS_REQUIRED := vSourceFileConfig.IS_WORKFLOW_SUCCESS_REQUIRED;
vStats.WORKFLOW_SUCCESS_FILE_COUNT := 0;
vStats.WORKFLOW_SUCCESS_ROW_COUNT := 0;
vStats.WORKFLOW_SUCCESS_TOTAL_SIZE := 0;
ELSE
RAISE;
END IF;
END;
vStats.A_TABLE_STAT_KEY := CT_MRDS.A_TABLE_STAT_KEY_SEQ.NEXTVAL;
insert into CT_MRDS.A_TABLE_STAT_HIST values vStats;
delete from CT_MRDS.A_TABLE_STAT where A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey;
insert into CT_MRDS.A_TABLE_STAT values vStats;
COMMIT;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_UNKNOWN , 'ERROR', vParameters);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_UNKNOWN, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
END GATHER_TABLE_STAT;
----------------------------------------------------------------------------------------------------
-- TRASH FOLDER MANAGEMENT PROCEDURES
----------------------------------------------------------------------------------------------------
PROCEDURE RESTORE_FILE_FROM_TRASH (
pSourceFileReceivedKey IN CT_MRDS.A_SOURCE_FILE_RECEIVED.A_SOURCE_FILE_RECEIVED_KEY%TYPE DEFAULT NULL,
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE DEFAULT NULL,
pRestoreAll IN BOOLEAN DEFAULT FALSE
)
IS
vSourceFileConfig CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE;
vSourceFileReceived CT_MRDS.A_SOURCE_FILE_RECEIVED%ROWTYPE;
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
vTrashPath VARCHAR2(1000);
vOdsPath VARCHAR2(1000);
vFilesRestored PLS_INTEGER := 0;
vFilesUpdated PLS_INTEGER := 0;
vRestoreLevel VARCHAR2(50);
cgBL CONSTANT VARCHAR2(2) := CHR(13)||CHR(10);
BEGIN
vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST(
'pSourceFileReceivedKey => '||nvl(to_char(pSourceFileReceivedKey), 'NULL'),
'pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'),
'pRestoreAll => '||CASE WHEN pRestoreAll THEN 'TRUE' ELSE 'FALSE' END
));
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
-- Determine restore level (priority: LEVEL 3 > LEVEL 2 > LEVEL 1)
IF pSourceFileReceivedKey IS NOT NULL THEN
vRestoreLevel := 'LEVEL_3_SINGLE_FILE';
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Restore level: LEVEL 3 - Single file restoration','INFO', 'A_SOURCE_FILE_RECEIVED_KEY: ' || pSourceFileReceivedKey);
-- LEVEL 3: Restore single file by A_SOURCE_FILE_RECEIVED_KEY
BEGIN
SELECT *
INTO vSourceFileReceived
FROM CT_MRDS.A_SOURCE_FILE_RECEIVED
WHERE A_SOURCE_FILE_RECEIVED_KEY = pSourceFileReceivedKey
AND PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED';
vSourceFileConfig := CT_MRDS.FILE_MANAGER.GET_SOURCE_FILE_CONFIG(pSourceFileConfigKey => vSourceFileReceived.A_SOURCE_FILE_CONFIG_KEY);
EXCEPTION
WHEN NO_DATA_FOUND THEN
RAISE_APPLICATION_ERROR(-20101, 'File not found or status is not ARCHIVED_AND_TRASHED for A_SOURCE_FILE_RECEIVED_KEY: ' || pSourceFileReceivedKey);
END;
vTrashPath := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('DATA') || 'TRASH/' || vSourceFileConfig.A_SOURCE_KEY || '/' || vSourceFileConfig.TABLE_ID || '/' || vSourceFileReceived.SOURCE_FILE_NAME;
vOdsPath := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('DATA') || 'ODS/' || vSourceFileConfig.A_SOURCE_KEY || '/' || vSourceFileConfig.TABLE_ID || '/' || vSourceFileReceived.SOURCE_FILE_NAME;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Restoring single file from TRASH to ODS','INFO', 'Source: ' || vTrashPath || cgBL || 'Target: ' || vOdsPath);
BEGIN
DBMS_CLOUD.MOVE_OBJECT(
source_credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName,
source_object_uri => vTrashPath,
target_object_uri => vOdsPath,
target_credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName
);
vFilesRestored := 1;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File successfully moved from TRASH to ODS','INFO', vSourceFileReceived.SOURCE_FILE_NAME);
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to move file from TRASH to ODS','ERROR', vTrashPath);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(-20103, 'Failed to restore file from TRASH: ' || SQLERRM);
END;
UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED
SET PROCESSING_STATUS = 'INGESTED',
ARCH_PATH = NULL,
PARTITION_YEAR = NULL,
PARTITION_MONTH = NULL
WHERE A_SOURCE_FILE_RECEIVED_KEY = pSourceFileReceivedKey
AND PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED';
vFilesUpdated := SQL%ROWCOUNT;
ELSIF pSourceFileConfigKey IS NOT NULL THEN
vRestoreLevel := 'LEVEL_2_CONFIG_FILES';
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Restore level: LEVEL 2 - Configuration-based restoration','INFO', 'pSourceFileConfigKey: ' || pSourceFileConfigKey);
-- LEVEL 2: Restore all files for specific pSourceFileConfigKey
vSourceFileConfig := CT_MRDS.FILE_MANAGER.GET_SOURCE_FILE_CONFIG(pSourceFileConfigKey => pSourceFileConfigKey);
FOR file_rec IN (
SELECT A_SOURCE_FILE_RECEIVED_KEY, SOURCE_FILE_NAME
FROM CT_MRDS.A_SOURCE_FILE_RECEIVED
WHERE A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey
AND PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED'
) LOOP
vTrashPath := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('DATA') || 'TRASH/' || vSourceFileConfig.A_SOURCE_KEY || '/' || vSourceFileConfig.TABLE_ID || '/' || file_rec.SOURCE_FILE_NAME;
vOdsPath := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('DATA') || 'ODS/' || vSourceFileConfig.A_SOURCE_KEY || '/' || vSourceFileConfig.TABLE_ID || '/' || file_rec.SOURCE_FILE_NAME;
BEGIN
DBMS_CLOUD.MOVE_OBJECT(
source_credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName,
source_object_uri => vTrashPath,
target_object_uri => vOdsPath,
target_credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName
);
vFilesRestored := vFilesRestored + 1;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File restored from TRASH: '||file_rec.SOURCE_FILE_NAME,'DEBUG', vParameters);
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to restore file from TRASH: '||file_rec.SOURCE_FILE_NAME,'ERROR', vParameters);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
END;
END LOOP;
UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED
SET PROCESSING_STATUS = 'INGESTED',
ARCH_PATH = NULL,
PARTITION_YEAR = NULL,
PARTITION_MONTH = NULL
WHERE A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey
AND PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED';
vFilesUpdated := SQL%ROWCOUNT;
ELSIF pRestoreAll THEN
vRestoreLevel := 'LEVEL_1_GLOBAL_RESTORE';
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Restore level: LEVEL 1 - Global TRASH restoration','INFO', 'Restoring ALL files with ARCHIVED_AND_TRASHED status');
-- LEVEL 1: Restore all files with ARCHIVED_AND_TRASHED status across all configurations
FOR file_rec IN (
SELECT r.A_SOURCE_FILE_RECEIVED_KEY, r.SOURCE_FILE_NAME,
c.A_SOURCE_KEY, c.TABLE_ID
FROM CT_MRDS.A_SOURCE_FILE_RECEIVED r
JOIN CT_MRDS.A_SOURCE_FILE_CONFIG c ON r.A_SOURCE_FILE_CONFIG_KEY = c.A_SOURCE_FILE_CONFIG_KEY
WHERE r.PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED'
) LOOP
vTrashPath := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('DATA') || 'TRASH/' || file_rec.A_SOURCE_KEY || '/' || file_rec.TABLE_ID || '/' || file_rec.SOURCE_FILE_NAME;
vOdsPath := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('DATA') || 'ODS/' || file_rec.A_SOURCE_KEY || '/' || file_rec.TABLE_ID || '/' || file_rec.SOURCE_FILE_NAME;
BEGIN
DBMS_CLOUD.MOVE_OBJECT(
source_credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName,
source_object_uri => vTrashPath,
target_object_uri => vOdsPath,
target_credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName
);
vFilesRestored := vFilesRestored + 1;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File restored from TRASH: '||file_rec.SOURCE_FILE_NAME,'DEBUG', vParameters);
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to restore file from TRASH: '||file_rec.SOURCE_FILE_NAME,'ERROR', vParameters);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
END;
END LOOP;
UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED
SET PROCESSING_STATUS = 'INGESTED',
ARCH_PATH = NULL,
PARTITION_YEAR = NULL,
PARTITION_MONTH = NULL
WHERE PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED';
vFilesUpdated := SQL%ROWCOUNT;
ELSE
RAISE_APPLICATION_ERROR(-20104, 'No restore level specified. Provide pSourceFileReceivedKey, pSourceFileConfigKey, or set pRestoreAll=TRUE');
END IF;
COMMIT;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Total files restored from TRASH: ' || vFilesRestored,'INFO');
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Total file records updated to INGESTED: ' || vFilesUpdated,'INFO');
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('TRASH restoration completed successfully','INFO', 'Level: ' || vRestoreLevel || ', Files restored: ' || vFilesRestored || ', Records updated: ' || vFilesUpdated);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO', vParameters);
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_UNKNOWN , 'ERROR', vParameters);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_UNKNOWN, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
END RESTORE_FILE_FROM_TRASH;
----------------------------------------------------------------------------------------------------
PROCEDURE PURGE_TRASH_FOLDER (
pSourceFileReceivedKey IN CT_MRDS.A_SOURCE_FILE_RECEIVED.A_SOURCE_FILE_RECEIVED_KEY%TYPE DEFAULT NULL,
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE DEFAULT NULL,
pPurgeAll IN BOOLEAN DEFAULT FALSE
)
IS
vSourceFileConfig CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE;
vSourceFileReceived CT_MRDS.A_SOURCE_FILE_RECEIVED%ROWTYPE;
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
vTrashLocationUri VARCHAR2(1000);
vFilesDeleted PLS_INTEGER := 0;
vFilesUpdated PLS_INTEGER := 0;
vPurgeLevel VARCHAR2(50);
cgBL CONSTANT VARCHAR2(2) := CHR(13)||CHR(10);
BEGIN
vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST(
'pSourceFileReceivedKey => '||nvl(to_char(pSourceFileReceivedKey), 'NULL'),
'pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'),
'pPurgeAll => '||CASE WHEN pPurgeAll THEN 'TRUE' ELSE 'FALSE' END
));
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
-- Determine purge level (priority: LEVEL 3 > LEVEL 2 > LEVEL 1)
IF pSourceFileReceivedKey IS NOT NULL THEN
vPurgeLevel := 'LEVEL_3_SINGLE_FILE';
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Purge level: LEVEL 3 - Single file deletion','INFO', 'A_SOURCE_FILE_RECEIVED_KEY: ' || pSourceFileReceivedKey);
-- LEVEL 3: Delete single file by A_SOURCE_FILE_RECEIVED_KEY
BEGIN
SELECT *
INTO vSourceFileReceived
FROM CT_MRDS.A_SOURCE_FILE_RECEIVED
WHERE A_SOURCE_FILE_RECEIVED_KEY = pSourceFileReceivedKey
AND PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED';
vSourceFileConfig := CT_MRDS.FILE_MANAGER.GET_SOURCE_FILE_CONFIG(pSourceFileConfigKey => vSourceFileReceived.A_SOURCE_FILE_CONFIG_KEY);
EXCEPTION
WHEN NO_DATA_FOUND THEN
RAISE_APPLICATION_ERROR(-20301, 'File not found or status is not ARCHIVED_AND_TRASHED for A_SOURCE_FILE_RECEIVED_KEY: ' || pSourceFileReceivedKey);
END;
vTrashLocationUri := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('DATA') || 'TRASH/' || vSourceFileConfig.A_SOURCE_KEY || '/' || vSourceFileConfig.TABLE_ID || '/' || vSourceFileReceived.SOURCE_FILE_NAME;
BEGIN
DBMS_CLOUD.DELETE_OBJECT(
credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName,
object_uri => vTrashLocationUri
);
vFilesDeleted := 1;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Single file deleted from TRASH','INFO', vSourceFileReceived.SOURCE_FILE_NAME);
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to delete file from TRASH','ERROR', vTrashLocationUri);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(-20302, 'Failed to delete single file from TRASH: ' || SQLERRM);
END;
UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED
SET PROCESSING_STATUS = 'ARCHIVED_AND_PURGED'
WHERE A_SOURCE_FILE_RECEIVED_KEY = pSourceFileReceivedKey
AND PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED';
vFilesUpdated := SQL%ROWCOUNT;
ELSIF pSourceFileConfigKey IS NOT NULL THEN
vPurgeLevel := 'LEVEL_2_CONFIG_FILES';
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Purge level: LEVEL 2 - Configuration-based deletion','INFO', 'pSourceFileConfigKey: ' || pSourceFileConfigKey);
-- LEVEL 2: Delete all files for specific pSourceFileConfigKey
vSourceFileConfig := CT_MRDS.FILE_MANAGER.GET_SOURCE_FILE_CONFIG(pSourceFileConfigKey => pSourceFileConfigKey);
vTrashLocationUri := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('DATA') || 'TRASH/' || vSourceFileConfig.A_SOURCE_KEY || '/' || vSourceFileConfig.TABLE_ID || '/';
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Purging TRASH folder for configuration','INFO', 'Location: ' || vTrashLocationUri);
BEGIN
FOR trash_file IN (
SELECT object_name
FROM DBMS_CLOUD.LIST_OBJECTS(
credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName,
location_uri => vTrashLocationUri
)
) LOOP
BEGIN
DBMS_CLOUD.DELETE_OBJECT(
credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName,
object_uri => vTrashLocationUri || trash_file.object_name
);
vFilesDeleted := vFilesDeleted + 1;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File deleted from TRASH','DEBUG', trash_file.object_name);
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to delete file from TRASH','ERROR', trash_file.object_name);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
END;
END LOOP;
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to list or delete TRASH files','ERROR', vTrashLocationUri);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(-20303, 'Failed to purge TRASH folder for configuration: ' || SQLERRM);
END;
UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED
SET PROCESSING_STATUS = 'ARCHIVED_AND_PURGED'
WHERE A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey
AND PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED';
vFilesUpdated := SQL%ROWCOUNT;
ELSIF pPurgeAll THEN
vPurgeLevel := 'LEVEL_1_GLOBAL_PURGE';
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Purge level: LEVEL 1 - Global TRASH purge','INFO', 'Deleting ALL files with ARCHIVED_AND_TRASHED status');
-- LEVEL 1: Delete all files with ARCHIVED_AND_TRASHED status across all configurations
FOR config_rec IN (
SELECT DISTINCT c.A_SOURCE_FILE_CONFIG_KEY, c.A_SOURCE_KEY, c.TABLE_ID
FROM CT_MRDS.A_SOURCE_FILE_CONFIG c
JOIN CT_MRDS.A_SOURCE_FILE_RECEIVED r ON r.A_SOURCE_FILE_CONFIG_KEY = c.A_SOURCE_FILE_CONFIG_KEY
WHERE r.PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED'
) LOOP
vTrashLocationUri := CT_MRDS.FILE_MANAGER.GET_BUCKET_URI('DATA') || 'TRASH/' || config_rec.A_SOURCE_KEY || '/' || config_rec.TABLE_ID || '/';
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Processing TRASH location','DEBUG', vTrashLocationUri);
BEGIN
FOR trash_file IN (
SELECT object_name
FROM DBMS_CLOUD.LIST_OBJECTS(
credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName,
location_uri => vTrashLocationUri
)
) LOOP
BEGIN
DBMS_CLOUD.DELETE_OBJECT(
credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName,
object_uri => vTrashLocationUri || trash_file.object_name
);
vFilesDeleted := vFilesDeleted + 1;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File deleted from TRASH','DEBUG', trash_file.object_name);
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to delete file from TRASH','ERROR', trash_file.object_name);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
END;
END LOOP;
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to list TRASH files','WARNING', vTrashLocationUri || ' - ' || SQLERRM);
END;
END LOOP;
UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED
SET PROCESSING_STATUS = 'ARCHIVED_AND_PURGED'
WHERE PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED';
vFilesUpdated := SQL%ROWCOUNT;
ELSE
RAISE_APPLICATION_ERROR(-20304, 'No purge level specified. Provide pSourceFileReceivedKey, pSourceFileConfigKey, or set pPurgeAll=TRUE');
END IF;
COMMIT;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Total files deleted from TRASH: ' || vFilesDeleted,'INFO');
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Total file records updated to ARCHIVED_AND_PURGED: ' || vFilesUpdated,'INFO');
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('TRASH folder purge completed successfully','INFO', 'Level: ' || vPurgeLevel || ', Files deleted: ' || vFilesDeleted || ', Records updated: ' || vFilesUpdated);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO', vParameters);
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_UNKNOWN , 'ERROR', vParameters);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_UNKNOWN, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
END PURGE_TRASH_FOLDER;
----------------------------------------------------------------------------------------------------
FUNCTION PURGE_TRASH_FOLDER (
pSourceFileReceivedKey IN CT_MRDS.A_SOURCE_FILE_RECEIVED.A_SOURCE_FILE_RECEIVED_KEY%TYPE DEFAULT NULL,
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE DEFAULT NULL,
pPurgeAll IN BOOLEAN DEFAULT FALSE
) RETURN PLS_INTEGER
IS
PRAGMA AUTONOMOUS_TRANSACTION;
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
BEGIN
vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST(
'pSourceFileReceivedKey => '||nvl(to_char(pSourceFileReceivedKey), 'NULL'),
'pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'),
'pPurgeAll => '||CASE WHEN pPurgeAll THEN 'TRUE' ELSE 'FALSE' END
));
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
----
PURGE_TRASH_FOLDER(
pSourceFileReceivedKey => pSourceFileReceivedKey,
pSourceFileConfigKey => pSourceFileConfigKey,
pPurgeAll => pPurgeAll
);
----
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
RETURN SQLCODE;
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RETURN SQLCODE;
END PURGE_TRASH_FOLDER;
----------------------------------------------------------------------------------------------------
FUNCTION RESTORE_FILE_FROM_TRASH (
pSourceFileReceivedKey IN CT_MRDS.A_SOURCE_FILE_RECEIVED.A_SOURCE_FILE_RECEIVED_KEY%TYPE DEFAULT NULL,
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE DEFAULT NULL,
pRestoreAll IN BOOLEAN DEFAULT FALSE
) RETURN PLS_INTEGER
IS
PRAGMA AUTONOMOUS_TRANSACTION;
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
BEGIN
vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST(
'pSourceFileReceivedKey => '||nvl(to_char(pSourceFileReceivedKey), 'NULL'),
'pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'),
'pRestoreAll => '||CASE WHEN pRestoreAll THEN 'TRUE' ELSE 'FALSE' END
));
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
----
RESTORE_FILE_FROM_TRASH(
pSourceFileReceivedKey => pSourceFileReceivedKey,
pSourceFileConfigKey => pSourceFileConfigKey,
pRestoreAll => pRestoreAll
);
----
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
RETURN SQLCODE;
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RETURN SQLCODE;
END RESTORE_FILE_FROM_TRASH;
----------------------------------------------------------------------------------------------------
-- PACKAGE VERSION MANAGEMENT FUNCTIONS IMPLEMENTATION
----------------------------------------------------------------------------------------------------
FUNCTION GET_VERSION
RETURN VARCHAR2
IS
BEGIN
RETURN PACKAGE_VERSION;
END GET_VERSION;
----------------------------------------------------------------------------------------------------
FUNCTION GET_BUILD_INFO
RETURN VARCHAR2
IS
BEGIN
RETURN CT_MRDS.ENV_MANAGER.GET_PACKAGE_VERSION_INFO(
pPackageName => 'FILE_ARCHIVER',
pVersion => PACKAGE_VERSION,
pBuildDate => PACKAGE_BUILD_DATE,
pAuthor => PACKAGE_AUTHOR
);
END GET_BUILD_INFO;
----------------------------------------------------------------------------------------------------
FUNCTION GET_VERSION_HISTORY
RETURN VARCHAR2
IS
BEGIN
RETURN CT_MRDS.ENV_MANAGER.FORMAT_VERSION_HISTORY(
pPackageName => 'FILE_ARCHIVER',
pVersionHistory => VERSION_HISTORY
);
END GET_VERSION_HISTORY;
----------------------------------------------------------------------------------------------------
FUNCTION FN_ARCHIVE_TABLE_DATA (
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE
) RETURN PLS_INTEGER
IS
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
BEGIN
vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST(
'pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL')
));
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
----
ARCHIVE_TABLE_DATA(pSourceFileConfigKey => pSourceFileConfigKey);
----
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
RETURN SQLCODE;
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RETURN SQLCODE;
END FN_ARCHIVE_TABLE_DATA;
----------------------------------------------------------------------------------------------------
FUNCTION FN_GATHER_TABLE_STAT (
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE
) RETURN PLS_INTEGER
IS
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
BEGIN
vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL')));
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
----
GATHER_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey);
----
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
RETURN SQLCODE;
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RETURN SQLCODE;
END FN_GATHER_TABLE_STAT;
----------------------------------------------------------------------------------------------------
-- BATCH ARCHIVAL PROCEDURES
----------------------------------------------------------------------------------------------------
PROCEDURE ARCHIVE_ALL (
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE DEFAULT NULL,
pSourceKey IN CT_MRDS.A_SOURCE.A_SOURCE_KEY%TYPE DEFAULT NULL,
pArchiveAll IN BOOLEAN DEFAULT FALSE
)
IS
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
vTablesArchived PLS_INTEGER := 0;
vTablesSkipped PLS_INTEGER := 0;
vTablesFailed PLS_INTEGER := 0;
vProcessingLevel VARCHAR2(50);
BEGIN
vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST(
'pSourceFileConfigKey => '''||nvl(to_char(pSourceFileConfigKey),'NULL')||'''',
'pSourceKey => '''||nvl(pSourceKey,'NULL')||'''',
'pArchiveAll => '''||CASE WHEN pArchiveAll THEN 'TRUE' ELSE 'FALSE' END||''''
));
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
-- Determine processing level and validate parameters
IF pSourceFileConfigKey IS NOT NULL THEN
vProcessingLevel := 'LEVEL 1: Single Config Key';
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Processing Level 1: pSourceFileConfigKey=' || pSourceFileConfigKey, 'INFO');
ELSIF pSourceKey IS NOT NULL THEN
vProcessingLevel := 'LEVEL 2: Source Key';
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Processing Level 2: pSourceKey=' || pSourceKey, 'INFO');
ELSIF pArchiveAll THEN
vProcessingLevel := 'LEVEL 3: Archive All';
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Processing Level 3: Archive All Enabled Tables', 'INFO');
ELSE
RAISE_APPLICATION_ERROR(-20003, 'No processing level specified. Provide pSourceFileConfigKey, pSourceKey, or set pArchiveAll=TRUE');
END IF;
FOR config_rec IN (
SELECT
A_SOURCE_FILE_CONFIG_KEY,
TABLE_ID,
IS_ARCHIVE_ENABLED,
IS_KEEP_IN_TRASH,
A_SOURCE_KEY
FROM CT_MRDS.A_SOURCE_FILE_CONFIG
WHERE SOURCE_FILE_TYPE = 'INPUT'
AND (
-- Level 1: Specific config key
(pSourceFileConfigKey IS NOT NULL AND A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey)
OR
-- Level 2: All configs for source key
(pSourceFileConfigKey IS NULL AND pSourceKey IS NOT NULL AND A_SOURCE_KEY = pSourceKey)
OR
-- Level 3: All configs when pArchiveAll = TRUE
(pSourceFileConfigKey IS NULL AND pSourceKey IS NULL AND pArchiveAll = TRUE)
)
ORDER BY A_SOURCE_KEY, A_SOURCE_FILE_CONFIG_KEY
) LOOP
IF config_rec.IS_ARCHIVE_ENABLED = 'N' THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(
'Skipping table ' || config_rec.TABLE_ID || ' (IS_ARCHIVE_ENABLED=N) [Source: ' || config_rec.A_SOURCE_KEY || ', Config: ' || config_rec.A_SOURCE_FILE_CONFIG_KEY || ']',
'INFO'
);
vTablesSkipped := vTablesSkipped + 1;
ELSE
BEGIN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(
'Archiving table ' || config_rec.TABLE_ID || ' [Source: ' || config_rec.A_SOURCE_KEY || ', Config: ' || config_rec.A_SOURCE_FILE_CONFIG_KEY || ', IS_KEEP_IN_TRASH=' || config_rec.IS_KEEP_IN_TRASH || ']',
'INFO'
);
ARCHIVE_TABLE_DATA(pSourceFileConfigKey => config_rec.A_SOURCE_FILE_CONFIG_KEY);
vTablesArchived := vTablesArchived + 1;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(
'Successfully archived table ' || config_rec.TABLE_ID,
'INFO'
);
EXCEPTION
WHEN OTHERS THEN
vTablesFailed := vTablesFailed + 1;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(
'Failed to archive table ' || config_rec.TABLE_ID || ' [Config: ' || config_rec.A_SOURCE_FILE_CONFIG_KEY || ']: ' || SQLERRM,
'ERROR'
);
-- Continue with next table
END;
END IF;
END LOOP;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(
vProcessingLevel || ' - Batch archival summary: Archived=' || vTablesArchived || ', Skipped=' || vTablesSkipped || ', Failed=' || vTablesFailed,
'INFO'
);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_UNKNOWN , 'ERROR', vParameters);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_UNKNOWN, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
END ARCHIVE_ALL;
----------------------------------------------------------------------------------------------------
FUNCTION FN_ARCHIVE_ALL (
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE DEFAULT NULL,
pSourceKey IN CT_MRDS.A_SOURCE.A_SOURCE_KEY%TYPE DEFAULT NULL,
pArchiveAll IN BOOLEAN DEFAULT FALSE
) RETURN PLS_INTEGER
IS
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
BEGIN
vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST(
'pSourceFileConfigKey => '''||nvl(to_char(pSourceFileConfigKey),'NULL')||'''',
'pSourceKey => '''||nvl(pSourceKey,'NULL')||'''',
'pArchiveAll => '''||CASE WHEN pArchiveAll THEN 'TRUE' ELSE 'FALSE' END||''''
));
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
----
ARCHIVE_ALL(
pSourceFileConfigKey => pSourceFileConfigKey,
pSourceKey => pSourceKey,
pArchiveAll => pArchiveAll
);
----
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
RETURN SQLCODE;
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RETURN SQLCODE;
END FN_ARCHIVE_ALL;
----------------------------------------------------------------------------------------------------
-- BATCH STATISTICS GATHERING PROCEDURES
----------------------------------------------------------------------------------------------------
PROCEDURE GATHER_TABLE_STAT_ALL (
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE DEFAULT NULL,
pSourceKey IN CT_MRDS.A_SOURCE.A_SOURCE_KEY%TYPE DEFAULT NULL,
pGatherAll IN BOOLEAN DEFAULT FALSE,
pOnlyEnabled IN BOOLEAN DEFAULT TRUE
)
IS
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
vTablesProcessed PLS_INTEGER := 0;
vTablesSkipped PLS_INTEGER := 0;
vTablesFailed PLS_INTEGER := 0;
vProcessingLevel VARCHAR2(50);
vEnabledFilter VARCHAR2(50);
BEGIN
vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST(
'pSourceFileConfigKey => '''||nvl(to_char(pSourceFileConfigKey),'NULL')||'''',
'pSourceKey => '''||nvl(pSourceKey,'NULL')||'''',
'pGatherAll => '''||CASE WHEN pGatherAll THEN 'TRUE' ELSE 'FALSE' END||'''',
'pOnlyEnabled => '''||CASE WHEN pOnlyEnabled THEN 'TRUE' ELSE 'FALSE' END||''''
));
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
-- Determine processing level and validate parameters
IF pSourceFileConfigKey IS NOT NULL THEN
vProcessingLevel := 'LEVEL 1: Single Config Key';
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Processing Level 1: pSourceFileConfigKey=' || pSourceFileConfigKey, 'INFO');
ELSIF pSourceKey IS NOT NULL THEN
vProcessingLevel := 'LEVEL 2: Source Key';
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Processing Level 2: pSourceKey=' || pSourceKey, 'INFO');
ELSIF pGatherAll THEN
vProcessingLevel := 'LEVEL 3: Gather All Statistics';
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Processing Level 3: Gather All Enabled Statistics', 'INFO');
ELSE
RAISE_APPLICATION_ERROR(-20003, 'No processing level specified. Provide pSourceFileConfigKey, pSourceKey, or set pGatherAll=TRUE');
END IF;
-- Set enabled filter info
vEnabledFilter := CASE WHEN pOnlyEnabled THEN 'IS_ARCHIVE_ENABLED=Y only' ELSE 'All tables' END;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Filter mode: ' || vEnabledFilter, 'INFO');
FOR config_rec IN (
SELECT
A_SOURCE_FILE_CONFIG_KEY,
TABLE_ID,
IS_ARCHIVE_ENABLED,
A_SOURCE_KEY
FROM CT_MRDS.A_SOURCE_FILE_CONFIG
WHERE SOURCE_FILE_TYPE = 'INPUT'
AND (
-- Level 1: Specific config key
(pSourceFileConfigKey IS NOT NULL AND A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey)
OR
-- Level 2: All configs for source key
(pSourceFileConfigKey IS NULL AND pSourceKey IS NOT NULL AND A_SOURCE_KEY = pSourceKey)
OR
-- Level 3: All configs when pGatherAll = TRUE
(pSourceFileConfigKey IS NULL AND pSourceKey IS NULL AND pGatherAll = TRUE)
)
-- Apply IS_ARCHIVE_ENABLED filter if pOnlyEnabled = TRUE
AND (pOnlyEnabled = FALSE OR IS_ARCHIVE_ENABLED = 'Y')
ORDER BY A_SOURCE_KEY, A_SOURCE_FILE_CONFIG_KEY
) LOOP
IF pOnlyEnabled AND config_rec.IS_ARCHIVE_ENABLED = 'N' THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(
'Skipping table ' || config_rec.TABLE_ID || ' (IS_ARCHIVE_ENABLED=N) [Source: ' || config_rec.A_SOURCE_KEY || ', Config: ' || config_rec.A_SOURCE_FILE_CONFIG_KEY || ']',
'INFO'
);
vTablesSkipped := vTablesSkipped + 1;
ELSE
BEGIN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(
'Gathering statistics for table ' || config_rec.TABLE_ID || ' [Source: ' || config_rec.A_SOURCE_KEY || ', Config: ' || config_rec.A_SOURCE_FILE_CONFIG_KEY || ', IS_ARCHIVE_ENABLED=' || config_rec.IS_ARCHIVE_ENABLED || ']',
'INFO'
);
GATHER_TABLE_STAT(pSourceFileConfigKey => config_rec.A_SOURCE_FILE_CONFIG_KEY);
vTablesProcessed := vTablesProcessed + 1;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(
'Successfully gathered statistics for table ' || config_rec.TABLE_ID,
'INFO'
);
EXCEPTION
WHEN OTHERS THEN
vTablesFailed := vTablesFailed + 1;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(
'Failed to gather statistics for table ' || config_rec.TABLE_ID || ' [Config: ' || config_rec.A_SOURCE_FILE_CONFIG_KEY || ']: ' || SQLERRM,
'ERROR'
);
-- Continue with next table
END;
END IF;
END LOOP;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(
vProcessingLevel || ' - Batch statistics gathering summary: Processed=' || vTablesProcessed || ', Skipped=' || vTablesSkipped || ', Failed=' || vTablesFailed || ' [Filter: ' || vEnabledFilter || ']',
'INFO'
);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.MSG_UNKNOWN , 'ERROR', vParameters);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_UNKNOWN, CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
END GATHER_TABLE_STAT_ALL;
----------------------------------------------------------------------------------------------------
FUNCTION FN_GATHER_TABLE_STAT_ALL (
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE DEFAULT NULL,
pSourceKey IN CT_MRDS.A_SOURCE.A_SOURCE_KEY%TYPE DEFAULT NULL,
pGatherAll IN BOOLEAN DEFAULT FALSE,
pOnlyEnabled IN BOOLEAN DEFAULT TRUE
) RETURN PLS_INTEGER
IS
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
BEGIN
vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST(
'pSourceFileConfigKey => '''||nvl(to_char(pSourceFileConfigKey),'NULL')||'''',
'pSourceKey => '''||nvl(pSourceKey,'NULL')||'''',
'pGatherAll => '''||CASE WHEN pGatherAll THEN 'TRUE' ELSE 'FALSE' END||'''',
'pOnlyEnabled => '''||CASE WHEN pOnlyEnabled THEN 'TRUE' ELSE 'FALSE' END||''''
));
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
----
GATHER_TABLE_STAT_ALL(
pSourceFileConfigKey => pSourceFileConfigKey,
pSourceKey => pSourceKey,
pGatherAll => pGatherAll,
pOnlyEnabled => pOnlyEnabled
);
----
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
RETURN SQLCODE;
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RETURN SQLCODE;
END FN_GATHER_TABLE_STAT_ALL;
----------------------------------------------------------------------------------------------------
END;
/