Files
mars/confluence/System_Migration_Informatica_to_Airflow_DBT.md
Grzegorz Michalski 11723f6c88 dokumentacja
2026-02-20 11:34:58 +01:00

16 KiB

System Migration: Informatica + WLA → Airflow + DBT

This document describes the migration from the legacy Informatica + WLA data processing system to the new Airflow + DBT architecture, including control table differences, data export strategies, and known limitations.

Migration Overview

The MRDS (Market Reference Data System) is undergoing a fundamental technology migration:

Legacy System (Informatica + WLA):

  • ETL Tool: Informatica PowerCenter
  • Workflow Orchestration: WLA (Workflow Automation)
  • Control Schema: CT_ODS (Operational Data Store Control)
  • Primary Control Table: CT_ODS.A_LOAD_HISTORY
  • Key Column: A_ETL_LOAD_SET_KEY

New System (Airflow + DBT):

  • Orchestration: Apache Airflow
  • Transformation: DBT (Data Build Tool)
  • Control Schema: CT_MRDS (MRDS Control)
  • Primary Control Tables: CT_MRDS.A_SOURCE_FILE_RECEIVED, CT_MRDS.A_WORKFLOW_HISTORY
  • Key Column: A_WORKFLOW_HISTORY_KEY

Control Table Architecture

Legacy System: CT_ODS.A_LOAD_HISTORY

Purpose: Tracks Informatica PowerCenter workflow executions

Structure:

DESC CT_ODS.A_LOAD_HISTORY;

Name                   Type
______________________ _____________________
A_ETL_LOAD_SET_KEY     NUMBER(38)         -- Primary key
WORKFLOW_NAME          VARCHAR2(255)       -- Informatica workflow name
INFA_RUN_ID            NUMBER(38)          -- Informatica run ID
LOAD_START             TIMESTAMP(6)        -- Workflow start time
LOAD_END               TIMESTAMP(6)        -- Workflow end time
EXDI_APPL_REQ_ID       VARCHAR2(255)
EXDI_CORRELATION_ID    VARCHAR2(255)
LOAD_SUCCESSFUL        CHAR(1)             -- Y/N success flag
WLA_RUN_ID             NUMBER(28)          -- WLA run ID
DQ_FLAG                VARCHAR2(5)         -- Data quality flag

Usage Pattern:

  • Created by Informatica workflows during ETL execution
  • Used for temporal partitioning in DATA_EXPORTER
  • Referenced via A_ETL_LOAD_SET_KEY_FK foreign key in data tables

New System: CT_MRDS Control Tables

1. A_SOURCE_FILE_RECEIVED

Purpose: Tracks individual file processing through the complete lifecycle

Key Columns:

A_SOURCE_FILE_RECEIVED_KEY   NUMBER        -- Primary key
SOURCE_FILE_NAME             VARCHAR2      -- Full OCI path
PROCESSING_STATUS            VARCHAR2      -- Status tracking
RECEPTION_DATE               DATE          -- File arrival timestamp
PARTITION_YEAR               VARCHAR2(4)   -- Archive partition (year)
PARTITION_MONTH              VARCHAR2(2)   -- Archive partition (month)
ARCH_FILE_NAME               VARCHAR2      -- Parquet archive file path

Status Workflow:

RECEIVED → VALIDATED → READY_FOR_INGESTION → INGESTED → ARCHIVED_AND_TRASHED → ARCHIVED_AND_PURGED (optional)

Note: Legacy ARCHIVED status maintained for backward compatibility

Usage Pattern:

  • Created by FILE_MANAGER.PROCESS_SOURCE_FILE during file validation
  • Updated throughout file lifecycle (validation, ingestion, archival)
  • Required by FILE_ARCHIVER for archival operations
  • Links to A_WORKFLOW_HISTORY via workflow execution

2. A_WORKFLOW_HISTORY

Purpose: Tracks Airflow + DBT workflow executions (similar role to A_LOAD_HISTORY)

Key Columns:

A_WORKFLOW_HISTORY_KEY       NUMBER        -- Primary key
WORKFLOW_NAME                VARCHAR2      -- Airflow DAG name
WORKFLOW_START               TIMESTAMP     -- Workflow start time
WORKFLOW_END                 TIMESTAMP     -- Workflow end time
WORKFLOW_SUCCESSFUL          CHAR(1)       -- Y/N success flag

Usage Pattern:

  • Created by Airflow + DBT workflows during data processing
  • Used for temporal partitioning decisions in FILE_ARCHIVER
  • Referenced via A_WORKFLOW_HISTORY_KEY_FK foreign key in data tables

Data Export Strategies

Scenario 1: Legacy Data Export (Informatica → ODS)

