Files
mars/MARS_Packages/REL01_ADDITIONS/MARS-828/new_version/FILE_ARCHIVER.pkb
Grzegorz Michalski 64965fb8c8 MARS-828
2026-02-03 13:51:13 +01:00

538 lines
28 KiB
Plaintext

create or replace PACKAGE BODY CT_MRDS.FILE_ARCHIVER
AS
----------------------------------------------------------------------------------------------------
-- PRIVATE FUNCTION: GET_ARCHIVAL_WHERE_CLAUSE
----------------------------------------------------------------------------------------------------
/**
* @name GET_ARCHIVAL_WHERE_CLAUSE
* @desc Private function that generates WHERE clause based on ARCHIVAL_STRATEGY configuration.
* Supports four strategies: THRESHOLD_BASED, CURRENT_MONTH_ONLY, MINIMUM_AGE_MONTHS, HYBRID.
* @param pSourceFileConfig - Source file configuration record with ARCHIVAL_STRATEGY
* @return VARCHAR2 - WHERE clause for filtering archival candidates
**/
FUNCTION GET_ARCHIVAL_WHERE_CLAUSE(
pSourceFileConfig IN CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE
) RETURN VARCHAR2
IS
vWhereClause VARCHAR2(4000);
cgBL CONSTANT VARCHAR2(2) := CHR(13)||CHR(10);
BEGIN
CASE pSourceFileConfig.ARCHIVAL_STRATEGY
-- Legacy threshold-based strategy (backward compatible)
WHEN 'THRESHOLD_BASED' THEN
vWhereClause := 'extract(day from (systimestamp - workflow_start)) > ' || pSourceFileConfig.DAYS_FOR_ARCHIVE_THRESHOLD;
-- Archive all data except current month
WHEN 'CURRENT_MONTH_ONLY' THEN
vWhereClause := 'TRUNC(workflow_start, ''MM'') < TRUNC(SYSDATE, ''MM'')';
-- Archive only data older than X months
WHEN 'MINIMUM_AGE_MONTHS' THEN
IF pSourceFileConfig.MINIMUM_AGE_MONTHS IS NULL THEN
RAISE_APPLICATION_ERROR(-20001, 'MINIMUM_AGE_MONTHS must be configured for MINIMUM_AGE_MONTHS strategy');
END IF;
vWhereClause := 'workflow_start < ADD_MONTHS(TRUNC(SYSDATE, ''MM''), -' || pSourceFileConfig.MINIMUM_AGE_MONTHS || ')';
-- Hybrid: Current month exclusion AND minimum age requirement
WHEN 'HYBRID' THEN
IF pSourceFileConfig.MINIMUM_AGE_MONTHS IS NULL THEN
RAISE_APPLICATION_ERROR(-20001, 'MINIMUM_AGE_MONTHS must be configured for HYBRID strategy');
END IF;
vWhereClause := 'TRUNC(workflow_start, ''MM'') < TRUNC(SYSDATE, ''MM'') ' ||
'AND workflow_start < ADD_MONTHS(TRUNC(SYSDATE, ''MM''), -' || pSourceFileConfig.MINIMUM_AGE_MONTHS || ')';
ELSE
RAISE_APPLICATION_ERROR(-20002, 'Invalid ARCHIVAL_STRATEGY: ' || pSourceFileConfig.ARCHIVAL_STRATEGY);
END CASE;
RETURN vWhereClause;
END GET_ARCHIVAL_WHERE_CLAUSE;
----------------------------------------------------------------------------------------------------
FUNCTION GET_TABLE_STAT(pSourceFileConfigKey IN NUMBER)
RETURN CT_MRDS.A_TABLE_STAT%ROWTYPE
IS
vTableStat CT_MRDS.A_TABLE_STAT%ROWTYPE;
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
vCount PLS_INTEGER;
vSourceFileType CT_MRDS.A_SOURCE_FILE_CONFIG.SOURCE_FILE_TYPE%TYPE;
BEGIN
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey),NULL)));
ENV_MANAGER.LOG_PROCESS_EVENT('Start','DEBUG', vParameters);
SELECT count(*) , min(SOURCE_FILE_TYPE)
INTO vCount, vSourceFileType
FROM CT_MRDS.A_TABLE_STAT s
JOIN CT_MRDS.A_SOURCE_FILE_CONFIG c
ON s.A_SOURCE_FILE_CONFIG_KEY = c.A_SOURCE_FILE_CONFIG_KEY
WHERE s.A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey;
IF vCount=0 and vSourceFileType='INPUT' THEN
GATHER_TABLE_STAT(pSourceFileConfigKey);
END IF;
BEGIN
SELECT *
INTO vTableStat
FROM CT_MRDS.A_TABLE_STAT
WHERE A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey;
-- EXCEPTION
-- WHEN NO_DATA_FOUND THEN
--
END;
ENV_MANAGER.LOG_PROCESS_EVENT('End','DEBUG',vParameters);
RETURN vTableStat;
END GET_TABLE_STAT;
----------------------------------------------------------------------------------------------------
PROCEDURE ARCHIVE_TABLE_DATA (
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE
)
IS
vSourceFileConfig CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE;
vTableStat CT_MRDS.A_TABLE_STAT%ROWTYPE;
vQuery VARCHAR2(4000);
vTableName VARCHAR2(200);
vUri VARCHAR2(1000);
vfiles T_FILENAMES;
vFilename VARCHAR2(300);
vOperationId NUMBER := -1;
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
vArchivalTriggeredBy VARCHAR2(60); -- Possible values: FILES_COUNT, ROWS_COUNT, BYTES_SUM
vUserLoadOperations USER_LOAD_OPERATIONS%ROWTYPE;
vProcessControlStatus VARCHAR2(60) := 'OK';
BEGIN
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL')));
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
vSourceFileConfig := FILE_MANAGER.GET_SOURCE_FILE_CONFIG(pSourceFileConfigKey => pSourceFileConfigKey);
vTableStat := GET_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey);
if vSourceFileConfig.SOURCE_FILE_TYPE <> 'INPUT' then
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.MSG_NOT_INPUT_SOURCE_FILE_TYPE, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_NOT_INPUT_SOURCE_FILE_TYPE, ENV_MANAGER.MSG_NOT_INPUT_SOURCE_FILE_TYPE);
end if;
if vTableStat.created < sysdate-(vSourceFileConfig.HOURS_TO_EXPIRE_STATISTICS/24) then
GATHER_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey);
vTableStat := GET_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey);
end if;
if vTableStat.OVER_ARCH_THRESOLD_FILE_COUNT >= vSourceFileConfig.FILES_COUNT_OVER_ARCHIVE_THRESHOLD then vArchivalTriggeredBy := 'FILES_COUNT';
elsif vTableStat.OVER_ARCH_THRESOLD_ROW_COUNT >= vSourceFileConfig.ROWS_COUNT_OVER_ARCHIVE_THRESHOLD then vArchivalTriggeredBy := vArchivalTriggeredBy||', ROWS_COUNT';
elsif vTableStat.OVER_ARCH_THRESOLD_SIZE >= vSourceFileConfig.BYTES_SUM_OVER_ARCHIVE_THRESHOLD then vArchivalTriggeredBy := vArchivalTriggeredBy||', BYTES_SUM';
else ENV_MANAGER.LOG_PROCESS_EVENT('Non of archival triggers reached','INFO');
end if;
if LENGTH(vArchivalTriggeredBy)>0 THEN
ENV_MANAGER.LOG_PROCESS_EVENT('Archival Triggered By: '||vArchivalTriggeredBy,'INFO');
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSourceFileConfig.ODS_SCHEMA_NAME) || '.'||vSourceFileConfig.A_SOURCE_KEY||'_'||DBMS_ASSERT.simple_sql_name(vSourceFileConfig.TABLE_ID)||'_ODS';
-- Use strategy-based WHERE clause (MARS-828)
vQuery := '
select t_filename(
file$name
,file$path
, to_char(h.workflow_start,''yyyy'')
, to_char(h.workflow_start,''mm'')
)
from '||vTableName||' s
join CT_MRDS.a_workflow_history h
on s.a_workflow_history_key = h.a_workflow_history_key
where ' || GET_ARCHIVAL_WHERE_CLAUSE(vSourceFileConfig)
;
-- Get all files that will be archived into "vfiles" collection ("regular data files")
execute immediate vQuery bulk collect into vfiles;
-- Start EXPORT "regular data files" to parquet and DROP "csv"
FOR ym_loop IN (select distinct year, month from table(vfiles) order by 1,2) LOOP
dbms_output.put_line('year: '||ym_loop.year||' - '||'month: '||ym_loop.month);
vQuery:=
'select
s.*
-- ,r.partition_year
-- ,r.partition_month
from '|| vTableName ||' s
join CT_MRDS.A_SOURCE_FILE_RECEIVED r
on s.file$name = r.source_file_name
and r.a_source_file_config_key = '||pSourceFileConfigKey||'
and r.partition_year='''||ym_loop.year||'''
and r.partition_month='''||ym_loop.month||'''
and r.PROCESSING_STATUS = ''INGESTED''
'
;
vUri := FILE_MANAGER.GET_BUCKET_URI('ARCHIVE')||vSourceFileConfig.A_SOURCE_KEY||'/'||vSourceFileConfig.TABLE_ID||'/PARTITION_YEAR='||ym_loop.year||'/PARTITION_MONTH='||ym_loop.month||'/';
ENV_MANAGER.LOG_PROCESS_EVENT('Start Archiving for YEAR_MONTH: '||ym_loop.year||'_'||ym_loop.month ,'INFO');
ENV_MANAGER.LOG_PROCESS_EVENT('Parameter for DBMS_CLOUD.EXPORT_DATA => file_uri_list' ,'DEBUG',vUri);
ENV_MANAGER.LOG_PROCESS_EVENT('Parameter for DBMS_CLOUD.EXPORT_DATA => query' ,'DEBUG',vQuery);
BEGIN
DBMS_CLOUD.EXPORT_DATA(
credential_name => ENV_MANAGER.gvCredentialName,
file_uri_list => vUri||'d' ,
format => json_object('type' value 'parquet'),
query => vQuery,
operation_id => vOperationId
);
EXCEPTION
WHEN OTHERS THEN
vProcessControlStatus :='EXPORT_FAILURE';
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED, ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED);
END;
ENV_MANAGER.LOG_PROCESS_EVENT('vOperationId of export: '||vOperationId,'DEBUG');
-- Get USER_LOAD_OPERATIONS info
select *
into vUserLoadOperations
from USER_LOAD_OPERATIONS
where id = vOperationId;
IF vUserLoadOperations.STATUS <>'COMPLETED' THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED,
ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED ||cgBL|| ' Export ended with status '||vUserLoadOperations.STATUS);
ELSIF vUserLoadOperations.STATUS = 'COMPLETED' and vUserLoadOperations.ROWS_LOADED = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED,
ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED ||cgBL|| ' Zero rows were exported.');
ELSE
ENV_MANAGER.LOG_PROCESS_EVENT('Data exported to archival file for YEAR_MONTH: 2025_01','INFO', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT(FILE_MANAGER.GET_DET_USER_LOAD_OPERATIONS (pOperationId => vOperationId),'DEBUG', vParameters);
END IF;
SELECT
object_name
into vFilename
from DBMS_CLOUD.LIST_OBJECTS(
credential_name => 'OCI$RESOURCE_PRINCIPAL',
location_uri => vUri)
where TO_UTC_TIMESTAMP_TZ(REGEXP_REPLACE(object_name, '.*(\d{4})(\d{2})(\d{2})T(\d{2})(\d{2})(\d{2})(\d{6})Z\.parquet$', '\1-\2-\3T\4:\5:\6.\7'))
between vUserLoadOperations.START_TIME
and vUserLoadOperations.UPDATE_TIME
;
-- Try to drop EXPORTED FILES ("regular data files")
BEGIN
FOR f in (select filename, pathname from table(vfiles) where year = ym_loop.year and month = ym_loop.month) loop
-- first change of status
BEGIN
UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED r
SET PROCESSING_STATUS = 'ARCHIVED'
,ARCH_FILE_NAME = vUri||vFilename
WHERE r.a_source_file_config_key= pSourceFileConfigKey
AND r.source_file_name = f.filename
AND r.processing_status = 'INGESTED'
;
EXCEPTION
WHEN OTHERS THEN
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
vProcessControlStatus := 'CHANGE_STATUS_TO_ARCHIVED_FAILURE';
END;
EXIT WHEN vProcessControlStatus = 'CHANGE_STATUS_TO_ARCHIVED_FAILURE';
-- move file to trash before dropping
BEGIN
DBMS_CLOUD.MOVE_OBJECT(source_credential_name => ENV_MANAGER.gvCredentialName,
source_object_uri => f.pathname||'/'||f.filename,
target_object_uri => replace(f.pathname,'ODS','TRASH')||'/'||f.filename,
target_credential_name => ENV_MANAGER.gvCredentialName
);
ENV_MANAGER.LOG_PROCESS_EVENT('File moved to TRASH.','DEBUG', f.pathname||'/'||f.filename);
EXCEPTION
WHEN OTHERS THEN
ENV_MANAGER.LOG_PROCESS_EVENT('Failed to move file to TRASH.','ERROR', f.pathname||'/'||f.filename);
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
rollback;
vProcessControlStatus := 'MOVE_FILE_TO_TRASH_FAILURE';
END;
EXIT WHEN vProcessControlStatus = 'MOVE_FILE_TO_TRASH_FAILURE';
commit;
END LOOP;
--------------------------------------------------------------------
-- IF All goes fine till this point, we drop files from TRASH (if not then ROLLBACK PART)
IF vProcessControlStatus = 'OK' THEN
FOR f in (select filename, pathname from table(vfiles) where year = ym_loop.year and month = ym_loop.month) LOOP
--Drop file from TRASH
DBMS_CLOUD.DELETE_OBJECT(credential_name => ENV_MANAGER.gvCredentialName,
object_uri => replace(f.pathname,'ODS','TRASH')||'/'||f.filename);
ENV_MANAGER.LOG_PROCESS_EVENT('File dropped from TRASH.','DEBUG', f.pathname||'/'||f.filename);
END LOOP;
--ROLLBACK PART
--ROLLBACK PROCESS in case of FAILURE (restore files from TRASH)
ELSIF vProcessControlStatus = 'MOVE_FILE_TO_TRASH_FAILURE' THEN
FOR f in ( SELECT vf.filename, vf.pathname
FROM TABLE(vfiles) vf
JOIN CT_MRDS.A_SOURCE_FILE_RECEIVED r
ON r.source_file_name = vf.filename
AND r.a_source_file_config_key = pSourceFileConfigKey
AND r.PROCESSING_STATUS = 'ARCHIVED'
AND vf.year = ym_loop.year
AND vf.month = ym_loop.month
) LOOP
BEGIN
DBMS_CLOUD.MOVE_OBJECT(source_credential_name => ENV_MANAGER.gvCredentialName,
source_object_uri => replace(f.pathname,'ODS','TRASH')||'/'||f.filename,
target_object_uri => f.pathname||'/'||f.filename,
target_credential_name => ENV_MANAGER.gvCredentialName
);
ENV_MANAGER.LOG_PROCESS_EVENT('File restored from TRASH.','DEBUG', f.pathname||'/'||f.filename);
UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED r
SET PROCESSING_STATUS = 'INGESTED'
,ARCH_FILE_NAME = NULL
WHERE r.a_source_file_config_key = pSourceFileConfigKey
AND r.source_file_name = f.filename
;
EXCEPTION
WHEN OTHERS THEN
ENV_MANAGER.LOG_PROCESS_EVENT('Failed to restore file from TRASH.','ERROR', replace(f.pathname,'ODS','TRASH')||'/'||f.filename);
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
vProcessControlStatus := 'RESTORE_FILE_FROM_TRASH_FAILURE';
END;
END LOOP;
DBMS_CLOUD.DELETE_OBJECT(credential_name => ENV_MANAGER.gvCredentialName,
object_uri => vUri||vFilename);
ENV_MANAGER.LOG_PROCESS_EVENT('ROLLBACK operation: Archival PARQUET file dropped.','DEBUG', vUri||vFilename);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_MOVE_FILE_TO_TRASH_FAILED, ENV_MANAGER.MSG_MOVE_FILE_TO_TRASH_FAILED);
ELSIF vProcessControlStatus = 'CHANGE_STATUS_TO_ARCHIVED_FAILURE' THEN
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.MSG_CHANGE_STAT_TO_ARCHIVED_FAILED, 'ERROR', vParameters);
DBMS_CLOUD.DELETE_OBJECT(credential_name => ENV_MANAGER.gvCredentialName,
object_uri => vUri||vFilename);
ENV_MANAGER.LOG_PROCESS_EVENT('Archival PARQUET file dropped.','DEBUG', vUri||vFilename);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_CHANGE_STAT_TO_ARCHIVED_FAILED, ENV_MANAGER.MSG_CHANGE_STAT_TO_ARCHIVED_FAILED);
ELSIF vProcessControlStatus = 'RESTORE_FILE_FROM_TRASH_FAILURE' THEN
ENV_MANAGER.LOG_PROCESS_EVENT('Some files were not restored from TRASH. Check A_PROCESS_LOG table for details','ERROR');
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_RESTORE_FILE_FROM_TRASH, ENV_MANAGER.MSG_RESTORE_FILE_FROM_TRASH);
END IF;
EXCEPTION
WHEN ENV_MANAGER.ERR_CHANGE_STAT_TO_ARCHIVED_FAILED THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_CHANGE_STAT_TO_ARCHIVED_FAILED, ENV_MANAGER.MSG_CHANGE_STAT_TO_ARCHIVED_FAILED);
WHEN ENV_MANAGER.ERR_MOVE_FILE_TO_TRASH_FAILED THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_MOVE_FILE_TO_TRASH_FAILED, ENV_MANAGER.MSG_MOVE_FILE_TO_TRASH_FAILED);
WHEN ENV_MANAGER.ERR_RESTORE_FILE_FROM_TRASH THEN
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_RESTORE_FILE_FROM_TRASH, ENV_MANAGER.MSG_RESTORE_FILE_FROM_TRASH);
WHEN OTHERS THEN
ENV_MANAGER.LOG_PROCESS_EVENT('Error during archiving process','ERROR');
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_DROP_EXPORTED_FILES_FAILED, ENV_MANAGER.MSG_DROP_EXPORTED_FILES_FAILED);
END;
-- END of "Try to drop EXPORTED FILES"
ENV_MANAGER.LOG_PROCESS_EVENT('All archived files had been dropped.','INFO');
ENV_MANAGER.LOG_PROCESS_EVENT('End Archiving for YEAR_MONTH: '||ym_loop.year||'_'||ym_loop.month ,'INFO');
END LOOP; --ym_loop end (YEAR_MONTH)
COMMIT;
ELSE
ENV_MANAGER.LOG_PROCESS_EVENT('Non of archival thresholds reached. Skip archiving.'||vArchivalTriggeredBy,'INFO');
END IF;
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
EXCEPTION
WHEN ENV_MANAGER.ERR_NOT_INPUT_SOURCE_FILE_TYPE THEN
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.MSG_NOT_INPUT_SOURCE_FILE_TYPE , 'ERROR', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_NOT_INPUT_SOURCE_FILE_TYPE, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
WHEN ENV_MANAGER.ERR_EXP_DATA_FOR_ARCH_FAILED THEN
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED , 'ERROR', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
WHEN ENV_MANAGER.ERR_CHANGE_STAT_TO_ARCHIVED_FAILED THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_CHANGE_STAT_TO_ARCHIVED_FAILED, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
WHEN ENV_MANAGER.ERR_MOVE_FILE_TO_TRASH_FAILED THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_MOVE_FILE_TO_TRASH_FAILED, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
WHEN OTHERS THEN
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.MSG_UNKNOWN , 'ERROR', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
END ARCHIVE_TABLE_DATA;
----------------------------------------------------------------------------------------------------
PROCEDURE GATHER_TABLE_STAT (
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE
) IS
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
vStats CT_MRDS.A_TABLE_STAT%ROWTYPE;
vSourceFileConfig CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE;
vTableName VARCHAR2(200);
vQuery VARCHAR2(32000);
BEGIN
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL')));
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
vSourceFileConfig := FILE_MANAGER.GET_SOURCE_FILE_CONFIG(pSourceFileConfigKey => pSourceFileConfigKey);
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSourceFileConfig.ODS_SCHEMA_NAME) || '.'||vSourceFileConfig.A_SOURCE_KEY||'_'||DBMS_ASSERT.simple_sql_name(vSourceFileConfig.TABLE_ID)||'_ODS';
ENV_MANAGER.LOG_PROCESS_EVENT('vTableName','DEBUG',vTableName);
-- Use strategy-based WHERE clause for statistics (MARS-828)
vQuery :=
'with tmp as (
select
s.*
,file$name as filename
,h.workflow_start
, to_char(h.workflow_start,''yyyy'') as year
, to_char(h.workflow_start,''mm'') as month
from '||vTableName||' s
join CT_MRDS.a_workflow_history h
on s.a_workflow_history_key = h.a_workflow_history_key
)
, tmp_gr as (
select
filename, count(*) as row_count_per_file, min(workflow_start) as workflow_start
from tmp
group by filename
)
select
NULL as A_TABLE_STAT_KEY
,'||pSourceFileConfigKey||' as A_SOURCE_FILE_CONFIG_KEY
,'''||vTableName||''' as TABLE_NAME
,count(*) as FILE_COUNT
,sum(case when ' || GET_ARCHIVAL_WHERE_CLAUSE(vSourceFileConfig) || ' then 1 else 0 end) as OLD_FILE_COUNT
,sum (row_count_per_file) as ROW_COUNT
,sum(case when ' || GET_ARCHIVAL_WHERE_CLAUSE(vSourceFileConfig) || ' then row_count_per_file else 0 end) as OLD_ROW_COUNT
,sum(r.bytes) as BYTES
,sum(case when ' || GET_ARCHIVAL_WHERE_CLAUSE(vSourceFileConfig) || ' then r.bytes else 0 end) as OLD_BYTES
,'||vSourceFileConfig.DAYS_FOR_ARCHIVE_THRESHOLD||' as DAYS_FOR_ARCHIVE_THRESHOLD
,systimestamp as CREATED
from tmp_gr t
join (SELECT * from DBMS_CLOUD.LIST_OBJECTS(
credential_name => '''||ENV_MANAGER.gvCredentialName||''',
location_uri => FILE_MANAGER.GET_BUCKET_URI(''ODS'')||''ODS/'||vSourceFileConfig.A_SOURCE_KEY||'/'||vSourceFileConfig.TABLE_ID||'/''
)
) r
on t.filename = r.object_name'
;
ENV_MANAGER.LOG_PROCESS_EVENT('vQuery','DEBUG',vQuery);
execute immediate vQuery into vStats;
vStats.A_TABLE_STAT_KEY := CT_MRDS.A_TABLE_STAT_KEY_SEQ.NEXTVAL;
insert into A_TABLE_STAT_HIST values vStats;
delete from A_TABLE_STAT where A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfigKey;
insert into A_TABLE_STAT values vStats;
COMMIT;
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
EXCEPTION
WHEN OTHERS THEN
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.MSG_UNKNOWN , 'ERROR', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
END GATHER_TABLE_STAT;
----------------------------------------------------------------------------------------------------
-- PACKAGE VERSION MANAGEMENT FUNCTIONS IMPLEMENTATION
----------------------------------------------------------------------------------------------------
FUNCTION GET_VERSION
RETURN VARCHAR2
IS
BEGIN
RETURN PACKAGE_VERSION;
END GET_VERSION;
----------------------------------------------------------------------------------------------------
FUNCTION GET_BUILD_INFO
RETURN VARCHAR2
IS
BEGIN
RETURN ENV_MANAGER.GET_PACKAGE_VERSION_INFO(
pPackageName => 'FILE_ARCHIVER',
pVersion => PACKAGE_VERSION,
pBuildDate => PACKAGE_BUILD_DATE,
pAuthor => PACKAGE_AUTHOR
);
END GET_BUILD_INFO;
----------------------------------------------------------------------------------------------------
FUNCTION GET_VERSION_HISTORY
RETURN VARCHAR2
IS
BEGIN
RETURN ENV_MANAGER.FORMAT_VERSION_HISTORY(
pPackageName => 'FILE_ARCHIVER',
pVersionHistory => VERSION_HISTORY
);
END GET_VERSION_HISTORY;
----------------------------------------------------------------------------------------------------
FUNCTION ARCHIVE_TABLE_DATA (
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE
) RETURN PLS_INTEGER
IS
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
BEGIN
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL')));
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
----
ARCHIVE_TABLE_DATA(pSourceFileConfigKey => pSourceFileConfigKey);
----
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
RETURN SQLCODE;
EXCEPTION
WHEN OTHERS THEN
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RETURN SQLCODE;
END ARCHIVE_TABLE_DATA;
----------------------------------------------------------------------------------------------------
FUNCTION GATHER_TABLE_STAT (
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE
) RETURN PLS_INTEGER
IS
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
BEGIN
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL')));
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
----
GATHER_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey);
----
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
RETURN SQLCODE;
EXCEPTION
WHEN OTHERS THEN
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RETURN SQLCODE;
END GATHER_TABLE_STAT;
----------------------------------------------------------------------------------------------------
END;
/