Update FILE_ARCHIVER package to version 3.1.2 with enhanced archival strategies and fixes
This commit is contained in:
@@ -154,15 +154,15 @@ AS
|
||||
vQuery:=
|
||||
'select
|
||||
s.*
|
||||
-- ,r.partition_year
|
||||
-- ,r.partition_month
|
||||
from '|| vTableName ||' s
|
||||
join CT_MRDS.A_SOURCE_FILE_RECEIVED r
|
||||
on s.file$name = r.source_file_name
|
||||
and r.a_source_file_config_key = '||pSourceFileConfigKey||'
|
||||
and r.partition_year='''||ym_loop.year||'''
|
||||
and r.partition_month='''||ym_loop.month||'''
|
||||
and r.PROCESSING_STATUS = ''INGESTED''
|
||||
join CT_MRDS.a_workflow_history h
|
||||
on s.a_workflow_history_key = h.a_workflow_history_key
|
||||
and to_char(h.workflow_start,''yyyy'') = '''||ym_loop.year||'''
|
||||
and to_char(h.workflow_start,''mm'') = '''||ym_loop.month||'''
|
||||
'
|
||||
;
|
||||
vUri := FILE_MANAGER.GET_BUCKET_URI('ARCHIVE')||vSourceFileConfig.A_SOURCE_KEY||'/'||vSourceFileConfig.TABLE_ID||'/PARTITION_YEAR='||ym_loop.year||'/PARTITION_MONTH='||ym_loop.month||'/';
|
||||
@@ -204,20 +204,14 @@ AS
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_EXP_DATA_FOR_ARCH_FAILED,
|
||||
ENV_MANAGER.MSG_EXP_DATA_FOR_ARCH_FAILED ||cgBL|| ' Zero rows were exported.');
|
||||
ELSE
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Data exported to archival file for YEAR_MONTH: 2025_01','INFO', vParameters);
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Data exported to archival file for YEAR_MONTH: '||ym_loop.year||'_'||ym_loop.month,'INFO', vParameters);
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT(FILE_MANAGER.GET_DET_USER_LOAD_OPERATIONS (pOperationId => vOperationId),'DEBUG', vParameters);
|
||||
END IF;
|
||||
|
||||
SELECT
|
||||
object_name
|
||||
into vFilename
|
||||
from DBMS_CLOUD.LIST_OBJECTS(
|
||||
credential_name => 'OCI$RESOURCE_PRINCIPAL',
|
||||
location_uri => vUri)
|
||||
where TO_UTC_TIMESTAMP_TZ(REGEXP_REPLACE(object_name, '.*(\d{4})(\d{2})(\d{2})T(\d{2})(\d{2})(\d{2})(\d{6})Z\.parquet$', '\1-\2-\3T\4:\5:\6.\7'))
|
||||
between vUserLoadOperations.START_TIME
|
||||
and vUserLoadOperations.UPDATE_TIME
|
||||
;
|
||||
-- Note: DBMS_CLOUD.EXPORT_DATA may create multiple parquet files (parallel execution)
|
||||
-- Instead of tracking individual files, we store the archive directory prefix
|
||||
-- ARCH_FILE_NAME will contain the directory URI where all parquet files are located
|
||||
vFilename := vUri; -- Store directory prefix instead of individual filename
|
||||
|
||||
-- Try to drop EXPORTED FILES ("regular data files")
|
||||
BEGIN
|
||||
@@ -227,7 +221,9 @@ AS
|
||||
BEGIN
|
||||
UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED r
|
||||
SET PROCESSING_STATUS = 'ARCHIVED'
|
||||
,ARCH_FILE_NAME = vUri||vFilename
|
||||
,ARCH_FILE_NAME = vFilename -- Now contains directory prefix, not individual file
|
||||
,PARTITION_YEAR = ym_loop.year -- Record which partition year the data was archived to
|
||||
,PARTITION_MONTH = ym_loop.month -- Record which partition month the data was archived to
|
||||
WHERE r.a_source_file_config_key= pSourceFileConfigKey
|
||||
AND r.source_file_name = f.filename
|
||||
AND r.processing_status = 'INGESTED'
|
||||
@@ -303,16 +299,38 @@ AS
|
||||
END;
|
||||
END LOOP;
|
||||
|
||||
DBMS_CLOUD.DELETE_OBJECT(credential_name => ENV_MANAGER.gvCredentialName,
|
||||
object_uri => vUri||vFilename);
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('ROLLBACK operation: Archival PARQUET file dropped.','DEBUG', vUri||vFilename);
|
||||
-- ROLLBACK: Delete all parquet files from archive directory
|
||||
FOR arch_file IN (
|
||||
SELECT object_name
|
||||
FROM DBMS_CLOUD.LIST_OBJECTS(
|
||||
credential_name => ENV_MANAGER.gvCredentialName,
|
||||
location_uri => vFilename -- vFilename now contains directory prefix
|
||||
)
|
||||
) LOOP
|
||||
DBMS_CLOUD.DELETE_OBJECT(
|
||||
credential_name => ENV_MANAGER.gvCredentialName,
|
||||
object_uri => vFilename || arch_file.object_name
|
||||
);
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('ROLLBACK operation: Archival PARQUET file dropped.','DEBUG', vFilename || arch_file.object_name);
|
||||
END LOOP;
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_MOVE_FILE_TO_TRASH_FAILED, ENV_MANAGER.MSG_MOVE_FILE_TO_TRASH_FAILED);
|
||||
|
||||
ELSIF vProcessControlStatus = 'CHANGE_STATUS_TO_ARCHIVED_FAILURE' THEN
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.MSG_CHANGE_STAT_TO_ARCHIVED_FAILED, 'ERROR', vParameters);
|
||||
DBMS_CLOUD.DELETE_OBJECT(credential_name => ENV_MANAGER.gvCredentialName,
|
||||
object_uri => vUri||vFilename);
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Archival PARQUET file dropped.','DEBUG', vUri||vFilename);
|
||||
-- ROLLBACK: Delete all parquet files from archive directory
|
||||
FOR arch_file IN (
|
||||
SELECT object_name
|
||||
FROM DBMS_CLOUD.LIST_OBJECTS(
|
||||
credential_name => ENV_MANAGER.gvCredentialName,
|
||||
location_uri => vFilename -- vFilename now contains directory prefix
|
||||
)
|
||||
) LOOP
|
||||
DBMS_CLOUD.DELETE_OBJECT(
|
||||
credential_name => ENV_MANAGER.gvCredentialName,
|
||||
object_uri => vFilename || arch_file.object_name
|
||||
);
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Archival PARQUET file dropped.','DEBUG', vFilename || arch_file.object_name);
|
||||
END LOOP;
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_CHANGE_STAT_TO_ARCHIVED_FAILED, ENV_MANAGER.MSG_CHANGE_STAT_TO_ARCHIVED_FAILED);
|
||||
|
||||
ELSIF vProcessControlStatus = 'RESTORE_FILE_FROM_TRASH_FAILURE' THEN
|
||||
|
||||
Reference in New Issue
Block a user