feat(MARS-1409-POSTHOOK): Add scripts to register missing files and rollback orphan file registrations

This commit is contained in:
Grzegorz Michalski
2026-03-26 16:52:38 +01:00
parent 904f5e347c
commit 00be955068
4 changed files with 319 additions and 1 deletions

View File

@@ -0,0 +1,274 @@
-- ============================================================================
-- MARS-1409-POSTHOOK Step 03: Register missing files in A_SOURCE_FILE_RECEIVED
-- ============================================================================
-- Purpose: For each active INPUT configuration in A_SOURCE_FILE_CONFIG,
-- scan the ODS external table and insert a record into
-- A_SOURCE_FILE_RECEIVED for every file that exists in the ODS bucket
-- but has no matching entry in A_SOURCE_FILE_RECEIVED.
--
-- Such "orphan" files are visible to ARCHIVE_TABLE_DATA (via vfiles
-- query joining A_SOURCE_FILE_RECEIVED) and would be skipped silently
-- without this fix, leading to incomplete archival.
--
-- Inserted record defaults:
-- PROCESSING_STATUS = 'INGESTED' when A_WORKFLOW_HISTORY.WORKFLOW_SUCCESSFUL IN ('Y', 'SUCCESS')
-- = 'READY_FOR_INGESTION' otherwise (workflow not completed successfully)
-- RECEPTION_DATE = A_WORKFLOW_HISTORY.WORKFLOW_START (fallback: SYSDATE)
-- CREATED = SYSTIMESTAMP
-- EXTERNAL_TABLE_NAME = ODS external table name (e.g. ODS.LM_FORECAST_HEADER_ODS)
-- PROCESS_NAME = 'MARS-1409'
-- A_WORKFLOW_HISTORY_KEY = from ODS table row (s.a_workflow_history_key)
-- BYTES = DBMS_CLOUD.LIST_OBJECTS bytes for the file in ODS bucket
-- CHECKSUM = DBMS_CLOUD.LIST_OBJECTS checksum (MD5 base64) for the file
-- (NULL if LIST_OBJECTS call fails)
--
-- Author: Grzegorz Michalski
-- Date: 2026-03-24
-- ============================================================================
SET SERVEROUTPUT ON SIZE UNLIMITED
DECLARE
vRegistered PLS_INTEGER := 0;
vSkipped PLS_INTEGER := 0;
vErrors PLS_INTEGER := 0;
vQuery VARCHAR2(32000);
vTableName VARCHAR2(200);
vCount PLS_INTEGER;
TYPE t_filename_key IS RECORD (
filename VARCHAR2(1000),
a_workflow_history_key NUMBER,
workflow_start DATE,
workflow_successful VARCHAR2(10)
);
TYPE t_filename_tab IS TABLE OF t_filename_key;
vCandidates t_filename_tab;
TYPE t_obj_meta IS RECORD (
bytes NUMBER,
checksum VARCHAR2(128)
);
TYPE t_obj_meta_map IS TABLE OF t_obj_meta INDEX BY VARCHAR2(1000);
vObjMeta t_obj_meta_map;
vObjUri VARCHAR2(500);
BEGIN
DBMS_OUTPUT.PUT_LINE('============================================================');
DBMS_OUTPUT.PUT_LINE('Register missing files in A_SOURCE_FILE_RECEIVED');
DBMS_OUTPUT.PUT_LINE('Started: ' || TO_CHAR(SYSTIMESTAMP, 'YYYY-MM-DD HH24:MI:SS'));
DBMS_OUTPUT.PUT_LINE('============================================================');
FOR cfg IN (
SELECT c.A_SOURCE_FILE_CONFIG_KEY,
c.A_SOURCE_KEY,
c.TABLE_ID,
c.ODS_SCHEMA_NAME,
c.SOURCE_FILE_ID
FROM CT_MRDS.A_SOURCE_FILE_CONFIG c
WHERE c.SOURCE_FILE_TYPE = 'INPUT'
ORDER BY c.A_SOURCE_KEY, c.TABLE_ID
) LOOP
vTableName := DBMS_ASSERT.SCHEMA_NAME(NVL(TRIM(cfg.ODS_SCHEMA_NAME), 'ODS'))
|| '.' || DBMS_ASSERT.SIMPLE_SQL_NAME(cfg.TABLE_ID) || '_ODS';
-- Check if external table exists
BEGIN
SELECT COUNT(*)
INTO vCount
FROM all_external_tables
WHERE owner = UPPER(NVL(TRIM(cfg.ODS_SCHEMA_NAME), 'ODS'))
AND table_name = UPPER(cfg.TABLE_ID) || '_ODS';
EXCEPTION
WHEN OTHERS THEN vCount := 0;
END;
IF vCount = 0 THEN
DBMS_OUTPUT.PUT_LINE('[SKIP] Config ' || cfg.A_SOURCE_FILE_CONFIG_KEY
|| ' (' || cfg.A_SOURCE_KEY || '.' || cfg.TABLE_ID
|| '): ODS table ' || vTableName || ' not found.');
vSkipped := vSkipped + 1;
CONTINUE;
END IF;
-- Check if A_WORKFLOW_HISTORY_KEY column exists in the ODS external table.
-- Some legacy tables (e.g. IDS_DATA_ODS) predate MARS-1409 and lack this column.
SELECT COUNT(*)
INTO vCount
FROM all_tab_columns
WHERE owner = UPPER(NVL(TRIM(cfg.ODS_SCHEMA_NAME), 'ODS'))
AND table_name = UPPER(cfg.TABLE_ID) || '_ODS'
AND column_name = 'A_WORKFLOW_HISTORY_KEY';
-- Find files present in ODS table but missing from A_SOURCE_FILE_RECEIVED.
-- GROUP BY file$name (+ MAX key) prevents duplicate rows when one file
-- has multiple rows in the ODS table with different A_WORKFLOW_HISTORY_KEY values.
IF vCount > 0 THEN
-- Full query: joins A_WORKFLOW_HISTORY for RECEPTION_DATE and status derivation
vQuery :=
'SELECT s.file$name, s.a_workflow_history_key, CAST(h.workflow_start AS DATE), h.workflow_successful' ||
' FROM (' ||
' SELECT file$name, MAX(a_workflow_history_key) AS a_workflow_history_key' ||
' FROM ' || vTableName ||
' WHERE NOT EXISTS (' ||
' SELECT 1 FROM CT_MRDS.A_SOURCE_FILE_RECEIVED r' ||
' WHERE r.source_file_name = file$name' ||
' AND r.a_source_file_config_key = ' || cfg.A_SOURCE_FILE_CONFIG_KEY ||
' )' ||
' GROUP BY file$name' ||
' ) s' ||
' JOIN CT_MRDS.A_WORKFLOW_HISTORY h ON h.a_workflow_history_key = s.a_workflow_history_key';
ELSE
-- Fallback: table has no A_WORKFLOW_HISTORY_KEY column (legacy table)
-- Import with NULL workflow key and READY_FOR_INGESTION status
vQuery :=
'SELECT DISTINCT file$name, CAST(NULL AS NUMBER), CAST(NULL AS DATE), CAST(NULL AS VARCHAR2(10))' ||
' FROM ' || vTableName ||
' WHERE NOT EXISTS (' ||
' SELECT 1 FROM CT_MRDS.A_SOURCE_FILE_RECEIVED r' ||
' WHERE r.source_file_name = file$name' ||
' AND r.a_source_file_config_key = ' || cfg.A_SOURCE_FILE_CONFIG_KEY ||
' )';
END IF;
BEGIN
EXECUTE IMMEDIATE vQuery BULK COLLECT INTO vCandidates;
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE IN (-29913, -12801) THEN
-- ORA-29913 / ORA-12801 = external table access error.
-- Possible causes: empty bucket location (KUP-05002), inaccessible path,
-- ODS table pointing to wrong prefix, etc.
-- In all cases we cannot read any files from this table, so treat as SKIP.
DECLARE
vErrDetail VARCHAR2(4000) := DBMS_UTILITY.FORMAT_ERROR_STACK;
BEGIN
IF vErrDetail LIKE '%KUP-05002%' THEN
DBMS_OUTPUT.PUT_LINE('[SKIP] Config ' || cfg.A_SOURCE_FILE_CONFIG_KEY
|| ' (' || cfg.A_SOURCE_KEY || '.' || cfg.TABLE_ID
|| '): ODS bucket location is empty.');
ELSE
DBMS_OUTPUT.PUT_LINE('[SKIP] Config ' || cfg.A_SOURCE_FILE_CONFIG_KEY
|| ' (' || cfg.A_SOURCE_KEY || '.' || cfg.TABLE_ID
|| '): ODS external table not accessible (' || RTRIM(SUBSTR(vErrDetail, 1, 200)) || ')');
END IF;
END;
vSkipped := vSkipped + 1;
CONTINUE;
ELSE
DBMS_OUTPUT.PUT_LINE('[ERROR] Config ' || cfg.A_SOURCE_FILE_CONFIG_KEY
|| ' (' || cfg.A_SOURCE_KEY || '.' || cfg.TABLE_ID
|| '): ' || SQLERRM);
vErrors := vErrors + 1;
CONTINUE;
END IF;
END;
IF vCandidates.COUNT = 0 THEN
DBMS_OUTPUT.PUT_LINE('[OK] Config ' || cfg.A_SOURCE_FILE_CONFIG_KEY
|| ' (' || cfg.A_SOURCE_KEY || '.' || cfg.TABLE_ID
|| '): no missing files.');
CONTINUE;
END IF;
-- Prefetch file metadata (bytes, checksum) from ODS bucket.
-- object_name returned by LIST_OBJECTS is relative to location_uri (= just the filename).
vObjMeta.DELETE;
vObjUri := CT_MRDS.ENV_MANAGER.gvDataBucketUri
|| 'ODS/' || cfg.A_SOURCE_KEY || '/' || cfg.TABLE_ID || '/';
BEGIN
FOR r IN (
SELECT object_name, bytes, checksum
FROM DBMS_CLOUD.LIST_OBJECTS(
credential_name => 'OCI$RESOURCE_PRINCIPAL',
location_uri => vObjUri
)
) LOOP
-- object_name is relative to location_uri, so it IS the filename directly
vObjMeta(r.object_name).bytes := r.bytes;
vObjMeta(r.object_name).checksum := r.checksum;
END LOOP;
EXCEPTION
WHEN OTHERS THEN
DBMS_OUTPUT.PUT_LINE('[WARN] Config ' || cfg.A_SOURCE_FILE_CONFIG_KEY
|| ' (' || cfg.A_SOURCE_KEY || '.' || cfg.TABLE_ID
|| '): failed to fetch cloud metadata - BYTES/CHECKSUM will be NULL. ' || SQLERRM);
END;
-- Insert missing records
FOR i IN 1..vCandidates.COUNT LOOP
DECLARE
vBytes NUMBER := NULL;
vChecksum VARCHAR2(128) := NULL;
BEGIN
IF vObjMeta.EXISTS(vCandidates(i).filename) THEN
vBytes := vObjMeta(vCandidates(i).filename).bytes;
vChecksum := vObjMeta(vCandidates(i).filename).checksum;
END IF;
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
A_SOURCE_FILE_RECEIVED_KEY,
A_SOURCE_FILE_CONFIG_KEY,
SOURCE_FILE_NAME,
BYTES,
CHECKSUM,
RECEPTION_DATE,
CREATED,
PROCESSING_STATUS,
EXTERNAL_TABLE_NAME,
PROCESS_NAME,
A_WORKFLOW_HISTORY_KEY
) VALUES (
CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL,
cfg.A_SOURCE_FILE_CONFIG_KEY,
vCandidates(i).filename,
vBytes,
vChecksum,
NVL(vCandidates(i).workflow_start, SYSDATE),
SYSTIMESTAMP,
CASE WHEN vCandidates(i).workflow_successful IN ('Y', 'SUCCESS') THEN 'INGESTED' ELSE 'READY_FOR_INGESTION' END,
vTableName,
'MARS-1409',
vCandidates(i).a_workflow_history_key
);
DBMS_OUTPUT.PUT_LINE('[INSERT] Config ' || cfg.A_SOURCE_FILE_CONFIG_KEY
|| ' (' || cfg.A_SOURCE_KEY || '.' || cfg.TABLE_ID
|| '): ' || vCandidates(i).filename
|| ' | bytes=' || NVL(TO_CHAR(vBytes), 'NULL')
|| ' | cksum=' || CASE WHEN vChecksum IS NOT NULL THEN 'present' ELSE 'NULL' END);
vRegistered := vRegistered + 1;
EXCEPTION
WHEN DUP_VAL_ON_INDEX THEN
DBMS_OUTPUT.PUT_LINE('[SKIP-DUP] Config ' || cfg.A_SOURCE_FILE_CONFIG_KEY
|| ': file already registered: ' || vCandidates(i).filename);
vSkipped := vSkipped + 1;
WHEN OTHERS THEN
DBMS_OUTPUT.PUT_LINE('[ERROR] Config ' || cfg.A_SOURCE_FILE_CONFIG_KEY
|| ': failed to register ' || vCandidates(i).filename
|| ' - ' || SQLERRM);
vErrors := vErrors + 1;
END;
END LOOP;
END LOOP;
COMMIT;
DBMS_OUTPUT.PUT_LINE('============================================================');
DBMS_OUTPUT.PUT_LINE('SUMMARY');
DBMS_OUTPUT.PUT_LINE(' Registered : ' || vRegistered);
DBMS_OUTPUT.PUT_LINE(' Skipped : ' || vSkipped);
DBMS_OUTPUT.PUT_LINE(' Errors : ' || vErrors);
DBMS_OUTPUT.PUT_LINE('Finished: ' || TO_CHAR(SYSTIMESTAMP, 'YYYY-MM-DD HH24:MI:SS'));
DBMS_OUTPUT.PUT_LINE('============================================================');
IF vErrors > 0 THEN
RAISE_APPLICATION_ERROR(-20001,
'Registration completed with ' || vErrors || ' error(s). Review SERVEROUTPUT above.');
END IF;
END;
/

