Update FILE_ARCHIVER package to version 3.2.0 with TRASH retention control and new statuses

This commit is contained in:
Grzegorz Michalski
2026-02-10 08:23:06 +01:00
parent cdd9dff32d
commit d443c4f07d
10 changed files with 257 additions and 60 deletions

View File

@@ -0,0 +1,29 @@
-- ====================================================================
-- A_SOURCE_FILE_RECEIVED Table
-- ====================================================================
-- Purpose: Track received files and their processing status
-- ====================================================================
CREATE TABLE CT_MRDS.A_SOURCE_FILE_RECEIVED (
A_SOURCE_FILE_RECEIVED_KEY NUMBER(38,0) NOT NULL ENABLE,
A_SOURCE_FILE_CONFIG_KEY NUMBER(38,0) NOT NULL ENABLE,
SOURCE_FILE_NAME VARCHAR2(1000) NOT NULL,
CHECKSUM VARCHAR2(128),
CREATED TIMESTAMP(6) WITH TIME ZONE,
BYTES NUMBER,
RECEPTION_DATE DATE NOT NULL,
PROCESSING_STATUS VARCHAR2(200),
EXTERNAL_TABLE_NAME VARCHAR2(200),
PARTITION_YEAR VARCHAR2(4),
PARTITION_MONTH VARCHAR2(2),
ARCH_FILE_NAME VARCHAR2(1000),
CONSTRAINT A_SOURCE_FILE_RECEIVED_PK PRIMARY KEY (A_SOURCE_FILE_RECEIVED_KEY),
CONSTRAINT ASFR_A_SOURCE_FILE_CONFIG_KEY_FK FOREIGN KEY(A_SOURCE_FILE_CONFIG_KEY) REFERENCES CT_MRDS.A_SOURCE_FILE_CONFIG(A_SOURCE_FILE_CONFIG_KEY),
CONSTRAINT A_SOURCE_FILE_RECEIVED_CHK CHECK (PROCESSING_STATUS IN ('RECEIVED', 'VALIDATED', 'READY_FOR_INGESTION', 'INGESTED', 'ARCHIVED', 'ARCHIVED_AND_TRASHED', 'ARCHIVED_AND_PURGED'))
) TABLESPACE "DATA";
-- Unique index for file identification (workaround for TIMESTAMP WITH TIMEZONE constraint limitation)
CREATE UNIQUE INDEX CT_MRDS.A_SOURCE_FILE_RECEIVED_UK1
ON CT_MRDS.A_SOURCE_FILE_RECEIVED(CHECKSUM, CREATED, BYTES);
GRANT SELECT, INSERT, UPDATE, DELETE ON CT_MRDS.A_SOURCE_FILE_RECEIVED TO MRDS_LOADER_ROLE;

View File

