From f42777f480aa32b0422c43bc6c6321a0e7cd51da Mon Sep 17 00:00:00 2001 From: Grzegorz Michalski Date: Thu, 5 Feb 2026 10:54:48 +0100 Subject: [PATCH] dokumentacja --- confluence/FILE_ARCHIVER_Guide.md | 16 +- confluence/PROCESS_SOURCE_FILE_Guide.md | 5 + ...em_Migration_Informatica_to_Airflow_DBT.md | 446 ++++++++++++++++++ 3 files changed, 465 insertions(+), 2 deletions(-) create mode 100644 confluence/System_Migration_Informatica_to_Airflow_DBT.md diff --git a/confluence/FILE_ARCHIVER_Guide.md b/confluence/FILE_ARCHIVER_Guide.md index 039c83f..b184ef0 100644 --- a/confluence/FILE_ARCHIVER_Guide.md +++ b/confluence/FILE_ARCHIVER_Guide.md @@ -19,7 +19,18 @@ The FILE_ARCHIVER package provides flexible archival strategies that accommodate - **Schema**: CT_MRDS - **Package**: FILE_ARCHIVER - **Current Version**: 3.1.0 -- **Dependencies**: ENV_MANAGER, FILE_MANAGER, cloud_wrapper, A_SOURCE_FILE_CONFIG, A_LOAD_HISTORY +- **Dependencies**: ENV_MANAGER, FILE_MANAGER, cloud_wrapper, A_SOURCE_FILE_CONFIG, A_SOURCE_FILE_RECEIVED, A_WORKFLOW_HISTORY + +### Critical Prerequisites + +⚠️ **IMPORTANT**: FILE_ARCHIVER requires data to be registered in `CT_MRDS.A_SOURCE_FILE_RECEIVED` table. This table is automatically populated when files are processed through the modern Airflow + DBT workflow via `FILE_MANAGER.PROCESS_SOURCE_FILE`. + +**For legacy data migrated from Informatica + WLA system:** +- Legacy data exported using `DATA_EXPORTER` does NOT automatically create `A_SOURCE_FILE_RECEIVED` records +- Without these records, FILE_ARCHIVER **CANNOT** archive the data +- See [System Migration Guide](System_Migration_Informatica_to_Airflow_DBT.md) for workaround strategies + +**Recommendation for legacy data**: Export directly to ARCHIVE bucket using `DATA_EXPORTER.EXPORT_TABLE_DATA_BY_DATE` with `pBucketArea => 'ARCHIVE'` to bypass this requirement ## Archival Strategies @@ -542,7 +553,8 @@ WHERE object_name = 'FILE_ARCHIVER'; ### Database Objects - **Table**: CT_MRDS.A_SOURCE_FILE_CONFIG - Configuration storage -- **Table**: CT_ODS.A_LOAD_HISTORY - Workflow tracking +- **Table**: CT_MRDS.A_SOURCE_FILE_RECEIVED - File processing tracking +- **Table**: CT_MRDS.A_WORKFLOW_HISTORY - Workflow execution tracking (Airflow + DBT) - **Trigger**: TRG_BI_A_SRC_FILE_CFG_ARCH_VAL - Configuration validation - **Credential**: DEF_CRED_ARN - OCI bucket access diff --git a/confluence/PROCESS_SOURCE_FILE_Guide.md b/confluence/PROCESS_SOURCE_FILE_Guide.md index f138ca4..c8909f4 100644 --- a/confluence/PROCESS_SOURCE_FILE_Guide.md +++ b/confluence/PROCESS_SOURCE_FILE_Guide.md @@ -15,6 +15,11 @@ This document provides comprehensive documentation for the `FILE_MANAGER.PROCESS - **Error Resilient**: Comprehensive error handling and logging for validation and file operations - **Status Tracking**: Updates file processing status throughout validation and preparation workflow +**Migration Context:** +- This procedure is part of the **modern Airflow + DBT system** architecture +- Creates records in `CT_MRDS.A_SOURCE_FILE_RECEIVED` (modern control table) +- For legacy Informatica + WLA data migration, see [System Migration Guide](System_Migration_Informatica_to_Airflow_DBT.md) + ## Procedure Signatures The procedure is available in two variants: diff --git a/confluence/System_Migration_Informatica_to_Airflow_DBT.md b/confluence/System_Migration_Informatica_to_Airflow_DBT.md new file mode 100644 index 0000000..d354ed2 --- /dev/null +++ b/confluence/System_Migration_Informatica_to_Airflow_DBT.md @@ -0,0 +1,446 @@ +# System Migration: Informatica + WLA → Airflow + DBT + +This document describes the migration from the legacy Informatica + WLA data processing system to the modern Airflow + DBT architecture, including control table differences, data export strategies, and known limitations. + +## Migration Overview + +The MRDS (Market Reference Data System) is undergoing a fundamental technology migration: + +**Legacy System (Informatica + WLA):** +- ETL Tool: Informatica PowerCenter +- Workflow Orchestration: WLA (Workflow Automation) +- Control Schema: `CT_ODS` (Operational Data Store Control) +- Primary Control Table: `CT_ODS.A_LOAD_HISTORY` +- Key Column: `A_ETL_LOAD_SET_KEY` + +**Modern System (Airflow + DBT):** +- Orchestration: Apache Airflow +- Transformation: DBT (Data Build Tool) +- Control Schema: `CT_MRDS` (MRDS Control) +- Primary Control Tables: `CT_MRDS.A_SOURCE_FILE_RECEIVED`, `CT_MRDS.A_WORKFLOW_HISTORY` +- Key Column: `A_WORKFLOW_HISTORY_KEY` + +## Control Table Architecture + +### Legacy System: CT_ODS.A_LOAD_HISTORY + +**Purpose**: Tracks Informatica PowerCenter workflow executions + +**Structure**: +```sql +DESC CT_ODS.A_LOAD_HISTORY; + +Name Type +______________________ _____________________ +A_ETL_LOAD_SET_KEY NUMBER(38) -- Primary key +WORKFLOW_NAME VARCHAR2(255) -- Informatica workflow name +INFA_RUN_ID NUMBER(38) -- Informatica run ID +LOAD_START TIMESTAMP(6) -- Workflow start time +LOAD_END TIMESTAMP(6) -- Workflow end time +EXDI_APPL_REQ_ID VARCHAR2(255) +EXDI_CORRELATION_ID VARCHAR2(255) +LOAD_SUCCESSFUL CHAR(1) -- Y/N success flag +WLA_RUN_ID NUMBER(28) -- WLA run ID +DQ_FLAG VARCHAR2(5) -- Data quality flag +``` + +**Usage Pattern**: +- Created by Informatica workflows during ETL execution +- Used for temporal partitioning in DATA_EXPORTER +- Referenced via `A_ETL_LOAD_SET_KEY_FK` foreign key in data tables + +### Modern System: CT_MRDS Control Tables + +#### 1. A_SOURCE_FILE_RECEIVED + +**Purpose**: Tracks individual file processing through the complete lifecycle + +**Key Columns**: +```sql +A_SOURCE_FILE_RECEIVED_KEY NUMBER -- Primary key +SOURCE_FILE_NAME VARCHAR2 -- Full OCI path +PROCESSING_STATUS VARCHAR2 -- Status tracking +RECEPTION_DATE DATE -- File arrival timestamp +PARTITION_YEAR VARCHAR2(4) -- Archive partition (year) +PARTITION_MONTH VARCHAR2(2) -- Archive partition (month) +ARCH_FILE_NAME VARCHAR2 -- Parquet archive file path +``` + +**Status Workflow**: +``` +RECEIVED → VALIDATED → READY_FOR_INGESTION → INGESTED → ARCHIVED +``` + +**Usage Pattern**: +- Created by FILE_MANAGER.PROCESS_SOURCE_FILE during file validation +- Updated throughout file lifecycle (validation, ingestion, archival) +- Required by FILE_ARCHIVER for archival operations +- Links to A_WORKFLOW_HISTORY via workflow execution + +#### 2. A_WORKFLOW_HISTORY + +**Purpose**: Tracks Airflow + DBT workflow executions (similar role to A_LOAD_HISTORY) + +**Key Columns**: +```sql +A_WORKFLOW_HISTORY_KEY NUMBER -- Primary key +WORKFLOW_NAME VARCHAR2 -- Airflow DAG name +WORKFLOW_START TIMESTAMP -- Workflow start time +WORKFLOW_END TIMESTAMP -- Workflow end time +WORKFLOW_SUCCESSFUL CHAR(1) -- Y/N success flag +``` + +**Usage Pattern**: +- Created by Airflow + DBT workflows during data processing +- Used for temporal partitioning decisions in FILE_ARCHIVER +- Referenced via `A_WORKFLOW_HISTORY_KEY_FK` foreign key in data tables + +## Data Export Strategies + +### Scenario 1: Legacy Data Export (Informatica → ODS) + +**Use Case**: One-time migration of historical data loaded by Informatica + +**Package**: `DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE` + +**Control Table**: Uses `CT_ODS.A_LOAD_HISTORY` for temporal partitioning + +**Example**: +```sql +-- Export historical AGGREGATED_ALLOTMENT data (loaded by Informatica) +BEGIN + CT_MRDS.DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE( + pSchemaName => 'OU_TOP', + pTableName => 'AGGREGATED_ALLOTMENT', + pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', -- Links to A_LOAD_HISTORY + pBucketArea => 'DATA', + pFolderName => 'legacy_migration', + pMinDate => DATE '2020-01-01', + pMaxDate => DATE '2024-12-31' + ); +END; +/ +``` + +**Result**: CSV files in ODS bucket (DATA area), partitioned by LOAD_START from A_LOAD_HISTORY + +### Scenario 2: Modern System Data (Airflow + DBT → ODS → ARCHIVE) + +**Use Case**: Ongoing processing with new Airflow + DBT system + +**Workflow**: +1. **File Arrival**: FILES → INBOX bucket +2. **Validation**: FILE_MANAGER.PROCESS_SOURCE_FILE → creates A_SOURCE_FILE_RECEIVED +3. **Processing**: Airflow + DBT → creates A_WORKFLOW_HISTORY +4. **Export**: DATA_EXPORTER → moves to ODS bucket (DATA area) +5. **Archival**: FILE_ARCHIVER → moves to ARCHIVE bucket (Parquet with Hive partitioning) + +**Control Tables**: Uses both A_SOURCE_FILE_RECEIVED and A_WORKFLOW_HISTORY + +**Example**: +```sql +-- Archival of data processed by Airflow + DBT +BEGIN + CT_MRDS.FILE_ARCHIVER.ARCHIVE_TABLE_DATA( + pSourceFileConfig => vConfig -- Requires A_SOURCE_FILE_RECEIVED records + ); +END; +/ +``` + +## Critical Gap: Legacy Data Archival + +### Problem Statement + +**Scenario**: Historical data exported using DATA_EXPORTER from Informatica-loaded tables + +**Issue**: FILE_ARCHIVER requires records in `A_SOURCE_FILE_RECEIVED`, but legacy exports don't create them + +**Impact**: Legacy data exported to ODS/DATA bucket **CANNOT** be archived to ARCHIVE bucket using FILE_ARCHIVER + +### Technical Analysis + +**DATA_EXPORTER Behavior**: +```sql +-- Uses A_LOAD_HISTORY for partitioning (Informatica workflows) +SELECT DISTINCT TO_CHAR(L.LOAD_START,'YYYY') AS YR, + TO_CHAR(L.LOAD_START,'MM') AS MN +FROM OU_TOP.AGGREGATED_ALLOTMENT T, CT_ODS.A_LOAD_HISTORY L +WHERE T.A_ETL_LOAD_SET_KEY_FK = L.A_ETL_LOAD_SET_KEY + AND L.LOAD_START >= :pMinDate + AND L.LOAD_START < :pMaxDate; + +-- Creates CSV files: ODS/legacy_migration/AGGREGATED_ALLOTMENT_YYYYMM.csv +-- Does NOT create A_SOURCE_FILE_RECEIVED records +``` + +**FILE_ARCHIVER Requirement**: +```sql +-- Joins A_SOURCE_FILE_RECEIVED with A_WORKFLOW_HISTORY +JOIN CT_MRDS.A_SOURCE_FILE_RECEIVED r + ON r.A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfig.A_SOURCE_FILE_CONFIG_KEY + AND r.PROCESSING_STATUS = 'INGESTED'; + +-- Without A_SOURCE_FILE_RECEIVED records, archival CANNOT proceed +``` + +### Workaround Strategies + +#### Strategy 1: Manual Registration (Recommended for Small Datasets) + +Manually create `A_SOURCE_FILE_RECEIVED` records for legacy exported files: + +```sql +-- Step 1: Export legacy data to ODS/DATA +BEGIN + DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE( + pSchemaName => 'OU_TOP', + pTableName => 'AGGREGATED_ALLOTMENT', + pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', + pBucketArea => 'DATA', + pFolderName => 'legacy_export', + pMinDate => DATE '2024-01-01', + pMaxDate => DATE '2024-12-31' + ); +END; +/ + +-- Step 2: List exported CSV files +SELECT object_name, time_created, bytes +FROM TABLE(MRDS_LOADER.cloud_wrapper.list_objects( + credential_name => 'DEF_CRED_ARN', + location_uri => 'https://objectstorage.eu-frankfurt-1.oraclecloud.com/n/frtgjxu7zl7c/b/data/o/' +)) WHERE object_name LIKE 'ODS/legacy_export/AGGREGATED_ALLOTMENT_%'; + +-- Step 3: Manually register each file in A_SOURCE_FILE_RECEIVED +-- (Requires source configuration for AGGREGATED_ALLOTMENT to exist) +INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED ( + A_SOURCE_FILE_RECEIVED_KEY, + A_SOURCE_FILE_CONFIG_KEY, + SOURCE_FILE_NAME, + PROCESSING_STATUS, + RECEPTION_DATE, + BYTES, + CHECKSUM, + EXTERNAL_TABLE_NAME +) VALUES ( + A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL, + (SELECT A_SOURCE_FILE_CONFIG_KEY FROM A_SOURCE_FILE_CONFIG + WHERE SOURCE_FILE_ID = 'AGGREGATED_ALLOTMENT' AND SOURCE_FILE_TYPE = 'INPUT'), + 'ODS/legacy_export/AGGREGATED_ALLOTMENT_202401.csv', + 'INGESTED', -- Skip validation, mark as already ingested + DATE '2024-01-15', + 1048576, -- File size in bytes + 'manual_registration', + NULL -- No external table needed +); +-- Repeat for all exported CSV files +COMMIT; + +-- Step 4: Now FILE_ARCHIVER can process these files +BEGIN + FILE_ARCHIVER.ARCHIVE_TABLE_DATA(pSourceFileConfig => vConfig); +END; +/ +``` + +#### Strategy 2: Direct Archive Export (Bypass ODS) + +Skip ODS/DATA bucket entirely - export directly to ARCHIVE bucket in Parquet format: + +```sql +-- Export legacy data directly to ARCHIVE bucket +BEGIN + DATA_EXPORTER.EXPORT_TABLE_DATA_BY_DATE( + pSchemaName => 'OU_TOP', + pTableName => 'AGGREGATED_ALLOTMENT', + pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', + pBucketArea => 'ARCHIVE', -- Direct to archive + pFolderName => 'legacy_direct', + pMinDate => DATE '2020-01-01', + pMaxDate => DATE '2024-12-31' + ); +END; +/ +-- Result: Parquet files with Hive partitioning in ARCHIVE bucket +-- Files: ARCHIVE/legacy_direct/PARTITION_YEAR=2024/PARTITION_MONTH=01/*.parquet +-- No A_SOURCE_FILE_RECEIVED records needed (archival already complete) +``` + +**Pros**: +- Simple, no manual registration +- Data already in final archive format (Parquet) +- Hive-style partitioning automatically applied + +**Cons**: +- No record in A_SOURCE_FILE_RECEIVED (tracking gap) +- Cannot use FILE_ARCHIVER features (archival strategies, status tracking) +- Mixed folder structure (legacy_direct vs. standard source/table paths) + +#### Strategy 3: Hybrid Approach (Recommended for Large Datasets) + +Use DATA_EXPORTER for initial export, create minimal A_SOURCE_FILE_RECEIVED records programmatically: + +```sql +-- Create helper procedure to register legacy exports +CREATE OR REPLACE PROCEDURE REGISTER_LEGACY_EXPORT ( + pSourceFileId VARCHAR2, + pBucketArea VARCHAR2, + pFolderName VARCHAR2, + pFilePattern VARCHAR2, + pReceptionDate DATE DEFAULT SYSDATE +) AS + vConfigKey NUMBER; + vBucketUri VARCHAR2(500); + vPrefix VARCHAR2(500); +BEGIN + -- Get source configuration + SELECT A_SOURCE_FILE_CONFIG_KEY + INTO vConfigKey + FROM CT_MRDS.A_SOURCE_FILE_CONFIG + WHERE SOURCE_FILE_ID = pSourceFileId + AND SOURCE_FILE_TYPE = 'INPUT'; + + -- Get bucket URI + vBucketUri := CT_MRDS.ENV_MANAGER.GET_BUCKET_URI(pBucketArea); + vPrefix := 'ODS/' || pFolderName || '/'; + + -- Register all matching files + FOR rec IN ( + SELECT object_name, bytes, etag + FROM TABLE(MRDS_LOADER.cloud_wrapper.list_objects( + credential_name => 'DEF_CRED_ARN', + location_uri => vBucketUri + )) + WHERE object_name LIKE vPrefix || pFilePattern + ) LOOP + INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED ( + A_SOURCE_FILE_RECEIVED_KEY, + A_SOURCE_FILE_CONFIG_KEY, + SOURCE_FILE_NAME, + PROCESSING_STATUS, + RECEPTION_DATE, + BYTES, + CHECKSUM + ) VALUES ( + A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL, + vConfigKey, + rec.object_name, + 'INGESTED', + pReceptionDate, + rec.bytes, + rec.etag + ); + END LOOP; + + COMMIT; + DBMS_OUTPUT.PUT_LINE('Registered ' || SQL%ROWCOUNT || ' legacy files'); +END; +/ + +-- Usage: +BEGIN + -- After DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE completes + REGISTER_LEGACY_EXPORT( + pSourceFileId => 'AGGREGATED_ALLOTMENT', + pBucketArea => 'DATA', + pFolderName => 'legacy_export', + pFilePattern => 'AGGREGATED_ALLOTMENT_2024%.csv', + pReceptionDate => DATE '2024-12-31' + ); + + -- Now FILE_ARCHIVER can process + FILE_ARCHIVER.ARCHIVE_TABLE_DATA(pSourceFileConfig => vConfig); +END; +/ +``` + +## Migration Timeline and Coexistence + +### Phase 1: Legacy System Only (Before Migration) +- All data loaded via Informatica + WLA +- Control table: `CT_ODS.A_LOAD_HISTORY` +- No `A_SOURCE_FILE_RECEIVED` records + +### Phase 2: Parallel Operation (During Migration) +- **Old data**: Continue using Informatica + WLA → A_LOAD_HISTORY +- **New data**: Start using Airflow + DBT → A_SOURCE_FILE_RECEIVED + A_WORKFLOW_HISTORY +- **Challenge**: Different control tables for different data vintages + +### Phase 3: New System Only (After Migration) +- All new data via Airflow + DBT +- Legacy data archived (one-time export using DATA_EXPORTER) +- Control tables: `CT_MRDS.A_SOURCE_FILE_RECEIVED`, `CT_MRDS.A_WORKFLOW_HISTORY` + +## Recommendations + +### For New Data (Airflow + DBT) +✅ Use standard workflow: +1. FILE_MANAGER.PROCESS_SOURCE_FILE (creates A_SOURCE_FILE_RECEIVED) +2. Airflow + DBT processing (creates A_WORKFLOW_HISTORY) +3. FILE_ARCHIVER.ARCHIVE_TABLE_DATA (uses both control tables) + +### For Legacy Data Migration +✅ **Small Datasets (<1000 files)**: Strategy 1 (Manual Registration) +✅ **Large Datasets (>1000 files)**: Strategy 2 (Direct to ARCHIVE) or Strategy 3 (Hybrid) +❌ **Avoid**: Exporting to ODS/DATA without registration (orphaned files, cannot archive) + +### Configuration Requirements + +Before archiving legacy data, ensure source configuration exists: + +```sql +-- Check if configuration exists +SELECT A_SOURCE_FILE_CONFIG_KEY, SOURCE_FILE_ID, TABLE_ID, ARCHIVAL_STRATEGY +FROM CT_MRDS.A_SOURCE_FILE_CONFIG +WHERE SOURCE_FILE_ID = 'YOUR_SOURCE_FILE_ID'; + +-- If missing, create configuration +CALL FILE_MANAGER.ADD_SOURCE_FILE_CONFIG( + pSourceKey => 'YOUR_SOURCE', + pSourceFileType => 'INPUT', + pSourceFileId => 'YOUR_SOURCE_FILE_ID', + pSourceFileDesc => 'Legacy migrated data', + pSourceFileNamePattern => 'pattern_*.csv', + pTableId => 'YOUR_TABLE_ID', + pTemplateTableName => 'CT_ET_TEMPLATES.YOUR_TEMPLATE' +); +``` + +## Known Limitations + +### 1. No Retroactive A_SOURCE_FILE_RECEIVED Creation +DATA_EXPORTER does not automatically create A_SOURCE_FILE_RECEIVED records when exporting legacy data. This is by design - it's a one-time export tool, not a file tracking system. + +### 2. FILE_ARCHIVER Requires A_SOURCE_FILE_RECEIVED +FILE_ARCHIVER cannot archive data without corresponding A_SOURCE_FILE_RECEIVED records. This prevents archiving of: +- Legacy Informatica-loaded data exported via DATA_EXPORTER +- Manually uploaded files not processed through FILE_MANAGER.PROCESS_SOURCE_FILE + +### 3. Mixed Control Table References +During migration period, some procedures reference A_LOAD_HISTORY (DATA_EXPORTER) while others reference A_WORKFLOW_HISTORY (FILE_ARCHIVER). This is intentional but requires careful understanding of data lineage. + +### 4. A_WORKFLOW_HISTORY vs A_LOAD_HISTORY Column Mismatch +The control tables have different schemas: +- **A_LOAD_HISTORY**: `LOAD_START`, `A_ETL_LOAD_SET_KEY` +- **A_WORKFLOW_HISTORY**: `WORKFLOW_START`, `A_WORKFLOW_HISTORY_KEY` + +Test scripts must be aware of which table is being used. + +## Related Documentation + +- [PROCESS_SOURCE_FILE Guide](PROCESS_SOURCE_FILE_Guide.md) - File validation and ingestion workflow +- [FILE_ARCHIVER Guide](FILE_ARCHIVER_Guide.md) - Archival strategies and configuration +- [FILE_MANAGER Configuration Guide](FILE_MANAGER_Configuration_Guide.md) - System configuration +- [Package Deployment Guide](Package_Deployment_Guide.md) - Deployment procedures + +## Summary + +The migration from Informatica + WLA to Airflow + DBT introduces new control tables (`A_SOURCE_FILE_RECEIVED`, `A_WORKFLOW_HISTORY`) while maintaining compatibility with legacy control tables (`A_LOAD_HISTORY`). Understanding the relationship between these tables is critical for: + +- **Data Lineage**: Tracking which system processed which data +- **Export Operations**: Choosing appropriate DATA_EXPORTER procedures +- **Archival Operations**: Ensuring FILE_ARCHIVER has required metadata +- **Testing**: Using correct control tables in test scenarios + +The recommended approach for legacy data migration is **Strategy 2 (Direct to ARCHIVE)** for large datasets, as it avoids the complexity of manual A_SOURCE_FILE_RECEIVED registration while achieving the goal of moving historical data to long-term archival storage.