View File

@@ -0,0 +1,31 @@
-- ============================================================================
-- MARS-1409-POSTHOOK Rollback Step 92: Remove registered orphan files
-- ============================================================================
-- Purpose: Delete all records from A_SOURCE_FILE_RECEIVED that were inserted
-- by 03_MARS_1409_POSTHOOK_register_missing_files.sql.
-- Identified by PROCESS_NAME = 'MARS-1409'.
-- Author: Grzegorz Michalski
-- Date: 2026-03-24
-- WARNING: This deletes ALL rows with PROCESS_NAME = 'MARS-1409'.
-- Do NOT run if other scripts also populate PROCESS_NAME = 'MARS-1409'.
-- ============================================================================
SET SERVEROUTPUT ON SIZE UNLIMITED
WHENEVER SQLERROR EXIT SQL.SQLCODE
PROMPT
PROMPT Removing orphan file registrations inserted by MARS-1409 POSTHOOK step 03...
DECLARE
vDeleted NUMBER := 0;
BEGIN
DELETE FROM CT_MRDS.A_SOURCE_FILE_RECEIVED
WHERE PROCESS_NAME = 'MARS-1409';
vDeleted := SQL%ROWCOUNT;
COMMIT;
DBMS_OUTPUT.PUT_LINE('Deleted ' || vDeleted || ' record(s) with PROCESS_NAME = ''MARS-1409''');
DBMS_OUTPUT.PUT_LINE('Rollback of orphan file registration completed successfully');
END;
/

