Files
mars/confluence/System_Migration_Informatica_to_Airflow_DBT.md

16 KiB

System Migration: Informatica + WLA → Airflow + DBT

This document describes the migration from the legacy Informatica + WLA data processing system to the modern Airflow + DBT architecture, including control table differences, data export strategies, and known limitations.

Migration Overview

The MRDS (Market Reference Data System) is undergoing a fundamental technology migration:

Legacy System (Informatica + WLA):

  • ETL Tool: Informatica PowerCenter
  • Workflow Orchestration: WLA (Workflow Automation)
  • Control Schema: CT_ODS (Operational Data Store Control)
  • Primary Control Table: CT_ODS.A_LOAD_HISTORY
  • Key Column: A_ETL_LOAD_SET_KEY

Modern System (Airflow + DBT):

  • Orchestration: Apache Airflow
  • Transformation: DBT (Data Build Tool)
  • Control Schema: CT_MRDS (MRDS Control)
  • Primary Control Tables: CT_MRDS.A_SOURCE_FILE_RECEIVED, CT_MRDS.A_WORKFLOW_HISTORY
  • Key Column: A_WORKFLOW_HISTORY_KEY

Control Table Architecture

Legacy System: CT_ODS.A_LOAD_HISTORY

Purpose: Tracks Informatica PowerCenter workflow executions

Structure:

DESC CT_ODS.A_LOAD_HISTORY;

Name                   Type
______________________ _____________________
A_ETL_LOAD_SET_KEY     NUMBER(38)         -- Primary key
WORKFLOW_NAME          VARCHAR2(255)       -- Informatica workflow name
INFA_RUN_ID            NUMBER(38)          -- Informatica run ID
LOAD_START             TIMESTAMP(6)        -- Workflow start time
LOAD_END               TIMESTAMP(6)        -- Workflow end time
EXDI_APPL_REQ_ID       VARCHAR2(255)
EXDI_CORRELATION_ID    VARCHAR2(255)
LOAD_SUCCESSFUL        CHAR(1)             -- Y/N success flag
WLA_RUN_ID             NUMBER(28)          -- WLA run ID
DQ_FLAG                VARCHAR2(5)         -- Data quality flag

Usage Pattern:

  • Created by Informatica workflows during ETL execution
  • Used for temporal partitioning in DATA_EXPORTER
  • Referenced via A_ETL_LOAD_SET_KEY_FK foreign key in data tables

Modern System: CT_MRDS Control Tables

1. A_SOURCE_FILE_RECEIVED

Purpose: Tracks individual file processing through the complete lifecycle

Key Columns:

A_SOURCE_FILE_RECEIVED_KEY   NUMBER        -- Primary key
SOURCE_FILE_NAME             VARCHAR2      -- Full OCI path
PROCESSING_STATUS            VARCHAR2      -- Status tracking
RECEPTION_DATE               DATE          -- File arrival timestamp
PARTITION_YEAR               VARCHAR2(4)   -- Archive partition (year)
PARTITION_MONTH              VARCHAR2(2)   -- Archive partition (month)
ARCH_FILE_NAME               VARCHAR2      -- Parquet archive file path

Status Workflow:

RECEIVED → VALIDATED → READY_FOR_INGESTION → INGESTED → ARCHIVED_AND_TRASHED → ARCHIVED_AND_PURGED (optional)

Note: Legacy ARCHIVED status maintained for backward compatibility

Usage Pattern:

  • Created by FILE_MANAGER.PROCESS_SOURCE_FILE during file validation
  • Updated throughout file lifecycle (validation, ingestion, archival)
  • Required by FILE_ARCHIVER for archival operations
  • Links to A_WORKFLOW_HISTORY via workflow execution

2. A_WORKFLOW_HISTORY

Purpose: Tracks Airflow + DBT workflow executions (similar role to A_LOAD_HISTORY)

Key Columns:

A_WORKFLOW_HISTORY_KEY       NUMBER        -- Primary key
WORKFLOW_NAME                VARCHAR2      -- Airflow DAG name
WORKFLOW_START               TIMESTAMP     -- Workflow start time
WORKFLOW_END                 TIMESTAMP     -- Workflow end time
WORKFLOW_SUCCESSFUL          CHAR(1)       -- Y/N success flag

Usage Pattern:

  • Created by Airflow + DBT workflows during data processing
  • Used for temporal partitioning decisions in FILE_ARCHIVER
  • Referenced via A_WORKFLOW_HISTORY_KEY_FK foreign key in data tables

Data Export Strategies

Scenario 1: Legacy Data Export (Informatica → ODS)

Use Case: One-time migration of historical data loaded by Informatica

Package: DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE

Control Table: Uses CT_ODS.A_LOAD_HISTORY for temporal partitioning

Example:

-- Export historical AGGREGATED_ALLOTMENT data (loaded by Informatica)
BEGIN
    CT_MRDS.DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
        pSchemaName     => 'OU_TOP',
        pTableName      => 'AGGREGATED_ALLOTMENT',
        pKeyColumnName  => 'A_ETL_LOAD_SET_KEY_FK',  -- Links to A_LOAD_HISTORY
        pBucketArea     => 'DATA',
        pFolderName     => 'legacy_migration',
        pMinDate        => DATE '2020-01-01',
        pMaxDate        => DATE '2024-12-31'
    );
END;
/

Result: CSV files in ODS bucket (DATA area), partitioned by LOAD_START from A_LOAD_HISTORY

Scenario 2: Modern System Data (Airflow + DBT → ODS → ARCHIVE)

Use Case: Ongoing processing with new Airflow + DBT system

Workflow:

  1. File Arrival: FILES → INBOX bucket
  2. Validation: FILE_MANAGER.PROCESS_SOURCE_FILE → creates A_SOURCE_FILE_RECEIVED
  3. Processing: Airflow + DBT → creates A_WORKFLOW_HISTORY
  4. Export: DATA_EXPORTER → moves to ODS bucket (DATA area)
  5. Archival: FILE_ARCHIVER → moves to ARCHIVE bucket (Parquet with Hive partitioning)

Control Tables: Uses both A_SOURCE_FILE_RECEIVED and A_WORKFLOW_HISTORY

Example:

-- Archival of data processed by Airflow + DBT
BEGIN
    CT_MRDS.FILE_ARCHIVER.ARCHIVE_TABLE_DATA(
        pSourceFileConfig => vConfig  -- Requires A_SOURCE_FILE_RECEIVED records
    );
END;
/

Critical Gap: Legacy Data Archival

Problem Statement

Scenario: Historical data exported using DATA_EXPORTER from Informatica-loaded tables

Issue: FILE_ARCHIVER requires records in A_SOURCE_FILE_RECEIVED, but legacy exports don't create them

Impact: Legacy data exported to ODS/DATA bucket CANNOT be archived to ARCHIVE bucket using FILE_ARCHIVER

Technical Analysis

DATA_EXPORTER Behavior:

-- Uses A_LOAD_HISTORY for partitioning (Informatica workflows)
SELECT DISTINCT TO_CHAR(L.LOAD_START,'YYYY') AS YR, 
                TO_CHAR(L.LOAD_START,'MM') AS MN
FROM OU_TOP.AGGREGATED_ALLOTMENT T, CT_ODS.A_LOAD_HISTORY L  
WHERE T.A_ETL_LOAD_SET_KEY_FK = L.A_ETL_LOAD_SET_KEY
  AND L.LOAD_START >= :pMinDate 
  AND L.LOAD_START < :pMaxDate;

-- Creates CSV files: ODS/legacy_migration/AGGREGATED_ALLOTMENT_YYYYMM.csv
-- Does NOT create A_SOURCE_FILE_RECEIVED records

FILE_ARCHIVER Requirement:

-- Joins A_SOURCE_FILE_RECEIVED with A_WORKFLOW_HISTORY
JOIN CT_MRDS.A_SOURCE_FILE_RECEIVED r
  ON r.A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfig.A_SOURCE_FILE_CONFIG_KEY
 AND r.PROCESSING_STATUS = 'INGESTED';

-- Without A_SOURCE_FILE_RECEIVED records, archival CANNOT proceed

Workaround Strategies

Manually create A_SOURCE_FILE_RECEIVED records for legacy exported files:

-- Step 1: Export legacy data to ODS/DATA
BEGIN
    DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
        pSchemaName => 'OU_TOP',
        pTableName => 'AGGREGATED_ALLOTMENT',
        pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK',
        pBucketArea => 'DATA',
        pFolderName => 'legacy_export',
        pMinDate => DATE '2024-01-01',
        pMaxDate => DATE '2024-12-31'
    );
END;
/

-- Step 2: List exported CSV files
SELECT object_name, time_created, bytes
FROM TABLE(MRDS_LOADER.cloud_wrapper.list_objects(
    credential_name => 'DEF_CRED_ARN',
    location_uri => 'https://objectstorage.eu-frankfurt-1.oraclecloud.com/n/frtgjxu7zl7c/b/data/o/'
)) WHERE object_name LIKE 'ODS/legacy_export/AGGREGATED_ALLOTMENT_%';