@@ -87,7 +87,8 @@ AS
----------------------------------------------------------------------------------------------------
PROCEDURE ARCHIVE_TABLE_DATA (
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE,
pKeepInTrash IN BOOLEAN DEFAULT TRUE
)
IS
vSourceFileConfig CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE;
@@ -104,7 +105,10 @@ AS
vProcessControlStatus VARCHAR2(60) := 'OK';
BEGIN
vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL')));
vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST(
'pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'),
'pKeepInTrash => '||CASE WHEN pKeepInTrash THEN 'TRUE' ELSE 'FALSE' END
));
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
vSourceFileConfig := CT_MRDS.FILE_MANAGER.GET_SOURCE_FILE_CONFIG(pSourceFileConfigKey => pSourceFileConfigKey);
@@ -244,17 +248,17 @@ AS
END;
EXIT WHEN vProcessControlStatus = 'CHANGE_STATUS_TO_ARCHIVED_FAILURE';
-- move file to trash before dropping
-- move file to TRASH subfolder (DATA bucket: ODS/ → TRASH/) before dropping
BEGIN
DBMS_CLOUD.MOVE_OBJECT(source_credential_name => ENV_MANAGER.gvCredentialName,
source_object_uri => f.pathname||'/'||f.filename,
target_object_uri => replace(f.pathname,'ODS','TRASH')||'/'||f.filename,
target_credential_name => ENV_MANAGER.gvCredentialName
);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File moved to TRASH.','DEBUG', f.pathname||'/'||f.filename);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File moved to TRASH folder.','DEBUG', f.pathname||'/'||f.filename);
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to move file to TRASH.','ERROR', f.pathname||'/'||f.filename);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to move file to TRASH folder.','ERROR', f.pathname||'/'||f.filename);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
rollback;
vProcessControlStatus := 'MOVE_FILE_TO_TRASH_FAILURE';
@@ -264,24 +268,38 @@ AS
END LOOP;
--------------------------------------------------------------------
-- IF All goes fine till this point, we drop files from TRASH (if not then ROLLBACK PART)
-- IF All goes fine till this point, we drop files from TRASH folder (if not then ROLLBACK PART)
-- TRASH is a subfolder in DATA bucket (e.g., TRASH/LM/TABLE_NAME instead of ODS/LM/TABLE_NAME)
IF vProcessControlStatus = 'OK' THEN
FOR f in (select filename, pathname from table(vfiles) where year = ym_loop.year and month = ym_loop.month) LOOP
--Drop file from TRASH
DBMS_CLOUD.DELETE_OBJECT(credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName,
object_uri => replace(f.pathname,'ODS','TRASH')||'/'||f.filename);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File dropped from TRASH.','DEBUG', f.pathname||'/'||f.filename);
END LOOP;
IF NOT pKeepInTrash THEN
-- Delete files from TRASH folder (cleanup) and update status to ARCHIVED_AND_PURGED
FOR f in (select filename, pathname from table(vfiles) where year = ym_loop.year and month = ym_loop.month) LOOP
DBMS_CLOUD.DELETE_OBJECT(credential_name => CT_MRDS.ENV_MANAGER.gvCredentialName,
object_uri => replace(f.pathname,'ODS','TRASH')||'/'||f.filename);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File dropped from TRASH folder.','DEBUG', f.pathname||'/'||f.filename);
-- Update status to ARCHIVED_AND_PURGED
UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED r
SET PROCESSING_STATUS = 'ARCHIVED_AND_PURGED'
WHERE r.a_source_file_config_key = pSourceFileConfigKey
AND r.source_file_name = f.filename
AND r.PROCESSING_STATUS = 'ARCHIVED_AND_TRASHED';
END LOOP;
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('All archived files removed from TRASH folder and marked as ARCHIVED_AND_PURGED.','INFO');
ELSE
-- Keep files in TRASH folder (status remains ARCHIVED_AND_TRASHED)
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Archived files kept in TRASH folder for retention (status: ARCHIVED_AND_TRASHED).','INFO');
END IF;
--ROLLBACK PART
--ROLLBACK PROCESS in case of FAILURE (restore files from TRASH)
--ROLLBACK PROCESS in case of FAILURE (restore files from TRASH subfolder in DATA bucket)
ELSIF vProcessControlStatus = 'MOVE_FILE_TO_TRASH_FAILURE' THEN
FOR f in ( SELECT vf.filename, vf.pathname
FROM TABLE(vfiles) vf
JOIN CT_MRDS.A_SOURCE_FILE_RECEIVED r
ON r.source_file_name = vf.filename
AND r.a_source_file_config_key = pSourceFileConfigKey
AND r.PROCESSING_STATUS = 'ARCHIVED'
AND r.PROCESSING_STATUS IN ('ARCHIVED_AND_TRASHED', 'ARCHIVED_AND_PURGED')
AND vf.year = ym_loop.year
AND vf.month = ym_loop.month
) LOOP
@@ -291,7 +309,7 @@ AS
target_object_uri => f.pathname||'/'||f.filename,
target_credential_name => ENV_MANAGER.gvCredentialName
);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File restored from TRASH.','DEBUG', f.pathname||'/'||f.filename);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('File restored from TRASH folder.','DEBUG', f.pathname||'/'||f.filename);
UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED r
SET PROCESSING_STATUS = 'INGESTED'
@@ -302,7 +320,7 @@ AS
EXCEPTION
WHEN OTHERS THEN
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to restore file from TRASH.','ERROR', replace(f.pathname,'ODS','TRASH')||'/'||f.filename);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Failed to restore file from TRASH folder.','ERROR', replace(f.pathname,'ODS','TRASH')||'/'||f.filename);
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
vProcessControlStatus := 'RESTORE_FILE_FROM_TRASH_FAILURE';
END;
@@ -363,9 +381,7 @@ AS
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT(CT_MRDS.ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(CT_MRDS.ENV_MANAGER.CODE_DROP_EXPORTED_FILES_FAILED, CT_MRDS.ENV_MANAGER.MSG_DROP_EXPORTED_FILES_FAILED);
END;
-- END of "Try to drop EXPORTED FILES"
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('All archived files had been dropped.','INFO');
-- END of "Try to drop EXPORTED FILES"
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End Archiving for YEAR_MONTH: '||ym_loop.year||'_'||ym_loop.month ,'INFO');
END LOOP; --ym_loop end (YEAR_MONTH)
@@ -523,15 +539,19 @@ AS
----------------------------------------------------------------------------------------------------
FUNCTION ARCHIVE_TABLE_DATA (
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE,
pKeepInTrash IN BOOLEAN DEFAULT TRUE
) RETURN PLS_INTEGER
IS
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
BEGIN
vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL')));
vParameters := CT_MRDS.ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST(
'pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'),
'pKeepInTrash => '||CASE WHEN pKeepInTrash THEN 'TRUE' ELSE 'FALSE' END
));
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
----
ARCHIVE_TABLE_DATA(pSourceFileConfigKey => pSourceFileConfigKey);
ARCHIVE_TABLE_DATA(pSourceFileConfigKey => pSourceFileConfigKey, pKeepInTrash => pKeepInTrash);
----
CT_MRDS.ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
RETURN SQLCODE;

