MARS-1409

This commit is contained in:
Grzegorz Michalski
2026-03-02 10:15:22 +01:00
parent a13a9d415f
commit 7db10725a0
36 changed files with 1059 additions and 99 deletions

View File

@@ -16,10 +16,12 @@ CREATE TABLE CT_MRDS.A_SOURCE_FILE_RECEIVED (
EXTERNAL_TABLE_NAME VARCHAR2(200),
PARTITION_YEAR VARCHAR2(4),
PARTITION_MONTH VARCHAR2(2),
ARCH_FILE_NAME VARCHAR2(1000),
ARCH_PATH VARCHAR2(1000),
PROCESS_NAME VARCHAR2(200),
A_WORKFLOW_HISTORY_KEY NUMBER,
CONSTRAINT A_SOURCE_FILE_RECEIVED_PK PRIMARY KEY (A_SOURCE_FILE_RECEIVED_KEY),
CONSTRAINT ASFR_A_SOURCE_FILE_CONFIG_KEY_FK FOREIGN KEY(A_SOURCE_FILE_CONFIG_KEY) REFERENCES CT_MRDS.A_SOURCE_FILE_CONFIG(A_SOURCE_FILE_CONFIG_KEY),
CONSTRAINT A_SOURCE_FILE_RECEIVED_CHK CHECK (PROCESSING_STATUS IN ('RECEIVED', 'VALIDATED', 'READY_FOR_INGESTION', 'INGESTED', 'ARCHIVED', 'ARCHIVED_AND_TRASHED', 'ARCHIVED_AND_PURGED'))
CONSTRAINT A_SOURCE_FILE_RECEIVED_CHK CHECK (PROCESSING_STATUS IN ('RECEIVED', 'VALIDATION_FAILED', 'VALIDATED', 'READY_FOR_INGESTION', 'INGESTED', 'ARCHIVED', 'ARCHIVED_AND_TRASHED', 'ARCHIVED_AND_PURGED'))
) TABLESPACE "DATA";
-- Unique index for file identification (workaround for TIMESTAMP WITH TIMEZONE constraint limitation)
@@ -49,7 +51,7 @@ COMMENT ON COLUMN CT_MRDS.A_SOURCE_FILE_RECEIVED.RECEPTION_DATE IS
'Date when file was registered in the system (extracted from CREATED timestamp)';
COMMENT ON COLUMN CT_MRDS.A_SOURCE_FILE_RECEIVED.PROCESSING_STATUS IS
'Current processing status: RECEIVED → VALIDATED → READY_FOR_INGESTION → INGESTED → ARCHIVED_AND_TRASHED → ARCHIVED_AND_PURGED';
'Current processing status: RECEIVED → VALIDATED (or VALIDATION_FAILED if errors) → READY_FOR_INGESTION → INGESTED → ARCHIVED → ARCHIVED_AND_TRASHED → ARCHIVED_AND_PURGED';
COMMENT ON COLUMN CT_MRDS.A_SOURCE_FILE_RECEIVED.EXTERNAL_TABLE_NAME IS
'Name of temporary external table created for file validation (dropped after validation)';
@@ -60,7 +62,13 @@ COMMENT ON COLUMN CT_MRDS.A_SOURCE_FILE_RECEIVED.PARTITION_YEAR IS
COMMENT ON COLUMN CT_MRDS.A_SOURCE_FILE_RECEIVED.PARTITION_MONTH IS
'Month partition value (MM format) when file was archived to ARCHIVE bucket with Hive-style partitioning';
COMMENT ON COLUMN CT_MRDS.A_SOURCE_FILE_RECEIVED.ARCH_FILE_NAME IS
COMMENT ON COLUMN CT_MRDS.A_SOURCE_FILE_RECEIVED.ARCH_PATH IS
'Archive directory prefix in ARCHIVE bucket containing archived Parquet files (supports multiple files from parallel DBMS_CLOUD.EXPORT_DATA)';
COMMENT ON COLUMN CT_MRDS.A_SOURCE_FILE_RECEIVED.PROCESS_NAME IS
'Name of the process or DBT model that ingested this file (populated during ingestion workflow)';
COMMENT ON COLUMN CT_MRDS.A_SOURCE_FILE_RECEIVED.A_WORKFLOW_HISTORY_KEY IS
'Direct link to workflow history - each file has exactly one workflow execution. Populated during VALIDATE_SOURCE_FILE_RECEIVED (MARS-1409)';
GRANT SELECT, INSERT, UPDATE, DELETE ON CT_MRDS.A_SOURCE_FILE_RECEIVED TO MRDS_LOADER_ROLE;