View File

@@ -96,11 +96,17 @@ PROMPT STEP 1: Backfill A_WORKFLOW_HISTORY_KEY for existing records
PROMPT ============================================================================
@@01_MARS_1409_POSTHOOK_update_existing_workflow_keys.sql
PROMPT
PROMPT ============================================================================
PROMPT STEP 3: Register files missing from A_SOURCE_FILE_RECEIVED
PROMPT ============================================================================
@@03_MARS_1409_POSTHOOK_register_missing_files.sql
PROMPT
PROMPT ============================================================================
PROMPT STEP 2: Diagnose workflow key status
PROMPT ============================================================================
@@02_MARS_1409_POSTHOOK_diagnose_workflow_key_status.sql
-- @@02_MARS_1409_POSTHOOK_diagnose_workflow_key_status.sql
PROMPT
PROMPT ============================================================================

View File

@@ -35,6 +35,7 @@ PROMPT This will reverse all changes from MARS-1409-POSTHOOK installation.
PROMPT
PROMPT Rollback steps:
PROMPT 1. Clear A_WORKFLOW_HISTORY_KEY values from A_SOURCE_FILE_RECEIVED
PROMPT 2. Delete orphan file registrations (PROCESS_NAME = 'MARS-1409')
PROMPT ============================================================================
-- Confirm rollback with user
@@ -54,6 +55,12 @@ PROMPT STEP 1: Clear backfilled A_WORKFLOW_HISTORY_KEY values
PROMPT ============================================================================
@@91_MARS_1409_POSTHOOK_rollback_workflow_keys.sql
PROMPT
PROMPT ============================================================================
PROMPT STEP 2: Delete orphan file registrations (PROCESS_NAME = 'MARS-1409')
PROMPT ============================================================================
@@92_MARS_1409_POSTHOOK_rollback_register_missing_files.sql
PROMPT
PROMPT ============================================================================
PROMPT MARS-1409-POSTHOOK Rollback Complete