Use Case: One-time migration of historical data loaded by Informatica

Package: DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE

Control Table: Uses CT_ODS.A_LOAD_HISTORY for temporal partitioning

Example:

-- Export historical AGGREGATED_ALLOTMENT data (loaded by Informatica)
BEGIN
    CT_MRDS.DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
        pSchemaName     => 'OU_TOP',
        pTableName      => 'AGGREGATED_ALLOTMENT',
        pKeyColumnName  => 'A_ETL_LOAD_SET_KEY_FK',  -- Links to A_LOAD_HISTORY
        pBucketArea     => 'DATA',
        pFolderName     => 'legacy_migration',
        pMinDate        => DATE '2020-01-01',
        pMaxDate        => DATE '2024-12-31'
    );
END;
/

Result: CSV files in ODS bucket (DATA area), partitioned by LOAD_START from A_LOAD_HISTORY

Scenario 2: New System Data (Airflow + DBT → ODS → ARCHIVE)

Use Case: Ongoing processing with new Airflow + DBT system

Workflow:

  1. File Arrival: FILES → INBOX bucket
  2. Validation: FILE_MANAGER.PROCESS_SOURCE_FILE → creates A_SOURCE_FILE_RECEIVED
  3. Processing: Airflow + DBT → creates A_WORKFLOW_HISTORY
  4. Export: DATA_EXPORTER → moves to ODS bucket (DATA area)
  5. Archival: FILE_ARCHIVER → moves to ARCHIVE bucket (Parquet with Hive partitioning)

Control Tables: Uses both A_SOURCE_FILE_RECEIVED and A_WORKFLOW_HISTORY

Example:

-- Archival of data processed by Airflow + DBT
BEGIN
    CT_MRDS.FILE_ARCHIVER.ARCHIVE_TABLE_DATA(
        pSourceFileConfig => vConfig  -- Requires A_SOURCE_FILE_RECEIVED records
    );
END;
/

Legacy Data Archival

FILE_ARCHIVER Requirement

⚠️ IMPORTANT: FILE_ARCHIVER requires records in A_SOURCE_FILE_RECEIVED table to track and manage archival lifecycle.

For new system data (Airflow + DBT):

  • Records automatically created by FILE_MANAGER.PROCESS_SOURCE_FILE
  • No additional steps needed

For legacy data (Informatica + WLA):

  • Historical data requires registration in A_SOURCE_FILE_RECEIVED
  • SOLUTION: Use DATA_EXPORTER v2.9.0+ with pRegisterExport => TRUE parameter
  • Automatically registers exported files with proper metadata (size, checksum, location)

Export Strategies for Legacy Data

DATA_EXPORTER v2.9.0+ supports automatic file registration via pRegisterExport parameter.

Benefits:

  • Simple, one-step export with automatic registration
  • Files tracked in A_SOURCE_FILE_RECEIVED (enables FILE_ARCHIVER processing)
  • Proper metadata capture (file size, checksum, location, timestamps)
  • Standard workflow integration (archival strategies, status tracking)

Example - CSV Export with Registration:

-- Export with automatic registration (DATA_EXPORTER v2.9.0+)
BEGIN
    CT_MRDS.DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
        pSchemaName     => 'OU_TOP',
        pTableName      => 'AGGREGATED_ALLOTMENT',
        pKeyColumnName  => 'A_ETL_LOAD_SET_KEY_FK',
        pBucketArea     => 'DATA',
        pFolderName     => 'legacy_export',
        pMinDate        => DATE '2024-01-01',
        pMaxDate        => DATE '2024-12-31',
        pRegisterExport => TRUE,  -- ✓ Automatically registers files
        pProcessName    => 'LEGACY_MIGRATION'
    );
END;
/

-- Files now registered in A_SOURCE_FILE_RECEIVED with:
-- - SOURCE_FILE_NAME: Full OCI path
-- - PROCESSING_STATUS: 'INGESTED'
-- - BYTES: Actual file size
-- - CHECKSUM: File ETag from OCI
-- - PROCESS_NAME: 'LEGACY_MIGRATION'

-- Now FILE_ARCHIVER can process these files
BEGIN
    CT_MRDS.FILE_ARCHIVER.ARCHIVE_TABLE_DATA(
        pSourceFileConfigKey => vConfigKey
    );
END;
/

Example - Single CSV Export with Registration:

-- For single file export (not partitioned by date)
BEGIN
    CT_MRDS.DATA_EXPORTER.EXPORT_TABLE_DATA(
        pSchemaName        => 'CT_MRDS',
        pTableName         => 'MY_TABLE',
        pKeyColumnName     => 'A_ETL_LOAD_SET_KEY_FK',
        pBucketArea        => 'DATA',
        pFolderName        => 'legacy_export',
        pFileName          => 'my_table_export.csv',
        pTemplateTableName => 'CT_ET_TEMPLATES.MY_TEMPLATE',
        pRegisterExport    => TRUE,  -- ✓ Registers file
        pProcessName       => 'LEGACY_MIGRATION'
    );