View File

@@ -39,6 +39,8 @@ AS
Errors(CODE_MOVE_FILE_TO_TRASH_FAILED) := Error_Record(CODE_MOVE_FILE_TO_TRASH_FAILED, MSG_MOVE_FILE_TO_TRASH_FAILED); -- -20032
Errors(CODE_DROP_EXPORTED_FILES_FAILED) := Error_Record(CODE_DROP_EXPORTED_FILES_FAILED, MSG_DROP_EXPORTED_FILES_FAILED); -- -20033
Errors(CODE_INVALID_BUCKET_AREA) := Error_Record(CODE_INVALID_BUCKET_AREA, MSG_INVALID_BUCKET_AREA); -- -20034
Errors(CODE_WORKFLOW_KEY_NULL) := Error_Record(CODE_WORKFLOW_KEY_NULL, MSG_WORKFLOW_KEY_NULL); -- -20035
Errors(CODE_MULTIPLE_WORKFLOW_KEYS) := Error_Record(CODE_MULTIPLE_WORKFLOW_KEYS, MSG_MULTIPLE_WORKFLOW_KEYS); -- -20036
Errors(CODE_INVALID_PARALLEL_DEGREE) := Error_Record(CODE_INVALID_PARALLEL_DEGREE, MSG_INVALID_PARALLEL_DEGREE); -- -20110
Errors(CODE_PARALLEL_EXECUTION_FAILED) := Error_Record(CODE_PARALLEL_EXECUTION_FAILED, MSG_PARALLEL_EXECUTION_FAILED); -- -20111

View File

@@ -17,12 +17,13 @@ AS
**/
-- Package Version Information (Semantic Versioning: MAJOR.MINOR.PATCH)
PACKAGE_VERSION CONSTANT VARCHAR2(10) := '3.2.0';
PACKAGE_BUILD_DATE CONSTANT VARCHAR2(20) := '2025-12-20 10:00:00';
PACKAGE_VERSION CONSTANT VARCHAR2(10) := '3.3.0';
PACKAGE_BUILD_DATE CONSTANT VARCHAR2(20) := '2026-02-27 09:00:00';
PACKAGE_AUTHOR CONSTANT VARCHAR2(100) := 'Grzegorz Michalski';
-- Version History (Latest changes first)
VERSION_HISTORY CONSTANT VARCHAR2(4000) :=
'3.3.0 (2026-02-27): MARS-1409 - Added error codes for A_WORKFLOW_HISTORY_KEY validation (CODE_WORKFLOW_KEY_NULL -20035, CODE_MULTIPLE_WORKFLOW_KEYS -20036)' || CHR(13)||CHR(10) ||
'3.2.0 (2025-12-20): Added error codes for parallel execution support (CODE_INVALID_PARALLEL_DEGREE -20110, CODE_PARALLEL_EXECUTION_FAILED -20111)' || CHR(13)||CHR(10) ||
'3.1.0 (2025-10-22): Added package hash tracking and automatic change detection system (SHA256 hashing)' || CHR(13)||CHR(10) ||
'3.0.0 (2025-10-22): Added package versioning system with centralized version management functions' || CHR(13)||CHR(10) ||
@@ -297,6 +298,18 @@ AS
PRAGMA EXCEPTION_INIT( ERR_INVALID_BUCKET_AREA
,CODE_INVALID_BUCKET_AREA);
ERR_WORKFLOW_KEY_NULL EXCEPTION;
CODE_WORKFLOW_KEY_NULL CONSTANT PLS_INTEGER := -20035;
MSG_WORKFLOW_KEY_NULL VARCHAR2(4000) := 'File validation failed: A_WORKFLOW_HISTORY_KEY column contains NULL value';
PRAGMA EXCEPTION_INIT( ERR_WORKFLOW_KEY_NULL
,CODE_WORKFLOW_KEY_NULL);
ERR_MULTIPLE_WORKFLOW_KEYS EXCEPTION;
CODE_MULTIPLE_WORKFLOW_KEYS CONSTANT PLS_INTEGER := -20036;
MSG_MULTIPLE_WORKFLOW_KEYS VARCHAR2(4000) := 'File validation failed: Multiple distinct A_WORKFLOW_HISTORY_KEY values found in file. Each file must contain exactly one workflow execution key';
PRAGMA EXCEPTION_INIT( ERR_MULTIPLE_WORKFLOW_KEYS
,CODE_MULTIPLE_WORKFLOW_KEYS);
ERR_INVALID_PARALLEL_DEGREE EXCEPTION;
CODE_INVALID_PARALLEL_DEGREE CONSTANT PLS_INTEGER := -20110;
MSG_INVALID_PARALLEL_DEGREE VARCHAR2(4000) := 'Invalid parallel degree parameter. Must be between 1 and 16';

