# System Migration: Informatica + WLA → Airflow + DBT This document describes the migration from the legacy Informatica + WLA data processing system to the new Airflow + DBT architecture, including control table differences, data export strategies, and known limitations. ## Migration Overview The MRDS (Market Reference Data System) is undergoing a fundamental technology migration: **Legacy System (Informatica + WLA):** - ETL Tool: Informatica PowerCenter - Workflow Orchestration: WLA (Workflow Automation) - Control Schema: `CT_ODS` (Operational Data Store Control) - Primary Control Table: `CT_ODS.A_LOAD_HISTORY` - Key Column: `A_ETL_LOAD_SET_KEY` **New System (Airflow + DBT):** - Orchestration: Apache Airflow - Transformation: DBT (Data Build Tool) - Control Schema: `CT_MRDS` (MRDS Control) - Primary Control Tables: `CT_MRDS.A_SOURCE_FILE_RECEIVED`, `CT_MRDS.A_WORKFLOW_HISTORY` - Key Column: `A_WORKFLOW_HISTORY_KEY` ## Control Table Architecture ### Legacy System: CT_ODS.A_LOAD_HISTORY **Purpose**: Tracks Informatica PowerCenter workflow executions **Structure**: ```sql DESC CT_ODS.A_LOAD_HISTORY; Name Type ______________________ _____________________ A_ETL_LOAD_SET_KEY NUMBER(38) -- Primary key WORKFLOW_NAME VARCHAR2(255) -- Informatica workflow name INFA_RUN_ID NUMBER(38) -- Informatica run ID LOAD_START TIMESTAMP(6) -- Workflow start time LOAD_END TIMESTAMP(6) -- Workflow end time EXDI_APPL_REQ_ID VARCHAR2(255) EXDI_CORRELATION_ID VARCHAR2(255) LOAD_SUCCESSFUL CHAR(1) -- Y/N success flag WLA_RUN_ID NUMBER(28) -- WLA run ID DQ_FLAG VARCHAR2(5) -- Data quality flag ``` **Usage Pattern**: - Created by Informatica workflows during ETL execution - Used for temporal partitioning in DATA_EXPORTER - Referenced via `A_ETL_LOAD_SET_KEY_FK` foreign key in data tables ### New System: CT_MRDS Control Tables #### 1. A_SOURCE_FILE_RECEIVED **Purpose**: Tracks individual file processing through the complete lifecycle **Key Columns**: ```sql A_SOURCE_FILE_RECEIVED_KEY NUMBER -- Primary key SOURCE_FILE_NAME VARCHAR2 -- Full OCI path PROCESSING_STATUS VARCHAR2 -- Status tracking RECEPTION_DATE DATE -- File arrival timestamp PARTITION_YEAR VARCHAR2(4) -- Archive partition (year) PARTITION_MONTH VARCHAR2(2) -- Archive partition (month) ARCH_FILE_NAME VARCHAR2 -- Parquet archive file path ``` **Status Workflow**: ``` RECEIVED → VALIDATED → READY_FOR_INGESTION → INGESTED → ARCHIVED_AND_TRASHED → ARCHIVED_AND_PURGED (optional) Note: Legacy ARCHIVED status maintained for backward compatibility ``` **Usage Pattern**: - Created by FILE_MANAGER.PROCESS_SOURCE_FILE during file validation - Updated throughout file lifecycle (validation, ingestion, archival) - Required by FILE_ARCHIVER for archival operations - Links to A_WORKFLOW_HISTORY via workflow execution #### 2. A_WORKFLOW_HISTORY **Purpose**: Tracks Airflow + DBT workflow executions (similar role to A_LOAD_HISTORY) **Key Columns**: ```sql A_WORKFLOW_HISTORY_KEY NUMBER -- Primary key WORKFLOW_NAME VARCHAR2 -- Airflow DAG name WORKFLOW_START TIMESTAMP -- Workflow start time WORKFLOW_END TIMESTAMP -- Workflow end time WORKFLOW_SUCCESSFUL CHAR(1) -- Y/N success flag ``` **Usage Pattern**: - Created by Airflow + DBT workflows during data processing - Used for temporal partitioning decisions in FILE_ARCHIVER - Referenced via `A_WORKFLOW_HISTORY_KEY_FK` foreign key in data tables ## Data Export Strategies ### Scenario 1: Legacy Data Export (Informatica → ODS) **Use Case**: One-time migration of historical data loaded by Informatica **Package**: `DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE` **Control Table**: Uses `CT_ODS.A_LOAD_HISTORY` for temporal partitioning **Example**: ```sql -- Export historical AGGREGATED_ALLOTMENT data (loaded by Informatica) BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE( pSchemaName => 'OU_TOP', pTableName => 'AGGREGATED_ALLOTMENT', pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', -- Links to A_LOAD_HISTORY pBucketArea => 'DATA', pFolderName => 'legacy_migration', pMinDate => DATE '2020-01-01', pMaxDate => DATE '2024-12-31' ); END; / ``` **Result**: CSV files in ODS bucket (DATA area), partitioned by LOAD_START from A_LOAD_HISTORY ### Scenario 2: New System Data (Airflow + DBT → ODS → ARCHIVE) **Use Case**: Ongoing processing with new Airflow + DBT system **Workflow**: 1. **File Arrival**: FILES → INBOX bucket 2. **Validation**: FILE_MANAGER.PROCESS_SOURCE_FILE → creates A_SOURCE_FILE_RECEIVED 3. **Processing**: Airflow + DBT → creates A_WORKFLOW_HISTORY 4. **Export**: DATA_EXPORTER → moves to ODS bucket (DATA area) 5. **Archival**: FILE_ARCHIVER → moves to ARCHIVE bucket (Parquet with Hive partitioning) **Control Tables**: Uses both A_SOURCE_FILE_RECEIVED and A_WORKFLOW_HISTORY **Example**: ```sql -- Archival of data processed by Airflow + DBT BEGIN CT_MRDS.FILE_ARCHIVER.ARCHIVE_TABLE_DATA( pSourceFileConfig => vConfig -- Requires A_SOURCE_FILE_RECEIVED records ); END; / ``` ## Legacy Data Archival ### FILE_ARCHIVER Requirement ⚠️ **IMPORTANT**: FILE_ARCHIVER requires records in `A_SOURCE_FILE_RECEIVED` table to track and manage archival lifecycle. **For new system data (Airflow + DBT)**: - Records automatically created by `FILE_MANAGER.PROCESS_SOURCE_FILE` - No additional steps needed **For legacy data (Informatica + WLA)**: - Historical data requires registration in `A_SOURCE_FILE_RECEIVED` - ✅ **SOLUTION**: Use DATA_EXPORTER v2.9.0+ with `pRegisterExport => TRUE` parameter - Automatically registers exported files with proper metadata (size, checksum, location) ### Export Strategies for Legacy Data #### Strategy 1: Automatic Registration (Recommended) ✅ **DATA_EXPORTER v2.9.0+** supports automatic file registration via `pRegisterExport` parameter. **Benefits**: - Simple, one-step export with automatic registration - Files tracked in `A_SOURCE_FILE_RECEIVED` (enables FILE_ARCHIVER processing) - Proper metadata capture (file size, checksum, location, timestamps) - Standard workflow integration (archival strategies, status tracking) **Example - CSV Export with Registration**: ```sql -- Export with automatic registration (DATA_EXPORTER v2.9.0+) BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE( pSchemaName => 'OU_TOP', pTableName => 'AGGREGATED_ALLOTMENT', pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', pBucketArea => 'DATA', pFolderName => 'legacy_export', pMinDate => DATE '2024-01-01', pMaxDate => DATE '2024-12-31', pRegisterExport => TRUE, -- ✓ Automatically registers files pProcessName => 'LEGACY_MIGRATION' ); END; / -- Files now registered in A_SOURCE_FILE_RECEIVED with: -- - SOURCE_FILE_NAME: Full OCI path -- - PROCESSING_STATUS: 'INGESTED' -- - BYTES: Actual file size -- - CHECKSUM: File ETag from OCI -- - PROCESS_NAME: 'LEGACY_MIGRATION' -- Now FILE_ARCHIVER can process these files BEGIN CT_MRDS.FILE_ARCHIVER.ARCHIVE_TABLE_DATA( pSourceFileConfigKey => vConfigKey ); END; / ``` **Example - Single CSV Export with Registration**: ```sql -- For single file export (not partitioned by date) BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_TABLE_DATA( pSchemaName => 'CT_MRDS', pTableName => 'MY_TABLE', pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', pBucketArea => 'DATA', pFolderName => 'legacy_export', pFileName => 'my_table_export.csv', pTemplateTableName => 'CT_ET_TEMPLATES.MY_TEMPLATE', pRegisterExport => TRUE, -- ✓ Registers file pProcessName => 'LEGACY_MIGRATION' ); END; / ``` #### Strategy 2: Direct Archive Export (Bypass ODS) ⚠️ **Use when**: You want to skip the ODS bucket entirely and go straight to ARCHIVE Skip ODS/DATA bucket entirely - export directly to ARCHIVE bucket in Parquet format: ```sql -- Export legacy data directly to ARCHIVE bucket BEGIN DATA_EXPORTER.EXPORT_TABLE_DATA_BY_DATE( pSchemaName => 'OU_TOP', pTableName => 'AGGREGATED_ALLOTMENT', pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', pBucketArea => 'ARCHIVE', -- Direct to archive pFolderName => 'legacy_direct', pMinDate => DATE '2020-01-01', pMaxDate => DATE '2024-12-31' ); END; / -- Result: Parquet files with Hive partitioning in ARCHIVE bucket -- Files: ARCHIVE/legacy_direct/PARTITION_YEAR=2024/PARTITION_MONTH=01/*.parquet -- No A_SOURCE_FILE_RECEIVED records needed (archival already complete) ``` **Pros**: - Simple, no manual registration - Data already in final archive format (Parquet) - Hive-style partitioning automatically applied **Cons**: - No record in A_SOURCE_FILE_RECEIVED (tracking gap) - Cannot use FILE_ARCHIVER features (archival strategies, status tracking) - Mixed folder structure (legacy_direct vs. standard source/table paths) #### Strategy 3: Hybrid Approach (Recommended for Large Datasets) Use DATA_EXPORTER for initial export, create minimal A_SOURCE_FILE_RECEIVED records programmatically: ```sql -- Create helper procedure to register legacy exports CREATE OR REPLACE PROCEDURE REGISTER_LEGACY_EXPORT ( pSourceFileId VARCHAR2, pBucketArea VARCHAR2, pFolderName VARCHAR2, pFilePattern VARCHAR2, pReceptionDate DATE DEFAULT SYSDATE ) AS vConfigKey NUMBER; vBucketUri VARCHAR2(500); vPrefix VARCHAR2(500); BEGIN -- Get source configuration SELECT A_SOURCE_FILE_CONFIG_KEY INTO vConfigKey FROM CT_MRDS.A_SOURCE_FILE_CONFIG WHERE SOURCE_FILE_ID = pSourceFileId AND SOURCE_FILE_TYPE = 'INPUT'; -- Get bucket URI vBucketUri := CT_MRDS.ENV_MANAGER.GET_BUCKET_URI(pBucketArea); vPrefix := 'ODS/' || pFolderName || '/'; -- Register all matching files FOR rec IN ( SELECT object_name, bytes, etag FROM TABLE(MRDS_LOADER.cloud_wrapper.list_objects( credential_name => 'DEF_CRED_ARN', location_uri => vBucketUri )) WHERE object_name LIKE vPrefix || pFilePattern ) LOOP INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED ( A_SOURCE_FILE_RECEIVED_KEY, A_SOURCE_FILE_CONFIG_KEY, SOURCE_FILE_NAME, PROCESSING_STATUS, RECEPTION_DATE, BYTES, CHECKSUM ) VALUES ( A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL, vConfigKey, rec.object_name, 'INGESTED', pReceptionDate, rec.bytes, rec.etag ); END LOOP; COMMIT; DBMS_OUTPUT.PUT_LINE('Registered ' || SQL%ROWCOUNT || ' legacy files'); END; / -- Usage: BEGIN -- After DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE completes REGISTER_LEGACY_EXPORT( pSourceFileId => 'AGGREGATED_ALLOTMENT', pBucketArea => 'DATA', pFolderName => 'legacy_export', pFilePattern => 'AGGREGATED_ALLOTMENT_2024%.csv', pReceptionDate => DATE '2024-12-31' ); -- Now FILE_ARCHIVER can process FILE_ARCHIVER.ARCHIVE_TABLE_DATA(pSourceFileConfig => vConfig); END; / ``` ## Migration Timeline and Coexistence ### Phase 1: Legacy System Only (Before Migration) - All data loaded via Informatica + WLA - Control table: `CT_ODS.A_LOAD_HISTORY` - No `A_SOURCE_FILE_RECEIVED` records ### Phase 2: Parallel Operation (During Migration) - **Old data**: Continue using Informatica + WLA → A_LOAD_HISTORY - **New data**: Start using Airflow + DBT → A_SOURCE_FILE_RECEIVED + A_WORKFLOW_HISTORY - **Challenge**: Different control tables for different data vintages ### Phase 3: New System Only (After Migration) - All new data via Airflow + DBT - Legacy data archived (one-time export using DATA_EXPORTER) - Control tables: `CT_MRDS.A_SOURCE_FILE_RECEIVED`, `CT_MRDS.A_WORKFLOW_HISTORY` ## Recommendations ### For New Data (Airflow + DBT) ✅ Use standard workflow: 1. FILE_MANAGER.PROCESS_SOURCE_FILE (creates A_SOURCE_FILE_RECEIVED) 2. Airflow + DBT processing (creates A_WORKFLOW_HISTORY) 3. FILE_ARCHIVER.ARCHIVE_TABLE_DATA (uses both control tables) ### For Legacy Data Migration ✅ **Small Datasets (<1000 files)**: Strategy 1 (Manual Registration) ✅ **Large Datasets (>1000 files)**: Strategy 2 (Direct to ARCHIVE) or Strategy 3 (Hybrid) ❌ **Avoid**: Exporting to ODS/DATA without registration (orphaned files, cannot archive) ### Configuration Requirements Before archiving legacy data, ensure source configuration exists: ```sql -- Check if configuration exists SELECT A_SOURCE_FILE_CONFIG_KEY, SOURCE_FILE_ID, TABLE_ID, ARCHIVAL_STRATEGY FROM CT_MRDS.A_SOURCE_FILE_CONFIG WHERE SOURCE_FILE_ID = 'YOUR_SOURCE_FILE_ID'; -- If missing, create configuration CALL FILE_MANAGER.ADD_SOURCE_FILE_CONFIG( pSourceKey => 'YOUR_SOURCE', pSourceFileType => 'INPUT', pSourceFileId => 'YOUR_SOURCE_FILE_ID', pSourceFileDesc => 'Legacy migrated data', pSourceFileNamePattern => 'pattern_*.csv', pTableId => 'YOUR_TABLE_ID', pTemplateTableName => 'CT_ET_TEMPLATES.YOUR_TEMPLATE' ); ``` ## Known Limitations ### 1. FILE_ARCHIVER Requires A_SOURCE_FILE_RECEIVED FILE_ARCHIVER cannot archive data without corresponding A_SOURCE_FILE_RECEIVED records. **Solutions**: - ✅ **New system data**: Automatically registered via `FILE_MANAGER.PROCESS_SOURCE_FILE` - ✅ **Legacy data exports**: Use `DATA_EXPORTER` with `pRegisterExport => TRUE` (v2.9.0+) - ⚠️ **Manual uploads**: Must be registered via `FILE_MANAGER.PROCESS_SOURCE_FILE` or manual INSERT ### 2. Mixed Control Table References During migration period, some procedures reference A_LOAD_HISTORY (DATA_EXPORTER) while others reference A_WORKFLOW_HISTORY (FILE_ARCHIVER). This is intentional but requires careful understanding of data lineage. ### 3. A_WORKFLOW_HISTORY vs A_LOAD_HISTORY Column Mismatch The control tables have different schemas: - **A_LOAD_HISTORY**: `LOAD_START`, `A_ETL_LOAD_SET_KEY` - **A_WORKFLOW_HISTORY**: `WORKFLOW_START`, `A_WORKFLOW_HISTORY_KEY` Test scripts must be aware of which table is being used. ## Related Documentation - [PROCESS_SOURCE_FILE Guide](PROCESS_SOURCE_FILE_Guide.md) - File validation and ingestion workflow - [FILE_ARCHIVER Guide](FILE_ARCHIVER_Guide.md) - Archival strategies and configuration - [FILE_MANAGER Configuration Guide](FILE_MANAGER_Configuration_Guide.md) - System configuration - [Package Deployment Guide](Package_Deployment_Guide.md) - Deployment procedures ## Summary The migration from Informatica + WLA to Airflow + DBT introduces new control tables (`A_SOURCE_FILE_RECEIVED`, `A_WORKFLOW_HISTORY`) while maintaining compatibility with legacy control tables (`A_LOAD_HISTORY`). Understanding the relationship between these tables is critical for: - **Data Lineage**: Tracking which system processed which data - **Export Operations**: Choosing appropriate DATA_EXPORTER procedures - **Archival Operations**: Ensuring FILE_ARCHIVER has required metadata - **Testing**: Using correct control tables in test scenarios **Recommended Approach for Legacy Data Migration**: 1. ✅ **Strategy 1 (Automatic Registration)** - Use `DATA_EXPORTER` with `pRegisterExport => TRUE` to automatically register files in `A_SOURCE_FILE_RECEIVED`, enabling full FILE_ARCHIVER workflow (archival strategies, status tracking, rollback capabilities) 2. ⚠️ **Strategy 2 (Direct to ARCHIVE)** - Export directly to ARCHIVE bucket to bypass ODS bucket entirely and avoid registration requirements (use when tracking is not needed)