16 KiB
System Migration: Informatica + WLA → Airflow + DBT
This document describes the migration from the legacy Informatica + WLA data processing system to the new Airflow + DBT architecture, including control table differences, data export strategies, and known limitations.
Migration Overview
The MRDS (Market Reference Data System) is undergoing a fundamental technology migration:
Legacy System (Informatica + WLA):
- ETL Tool: Informatica PowerCenter
- Workflow Orchestration: WLA (Workflow Automation)
- Control Schema:
CT_ODS(Operational Data Store Control) - Primary Control Table:
CT_ODS.A_LOAD_HISTORY - Key Column:
A_ETL_LOAD_SET_KEY
New System (Airflow + DBT):
- Orchestration: Apache Airflow
- Transformation: DBT (Data Build Tool)
- Control Schema:
CT_MRDS(MRDS Control) - Primary Control Tables:
CT_MRDS.A_SOURCE_FILE_RECEIVED,CT_MRDS.A_WORKFLOW_HISTORY - Key Column:
A_WORKFLOW_HISTORY_KEY
Control Table Architecture
Legacy System: CT_ODS.A_LOAD_HISTORY
Purpose: Tracks Informatica PowerCenter workflow executions
Structure:
DESC CT_ODS.A_LOAD_HISTORY;
Name Type
______________________ _____________________
A_ETL_LOAD_SET_KEY NUMBER(38) -- Primary key
WORKFLOW_NAME VARCHAR2(255) -- Informatica workflow name
INFA_RUN_ID NUMBER(38) -- Informatica run ID
LOAD_START TIMESTAMP(6) -- Workflow start time
LOAD_END TIMESTAMP(6) -- Workflow end time
EXDI_APPL_REQ_ID VARCHAR2(255)
EXDI_CORRELATION_ID VARCHAR2(255)
LOAD_SUCCESSFUL CHAR(1) -- Y/N success flag
WLA_RUN_ID NUMBER(28) -- WLA run ID
DQ_FLAG VARCHAR2(5) -- Data quality flag
Usage Pattern:
- Created by Informatica workflows during ETL execution
- Used for temporal partitioning in DATA_EXPORTER
- Referenced via
A_ETL_LOAD_SET_KEY_FKforeign key in data tables
New System: CT_MRDS Control Tables
1. A_SOURCE_FILE_RECEIVED
Purpose: Tracks individual file processing through the complete lifecycle
Key Columns:
A_SOURCE_FILE_RECEIVED_KEY NUMBER -- Primary key
SOURCE_FILE_NAME VARCHAR2 -- Full OCI path
PROCESSING_STATUS VARCHAR2 -- Status tracking
RECEPTION_DATE DATE -- File arrival timestamp
PARTITION_YEAR VARCHAR2(4) -- Archive partition (year)
PARTITION_MONTH VARCHAR2(2) -- Archive partition (month)
ARCH_FILE_NAME VARCHAR2 -- Parquet archive file path
Status Workflow:
RECEIVED → VALIDATED → READY_FOR_INGESTION → INGESTED → ARCHIVED_AND_TRASHED → ARCHIVED_AND_PURGED (optional)
Note: Legacy ARCHIVED status maintained for backward compatibility
Usage Pattern:
- Created by FILE_MANAGER.PROCESS_SOURCE_FILE during file validation
- Updated throughout file lifecycle (validation, ingestion, archival)
- Required by FILE_ARCHIVER for archival operations
- Links to A_WORKFLOW_HISTORY via workflow execution
2. A_WORKFLOW_HISTORY
Purpose: Tracks Airflow + DBT workflow executions (similar role to A_LOAD_HISTORY)
Key Columns:
A_WORKFLOW_HISTORY_KEY NUMBER -- Primary key
WORKFLOW_NAME VARCHAR2 -- Airflow DAG name
WORKFLOW_START TIMESTAMP -- Workflow start time
WORKFLOW_END TIMESTAMP -- Workflow end time
WORKFLOW_SUCCESSFUL CHAR(1) -- Y/N success flag
Usage Pattern:
- Created by Airflow + DBT workflows during data processing
- Used for temporal partitioning decisions in FILE_ARCHIVER
- Referenced via
A_WORKFLOW_HISTORY_KEY_FKforeign key in data tables
Data Export Strategies
Scenario 1: Legacy Data Export (Informatica → ODS)
Use Case: One-time migration of historical data loaded by Informatica
Package: DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE
Control Table: Uses CT_ODS.A_LOAD_HISTORY for temporal partitioning
Example:
-- Export historical AGGREGATED_ALLOTMENT data (loaded by Informatica)
BEGIN
CT_MRDS.DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
pSchemaName => 'OU_TOP',
pTableName => 'AGGREGATED_ALLOTMENT',
pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', -- Links to A_LOAD_HISTORY
pBucketArea => 'DATA',
pFolderName => 'legacy_migration',
pMinDate => DATE '2020-01-01',
pMaxDate => DATE '2024-12-31'
);
END;
/
Result: CSV files in ODS bucket (DATA area), partitioned by LOAD_START from A_LOAD_HISTORY
Scenario 2: New System Data (Airflow + DBT → ODS → ARCHIVE)
Use Case: Ongoing processing with new Airflow + DBT system
Workflow:
- File Arrival: FILES → INBOX bucket
- Validation: FILE_MANAGER.PROCESS_SOURCE_FILE → creates A_SOURCE_FILE_RECEIVED
- Processing: Airflow + DBT → creates A_WORKFLOW_HISTORY
- Export: DATA_EXPORTER → moves to ODS bucket (DATA area)
- Archival: FILE_ARCHIVER → moves to ARCHIVE bucket (Parquet with Hive partitioning)
Control Tables: Uses both A_SOURCE_FILE_RECEIVED and A_WORKFLOW_HISTORY
Example:
-- Archival of data processed by Airflow + DBT
BEGIN
CT_MRDS.FILE_ARCHIVER.ARCHIVE_TABLE_DATA(
pSourceFileConfig => vConfig -- Requires A_SOURCE_FILE_RECEIVED records
);
END;
/
Legacy Data Archival
FILE_ARCHIVER Requirement
⚠️ IMPORTANT: FILE_ARCHIVER requires records in A_SOURCE_FILE_RECEIVED table to track and manage archival lifecycle.
For new system data (Airflow + DBT):
- Records automatically created by
FILE_MANAGER.PROCESS_SOURCE_FILE - No additional steps needed
For legacy data (Informatica + WLA):
- Historical data requires registration in
A_SOURCE_FILE_RECEIVED - ✅ SOLUTION: Use DATA_EXPORTER v2.9.0+ with
pRegisterExport => TRUEparameter - Automatically registers exported files with proper metadata (size, checksum, location)
Export Strategies for Legacy Data
Strategy 1: Automatic Registration (Recommended)
✅ DATA_EXPORTER v2.9.0+ supports automatic file registration via pRegisterExport parameter.
Benefits:
- Simple, one-step export with automatic registration
- Files tracked in
A_SOURCE_FILE_RECEIVED(enables FILE_ARCHIVER processing) - Proper metadata capture (file size, checksum, location, timestamps)
- Standard workflow integration (archival strategies, status tracking)
Example - CSV Export with Registration:
-- Export with automatic registration (DATA_EXPORTER v2.9.0+)
BEGIN
CT_MRDS.DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
pSchemaName => 'OU_TOP',
pTableName => 'AGGREGATED_ALLOTMENT',
pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK',
pBucketArea => 'DATA',
pFolderName => 'legacy_export',
pMinDate => DATE '2024-01-01',
pMaxDate => DATE '2024-12-31',
pRegisterExport => TRUE, -- ✓ Automatically registers files
pProcessName => 'LEGACY_MIGRATION'
);
END;
/
-- Files now registered in A_SOURCE_FILE_RECEIVED with:
-- - SOURCE_FILE_NAME: Full OCI path
-- - PROCESSING_STATUS: 'INGESTED'
-- - BYTES: Actual file size
-- - CHECKSUM: File ETag from OCI
-- - PROCESS_NAME: 'LEGACY_MIGRATION'
-- Now FILE_ARCHIVER can process these files
BEGIN
CT_MRDS.FILE_ARCHIVER.ARCHIVE_TABLE_DATA(
pSourceFileConfigKey => vConfigKey
);
END;
/
Example - Single CSV Export with Registration:
-- For single file export (not partitioned by date)
BEGIN
CT_MRDS.DATA_EXPORTER.EXPORT_TABLE_DATA(
pSchemaName => 'CT_MRDS',
pTableName => 'MY_TABLE',
pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK',
pBucketArea => 'DATA',
pFolderName => 'legacy_export',
pFileName => 'my_table_export.csv',
pTemplateTableName => 'CT_ET_TEMPLATES.MY_TEMPLATE',
pRegisterExport => TRUE, -- ✓ Registers file
pProcessName => 'LEGACY_MIGRATION'
);
END;
/
Strategy 2: Direct Archive Export (Bypass ODS)
⚠️ Use when: You want to skip the ODS bucket entirely and go straight to ARCHIVE
Skip ODS/DATA bucket entirely - export directly to ARCHIVE bucket in Parquet format:
-- Export legacy data directly to ARCHIVE bucket
BEGIN
DATA_EXPORTER.EXPORT_TABLE_DATA_BY_DATE(
pSchemaName => 'OU_TOP',
pTableName => 'AGGREGATED_ALLOTMENT',
pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK',
pBucketArea => 'ARCHIVE', -- Direct to archive
pFolderName => 'legacy_direct',
pMinDate => DATE '2020-01-01',
pMaxDate => DATE '2024-12-31'
);
END;
/
-- Result: Parquet files with Hive partitioning in ARCHIVE bucket
-- Files: ARCHIVE/legacy_direct/PARTITION_YEAR=2024/PARTITION_MONTH=01/*.parquet
-- No A_SOURCE_FILE_RECEIVED records needed (archival already complete)
Pros:
- Simple, no manual registration
- Data already in final archive format (Parquet)
- Hive-style partitioning automatically applied
Cons:
- No record in A_SOURCE_FILE_RECEIVED (tracking gap)
- Cannot use FILE_ARCHIVER features (archival strategies, status tracking)
- Mixed folder structure (legacy_direct vs. standard source/table paths)
Strategy 3: Hybrid Approach (Recommended for Large Datasets)
Use DATA_EXPORTER for initial export, create minimal A_SOURCE_FILE_RECEIVED records programmatically:
-- Create helper procedure to register legacy exports
CREATE OR REPLACE PROCEDURE REGISTER_LEGACY_EXPORT (
pSourceFileId VARCHAR2,
pBucketArea VARCHAR2,
pFolderName VARCHAR2,
pFilePattern VARCHAR2,
pReceptionDate DATE DEFAULT SYSDATE
) AS
vConfigKey NUMBER;
vBucketUri VARCHAR2(500);
vPrefix VARCHAR2(500);
BEGIN
-- Get source configuration
SELECT A_SOURCE_FILE_CONFIG_KEY
INTO vConfigKey
FROM CT_MRDS.A_SOURCE_FILE_CONFIG
WHERE SOURCE_FILE_ID = pSourceFileId
AND SOURCE_FILE_TYPE = 'INPUT';
-- Get bucket URI
vBucketUri := CT_MRDS.ENV_MANAGER.GET_BUCKET_URI(pBucketArea);
vPrefix := 'ODS/' || pFolderName || '/';
-- Register all matching files
FOR rec IN (
SELECT object_name, bytes, etag
FROM TABLE(MRDS_LOADER.cloud_wrapper.list_objects(
credential_name => 'DEF_CRED_ARN',
location_uri => vBucketUri
))
WHERE object_name LIKE vPrefix || pFilePattern
) LOOP
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
A_SOURCE_FILE_RECEIVED_KEY,
A_SOURCE_FILE_CONFIG_KEY,
SOURCE_FILE_NAME,
PROCESSING_STATUS,
RECEPTION_DATE,
BYTES,
CHECKSUM
) VALUES (
A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL,
vConfigKey,
rec.object_name,
'INGESTED',
pReceptionDate,
rec.bytes,
rec.etag
);
END LOOP;
COMMIT;
DBMS_OUTPUT.PUT_LINE('Registered ' || SQL%ROWCOUNT || ' legacy files');
END;
/
-- Usage:
BEGIN
-- After DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE completes
REGISTER_LEGACY_EXPORT(
pSourceFileId => 'AGGREGATED_ALLOTMENT',
pBucketArea => 'DATA',
pFolderName => 'legacy_export',
pFilePattern => 'AGGREGATED_ALLOTMENT_2024%.csv',
pReceptionDate => DATE '2024-12-31'
);
-- Now FILE_ARCHIVER can process
FILE_ARCHIVER.ARCHIVE_TABLE_DATA(pSourceFileConfig => vConfig);
END;
/
Migration Timeline and Coexistence
Phase 1: Legacy System Only (Before Migration)
- All data loaded via Informatica + WLA
- Control table:
CT_ODS.A_LOAD_HISTORY - No
A_SOURCE_FILE_RECEIVEDrecords
Phase 2: Parallel Operation (During Migration)
- Old data: Continue using Informatica + WLA → A_LOAD_HISTORY
- New data: Start using Airflow + DBT → A_SOURCE_FILE_RECEIVED + A_WORKFLOW_HISTORY
- Challenge: Different control tables for different data vintages
Phase 3: New System Only (After Migration)
- All new data via Airflow + DBT
- Legacy data archived (one-time export using DATA_EXPORTER)
- Control tables:
CT_MRDS.A_SOURCE_FILE_RECEIVED,CT_MRDS.A_WORKFLOW_HISTORY
Recommendations
For New Data (Airflow + DBT)
✅ Use standard workflow:
- FILE_MANAGER.PROCESS_SOURCE_FILE (creates A_SOURCE_FILE_RECEIVED)
- Airflow + DBT processing (creates A_WORKFLOW_HISTORY)
- FILE_ARCHIVER.ARCHIVE_TABLE_DATA (uses both control tables)
For Legacy Data Migration
✅ Small Datasets (<1000 files): Strategy 1 (Manual Registration) ✅ Large Datasets (>1000 files): Strategy 2 (Direct to ARCHIVE) or Strategy 3 (Hybrid) ❌ Avoid: Exporting to ODS/DATA without registration (orphaned files, cannot archive)
Configuration Requirements
Before archiving legacy data, ensure source configuration exists:
-- Check if configuration exists
SELECT A_SOURCE_FILE_CONFIG_KEY, SOURCE_FILE_ID, TABLE_ID, ARCHIVAL_STRATEGY
FROM CT_MRDS.A_SOURCE_FILE_CONFIG
WHERE SOURCE_FILE_ID = 'YOUR_SOURCE_FILE_ID';
-- If missing, create configuration
CALL FILE_MANAGER.ADD_SOURCE_FILE_CONFIG(
pSourceKey => 'YOUR_SOURCE',
pSourceFileType => 'INPUT',
pSourceFileId => 'YOUR_SOURCE_FILE_ID',
pSourceFileDesc => 'Legacy migrated data',
pSourceFileNamePattern => 'pattern_*.csv',
pTableId => 'YOUR_TABLE_ID',
pTemplateTableName => 'CT_ET_TEMPLATES.YOUR_TEMPLATE'
);
Known Limitations
1. FILE_ARCHIVER Requires A_SOURCE_FILE_RECEIVED
FILE_ARCHIVER cannot archive data without corresponding A_SOURCE_FILE_RECEIVED records.
Solutions:
- ✅ New system data: Automatically registered via
FILE_MANAGER.PROCESS_SOURCE_FILE - ✅ Legacy data exports: Use
DATA_EXPORTERwithpRegisterExport => TRUE(v2.9.0+) - ⚠️ Manual uploads: Must be registered via
FILE_MANAGER.PROCESS_SOURCE_FILEor manual INSERT
2. Mixed Control Table References
During migration period, some procedures reference A_LOAD_HISTORY (DATA_EXPORTER) while others reference A_WORKFLOW_HISTORY (FILE_ARCHIVER). This is intentional but requires careful understanding of data lineage.
3. A_WORKFLOW_HISTORY vs A_LOAD_HISTORY Column Mismatch
The control tables have different schemas:
- A_LOAD_HISTORY:
LOAD_START,A_ETL_LOAD_SET_KEY - A_WORKFLOW_HISTORY:
WORKFLOW_START,A_WORKFLOW_HISTORY_KEY
Test scripts must be aware of which table is being used.
Related Documentation
- PROCESS_SOURCE_FILE Guide - File validation and ingestion workflow
- FILE_ARCHIVER Guide - Archival strategies and configuration
- FILE_MANAGER Configuration Guide - System configuration
- Package Deployment Guide - Deployment procedures
Summary
The migration from Informatica + WLA to Airflow + DBT introduces new control tables (A_SOURCE_FILE_RECEIVED, A_WORKFLOW_HISTORY) while maintaining compatibility with legacy control tables (A_LOAD_HISTORY). Understanding the relationship between these tables is critical for:
- Data Lineage: Tracking which system processed which data
- Export Operations: Choosing appropriate DATA_EXPORTER procedures
- Archival Operations: Ensuring FILE_ARCHIVER has required metadata
- Testing: Using correct control tables in test scenarios
Recommended Approach for Legacy Data Migration:
-
✅ Strategy 1 (Automatic Registration) - Use
DATA_EXPORTERwithpRegisterExport => TRUEto automatically register files inA_SOURCE_FILE_RECEIVED, enabling full FILE_ARCHIVER workflow (archival strategies, status tracking, rollback capabilities) -
⚠️ Strategy 2 (Direct to ARCHIVE) - Export directly to ARCHIVE bucket to bypass ODS bucket entirely and avoid registration requirements (use when tracking is not needed)