View File

@@ -683,6 +683,64 @@ AS
FROM USER_LOAD_OPERATIONS
WHERE ID = vOperationId;
-- MARS-1409: Extract and validate A_WORKFLOW_HISTORY_KEY from external table
DECLARE
vWorkflowHistoryKey NUMBER;
vWorkflowKeyCount NUMBER;
vWorkflowKeyDistinct NUMBER;
vDynamicSQL VARCHAR2(1000);
BEGIN
-- Build dynamic SQL to count distinct A_WORKFLOW_HISTORY_KEY values
vDynamicSQL := 'SELECT COUNT(*), COUNT(DISTINCT A_WORKFLOW_HISTORY_KEY) FROM ' || vSourceFileReceived.EXTERNAL_TABLE_NAME;
ENV_MANAGER.LOG_PROCESS_EVENT('MARS-1409: Extracting A_WORKFLOW_HISTORY_KEY from external table', 'DEBUG', vParameters);
-- Count total rows and distinct workflow keys
EXECUTE IMMEDIATE vDynamicSQL INTO vWorkflowKeyCount, vWorkflowKeyDistinct;
ENV_MANAGER.LOG_PROCESS_EVENT('MARS-1409: Total rows: ' || vWorkflowKeyCount || ', Distinct A_WORKFLOW_HISTORY_KEY values: ' || vWorkflowKeyDistinct, 'DEBUG', vParameters);
-- Validate workflow key presence and uniqueness
IF vWorkflowKeyDistinct = 0 OR vWorkflowKeyDistinct IS NULL THEN
-- No A_WORKFLOW_HISTORY_KEY found or all values are NULL
vgMsgTmp := ENV_MANAGER.MSG_WORKFLOW_KEY_NULL || ' [File: ' || vSourceFileReceived.SOURCE_FILE_NAME || ']';
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_WORKFLOW_KEY_NULL, vgMsgTmp);
ELSIF vWorkflowKeyDistinct > 1 THEN
-- Multiple different A_WORKFLOW_HISTORY_KEY values found
vgMsgTmp := ENV_MANAGER.MSG_MULTIPLE_WORKFLOW_KEYS || ' [Found: ' || vWorkflowKeyDistinct || ' distinct values in file: ' || vSourceFileReceived.SOURCE_FILE_NAME || ']';
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_MULTIPLE_WORKFLOW_KEYS, vgMsgTmp);
ELSE
-- Exactly one A_WORKFLOW_HISTORY_KEY value - fetch and save it
vDynamicSQL := 'SELECT DISTINCT A_WORKFLOW_HISTORY_KEY FROM ' || vSourceFileReceived.EXTERNAL_TABLE_NAME;
EXECUTE IMMEDIATE vDynamicSQL INTO vWorkflowHistoryKey;
ENV_MANAGER.LOG_PROCESS_EVENT('MARS-1409: Extracted A_WORKFLOW_HISTORY_KEY: ' || vWorkflowHistoryKey, 'DEBUG', vParameters);
-- Update A_SOURCE_FILE_RECEIVED with workflow history key
UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED
SET A_WORKFLOW_HISTORY_KEY = vWorkflowHistoryKey
WHERE A_SOURCE_FILE_RECEIVED_KEY = pSourceFileReceivedKey;
ENV_MANAGER.LOG_PROCESS_EVENT('MARS-1409: Updated A_SOURCE_FILE_RECEIVED with A_WORKFLOW_HISTORY_KEY: ' || vWorkflowHistoryKey, 'INFO', vParameters);
END IF;
EXCEPTION
WHEN ENV_MANAGER.ERR_WORKFLOW_KEY_NULL THEN
SET_SOURCE_FILE_RECEIVED_STATUS(pSourceFileReceivedKey => pSourceFileReceivedKey, pStatus => 'VALIDATION_FAILED');
RAISE;
WHEN ENV_MANAGER.ERR_MULTIPLE_WORKFLOW_KEYS THEN
SET_SOURCE_FILE_RECEIVED_STATUS(pSourceFileReceivedKey => pSourceFileReceivedKey, pStatus => 'VALIDATION_FAILED');
RAISE;
WHEN OTHERS THEN
vgMsgTmp := 'MARS-1409: Error extracting A_WORKFLOW_HISTORY_KEY: ' || SQLERRM;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
SET_SOURCE_FILE_RECEIVED_STATUS(pSourceFileReceivedKey => pSourceFileReceivedKey, pStatus => 'VALIDATION_FAILED');
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_FILE_VALIDATION_FAILED, vgMsgTmp);
END;
-- DBMS_OUTPUT.PUT_LINE(vStatus);
SET_SOURCE_FILE_RECEIVED_STATUS(pSourceFileReceivedKey => pSourceFileReceivedKey, pStatus => 'VALIDATED');
ENV_MANAGER.LOG_PROCESS_EVENT('File status changed to VALIDATED','DEBUG', vParameters);
@@ -700,6 +758,18 @@ AS
SET_SOURCE_FILE_RECEIVED_STATUS(pSourceFileReceivedKey => pSourceFileReceivedKey, pStatus => 'VALIDATION_FAILED');
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_FILE_VALIDATION_FAILED, vgMsgTmp);
WHEN ENV_MANAGER.ERR_WORKFLOW_KEY_NULL THEN
vgMsgTmp := ENV_MANAGER.MSG_WORKFLOW_KEY_NULL;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
SET_SOURCE_FILE_RECEIVED_STATUS(pSourceFileReceivedKey => pSourceFileReceivedKey, pStatus => 'VALIDATION_FAILED');
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_WORKFLOW_KEY_NULL, vgMsgTmp);
WHEN ENV_MANAGER.ERR_MULTIPLE_WORKFLOW_KEYS THEN
vgMsgTmp := ENV_MANAGER.MSG_MULTIPLE_WORKFLOW_KEYS;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
SET_SOURCE_FILE_RECEIVED_STATUS(pSourceFileReceivedKey => pSourceFileReceivedKey, pStatus => 'VALIDATION_FAILED');
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_MULTIPLE_WORKFLOW_KEYS, vgMsgTmp);
WHEN OTHERS THEN
IF SQLCODE = -20404 THEN
vgMsgTmp := ENV_MANAGER.MSG_FILE_NOT_FOUND_ON_CLOUD||cgBL||SQLERRM;

