Refactor MARS-1409 SQL scripts for workflow history key management

- Added checks for existing columns before adding or dropping A_WORKFLOW_HISTORY_KEY in relevant scripts to prevent errors.
- Updated rollback scripts to ensure proper restoration of previous states, including recompilation of dependent packages.
- Introduced a diagnostic script to assess the status of workflow keys against ODS tables, providing detailed reporting on discrepancies.
- Adjusted trigger definitions to accommodate new workflow names and ensure correct handling of workflow history.
- Modified master rollback script to streamline the rollback process and improve clarity in step descriptions.
This commit is contained in:
Grzegorz Michalski
2026-03-05 12:33:59 +01:00
parent 59e18d9b35
commit 113ea0a618
9 changed files with 524 additions and 159 deletions

View File

@@ -6,18 +6,40 @@
-- ============================================================================
SET SERVEROUTPUT ON SIZE UNLIMITED
WHENEVER SQLERROR EXIT SQL.SQLCODE
PROMPT
PROMPT Adding A_WORKFLOW_HISTORY_KEY column to A_SOURCE_FILE_RECEIVED...
-- Add A_WORKFLOW_HISTORY_KEY column (no FK constraint - workflow history record created later)
ALTER TABLE CT_MRDS.A_SOURCE_FILE_RECEIVED ADD (
A_WORKFLOW_HISTORY_KEY NUMBER
);
DECLARE
vColumnExists NUMBER := 0;
BEGIN
SELECT COUNT(*) INTO vColumnExists
FROM ALL_TAB_COLUMNS
WHERE OWNER = 'CT_MRDS'
AND TABLE_NAME = 'A_SOURCE_FILE_RECEIVED'
AND COLUMN_NAME = 'A_WORKFLOW_HISTORY_KEY';
-- Add column comment
COMMENT ON COLUMN CT_MRDS.A_SOURCE_FILE_RECEIVED.A_WORKFLOW_HISTORY_KEY IS
'Direct link to workflow history - each file has exactly one workflow execution. Populated during VALIDATE_SOURCE_FILE_RECEIVED (MARS-1409)';
IF vColumnExists > 0 THEN
DBMS_OUTPUT.PUT_LINE('SKIP: Column A_WORKFLOW_HISTORY_KEY already exists.');
RETURN;
END IF;
EXECUTE IMMEDIATE '
ALTER TABLE CT_MRDS.A_SOURCE_FILE_RECEIVED ADD (
A_WORKFLOW_HISTORY_KEY NUMBER
)';
EXECUTE IMMEDIATE '
COMMENT ON COLUMN CT_MRDS.A_SOURCE_FILE_RECEIVED.A_WORKFLOW_HISTORY_KEY IS
''Direct link to workflow history - each file has exactly one workflow execution. Populated during VALIDATE_SOURCE_FILE_RECEIVED (MARS-1409)''';
DBMS_OUTPUT.PUT_LINE('A_WORKFLOW_HISTORY_KEY column added successfully!');
EXCEPTION
WHEN OTHERS THEN
DBMS_OUTPUT.PUT_LINE('ERROR adding column: ' || SQLERRM);
RAISE;
END;
/
PROMPT A_WORKFLOW_HISTORY_KEY column added successfully!
PROMPT

View File

@@ -9,18 +9,22 @@
-- ============================================================================
SET SERVEROUTPUT ON SIZE UNLIMITED
WHENEVER SQLERROR EXIT SQL.SQLCODE
PROMPT
PROMPT Updating A_WORKFLOW_HISTORY_KEY for existing A_SOURCE_FILE_RECEIVED records...
DECLARE
vUpdatedTotal NUMBER := 0;
vUpdatedCurrent NUMBER := 0;
vFailedConfigs NUMBER := 0;
vSkippedConfigs NUMBER := 0;
vTableName VARCHAR2(200);
vSQL VARCHAR2(4000);
vRecordsToUpdate NUMBER := 0;
vUpdatedTotal NUMBER := 0;
vUpdatedCurrent NUMBER := 0;
vFailedConfigs NUMBER := 0;
vTableNotFound NUMBER := 0;
vSkippedConfigs NUMBER := 0;
vTableName VARCHAR2(200);
vSQL VARCHAR2(32767);
vRecordsToUpdate NUMBER := 0;
vRemainingTargeted NUMBER := 0;
vTableExists NUMBER := 0;
BEGIN
-- Count total records to update
@@ -32,9 +36,9 @@ BEGIN
DBMS_OUTPUT.PUT_LINE('Found ' || vRecordsToUpdate || ' records with NULL A_WORKFLOW_HISTORY_KEY');
DBMS_OUTPUT.PUT_LINE('----------------------------------------');
-- Process each INPUT configuration
-- Process each INPUT configuration that has records to update
FOR config_rec IN (
SELECT DISTINCT
SELECT
sfc.A_SOURCE_FILE_CONFIG_KEY,
sfc.A_SOURCE_KEY,
sfc.SOURCE_FILE_ID,
@@ -45,7 +49,7 @@ BEGIN
WHERE sfr.A_SOURCE_FILE_CONFIG_KEY = sfc.A_SOURCE_FILE_CONFIG_KEY
AND sfr.A_WORKFLOW_HISTORY_KEY IS NULL
AND sfr.PROCESSING_STATUS IN ('VALIDATED', 'READY_FOR_INGESTION', 'INGESTED', 'ARCHIVED', 'ARCHIVED_AND_TRASHED', 'ARCHIVED_AND_PURGED')
) as NULL_COUNT
) AS NULL_COUNT
FROM CT_MRDS.A_SOURCE_FILE_CONFIG sfc
WHERE sfc.SOURCE_FILE_TYPE = 'INPUT'
AND sfc.TABLE_ID IS NOT NULL
@@ -64,18 +68,31 @@ BEGIN
-- Construct ODS table name from TABLE_ID (ODS tables have _ODS suffix)
vTableName := 'ODS.' || config_rec.TABLE_ID || '_ODS';
-- Check table existence before attempting dynamic SQL
SELECT COUNT(*) INTO vTableExists
FROM ALL_TABLES
WHERE OWNER = 'ODS'
AND TABLE_NAME = config_rec.TABLE_ID || '_ODS';
IF vTableExists = 0 THEN
vTableNotFound := vTableNotFound + 1;
DBMS_OUTPUT.PUT_LINE('SKIP: Config ' || config_rec.A_SOURCE_FILE_CONFIG_KEY ||
' (' || config_rec.A_SOURCE_KEY || '/' || config_rec.SOURCE_FILE_ID || '/' || config_rec.TABLE_ID ||
') - ODS table not found: ' || vTableName);
CONTINUE;
END IF;
DBMS_OUTPUT.PUT_LINE('Processing config ' || config_rec.A_SOURCE_FILE_CONFIG_KEY ||
' (' || config_rec.A_SOURCE_KEY || '/' || config_rec.SOURCE_FILE_ID || '/' || config_rec.TABLE_ID || ')...');
-- Try to update using ODS table
-- Uses MIN to handle edge case of multiple workflow keys (shouldn't happen, but defensive)
-- Update using ODS table
vSQL :=
'UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED sfr ' ||
'SET A_WORKFLOW_HISTORY_KEY = ( ' ||
' SELECT MIN(t.A_WORKFLOW_HISTORY_KEY) ' ||
' SELECT t.A_WORKFLOW_HISTORY_KEY ' ||
' FROM ' || vTableName || ' t ' ||
' WHERE t.file$name = sfr.SOURCE_FILE_NAME ' ||
' AND t.A_WORKFLOW_HISTORY_KEY IS NOT NULL ' ||
' AND rownum=1 ' ||
') ' ||
'WHERE sfr.A_SOURCE_FILE_CONFIG_KEY = :config_key ' ||
' AND sfr.A_WORKFLOW_HISTORY_KEY IS NULL ' ||
@@ -83,7 +100,7 @@ BEGIN
' AND EXISTS ( ' ||
' SELECT 1 FROM ' || vTableName || ' t ' ||
' WHERE t.file$name = sfr.SOURCE_FILE_NAME ' ||
' AND t.A_WORKFLOW_HISTORY_KEY IS NOT NULL ' ||
' AND rownum=1 ' ||
' )';
EXECUTE IMMEDIATE vSQL USING config_rec.A_SOURCE_FILE_CONFIG_KEY;
@@ -100,7 +117,7 @@ BEGIN
EXCEPTION
WHEN OTHERS THEN
vFailedConfigs := vFailedConfigs + 1;
DBMS_OUTPUT.PUT_LINE(' ERROR: Failed for config ' || config_rec.A_SOURCE_FILE_CONFIG_KEY ||
DBMS_OUTPUT.PUT_LINE(' ERROR: Unexpected failure for config ' || config_rec.A_SOURCE_FILE_CONFIG_KEY ||
' (table: ' || vTableName || ')');
DBMS_OUTPUT.PUT_LINE(' Reason: ' || SQLERRM);
-- Continue processing other configurations despite this failure
@@ -113,25 +130,35 @@ BEGIN
DBMS_OUTPUT.PUT_LINE('Update Summary:');
DBMS_OUTPUT.PUT_LINE(' Total records updated: ' || vUpdatedTotal);
DBMS_OUTPUT.PUT_LINE(' Configurations skipped (no NULL records): ' || vSkippedConfigs);
DBMS_OUTPUT.PUT_LINE(' Configurations failed: ' || vFailedConfigs);
DBMS_OUTPUT.PUT_LINE(' Configurations skipped (ODS table not found): ' || vTableNotFound);
DBMS_OUTPUT.PUT_LINE(' Configurations failed (unexpected errors): ' || vFailedConfigs);
-- Check remaining NULL records
SELECT COUNT(*) INTO vRecordsToUpdate
-- Check remaining NULL records - targeted statuses only
SELECT COUNT(*) INTO vRemainingTargeted
FROM CT_MRDS.A_SOURCE_FILE_RECEIVED
WHERE A_WORKFLOW_HISTORY_KEY IS NULL
-- AND PROCESSING_STATUS IN ('VALIDATED', 'READY_FOR_INGESTION', 'INGESTED', 'ARCHIVED', 'ARCHIVED_AND_TRASHED', 'ARCHIVED_AND_PURGED')
;
AND PROCESSING_STATUS IN ('VALIDATED', 'READY_FOR_INGESTION', 'INGESTED', 'ARCHIVED', 'ARCHIVED_AND_TRASHED', 'ARCHIVED_AND_PURGED');
DBMS_OUTPUT.PUT_LINE(' Remaining NULL records: ' || vRecordsToUpdate);
-- Check all remaining NULL records (includes RECEIVED, VALIDATION_FAILED)
SELECT COUNT(*) INTO vRecordsToUpdate
FROM CT_MRDS.A_SOURCE_FILE_RECEIVED
WHERE A_WORKFLOW_HISTORY_KEY IS NULL;
IF vRecordsToUpdate > 0 THEN
DBMS_OUTPUT.PUT_LINE(' Remaining NULL records (targeted statuses): ' || vRemainingTargeted);
DBMS_OUTPUT.PUT_LINE(' Remaining NULL records (all statuses): ' || vRecordsToUpdate);
IF vRemainingTargeted > 0 THEN
DBMS_OUTPUT.PUT_LINE('');
DBMS_OUTPUT.PUT_LINE('NOTE: Some records still have NULL A_WORKFLOW_HISTORY_KEY.');
DBMS_OUTPUT.PUT_LINE(' This is expected for:');
DBMS_OUTPUT.PUT_LINE(' - Files not yet ingested into ODS tables');
DBMS_OUTPUT.PUT_LINE(' - Files with status RECEIVED or VALIDATION_FAILED');
DBMS_OUTPUT.PUT_LINE(' - ODS tables that do not exist or have different structure');
DBMS_OUTPUT.PUT_LINE(' These records will be populated when files are processed through VALIDATE_SOURCE_FILE_RECEIVED');
DBMS_OUTPUT.PUT_LINE('NOTE: Some records with targeted statuses still have NULL A_WORKFLOW_HISTORY_KEY.');
DBMS_OUTPUT.PUT_LINE(' This is expected for files not yet ingested into ODS tables');
DBMS_OUTPUT.PUT_LINE(' or ODS tables with a different structure.');
DBMS_OUTPUT.PUT_LINE(' These records will be populated when files are re-processed.');
END IF;
IF vFailedConfigs > 0 THEN
DBMS_OUTPUT.PUT_LINE('');
DBMS_OUTPUT.PUT_LINE('WARNING: ' || vFailedConfigs || ' configuration(s) failed with unexpected errors.');
DBMS_OUTPUT.PUT_LINE(' Review the ERROR lines above and investigate manually.');
END IF;
DBMS_OUTPUT.PUT_LINE('----------------------------------------');

View File

@@ -12,14 +12,15 @@ PROMPT
DECLARE
v_status VARCHAR2(20);
BEGIN
-- After rollback the trigger is restored under its original name: a_workflow_history
SELECT status INTO v_status
FROM all_objects
WHERE owner = 'CT_MRDS'
AND object_name = 'TRG_A_WORKFLOW_HISTORY'
AND object_name = 'A_WORKFLOW_HISTORY'
AND object_type = 'TRIGGER';
DBMS_OUTPUT.PUT_LINE('TRG_A_WORKFLOW_HISTORY restored, status: ' || v_status);
DBMS_OUTPUT.PUT_LINE('A_WORKFLOW_HISTORY (original trigger) restored, status: ' || v_status);
EXCEPTION
WHEN NO_DATA_FOUND THEN
RAISE_APPLICATION_ERROR(-20001, 'ERROR: TRG_A_WORKFLOW_HISTORY not found after rollback');
RAISE_APPLICATION_ERROR(-20001, 'ERROR: A_WORKFLOW_HISTORY not found after rollback');
END;
/

View File

@@ -1,54 +0,0 @@
-- ============================================================================
-- MARS-1409 Rollback 91A: Clear A_WORKFLOW_HISTORY_KEY for existing records
-- ============================================================================
-- Purpose: Set A_WORKFLOW_HISTORY_KEY to NULL for all existing records
-- This is part of rollback process - restores state before migration
-- Note: Cannot restore exact previous values (we don't track which were NULL)
-- This script sets ALL values to NULL to ensure clean rollback state
-- ============================================================================
SET SERVEROUTPUT ON SIZE UNLIMITED
PROMPT
PROMPT Clearing A_WORKFLOW_HISTORY_KEY for all A_SOURCE_FILE_RECEIVED records...
DECLARE
vTotalRecords NUMBER := 0;
vClearedRecords NUMBER := 0;
BEGIN
-- Count total records
SELECT COUNT(*) INTO vTotalRecords
FROM CT_MRDS.A_SOURCE_FILE_RECEIVED;
DBMS_OUTPUT.PUT_LINE('Total records in A_SOURCE_FILE_RECEIVED: ' || vTotalRecords);
DBMS_OUTPUT.PUT_LINE('----------------------------------------');
-- Clear A_WORKFLOW_HISTORY_KEY for all records
UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED
SET A_WORKFLOW_HISTORY_KEY = NULL
WHERE A_WORKFLOW_HISTORY_KEY IS NOT NULL;
vClearedRecords := SQL%ROWCOUNT;
COMMIT;
DBMS_OUTPUT.PUT_LINE('Rollback Summary:');
DBMS_OUTPUT.PUT_LINE(' Records with A_WORKFLOW_HISTORY_KEY cleared: ' || vClearedRecords);
DBMS_OUTPUT.PUT_LINE(' Records already NULL: ' || (vTotalRecords - vClearedRecords));
DBMS_OUTPUT.PUT_LINE('----------------------------------------');
DBMS_OUTPUT.PUT_LINE('NOTE: All A_WORKFLOW_HISTORY_KEY values set to NULL');
DBMS_OUTPUT.PUT_LINE(' Original values cannot be restored (not tracked before migration)');
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
DBMS_OUTPUT.PUT_LINE('ERROR: ' || SQLERRM);
DBMS_OUTPUT.PUT_LINE('Transaction rolled back');
RAISE;
END;
/
PROMPT
PROMPT Workflow keys cleared successfully!
PROMPT

View File

@@ -5,13 +5,40 @@
-- ============================================================================
SET SERVEROUTPUT ON SIZE UNLIMITED
WHENEVER SQLERROR EXIT SQL.SQLCODE
PROMPT
PROMPT Dropping A_WORKFLOW_HISTORY_KEY column from A_SOURCE_FILE_RECEIVED...
-- Drop A_WORKFLOW_HISTORY_KEY column (no FK constraint to drop first)
ALTER TABLE CT_MRDS.A_SOURCE_FILE_RECEIVED
DROP COLUMN A_WORKFLOW_HISTORY_KEY;
DECLARE
vColumnExists NUMBER := 0;
BEGIN
SELECT COUNT(*) INTO vColumnExists
FROM ALL_TAB_COLUMNS
WHERE OWNER = 'CT_MRDS'
AND TABLE_NAME = 'A_SOURCE_FILE_RECEIVED'
AND COLUMN_NAME = 'A_WORKFLOW_HISTORY_KEY';
IF vColumnExists = 0 THEN
DBMS_OUTPUT.PUT_LINE('SKIP: Column A_WORKFLOW_HISTORY_KEY does not exist (already dropped).');
RETURN;
END IF;
EXECUTE IMMEDIATE 'ALTER TABLE CT_MRDS.A_SOURCE_FILE_RECEIVED DROP COLUMN A_WORKFLOW_HISTORY_KEY';
DBMS_OUTPUT.PUT_LINE('A_WORKFLOW_HISTORY_KEY column dropped successfully!');
-- Recompile packages invalidated by column drop
EXECUTE IMMEDIATE 'ALTER PACKAGE CT_MRDS.ENV_MANAGER COMPILE BODY';
EXECUTE IMMEDIATE 'ALTER PACKAGE CT_MRDS.FILE_MANAGER COMPILE';
EXECUTE IMMEDIATE 'ALTER PACKAGE CT_MRDS.FILE_MANAGER COMPILE BODY';
EXECUTE IMMEDIATE 'ALTER PACKAGE CT_MRDS.FILE_ARCHIVER COMPILE';
EXECUTE IMMEDIATE 'ALTER PACKAGE CT_MRDS.FILE_ARCHIVER COMPILE BODY';
DBMS_OUTPUT.PUT_LINE('Dependent packages recompiled.');
EXCEPTION
WHEN OTHERS THEN
DBMS_OUTPUT.PUT_LINE('ERROR dropping column: ' || SQLERRM);
RAISE;
END;
/
PROMPT A_WORKFLOW_HISTORY_KEY column dropped successfully!
PROMPT

View File

@@ -0,0 +1,319 @@
-- ============================================================================
-- MARS-1409 Diagnostic: Workflow key status after step 09
-- ============================================================================
-- Purpose: For each INPUT config with an ODS table, report:
-- [A] Files present in ODS bucket but NOT registered in A_SOURCE_FILE_RECEIVED
-- [B] Files registered in A_SOURCE_FILE_RECEIVED but NOT in ODS bucket
-- [C] Files present in both - with A_WORKFLOW_HISTORY_KEY populated
-- [D] Files present in both - A_WORKFLOW_HISTORY_KEY still NULL
--
-- Can be run at any time, read-only (no DML).
-- ============================================================================
SET SERVEROUTPUT ON SIZE UNLIMITED
SET LINESIZE 200
PROMPT
PROMPT ============================================================================
PROMPT Diagnosing workflow key status (ODS bucket vs A_SOURCE_FILE_RECEIVED)
PROMPT ============================================================================
PROMPT
DECLARE
TYPE tStringList IS TABLE OF VARCHAR2(500);
vTableName VARCHAR2(200);
vTableExists NUMBER;
vBucketEmpty BOOLEAN;
vRefCursor SYS_REFCURSOR;
vFileName VARCHAR2(500);
-- Per-config counters
vOnlyInBucket NUMBER;
vOnlyInDB NUMBER;
vInBothWithKey NUMBER;
vInBothNoKey NUMBER;
-- Grand totals
vConfigsChecked NUMBER := 0;
vConfigsWithIssues NUMBER := 0;
vTotalOnlyInBucket NUMBER := 0;
vTotalOnlyInDB NUMBER := 0;
vTotalInBothWithKey NUMBER := 0;
vTotalInBothNoKey NUMBER := 0;
-- How many individual file names to print per category before summarising
cMaxPrint CONSTANT NUMBER := 1000;
vPrinted NUMBER;
BEGIN
FOR config_rec IN (
SELECT sfc.A_SOURCE_FILE_CONFIG_KEY,
sfc.A_SOURCE_KEY,
sfc.SOURCE_FILE_ID,
sfc.TABLE_ID
FROM CT_MRDS.A_SOURCE_FILE_CONFIG sfc
WHERE sfc.SOURCE_FILE_TYPE = 'INPUT'
AND sfc.TABLE_ID IS NOT NULL
ORDER BY sfc.A_SOURCE_KEY, sfc.SOURCE_FILE_ID, sfc.TABLE_ID
) LOOP
vTableName := 'ODS.' || config_rec.TABLE_ID || '_ODS';
SELECT COUNT(*) INTO vTableExists
FROM ALL_TABLES
WHERE OWNER = 'ODS'
AND TABLE_NAME = config_rec.TABLE_ID || '_ODS';
IF vTableExists = 0 THEN
CONTINUE;
END IF;
-- Check if the bucket location has any files at all
-- (empty bucket raises ORA-29913 instead of returning 0 rows)
vBucketEmpty := FALSE;
BEGIN
EXECUTE IMMEDIATE
'SELECT COUNT(*) FROM ' || vTableName || ' t WHERE ROWNUM = 1'
INTO vTableExists;
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE = -29913 THEN
vBucketEmpty := TRUE;
ELSE
RAISE;
END IF;
END;
IF vBucketEmpty THEN
-- Bucket is empty: nothing in ODS, but registered records are all "not in bucket"
vOnlyInBucket := 0;
SELECT COUNT(*) INTO vOnlyInDB
FROM CT_MRDS.A_SOURCE_FILE_RECEIVED sfr
WHERE sfr.A_SOURCE_FILE_CONFIG_KEY = config_rec.A_SOURCE_FILE_CONFIG_KEY
AND sfr.PROCESSING_STATUS IN ('VALIDATED','READY_FOR_INGESTION','INGESTED','ARCHIVED','ARCHIVED_AND_TRASHED','ARCHIVED_AND_PURGED');
vInBothWithKey := 0;
vInBothNoKey := 0;
ELSE
-- ----------------------------------------------------------------
-- [A] In ODS bucket but NOT in A_SOURCE_FILE_RECEIVED
-- ----------------------------------------------------------------
EXECUTE IMMEDIATE
'SELECT COUNT(DISTINCT t.file$name) ' ||
'FROM ' || vTableName || ' t ' ||
'WHERE t.file$name IS NOT NULL ' ||
' AND NOT EXISTS ( ' ||
' SELECT 1 FROM CT_MRDS.A_SOURCE_FILE_RECEIVED sfr ' ||
' WHERE sfr.SOURCE_FILE_NAME = t.file$name ' ||
' AND sfr.A_SOURCE_FILE_CONFIG_KEY = :1)'
INTO vOnlyInBucket
USING config_rec.A_SOURCE_FILE_CONFIG_KEY;
-- ----------------------------------------------------------------
-- [B] In A_SOURCE_FILE_RECEIVED (targeted statuses) but NOT in ODS bucket
-- ----------------------------------------------------------------
EXECUTE IMMEDIATE
'SELECT COUNT(*) ' ||
'FROM CT_MRDS.A_SOURCE_FILE_RECEIVED sfr ' ||
'WHERE sfr.A_SOURCE_FILE_CONFIG_KEY = :1 ' ||
' AND sfr.PROCESSING_STATUS IN (''VALIDATED'',''READY_FOR_INGESTION'',''INGESTED'',''ARCHIVED'',''ARCHIVED_AND_TRASHED'',''ARCHIVED_AND_PURGED'') ' ||
' AND NOT EXISTS ( ' ||
' SELECT 1 FROM ' || vTableName || ' t ' ||
' WHERE t.file$name = sfr.SOURCE_FILE_NAME)'
INTO vOnlyInDB
USING config_rec.A_SOURCE_FILE_CONFIG_KEY;
-- ----------------------------------------------------------------
-- [C] In both, A_WORKFLOW_HISTORY_KEY IS NOT NULL
-- ----------------------------------------------------------------
EXECUTE IMMEDIATE
'SELECT COUNT(DISTINCT sfr.A_SOURCE_FILE_RECEIVED_KEY) ' ||
'FROM CT_MRDS.A_SOURCE_FILE_RECEIVED sfr ' ||
'WHERE sfr.A_SOURCE_FILE_CONFIG_KEY = :1 ' ||
' AND sfr.A_WORKFLOW_HISTORY_KEY IS NOT NULL ' ||
' AND sfr.PROCESSING_STATUS IN (''VALIDATED'',''READY_FOR_INGESTION'',''INGESTED'',''ARCHIVED'',''ARCHIVED_AND_TRASHED'',''ARCHIVED_AND_PURGED'') ' ||
' AND EXISTS ( ' ||
' SELECT 1 FROM ' || vTableName || ' t ' ||
' WHERE t.file$name = sfr.SOURCE_FILE_NAME)'
INTO vInBothWithKey
USING config_rec.A_SOURCE_FILE_CONFIG_KEY;
-- ----------------------------------------------------------------
-- [D] In both, A_WORKFLOW_HISTORY_KEY IS NULL
-- ----------------------------------------------------------------
EXECUTE IMMEDIATE
'SELECT COUNT(DISTINCT sfr.A_SOURCE_FILE_RECEIVED_KEY) ' ||
'FROM CT_MRDS.A_SOURCE_FILE_RECEIVED sfr ' ||
'WHERE sfr.A_SOURCE_FILE_CONFIG_KEY = :1 ' ||
' AND sfr.A_WORKFLOW_HISTORY_KEY IS NULL ' ||
' AND sfr.PROCESSING_STATUS IN (''VALIDATED'',''READY_FOR_INGESTION'',''INGESTED'',''ARCHIVED'',''ARCHIVED_AND_TRASHED'',''ARCHIVED_AND_PURGED'') ' ||
' AND EXISTS ( ' ||
' SELECT 1 FROM ' || vTableName || ' t ' ||
' WHERE t.file$name = sfr.SOURCE_FILE_NAME)'
INTO vInBothNoKey
USING config_rec.A_SOURCE_FILE_CONFIG_KEY;
END IF; -- vBucketEmpty
-- Skip configs with nothing to report
IF vOnlyInBucket = 0 AND vOnlyInDB = 0 AND vInBothWithKey = 0 AND vInBothNoKey = 0 THEN
CONTINUE;
END IF;
vConfigsChecked := vConfigsChecked + 1;
DBMS_OUTPUT.PUT_LINE('Config ' || config_rec.A_SOURCE_FILE_CONFIG_KEY ||
' (' || config_rec.A_SOURCE_KEY ||
'/' || config_rec.SOURCE_FILE_ID ||
'/' || config_rec.TABLE_ID || ')');
DBMS_OUTPUT.PUT_LINE(' [A] In bucket, not registered: ' || vOnlyInBucket);
DBMS_OUTPUT.PUT_LINE(' [B] Registered, not in bucket: ' || vOnlyInDB);
DBMS_OUTPUT.PUT_LINE(' [C] In both, A_WORKFLOW_HISTORY_KEY set: ' || vInBothWithKey);
DBMS_OUTPUT.PUT_LINE(' [D] In both, A_WORKFLOW_HISTORY_KEY NULL: ' || vInBothNoKey);
-- Print individual file names for categories with problems
IF vOnlyInBucket > 0 THEN
DBMS_OUTPUT.PUT_LINE(' [A] Files in bucket not registered:');
vPrinted := 0;
OPEN vRefCursor FOR
'SELECT DISTINCT t.file$name ' ||
'FROM ' || vTableName || ' t ' ||
'WHERE t.file$name IS NOT NULL ' ||
' AND NOT EXISTS ( ' ||
' SELECT 1 FROM CT_MRDS.A_SOURCE_FILE_RECEIVED sfr ' ||
' WHERE sfr.SOURCE_FILE_NAME = t.file$name ' ||
' AND sfr.A_SOURCE_FILE_CONFIG_KEY = :1) ' ||
'ORDER BY t.file$name'
USING config_rec.A_SOURCE_FILE_CONFIG_KEY;
LOOP
FETCH vRefCursor INTO vFileName;
EXIT WHEN vRefCursor%NOTFOUND;
vPrinted := vPrinted + 1;
IF vPrinted <= cMaxPrint THEN
DBMS_OUTPUT.PUT_LINE(' ' || vFileName);
ELSIF vPrinted = cMaxPrint + 1 THEN
DBMS_OUTPUT.PUT_LINE(' ... and ' || (vOnlyInBucket - cMaxPrint) || ' more');
END IF;
END LOOP;
CLOSE vRefCursor;
END IF;
IF vOnlyInDB > 0 THEN
vConfigsWithIssues := vConfigsWithIssues + 1;
DBMS_OUTPUT.PUT_LINE(' [B] Registered files not found in bucket:');
vPrinted := 0;
OPEN vRefCursor FOR
'SELECT sfr.SOURCE_FILE_NAME, sfr.PROCESSING_STATUS, sfr.A_WORKFLOW_HISTORY_KEY ' ||
'FROM CT_MRDS.A_SOURCE_FILE_RECEIVED sfr ' ||
'WHERE sfr.A_SOURCE_FILE_CONFIG_KEY = :1 ' ||
' AND sfr.PROCESSING_STATUS IN (''VALIDATED'',''READY_FOR_INGESTION'',''INGESTED'',''ARCHIVED'',''ARCHIVED_AND_TRASHED'',''ARCHIVED_AND_PURGED'') ' ||
' AND NOT EXISTS ( ' ||
' SELECT 1 FROM ' || vTableName || ' t ' ||
' WHERE t.file$name = sfr.SOURCE_FILE_NAME) ' ||
'ORDER BY sfr.SOURCE_FILE_NAME'
USING config_rec.A_SOURCE_FILE_CONFIG_KEY;
LOOP
DECLARE
vStatus VARCHAR2(50);
vWfKey NUMBER;
BEGIN
FETCH vRefCursor INTO vFileName, vStatus, vWfKey;
EXIT WHEN vRefCursor%NOTFOUND;
vPrinted := vPrinted + 1;
IF vPrinted <= cMaxPrint THEN
DBMS_OUTPUT.PUT_LINE(' ' || vFileName ||
' status=' || vStatus ||
' wf_key=' || NVL(TO_CHAR(vWfKey), 'NULL'));
ELSIF vPrinted = cMaxPrint + 1 THEN
DBMS_OUTPUT.PUT_LINE(' ... and ' || (vOnlyInDB - cMaxPrint) || ' more');
END IF;
END;
END LOOP;
CLOSE vRefCursor;
END IF;
IF vInBothNoKey > 0 THEN
vConfigsWithIssues := vConfigsWithIssues + 1;
DBMS_OUTPUT.PUT_LINE(' [D] Files in both but A_WORKFLOW_HISTORY_KEY still NULL:');
vPrinted := 0;
OPEN vRefCursor FOR
'SELECT sfr.SOURCE_FILE_NAME, sfr.PROCESSING_STATUS ' ||
'FROM CT_MRDS.A_SOURCE_FILE_RECEIVED sfr ' ||
'WHERE sfr.A_SOURCE_FILE_CONFIG_KEY = :1 ' ||
' AND sfr.A_WORKFLOW_HISTORY_KEY IS NULL ' ||
' AND sfr.PROCESSING_STATUS IN (''VALIDATED'',''READY_FOR_INGESTION'',''INGESTED'',''ARCHIVED'',''ARCHIVED_AND_TRASHED'',''ARCHIVED_AND_PURGED'') ' ||
' AND EXISTS ( ' ||
' SELECT 1 FROM ' || vTableName || ' t ' ||
' WHERE t.file$name = sfr.SOURCE_FILE_NAME) ' ||
'ORDER BY sfr.SOURCE_FILE_NAME'
USING config_rec.A_SOURCE_FILE_CONFIG_KEY;
LOOP
DECLARE
vStatus VARCHAR2(50);
BEGIN
FETCH vRefCursor INTO vFileName, vStatus;
EXIT WHEN vRefCursor%NOTFOUND;
vPrinted := vPrinted + 1;
IF vPrinted <= cMaxPrint THEN
DBMS_OUTPUT.PUT_LINE(' ' || vFileName || ' status=' || vStatus);
ELSIF vPrinted = cMaxPrint + 1 THEN
DBMS_OUTPUT.PUT_LINE(' ... and ' || (vInBothNoKey - cMaxPrint) || ' more');
END IF;
END;
END LOOP;
CLOSE vRefCursor;
END IF;
DBMS_OUTPUT.PUT_LINE('');
-- Accumulate totals
vTotalOnlyInBucket := vTotalOnlyInBucket + vOnlyInBucket;
vTotalOnlyInDB := vTotalOnlyInDB + vOnlyInDB;
vTotalInBothWithKey := vTotalInBothWithKey + vInBothWithKey;
vTotalInBothNoKey := vTotalInBothNoKey + vInBothNoKey;
END LOOP;
DBMS_OUTPUT.PUT_LINE('============================================================================');
DBMS_OUTPUT.PUT_LINE('Grand Summary:');
DBMS_OUTPUT.PUT_LINE(' Configs with data checked: ' || vConfigsChecked);
DBMS_OUTPUT.PUT_LINE(' Configs with issues (B or D): ' || vConfigsWithIssues);
DBMS_OUTPUT.PUT_LINE(' [A] Files in bucket, not registered: ' || vTotalOnlyInBucket);
DBMS_OUTPUT.PUT_LINE(' [B] Registered, not in bucket: ' || vTotalOnlyInDB);
DBMS_OUTPUT.PUT_LINE(' [C] In both - A_WORKFLOW_HISTORY_KEY set: ' || vTotalInBothWithKey);
DBMS_OUTPUT.PUT_LINE(' [D] In both - A_WORKFLOW_HISTORY_KEY NULL: ' || vTotalInBothNoKey);
DBMS_OUTPUT.PUT_LINE('============================================================================');
IF vTotalOnlyInDB > 0 THEN
DBMS_OUTPUT.PUT_LINE('');
DBMS_OUTPUT.PUT_LINE('WARNING [B]: ' || vTotalOnlyInDB || ' registered file(s) not found in ODS bucket.');
DBMS_OUTPUT.PUT_LINE(' These may have been moved to ARCHIVE or deleted from ODS.');
END IF;
IF vTotalInBothNoKey > 0 THEN
DBMS_OUTPUT.PUT_LINE('');
DBMS_OUTPUT.PUT_LINE('WARNING [D]: ' || vTotalInBothNoKey || ' file(s) present in both but A_WORKFLOW_HISTORY_KEY is still NULL.');
DBMS_OUTPUT.PUT_LINE(' ODS table rows for these files may have A_WORKFLOW_HISTORY_KEY = NULL.');
DBMS_OUTPUT.PUT_LINE(' Re-run step 09 after the ODS rows are populated by the pipeline.');
END IF;
IF vConfigsWithIssues = 0 THEN
DBMS_OUTPUT.PUT_LINE('');
DBMS_OUTPUT.PUT_LINE('OK: No issues found. All registered files in ODS have A_WORKFLOW_HISTORY_KEY assigned.');
END IF;
EXCEPTION
WHEN OTHERS THEN
IF vRefCursor%ISOPEN THEN
CLOSE vRefCursor;
END IF;
DBMS_OUTPUT.PUT_LINE('ERROR: ' || SQLERRM);
RAISE;
END;
/
PROMPT
PROMPT Diagnosis complete.
PROMPT

View File

@@ -1,39 +1,55 @@
-- ====================================================================
WHENEVER SQLERROR CONTINUE
GRANT SELECT, INSERT, UPDATE, DELETE ON ct_ods.a_load_history TO ct_mrds;
WHENEVER SQLERROR EXIT SQL.SQLCODE
-- ============================================================================
-- TRG_A_WORKFLOW_HISTORY Trigger Definition
-- ====================================================================
-- Purpose: Trigger to:
-- 1. Insert workflow completion data to CT_ODS.A_LOAD_HISTORY
-- 2. MARS-1409: Mark linked A_SOURCE_FILE_RECEIVED records as INGESTED
-- ====================================================================
-- ============================================================================
-- Drop old trigger (different name) to ensure clean rename; ignore error if not exists
WHENEVER SQLERROR CONTINUE
drop TRIGGER ct_mrds.a_workflow_history;
WHENEVER SQLERROR EXIT SQL.SQLCODE
CREATE OR REPLACE EDITIONABLE TRIGGER "CT_MRDS"."TRG_A_WORKFLOW_HISTORY"
AFTER INSERT OR UPDATE OF workflow_successful ON CT_MRDS.A_WORKFLOW_HISTORY
AFTER INSERT OR UPDATE OF workflow_successful ON ct_mrds.a_workflow_history
REFERENCING NEW AS new OLD AS old
FOR EACH ROW
DECLARE
v_workflow_name VARCHAR2(128);
v_wla_id NUMBER;
BEGIN
-- Original logic: Insert into CT_ODS.A_LOAD_HISTORY for specific ODS workflows
IF :new.workflow_name IN ('w_ODS_LM_STANDING_FACILITIES', 'w_ODS_CSDB_DEBT', 'w_ODS_CSDB_DEBT_DAILY', 'w_ODS_CSDB_RATINGS_FULL') AND :new.service_name = 'ODS' THEN
IF :new.workflow_successful <> :old.workflow_successful AND :new.workflow_successful = 'Y' THEN
IF :new.workflow_name = 'w_ODS_LM_STANDING_FACILITIES' THEN
v_workflow_name := 'w_ODS_LM_STANDING_FACILITY';
IF :new.service_name = 'ODS' AND :new.workflow_name IN (
'w_ODS_LM_STANDING_FACILITIES', 'w_ODS_CSDB_DEBT', 'w_ODS_CSDB_DEBT_DAILY', 'w_ODS_CSDB_RATINGS_FULL',
'w_ODS_TMS_LIMIT_ACCESS', 'w_ODS_TMS_PORTFOLIO_ACCESS', 'w_ODS_TMS_PORTFOLIO_TREE',
'w_ODS_TMS_COLLATERAL_INVENTORY', 'w_ODS_TOP_FULLBIDARRAY_COMPILED', 'w_ODS_TOP_ANNOUNCEMENT',
'w_ODS_TOP_ALLOTMENT_MODIFICATIONS', 'w_ODS_TOP_ALLOTMENT', 'w_ODS_CEPH_PRICING', 'w_ODS_C2D_MPEC'
) THEN
IF :new.workflow_successful = 'Y' AND :new.workflow_successful <> NVL(:old.workflow_successful, 'N') THEN
CASE
WHEN :new.workflow_name = 'w_ODS_LM_STANDING_FACILITIES' THEN v_workflow_name := 'w_ODS_LM_STANDING_FACILITY';
WHEN :new.workflow_name = 'w_ODS_TMS_LIMIT_ACCESS' THEN v_workflow_name := 'w_ODS_TMS_RAR_LIMITACCESS';
WHEN :new.workflow_name = 'w_ODS_TMS_PORTFOLIO_ACCESS' THEN v_workflow_name := 'w_ODS_TMS_RAR_PORTFOLIOACCESS';
WHEN :new.workflow_name = 'w_ODS_TMS_PORTFOLIO_TREE' THEN v_workflow_name := 'w_ODS_TMS_RAR_PORTFOLIOTREE';
WHEN :new.workflow_name = 'w_ODS_TMS_COLLATERAL_INVENTORY' THEN v_workflow_name := 'w_ODS_TMS_RAR_RARCOLLATERALINVENTORY';
WHEN :new.workflow_name = 'w_ODS_TOP_FULLBIDARRAY_COMPILED' THEN v_workflow_name := 'w_ODS_TOP_FULLBIDARRAY_COMPILED';
WHEN :new.workflow_name = 'w_ODS_TOP_ANNOUNCEMENT' THEN v_workflow_name := 'w_ODS_TOP_ANNOUNCEMENT';
WHEN :new.workflow_name = 'w_ODS_TOP_ALLOTMENT_MODIFICATIONS' THEN v_workflow_name := 'w_ODS_TOP_ALLOTMENT_MODIFICATIONS';
WHEN :new.workflow_name = 'w_ODS_TOP_ALLOTMENT' THEN v_workflow_name := 'w_ODS_TOP_ALLOTMENT';
WHEN :new.workflow_name = 'w_ODS_CEPH_PRICING' THEN v_workflow_name := 'w_ODS_CEPH_PRICING';
WHEN :new.workflow_name = 'w_ODS_C2D_MPEC' THEN v_workflow_name := 'w_ODS_C2D_MPEC';
ELSE
v_workflow_name := :new.workflow_name;
END IF;
BEGIN
v_wla_id := TO_NUMBER(:new.orchestration_run_id);
EXCEPTION
WHEN OTHERS THEN
NULL;
END;
INSERT INTO CT_ODS.A_LOAD_HISTORY (
a_etl_load_set_key, workflow_name, infa_run_id, load_start, load_end,
exdi_appl_req_id, exdi_correlation_id, load_successful, wla_run_id, dq_flag
END CASE;
BEGIN
v_wla_id := TO_NUMBER(:new.orchestration_run_id);
EXCEPTION WHEN OTHERS THEN NULL;
END;
INSERT INTO ct_ods.a_load_history (
a_etl_load_set_key, workflow_name, infa_run_id, load_start, load_end, exdi_appl_req_id, exdi_correlation_id, load_successful, wla_run_id, dq_flag
) VALUES (
:new.a_workflow_history_key, v_workflow_name, NULL, :new.workflow_start, :new.workflow_end,
NULL, NULL, :new.workflow_successful, v_wla_id, 'F'
:new.a_workflow_history_key, v_workflow_name, NULL, :new.workflow_start, :new.workflow_end, NULL, NULL, :new.workflow_successful, v_wla_id, 'F'
);
END IF;
END IF;

View File

@@ -91,19 +91,13 @@ PROMPT =========================================================================
PROMPT
PROMPT ============================================================================
PROMPT STEP 8: Clear A_WORKFLOW_HISTORY_KEY values from existing records
PROMPT ============================================================================
@@98_MARS_1409_rollback_existing_workflow_keys.sql
PROMPT
PROMPT ============================================================================
PROMPT STEP 9: Drop A_WORKFLOW_HISTORY_KEY column
PROMPT STEP 8: Drop A_WORKFLOW_HISTORY_KEY column
PROMPT ============================================================================
@@99_MARS_1409_rollback_workflow_history_key_column.sql
PROMPT
PROMPT ============================================================================
PROMPT STEP 10: Verify rollback
PROMPT STEP 9: Verify rollback
PROMPT ============================================================================
@@90_MARS_1409_verify_rollback.sql

View File

@@ -1,37 +1,50 @@
-- ====================================================================
-- TRG_A_WORKFLOW_HISTORY Trigger Definition (rollback version)
-- ====================================================================
-- Purpose: Restore trigger to pre-MARS-1409 state
-- Handles only CT_ODS.A_LOAD_HISTORY inserts for ODS workflows
-- ====================================================================
WHENEVER SQLERROR CONTINUE
GRANT SELECT, INSERT, UPDATE, DELETE ON ct_ods.a_load_history TO ct_mrds;
WHENEVER SQLERROR EXIT SQL.SQLCODE
CREATE OR REPLACE EDITIONABLE TRIGGER "CT_MRDS"."TRG_A_WORKFLOW_HISTORY"
AFTER INSERT OR UPDATE OF workflow_successful ON CT_MRDS.A_WORKFLOW_HISTORY
-- Drop new trigger (MARS-1409 name) before restoring old trigger name
WHENEVER SQLERROR CONTINUE
drop TRIGGER ct_mrds.TRG_A_WORKFLOW_HISTORY;
WHENEVER SQLERROR EXIT SQL.SQLCODE
create or replace TRIGGER ct_mrds.a_workflow_history
AFTER INSERT OR UPDATE OF workflow_successful ON ct_mrds.a_workflow_history
REFERENCING NEW AS new OLD AS old
FOR EACH ROW
DECLARE
v_workflow_name VARCHAR2(128);
v_wla_id NUMBER;
BEGIN
IF :new.workflow_name IN ('w_ODS_LM_STANDING_FACILITIES', 'w_ODS_CSDB_DEBT', 'w_ODS_CSDB_DEBT_DAILY', 'w_ODS_CSDB_RATINGS_FULL') AND :new.service_name = 'ODS' THEN
IF :new.workflow_successful <> :old.workflow_successful AND :new.workflow_successful = 'Y' THEN
IF :new.workflow_name = 'w_ODS_LM_STANDING_FACILITIES' THEN
v_workflow_name := 'w_ODS_LM_STANDING_FACILITY';
IF :new.service_name = 'ODS' AND :new.workflow_name IN (
'w_ODS_LM_STANDING_FACILITIES', 'w_ODS_CSDB_DEBT', 'w_ODS_CSDB_DEBT_DAILY', 'w_ODS_CSDB_RATINGS_FULL',
'w_ODS_TMS_LIMIT_ACCESS', 'w_ODS_TMS_PORTFOLIO_ACCESS', 'w_ODS_TMS_PORTFOLIO_TREE',
'w_ODS_TMS_COLLATERAL_INVENTORY', 'w_ODS_TOP_FULLBIDARRAY_COMPILED', 'w_ODS_TOP_ANNOUNCEMENT',
'w_ODS_TOP_ALLOTMENT_MODIFICATIONS', 'w_ODS_TOP_ALLOTMENT', 'w_ODS_CEPH_PRICING', 'w_ODS_C2D_MPEC'
) THEN
IF :new.workflow_successful = 'Y' AND :new.workflow_successful <> NVL(:old.workflow_successful, 'N') THEN
CASE
WHEN :new.workflow_name = 'w_ODS_LM_STANDING_FACILITIES' THEN v_workflow_name := 'w_ODS_LM_STANDING_FACILITY';
WHEN :new.workflow_name = 'w_ODS_TMS_LIMIT_ACCESS' THEN v_workflow_name := 'w_ODS_TMS_RAR_LIMITACCESS';
WHEN :new.workflow_name = 'w_ODS_TMS_PORTFOLIO_ACCESS' THEN v_workflow_name := 'w_ODS_TMS_RAR_PORTFOLIOACCESS';
WHEN :new.workflow_name = 'w_ODS_TMS_PORTFOLIO_TREE' THEN v_workflow_name := 'w_ODS_TMS_RAR_PORTFOLIOTREE';
WHEN :new.workflow_name = 'w_ODS_TMS_COLLATERAL_INVENTORY' THEN v_workflow_name := 'w_ODS_TMS_RAR_RARCOLLATERALINVENTORY';
WHEN :new.workflow_name = 'w_ODS_TOP_FULLBIDARRAY_COMPILED' THEN v_workflow_name := 'w_ODS_TOP_FULLBIDARRAY_COMPILED';
WHEN :new.workflow_name = 'w_ODS_TOP_ANNOUNCEMENT' THEN v_workflow_name := 'w_ODS_TOP_ANNOUNCEMENT';
WHEN :new.workflow_name = 'w_ODS_TOP_ALLOTMENT_MODIFICATIONS' THEN v_workflow_name := 'w_ODS_TOP_ALLOTMENT_MODIFICATIONS';
WHEN :new.workflow_name = 'w_ODS_TOP_ALLOTMENT' THEN v_workflow_name := 'w_ODS_TOP_ALLOTMENT';
WHEN :new.workflow_name = 'w_ODS_CEPH_PRICING' THEN v_workflow_name := 'w_ODS_CEPH_PRICING';
WHEN :new.workflow_name = 'w_ODS_C2D_MPEC' THEN v_workflow_name := 'w_ODS_C2D_MPEC';
ELSE
v_workflow_name := :new.workflow_name;
END IF;
BEGIN
v_wla_id := TO_NUMBER(:new.orchestration_run_id);
EXCEPTION
WHEN OTHERS THEN
NULL;
END;
INSERT INTO CT_ODS.A_LOAD_HISTORY (
a_etl_load_set_key, workflow_name, infa_run_id, load_start, load_end,
exdi_appl_req_id, exdi_correlation_id, load_successful, wla_run_id, dq_flag
END CASE;
BEGIN
v_wla_id := TO_NUMBER(:new.orchestration_run_id);
EXCEPTION WHEN OTHERS THEN NULL;
END;
INSERT INTO ct_ods.a_load_history (
a_etl_load_set_key, workflow_name, infa_run_id, load_start, load_end, exdi_appl_req_id, exdi_correlation_id, load_successful, wla_run_id, dq_flag
) VALUES (
:new.a_workflow_history_key, v_workflow_name, NULL, :new.workflow_start, :new.workflow_end,
NULL, NULL, :new.workflow_successful, v_wla_id, 'F'
:new.a_workflow_history_key, v_workflow_name, NULL, :new.workflow_start, :new.workflow_end, NULL, NULL, :new.workflow_successful, v_wla_id, 'F'
);
END IF;
END IF;