END;
/

Strategy 2: Direct Archive Export (Bypass ODS)

⚠️ Use when: You want to skip the ODS bucket entirely and go straight to ARCHIVE

Skip ODS/DATA bucket entirely - export directly to ARCHIVE bucket in Parquet format:

-- Export legacy data directly to ARCHIVE bucket
BEGIN
    DATA_EXPORTER.EXPORT_TABLE_DATA_BY_DATE(
        pSchemaName => 'OU_TOP',
        pTableName => 'AGGREGATED_ALLOTMENT',
        pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK',
        pBucketArea => 'ARCHIVE',  -- Direct to archive
        pFolderName => 'legacy_direct',
        pMinDate => DATE '2020-01-01',
        pMaxDate => DATE '2024-12-31'
    );
END;
/
-- Result: Parquet files with Hive partitioning in ARCHIVE bucket
-- Files: ARCHIVE/legacy_direct/PARTITION_YEAR=2024/PARTITION_MONTH=01/*.parquet
-- No A_SOURCE_FILE_RECEIVED records needed (archival already complete)

Pros:

  • Simple, no manual registration
  • Data already in final archive format (Parquet)
  • Hive-style partitioning automatically applied

Cons:

  • No record in A_SOURCE_FILE_RECEIVED (tracking gap)
  • Cannot use FILE_ARCHIVER features (archival strategies, status tracking)
  • Mixed folder structure (legacy_direct vs. standard source/table paths)

Use DATA_EXPORTER for initial export, create minimal A_SOURCE_FILE_RECEIVED records programmatically:

-- Create helper procedure to register legacy exports
CREATE OR REPLACE PROCEDURE REGISTER_LEGACY_EXPORT (
    pSourceFileId     VARCHAR2,
    pBucketArea       VARCHAR2,
    pFolderName       VARCHAR2,
    pFilePattern      VARCHAR2,
    pReceptionDate    DATE DEFAULT SYSDATE
) AS
    vConfigKey NUMBER;
    vBucketUri VARCHAR2(500);
    vPrefix    VARCHAR2(500);
BEGIN
    -- Get source configuration
    SELECT A_SOURCE_FILE_CONFIG_KEY 
    INTO vConfigKey
    FROM CT_MRDS.A_SOURCE_FILE_CONFIG
    WHERE SOURCE_FILE_ID = pSourceFileId 
      AND SOURCE_FILE_TYPE = 'INPUT';
    
    -- Get bucket URI
    vBucketUri := CT_MRDS.ENV_MANAGER.GET_BUCKET_URI(pBucketArea);
    vPrefix := 'ODS/' || pFolderName || '/';
    
    -- Register all matching files
    FOR rec IN (
        SELECT object_name, bytes, etag
        FROM TABLE(MRDS_LOADER.cloud_wrapper.list_objects(
            credential_name => 'DEF_CRED_ARN',
            location_uri => vBucketUri
        ))
        WHERE object_name LIKE vPrefix || pFilePattern
    ) LOOP
        INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
            A_SOURCE_FILE_RECEIVED_KEY,
            A_SOURCE_FILE_CONFIG_KEY,
            SOURCE_FILE_NAME,
            PROCESSING_STATUS,
            RECEPTION_DATE,
            BYTES,
            CHECKSUM
        ) VALUES (
            A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL,
            vConfigKey,
            rec.object_name,
            'INGESTED',
            pReceptionDate,
            rec.bytes,
            rec.etag
        );
    END LOOP;
    
    COMMIT;
    DBMS_OUTPUT.PUT_LINE('Registered ' || SQL%ROWCOUNT || ' legacy files');
END;
/

-- Usage:
BEGIN
    -- After DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE completes
    REGISTER_LEGACY_EXPORT(
        pSourceFileId => 'AGGREGATED_ALLOTMENT',
        pBucketArea => 'DATA',
        pFolderName => 'legacy_export',
        pFilePattern => 'AGGREGATED_ALLOTMENT_2024%.csv',
        pReceptionDate => DATE '2024-12-31'
    );
    
    -- Now FILE_ARCHIVER can process
    FILE_ARCHIVER.ARCHIVE_TABLE_DATA(pSourceFileConfig => vConfig);
END;
/

Migration Timeline and Coexistence

Phase 1: Legacy System Only (Before Migration)

  • All data loaded via Informatica + WLA
  • Control table: CT_ODS.A_LOAD_HISTORY
  • No A_SOURCE_FILE_RECEIVED records

Phase 2: Parallel Operation (During Migration)

  • Old data: Continue using Informatica + WLA → A_LOAD_HISTORY
  • New data: Start using Airflow + DBT → A_SOURCE_FILE_RECEIVED + A_WORKFLOW_HISTORY
  • Challenge: Different control tables for different data vintages

Phase 3: New System Only (After Migration)

  • All new data via Airflow + DBT
  • Legacy data archived (one-time export using DATA_EXPORTER)
  • Control tables: CT_MRDS.A_SOURCE_FILE_RECEIVED, CT_MRDS.A_WORKFLOW_HISTORY

Recommendations

For New Data (Airflow + DBT)

Use standard workflow:

  1. FILE_MANAGER.PROCESS_SOURCE_FILE (creates A_SOURCE_FILE_RECEIVED)
  2. Airflow + DBT processing (creates A_WORKFLOW_HISTORY)
  3. FILE_ARCHIVER.ARCHIVE_TABLE_DATA (uses both control tables)

For Legacy Data Migration

Small Datasets (<1000 files): Strategy 1 (Manual Registration) Large Datasets (>1000 files): Strategy 2 (Direct to ARCHIVE) or Strategy 3 (Hybrid) Avoid: Exporting to ODS/DATA without registration (orphaned files, cannot archive)

Configuration Requirements

Before archiving legacy data, ensure source configuration exists:

-- Check if configuration exists
SELECT A_SOURCE_FILE_CONFIG_KEY, SOURCE_FILE_ID, TABLE_ID, ARCHIVAL_STRATEGY
FROM CT_MRDS.A_SOURCE_FILE_CONFIG
WHERE SOURCE_FILE_ID = 'YOUR_SOURCE_FILE_ID';

-- If missing, create configuration
CALL FILE_MANAGER.ADD_SOURCE_FILE_CONFIG(
    pSourceKey => 'YOUR_SOURCE',
    pSourceFileType => 'INPUT',
    pSourceFileId => 'YOUR_SOURCE_FILE_ID',
    pSourceFileDesc => 'Legacy migrated data',
    pSourceFileNamePattern => 'pattern_*.csv',
    pTableId => 'YOUR_TABLE_ID',
    pTemplateTableName => 'CT_ET_TEMPLATES.YOUR_TEMPLATE'
);

Known Limitations

1. FILE_ARCHIVER Requires A_SOURCE_FILE_RECEIVED

FILE_ARCHIVER cannot archive data without corresponding A_SOURCE_FILE_RECEIVED records.

Solutions:

  • New system data: Automatically registered via FILE_MANAGER.PROCESS_SOURCE_FILE
  • Legacy data exports: Use DATA_EXPORTER with pRegisterExport => TRUE (v2.9.0+)
  • ⚠️ Manual uploads: Must be registered via FILE_MANAGER.PROCESS_SOURCE_FILE or manual INSERT

2. Mixed Control Table References

During migration period, some procedures reference A_LOAD_HISTORY (DATA_EXPORTER) while others reference A_WORKFLOW_HISTORY (FILE_ARCHIVER). This is intentional but requires careful understanding of data lineage.

3. A_WORKFLOW_HISTORY vs A_LOAD_HISTORY Column Mismatch

The control tables have different schemas:

  • A_LOAD_HISTORY: LOAD_START, A_ETL_LOAD_SET_KEY
  • A_WORKFLOW_HISTORY: WORKFLOW_START, A_WORKFLOW_HISTORY_KEY

Test scripts must be aware of which table is being used.

Summary

The migration from Informatica + WLA to Airflow + DBT introduces new control tables (A_SOURCE_FILE_RECEIVED, A_WORKFLOW_HISTORY) while maintaining compatibility with legacy control tables (A_LOAD_HISTORY). Understanding the relationship between these tables is critical for:

  • Data Lineage: Tracking which system processed which data
  • Export Operations: Choosing appropriate DATA_EXPORTER procedures
  • Archival Operations: Ensuring FILE_ARCHIVER has required metadata
  • Testing: Using correct control tables in test scenarios

Recommended Approach for Legacy Data Migration:

  1. Strategy 1 (Automatic Registration) - Use DATA_EXPORTER with pRegisterExport => TRUE to automatically register files in A_SOURCE_FILE_RECEIVED, enabling full FILE_ARCHIVER workflow (archival strategies, status tracking, rollback capabilities)

  2. ⚠️ Strategy 2 (Direct to ARCHIVE) - Export directly to ARCHIVE bucket to bypass ODS bucket entirely and avoid registration requirements (use when tracking is not needed)