View File

@@ -17,12 +17,13 @@ AS
**/
-- Package Version Information (Semantic Versioning: MAJOR.MINOR.PATCH)
PACKAGE_VERSION CONSTANT VARCHAR2(10) := '3.5.1';
PACKAGE_BUILD_DATE CONSTANT VARCHAR2(20) := '2026-02-24 13:35:00';
PACKAGE_VERSION CONSTANT VARCHAR2(10) := '3.6.0';
PACKAGE_BUILD_DATE CONSTANT VARCHAR2(20) := '2026-02-27 09:00:00';
PACKAGE_AUTHOR CONSTANT VARCHAR2(100) := 'Grzegorz Michalski';
-- Version History (Latest changes first)
VERSION_HISTORY CONSTANT VARCHAR2(4000) :=
'3.6.0 (2026-02-27): MARS-1409 - Added A_WORKFLOW_HISTORY_KEY tracking in A_SOURCE_FILE_RECEIVED. Each file now stores its workflow execution key extracted during VALIDATE_SOURCE_FILE_RECEIVED' || CHR(13)||CHR(10) ||
'3.5.1 (2026-02-24): Fixed TIMESTAMP field syntax in GENERATE_EXTERNAL_TABLE_PARAMS for SQL*Loader compatibility (CHAR(35) DATE_FORMAT TIMESTAMP MASK format)' || CHR(13)||CHR(10) ||
'3.3.2 (2026-02-20): MARS-828 - Fixed threshold column names in GET_DET_SOURCE_FILE_CONFIG_INFO for MARS-828 compatibility' || CHR(13)||CHR(10) ||
'3.3.1 (2025-11-27): MARS-1046 - Fixed ISO 8601 datetime format parsing with milliseconds and timezone (e.g., 2012-03-02T14:16:23.798+01:00)' || CHR(13)||CHR(10) ||