-- Step 3: Manually register each file in A_SOURCE_FILE_RECEIVED
-- (Requires source configuration for AGGREGATED_ALLOTMENT to exist)
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
    A_SOURCE_FILE_RECEIVED_KEY,
    A_SOURCE_FILE_CONFIG_KEY,
    SOURCE_FILE_NAME,
    PROCESSING_STATUS,
    RECEPTION_DATE,
    BYTES,
    CHECKSUM,
    EXTERNAL_TABLE_NAME
) VALUES (
    A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL,
    (SELECT A_SOURCE_FILE_CONFIG_KEY FROM A_SOURCE_FILE_CONFIG 
     WHERE SOURCE_FILE_ID = 'AGGREGATED_ALLOTMENT' AND SOURCE_FILE_TYPE = 'INPUT'),
    'ODS/legacy_export/AGGREGATED_ALLOTMENT_202401.csv',
    'INGESTED',  -- Skip validation, mark as already ingested
    DATE '2024-01-15',
    1048576,  -- File size in bytes
    'manual_registration',
    NULL  -- No external table needed
);
-- Repeat for all exported CSV files
COMMIT;

-- Step 4: Now FILE_ARCHIVER can process these files
BEGIN
    FILE_ARCHIVER.ARCHIVE_TABLE_DATA(pSourceFileConfig => vConfig);
END;
/

Strategy 2: Direct Archive Export (Bypass ODS)

Skip ODS/DATA bucket entirely - export directly to ARCHIVE bucket in Parquet format:

-- Export legacy data directly to ARCHIVE bucket
BEGIN
    DATA_EXPORTER.EXPORT_TABLE_DATA_BY_DATE(
        pSchemaName => 'OU_TOP',
        pTableName => 'AGGREGATED_ALLOTMENT',
        pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK',
        pBucketArea => 'ARCHIVE',  -- Direct to archive
        pFolderName => 'legacy_direct',
        pMinDate => DATE '2020-01-01',
        pMaxDate => DATE '2024-12-31'
    );
