# System Migration: Informatica + WLA → Airflow + DBT This document describes the migration from the legacy Informatica + WLA data processing system to the modern Airflow + DBT architecture, including control table differences, data export strategies, and known limitations. ## Migration Overview The MRDS (Market Reference Data System) is undergoing a fundamental technology migration: **Legacy System (Informatica + WLA):** - ETL Tool: Informatica PowerCenter - Workflow Orchestration: WLA (Workflow Automation) - Control Schema: `CT_ODS` (Operational Data Store Control) - Primary Control Table: `CT_ODS.A_LOAD_HISTORY` - Key Column: `A_ETL_LOAD_SET_KEY` **Modern System (Airflow + DBT):** - Orchestration: Apache Airflow - Transformation: DBT (Data Build Tool) - Control Schema: `CT_MRDS` (MRDS Control) - Primary Control Tables: `CT_MRDS.A_SOURCE_FILE_RECEIVED`, `CT_MRDS.A_WORKFLOW_HISTORY` - Key Column: `A_WORKFLOW_HISTORY_KEY` ## Control Table Architecture ### Legacy System: CT_ODS.A_LOAD_HISTORY **Purpose**: Tracks Informatica PowerCenter workflow executions **Structure**: ```sql DESC CT_ODS.A_LOAD_HISTORY; Name Type ______________________ _____________________ A_ETL_LOAD_SET_KEY NUMBER(38) -- Primary key WORKFLOW_NAME VARCHAR2(255) -- Informatica workflow name INFA_RUN_ID NUMBER(38) -- Informatica run ID LOAD_START TIMESTAMP(6) -- Workflow start time LOAD_END TIMESTAMP(6) -- Workflow end time EXDI_APPL_REQ_ID VARCHAR2(255) EXDI_CORRELATION_ID VARCHAR2(255) LOAD_SUCCESSFUL CHAR(1) -- Y/N success flag WLA_RUN_ID NUMBER(28) -- WLA run ID DQ_FLAG VARCHAR2(5) -- Data quality flag ``` **Usage Pattern**: - Created by Informatica workflows during ETL execution - Used for temporal partitioning in DATA_EXPORTER - Referenced via `A_ETL_LOAD_SET_KEY_FK` foreign key in data tables ### Modern System: CT_MRDS Control Tables #### 1. A_SOURCE_FILE_RECEIVED **Purpose**: Tracks individual file processing through the complete lifecycle **Key Columns**: ```sql A_SOURCE_FILE_RECEIVED_KEY NUMBER -- Primary key SOURCE_FILE_NAME VARCHAR2 -- Full OCI path PROCESSING_STATUS VARCHAR2 -- Status tracking RECEPTION_DATE DATE -- File arrival timestamp PARTITION_YEAR VARCHAR2(4) -- Archive partition (year) PARTITION_MONTH VARCHAR2(2) -- Archive partition (month) ARCH_FILE_NAME VARCHAR2 -- Parquet archive file path ``` **Status Workflow**: ``` RECEIVED → VALIDATED → READY_FOR_INGESTION → INGESTED → ARCHIVED_AND_TRASHED → ARCHIVED_AND_PURGED (optional) Note: Legacy ARCHIVED status maintained for backward compatibility ``` **Usage Pattern**: - Created by FILE_MANAGER.PROCESS_SOURCE_FILE during file validation - Updated throughout file lifecycle (validation, ingestion, archival) - Required by FILE_ARCHIVER for archival operations - Links to A_WORKFLOW_HISTORY via workflow execution #### 2. A_WORKFLOW_HISTORY **Purpose**: Tracks Airflow + DBT workflow executions (similar role to A_LOAD_HISTORY) **Key Columns**: ```sql A_WORKFLOW_HISTORY_KEY NUMBER -- Primary key WORKFLOW_NAME VARCHAR2 -- Airflow DAG name WORKFLOW_START TIMESTAMP -- Workflow start time WORKFLOW_END TIMESTAMP -- Workflow end time WORKFLOW_SUCCESSFUL CHAR(1) -- Y/N success flag ``` **Usage Pattern**: - Created by Airflow + DBT workflows during data processing - Used for temporal partitioning decisions in FILE_ARCHIVER - Referenced via `A_WORKFLOW_HISTORY_KEY_FK` foreign key in data tables ## Data Export Strategies ### Scenario 1: Legacy Data Export (Informatica → ODS) **Use Case**: One-time migration of historical data loaded by Informatica **Package**: `DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE` **Control Table**: Uses `CT_ODS.A_LOAD_HISTORY` for temporal partitioning **Example**: ```sql -- Export historical AGGREGATED_ALLOTMENT data (loaded by Informatica) BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE( pSchemaName => 'OU_TOP', pTableName => 'AGGREGATED_ALLOTMENT', pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', -- Links to A_LOAD_HISTORY pBucketArea => 'DATA', pFolderName => 'legacy_migration', pMinDate => DATE '2020-01-01', pMaxDate => DATE '2024-12-31' ); END; / ``` **Result**: CSV files in ODS bucket (DATA area), partitioned by LOAD_START from A_LOAD_HISTORY ### Scenario 2: Modern System Data (Airflow + DBT → ODS → ARCHIVE) **Use Case**: Ongoing processing with new Airflow + DBT system **Workflow**: 1. **File Arrival**: FILES → INBOX bucket 2. **Validation**: FILE_MANAGER.PROCESS_SOURCE_FILE → creates A_SOURCE_FILE_RECEIVED 3. **Processing**: Airflow + DBT → creates A_WORKFLOW_HISTORY 4. **Export**: DATA_EXPORTER → moves to ODS bucket (DATA area) 5. **Archival**: FILE_ARCHIVER → moves to ARCHIVE bucket (Parquet with Hive partitioning) **Control Tables**: Uses both A_SOURCE_FILE_RECEIVED and A_WORKFLOW_HISTORY **Example**: ```sql -- Archival of data processed by Airflow + DBT BEGIN CT_MRDS.FILE_ARCHIVER.ARCHIVE_TABLE_DATA( pSourceFileConfig => vConfig -- Requires A_SOURCE_FILE_RECEIVED records ); END; / ``` ## Critical Gap: Legacy Data Archival ### Problem Statement **Scenario**: Historical data exported using DATA_EXPORTER from Informatica-loaded tables **Issue**: FILE_ARCHIVER requires records in `A_SOURCE_FILE_RECEIVED`, but legacy exports don't create them **Impact**: Legacy data exported to ODS/DATA bucket **CANNOT** be archived to ARCHIVE bucket using FILE_ARCHIVER ### Technical Analysis **DATA_EXPORTER Behavior**: ```sql -- Uses A_LOAD_HISTORY for partitioning (Informatica workflows) SELECT DISTINCT TO_CHAR(L.LOAD_START,'YYYY') AS YR, TO_CHAR(L.LOAD_START,'MM') AS MN FROM OU_TOP.AGGREGATED_ALLOTMENT T, CT_ODS.A_LOAD_HISTORY L WHERE T.A_ETL_LOAD_SET_KEY_FK = L.A_ETL_LOAD_SET_KEY AND L.LOAD_START >= :pMinDate AND L.LOAD_START < :pMaxDate; -- Creates CSV files: ODS/legacy_migration/AGGREGATED_ALLOTMENT_YYYYMM.csv -- Does NOT create A_SOURCE_FILE_RECEIVED records ``` **FILE_ARCHIVER Requirement**: ```sql -- Joins A_SOURCE_FILE_RECEIVED with A_WORKFLOW_HISTORY JOIN CT_MRDS.A_SOURCE_FILE_RECEIVED r ON r.A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfig.A_SOURCE_FILE_CONFIG_KEY AND r.PROCESSING_STATUS = 'INGESTED'; -- Without A_SOURCE_FILE_RECEIVED records, archival CANNOT proceed ``` ### Workaround Strategies #### Strategy 1: Manual Registration (Recommended for Small Datasets) Manually create `A_SOURCE_FILE_RECEIVED` records for legacy exported files: ```sql -- Step 1: Export legacy data to ODS/DATA BEGIN DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE( pSchemaName => 'OU_TOP', pTableName => 'AGGREGATED_ALLOTMENT', pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', pBucketArea => 'DATA', pFolderName => 'legacy_export', pMinDate => DATE '2024-01-01', pMaxDate => DATE '2024-12-31' ); END; / -- Step 2: List exported CSV files SELECT object_name, time_created, bytes FROM TABLE(MRDS_LOADER.cloud_wrapper.list_objects( credential_name => 'DEF_CRED_ARN', location_uri => 'https://objectstorage.eu-frankfurt-1.oraclecloud.com/n/frtgjxu7zl7c/b/data/o/' )) WHERE object_name LIKE 'ODS/legacy_export/AGGREGATED_ALLOTMENT_%'; -- Step 3: Manually register each file in A_SOURCE_FILE_RECEIVED -- (Requires source configuration for AGGREGATED_ALLOTMENT to exist) INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED ( A_SOURCE_FILE_RECEIVED_KEY, A_SOURCE_FILE_CONFIG_KEY, SOURCE_FILE_NAME, PROCESSING_STATUS, RECEPTION_DATE, BYTES, CHECKSUM, EXTERNAL_TABLE_NAME ) VALUES ( A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL, (SELECT A_SOURCE_FILE_CONFIG_KEY FROM A_SOURCE_FILE_CONFIG WHERE SOURCE_FILE_ID = 'AGGREGATED_ALLOTMENT' AND SOURCE_FILE_TYPE = 'INPUT'), 'ODS/legacy_export/AGGREGATED_ALLOTMENT_202401.csv', 'INGESTED', -- Skip validation, mark as already ingested DATE '2024-01-15', 1048576, -- File size in bytes 'manual_registration', NULL -- No external table needed ); -- Repeat for all exported CSV files COMMIT; -- Step 4: Now FILE_ARCHIVER can process these files BEGIN FILE_ARCHIVER.ARCHIVE_TABLE_DATA(pSourceFileConfig => vConfig); END; / ``` #### Strategy 2: Direct Archive Export (Bypass ODS) Skip ODS/DATA bucket entirely - export directly to ARCHIVE bucket in Parquet format: ```sql -- Export legacy data directly to ARCHIVE bucket BEGIN DATA_EXPORTER.EXPORT_TABLE_DATA_BY_DATE( pSchemaName => 'OU_TOP', pTableName => 'AGGREGATED_ALLOTMENT', pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', pBucketArea => 'ARCHIVE', -- Direct to archive pFolderName => 'legacy_direct', pMinDate => DATE '2020-01-01', pMaxDate => DATE '2024-12-31' ); END; / -- Result: Parquet files with Hive partitioning in ARCHIVE bucket -- Files: ARCHIVE/legacy_direct/PARTITION_YEAR=2024/PARTITION_MONTH=01/*.parquet -- No A_SOURCE_FILE_RECEIVED records needed (archival already complete) ``` **Pros**: - Simple, no manual registration - Data already in final archive format (Parquet) - Hive-style partitioning automatically applied **Cons**: - No record in A_SOURCE_FILE_RECEIVED (tracking gap) - Cannot use FILE_ARCHIVER features (archival strategies, status tracking) - Mixed folder structure (legacy_direct vs. standard source/table paths) #### Strategy 3: Hybrid Approach (Recommended for Large Datasets) Use DATA_EXPORTER for initial export, create minimal A_SOURCE_FILE_RECEIVED records programmatically: ```sql -- Create helper procedure to register legacy exports CREATE OR REPLACE PROCEDURE REGISTER_LEGACY_EXPORT ( pSourceFileId VARCHAR2, pBucketArea VARCHAR2, pFolderName VARCHAR2, pFilePattern VARCHAR2, pReceptionDate DATE DEFAULT SYSDATE ) AS vConfigKey NUMBER; vBucketUri VARCHAR2(500); vPrefix VARCHAR2(500); BEGIN -- Get source configuration SELECT A_SOURCE_FILE_CONFIG_KEY INTO vConfigKey FROM CT_MRDS.A_SOURCE_FILE_CONFIG WHERE SOURCE_FILE_ID = pSourceFileId AND SOURCE_FILE_TYPE = 'INPUT'; -- Get bucket URI vBucketUri := CT_MRDS.ENV_MANAGER.GET_BUCKET_URI(pBucketArea); vPrefix := 'ODS/' || pFolderName || '/'; -- Register all matching files FOR rec IN ( SELECT object_name, bytes, etag FROM TABLE(MRDS_LOADER.cloud_wrapper.list_objects( credential_name => 'DEF_CRED_ARN', location_uri => vBucketUri )) WHERE object_name LIKE vPrefix || pFilePattern ) LOOP INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED ( A_SOURCE_FILE_RECEIVED_KEY, A_SOURCE_FILE_CONFIG_KEY, SOURCE_FILE_NAME, PROCESSING_STATUS, RECEPTION_DATE, BYTES, CHECKSUM ) VALUES ( A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL, vConfigKey, rec.object_name, 'INGESTED', pReceptionDate, rec.bytes, rec.etag ); END LOOP; COMMIT; DBMS_OUTPUT.PUT_LINE('Registered ' || SQL%ROWCOUNT || ' legacy files'); END; / -- Usage: BEGIN -- After DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE completes REGISTER_LEGACY_EXPORT( pSourceFileId => 'AGGREGATED_ALLOTMENT', pBucketArea => 'DATA', pFolderName => 'legacy_export', pFilePattern => 'AGGREGATED_ALLOTMENT_2024%.csv', pReceptionDate => DATE '2024-12-31' ); -- Now FILE_ARCHIVER can process FILE_ARCHIVER.ARCHIVE_TABLE_DATA(pSourceFileConfig => vConfig); END; / ``` ## Migration Timeline and Coexistence ### Phase 1: Legacy System Only (Before Migration) - All data loaded via Informatica + WLA - Control table: `CT_ODS.A_LOAD_HISTORY` - No `A_SOURCE_FILE_RECEIVED` records ### Phase 2: Parallel Operation (During Migration) - **Old data**: Continue using Informatica + WLA → A_LOAD_HISTORY - **New data**: Start using Airflow + DBT → A_SOURCE_FILE_RECEIVED + A_WORKFLOW_HISTORY - **Challenge**: Different control tables for different data vintages ### Phase 3: New System Only (After Migration) - All new data via Airflow + DBT - Legacy data archived (one-time export using DATA_EXPORTER) - Control tables: `CT_MRDS.A_SOURCE_FILE_RECEIVED`, `CT_MRDS.A_WORKFLOW_HISTORY` ## Recommendations ### For New Data (Airflow + DBT) ✅ Use standard workflow: 1. FILE_MANAGER.PROCESS_SOURCE_FILE (creates A_SOURCE_FILE_RECEIVED) 2. Airflow + DBT processing (creates A_WORKFLOW_HISTORY) 3. FILE_ARCHIVER.ARCHIVE_TABLE_DATA (uses both control tables) ### For Legacy Data Migration ✅ **Small Datasets (<1000 files)**: Strategy 1 (Manual Registration) ✅ **Large Datasets (>1000 files)**: Strategy 2 (Direct to ARCHIVE) or Strategy 3 (Hybrid) ❌ **Avoid**: Exporting to ODS/DATA without registration (orphaned files, cannot archive) ### Configuration Requirements Before archiving legacy data, ensure source configuration exists: ```sql -- Check if configuration exists SELECT A_SOURCE_FILE_CONFIG_KEY, SOURCE_FILE_ID, TABLE_ID, ARCHIVAL_STRATEGY FROM CT_MRDS.A_SOURCE_FILE_CONFIG WHERE SOURCE_FILE_ID = 'YOUR_SOURCE_FILE_ID'; -- If missing, create configuration CALL FILE_MANAGER.ADD_SOURCE_FILE_CONFIG( pSourceKey => 'YOUR_SOURCE', pSourceFileType => 'INPUT', pSourceFileId => 'YOUR_SOURCE_FILE_ID', pSourceFileDesc => 'Legacy migrated data', pSourceFileNamePattern => 'pattern_*.csv', pTableId => 'YOUR_TABLE_ID', pTemplateTableName => 'CT_ET_TEMPLATES.YOUR_TEMPLATE' ); ``` ## Known Limitations ### 1. No Retroactive A_SOURCE_FILE_RECEIVED Creation DATA_EXPORTER does not automatically create A_SOURCE_FILE_RECEIVED records when exporting legacy data. This is by design - it's a one-time export tool, not a file tracking system. ### 2. FILE_ARCHIVER Requires A_SOURCE_FILE_RECEIVED FILE_ARCHIVER cannot archive data without corresponding A_SOURCE_FILE_RECEIVED records. This prevents archiving of: - Legacy Informatica-loaded data exported via DATA_EXPORTER - Manually uploaded files not processed through FILE_MANAGER.PROCESS_SOURCE_FILE ### 3. Mixed Control Table References During migration period, some procedures reference A_LOAD_HISTORY (DATA_EXPORTER) while others reference A_WORKFLOW_HISTORY (FILE_ARCHIVER). This is intentional but requires careful understanding of data lineage. ### 4. A_WORKFLOW_HISTORY vs A_LOAD_HISTORY Column Mismatch The control tables have different schemas: - **A_LOAD_HISTORY**: `LOAD_START`, `A_ETL_LOAD_SET_KEY` - **A_WORKFLOW_HISTORY**: `WORKFLOW_START`, `A_WORKFLOW_HISTORY_KEY` Test scripts must be aware of which table is being used. ## Related Documentation - [PROCESS_SOURCE_FILE Guide](PROCESS_SOURCE_FILE_Guide.md) - File validation and ingestion workflow - [FILE_ARCHIVER Guide](FILE_ARCHIVER_Guide.md) - Archival strategies and configuration - [FILE_MANAGER Configuration Guide](FILE_MANAGER_Configuration_Guide.md) - System configuration - [Package Deployment Guide](Package_Deployment_Guide.md) - Deployment procedures ## Summary The migration from Informatica + WLA to Airflow + DBT introduces new control tables (`A_SOURCE_FILE_RECEIVED`, `A_WORKFLOW_HISTORY`) while maintaining compatibility with legacy control tables (`A_LOAD_HISTORY`). Understanding the relationship between these tables is critical for: - **Data Lineage**: Tracking which system processed which data - **Export Operations**: Choosing appropriate DATA_EXPORTER procedures - **Archival Operations**: Ensuring FILE_ARCHIVER has required metadata - **Testing**: Using correct control tables in test scenarios The recommended approach for legacy data migration is **Strategy 2 (Direct to ARCHIVE)** for large datasets, as it avoids the complexity of manual A_SOURCE_FILE_RECEIVED registration while achieving the goal of moving historical data to long-term archival storage.