View File

@@ -0,0 +1,52 @@
-- ====================================================================
-- TRG_A_WORKFLOW_HISTORY Trigger Definition
-- ====================================================================
-- Purpose: Trigger to:
-- 1. Insert workflow completion data to CT_ODS.A_LOAD_HISTORY
-- 2. MARS-1409: Mark linked A_SOURCE_FILE_RECEIVED records as INGESTED
-- ====================================================================
CREATE OR REPLACE EDITIONABLE TRIGGER "CT_MRDS"."TRG_A_WORKFLOW_HISTORY"
AFTER INSERT OR UPDATE OF workflow_successful ON CT_MRDS.A_WORKFLOW_HISTORY
REFERENCING NEW AS new OLD AS old
FOR EACH ROW
DECLARE
v_workflow_name VARCHAR2(128);
v_wla_id NUMBER;
BEGIN
-- Original logic: Insert into CT_ODS.A_LOAD_HISTORY for specific ODS workflows
IF :new.workflow_name IN ('w_ODS_LM_STANDING_FACILITIES', 'w_ODS_CSDB_DEBT', 'w_ODS_CSDB_DEBT_DAILY', 'w_ODS_CSDB_RATINGS_FULL') AND :new.service_name = 'ODS' THEN
IF :new.workflow_successful <> :old.workflow_successful AND :new.workflow_successful = 'Y' THEN
IF :new.workflow_name = 'w_ODS_LM_STANDING_FACILITIES' THEN
v_workflow_name := 'w_ODS_LM_STANDING_FACILITY';
ELSE
v_workflow_name := :new.workflow_name;
END IF;
BEGIN
v_wla_id := TO_NUMBER(:new.orchestration_run_id);
EXCEPTION
WHEN OTHERS THEN
NULL;
END;
INSERT INTO CT_ODS.A_LOAD_HISTORY (
a_etl_load_set_key, workflow_name, infa_run_id, load_start, load_end,
exdi_appl_req_id, exdi_correlation_id, load_successful, wla_run_id, dq_flag
) VALUES (
:new.a_workflow_history_key, v_workflow_name, NULL, :new.workflow_start, :new.workflow_end,
NULL, NULL, :new.workflow_successful, v_wla_id, 'F'
);
END IF;
END IF;
-- MARS-1409: When workflow completes successfully, mark linked files as INGESTED
IF :new.workflow_successful = 'Y' THEN
IF INSERTING OR (UPDATING AND (:old.workflow_successful IS NULL OR :old.workflow_successful != 'Y')) THEN
UPDATE CT_MRDS.A_SOURCE_FILE_RECEIVED
SET PROCESSING_STATUS = 'INGESTED',
PROCESS_NAME = :new.service_name
WHERE A_WORKFLOW_HISTORY_KEY = :new.a_workflow_history_key;
END IF;
END IF;
END;
/