View File

@@ -17,12 +17,13 @@ AS
**/
-- Package Version Information (Semantic Versioning: MAJOR.MINOR.PATCH)
PACKAGE_VERSION CONSTANT VARCHAR2(10) := '3.1.2';
PACKAGE_BUILD_DATE CONSTANT VARCHAR2(20) := '2026-02-06 11:30:00';
PACKAGE_VERSION CONSTANT VARCHAR2(10) := '3.2.0';
PACKAGE_BUILD_DATE CONSTANT VARCHAR2(20) := '2026-02-06 14:00:00';
PACKAGE_AUTHOR CONSTANT VARCHAR2(100) := 'Grzegorz Michalski';
-- Version History (Latest changes first)
VERSION_HISTORY CONSTANT VARCHAR2(4000) :=
'3.2.0 (2026-02-06): Added pKeepInTrash parameter (DEFAULT TRUE) to ARCHIVE_TABLE_DATA for TRASH folder retention control - files kept in TRASH subfolder (DATA bucket) by default for safety and compliance' || CHR(13)||CHR(10) ||
'3.1.2 (2026-02-06): Fixed missing PARTITION_YEAR/PARTITION_MONTH assignments in UPDATE statement and export query circular dependency (now filters by workflow_start instead of partition fields)' || CHR(13)||CHR(10) ||
'3.1.1 (2026-02-06): Fixed ORA-01422 error when DBMS_CLOUD.EXPORT_DATA creates multiple parquet files (parallel execution). Now stores archive directory prefix instead of individual filenames' || CHR(13)||CHR(10) ||
'3.1.0 (2026-01-29): Added function overloads for ARCHIVE_TABLE_DATA and GATHER_TABLE_STAT returning SQLCODE for Python library integration' || CHR(13)||CHR(10) ||
@@ -38,9 +39,11 @@ AS
* @desc Wrapper procedure for DBMS_CLOUD.EXPORT_DATA.
* Exports data from table specified by pSourceFileConfigKey(A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY) into PARQUET file on OCI infrustructure.
* Each YEAR_MONTH pair goes to seperate file (implicit partitioning).
* @param pKeepInTrash - When TRUE (default), files are kept in TRASH folder (DATA bucket subfolder) for safety. When FALSE, files are deleted from TRASH after successful archive.
**/
PROCEDURE ARCHIVE_TABLE_DATA (
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE,
pKeepInTrash IN BOOLEAN DEFAULT TRUE
);
/**
@@ -48,11 +51,13 @@ AS
* @desc Function overload for ARCHIVE_TABLE_DATA procedure.
* Returns SQLCODE for Python library integration.
* Calls the main ARCHIVE_TABLE_DATA procedure and captures execution result.
* @param pKeepInTrash - When TRUE (default), files are kept in TRASH folder (DATA bucket subfolder) for safety. When FALSE, files are deleted from TRASH after successful archive.
* @example SELECT FILE_ARCHIVER.ARCHIVE_TABLE_DATA(pSourceFileConfigKey => 123) FROM DUAL;
* @ex_rslt 0 (success) or error code
**/
FUNCTION ARCHIVE_TABLE_DATA (
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE
pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE,
pKeepInTrash IN BOOLEAN DEFAULT TRUE
) RETURN PLS_INTEGER;