END;
/
-- Result: Parquet files with Hive partitioning in ARCHIVE bucket
-- Files: ARCHIVE/legacy_direct/PARTITION_YEAR=2024/PARTITION_MONTH=01/*.parquet
-- No A_SOURCE_FILE_RECEIVED records needed (archival already complete)

Pros:

  • Simple, no manual registration
  • Data already in final archive format (Parquet)
  • Hive-style partitioning automatically applied

Cons:

  • No record in A_SOURCE_FILE_RECEIVED (tracking gap)
  • Cannot use FILE_ARCHIVER features (archival strategies, status tracking)
  • Mixed folder structure (legacy_direct vs. standard source/table paths)

Use DATA_EXPORTER for initial export, create minimal A_SOURCE_FILE_RECEIVED records programmatically:

-- Create helper procedure to register legacy exports
CREATE OR REPLACE PROCEDURE REGISTER_LEGACY_EXPORT (
    pSourceFileId     VARCHAR2,
    pBucketArea       VARCHAR2,
    pFolderName       VARCHAR2,
    pFilePattern      VARCHAR2,
    pReceptionDate    DATE DEFAULT SYSDATE
) AS
    vConfigKey NUMBER;
    vBucketUri VARCHAR2(500);
    vPrefix    VARCHAR2(500);
BEGIN
    -- Get source configuration
    SELECT A_SOURCE_FILE_CONFIG_KEY 
    INTO vConfigKey
    FROM CT_MRDS.A_SOURCE_FILE_CONFIG
    WHERE SOURCE_FILE_ID = pSourceFileId 
      AND SOURCE_FILE_TYPE = 'INPUT';
    
    -- Get bucket URI
    vBucketUri := CT_MRDS.ENV_MANAGER.GET_BUCKET_URI(pBucketArea);
    vPrefix := 'ODS/' || pFolderName || '/';
    
    -- Register all matching files
    FOR rec IN (
        SELECT object_name, bytes, etag
        FROM TABLE(MRDS_LOADER.cloud_wrapper.list_objects(
            credential_name => 'DEF_CRED_ARN',
            location_uri => vBucketUri
        ))
        WHERE object_name LIKE vPrefix || pFilePattern
    ) LOOP
        INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
            A_SOURCE_FILE_RECEIVED_KEY,
            A_SOURCE_FILE_CONFIG_KEY,
            SOURCE_FILE_NAME,
            PROCESSING_STATUS,
            RECEPTION_DATE,
            BYTES,
            CHECKSUM
        ) VALUES (
            A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL,
            vConfigKey,
            rec.object_name,
            'INGESTED',
            pReceptionDate,
            rec.bytes,
            rec.etag
        );
    END LOOP;
    
    COMMIT;
    DBMS_OUTPUT.PUT_LINE('Registered ' || SQL%ROWCOUNT || ' legacy files');
END;
/

-- Usage:
BEGIN
    -- After DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE completes
    REGISTER_LEGACY_EXPORT(
        pSourceFileId => 'AGGREGATED_ALLOTMENT',
        pBucketArea => 'DATA',
        pFolderName => 'legacy_export',
        pFilePattern => 'AGGREGATED_ALLOTMENT_2024%.csv',
        pReceptionDate => DATE '2024-12-31'
    );
    
    -- Now FILE_ARCHIVER can process
    FILE_ARCHIVER.ARCHIVE_TABLE_DATA(pSourceFileConfig => vConfig);
END;
/

Migration Timeline and Coexistence

Phase 1: Legacy System Only (Before Migration)

  • All data loaded via Informatica + WLA
  • Control table: CT_ODS.A_LOAD_HISTORY
  • No A_SOURCE_FILE_RECEIVED records

Phase 2: Parallel Operation (During Migration)

  • Old data: Continue using Informatica + WLA → A_LOAD_HISTORY
  • New data: Start using Airflow + DBT → A_SOURCE_FILE_RECEIVED + A_WORKFLOW_HISTORY
  • Challenge: Different control tables for different data vintages

Phase 3: New System Only (After Migration)

  • All new data via Airflow + DBT
  • Legacy data archived (one-time export using DATA_EXPORTER)
  • Control tables: CT_MRDS.A_SOURCE_FILE_RECEIVED, CT_MRDS.A_WORKFLOW_HISTORY

Recommendations

For New Data (Airflow + DBT)

Use standard workflow:

  1. FILE_MANAGER.PROCESS_SOURCE_FILE (creates A_SOURCE_FILE_RECEIVED)
  2. Airflow + DBT processing (creates A_WORKFLOW_HISTORY)
  3. FILE_ARCHIVER.ARCHIVE_TABLE_DATA (uses both control tables)

For Legacy Data Migration

Small Datasets (<1000 files): Strategy 1 (Manual Registration) Large Datasets (>1000 files): Strategy 2 (Direct to ARCHIVE) or Strategy 3 (Hybrid) Avoid: Exporting to ODS/DATA without registration (orphaned files, cannot archive)

Configuration Requirements

Before archiving legacy data, ensure source configuration exists:

-- Check if configuration exists
SELECT A_SOURCE_FILE_CONFIG_KEY, SOURCE_FILE_ID, TABLE_ID, ARCHIVAL_STRATEGY
FROM CT_MRDS.A_SOURCE_FILE_CONFIG
WHERE SOURCE_FILE_ID = 'YOUR_SOURCE_FILE_ID';

-- If missing, create configuration
CALL FILE_MANAGER.ADD_SOURCE_FILE_CONFIG(
    pSourceKey => 'YOUR_SOURCE',
    pSourceFileType => 'INPUT',
    pSourceFileId => 'YOUR_SOURCE_FILE_ID',
    pSourceFileDesc => 'Legacy migrated data',
    pSourceFileNamePattern => 'pattern_*.csv',
    pTableId => 'YOUR_TABLE_ID',
    pTemplateTableName => 'CT_ET_TEMPLATES.YOUR_TEMPLATE'
);

Known Limitations

1. No Retroactive A_SOURCE_FILE_RECEIVED Creation

DATA_EXPORTER does not automatically create A_SOURCE_FILE_RECEIVED records when exporting legacy data. This is by design - it's a one-time export tool, not a file tracking system.

2. FILE_ARCHIVER Requires A_SOURCE_FILE_RECEIVED

FILE_ARCHIVER cannot archive data without corresponding A_SOURCE_FILE_RECEIVED records. This prevents archiving of:

  • Legacy Informatica-loaded data exported via DATA_EXPORTER
  • Manually uploaded files not processed through FILE_MANAGER.PROCESS_SOURCE_FILE

3. Mixed Control Table References

During migration period, some procedures reference A_LOAD_HISTORY (DATA_EXPORTER) while others reference A_WORKFLOW_HISTORY (FILE_ARCHIVER). This is intentional but requires careful understanding of data lineage.

4. A_WORKFLOW_HISTORY vs A_LOAD_HISTORY Column Mismatch

The control tables have different schemas:

  • A_LOAD_HISTORY: LOAD_START, A_ETL_LOAD_SET_KEY
  • A_WORKFLOW_HISTORY: WORKFLOW_START, A_WORKFLOW_HISTORY_KEY

Test scripts must be aware of which table is being used.

Summary

The migration from Informatica + WLA to Airflow + DBT introduces new control tables (A_SOURCE_FILE_RECEIVED, A_WORKFLOW_HISTORY) while maintaining compatibility with legacy control tables (A_LOAD_HISTORY). Understanding the relationship between these tables is critical for:

  • Data Lineage: Tracking which system processed which data
  • Export Operations: Choosing appropriate DATA_EXPORTER procedures
  • Archival Operations: Ensuring FILE_ARCHIVER has required metadata
  • Testing: Using correct control tables in test scenarios

The recommended approach for legacy data migration is Strategy 2 (Direct to ARCHIVE) for large datasets, as it avoids the complexity of manual A_SOURCE_FILE_RECEIVED registration while achieving the goal of moving historical data to long-term archival storage.