dokumentacja
This commit is contained in:
@@ -19,7 +19,18 @@ The FILE_ARCHIVER package provides flexible archival strategies that accommodate
|
|||||||
- **Schema**: CT_MRDS
|
- **Schema**: CT_MRDS
|
||||||
- **Package**: FILE_ARCHIVER
|
- **Package**: FILE_ARCHIVER
|
||||||
- **Current Version**: 3.1.0
|
- **Current Version**: 3.1.0
|
||||||
- **Dependencies**: ENV_MANAGER, FILE_MANAGER, cloud_wrapper, A_SOURCE_FILE_CONFIG, A_LOAD_HISTORY
|
- **Dependencies**: ENV_MANAGER, FILE_MANAGER, cloud_wrapper, A_SOURCE_FILE_CONFIG, A_SOURCE_FILE_RECEIVED, A_WORKFLOW_HISTORY
|
||||||
|
|
||||||
|
### Critical Prerequisites
|
||||||
|
|
||||||
|
⚠️ **IMPORTANT**: FILE_ARCHIVER requires data to be registered in `CT_MRDS.A_SOURCE_FILE_RECEIVED` table. This table is automatically populated when files are processed through the modern Airflow + DBT workflow via `FILE_MANAGER.PROCESS_SOURCE_FILE`.
|
||||||
|
|
||||||
|
**For legacy data migrated from Informatica + WLA system:**
|
||||||
|
- Legacy data exported using `DATA_EXPORTER` does NOT automatically create `A_SOURCE_FILE_RECEIVED` records
|
||||||
|
- Without these records, FILE_ARCHIVER **CANNOT** archive the data
|
||||||
|
- See [System Migration Guide](System_Migration_Informatica_to_Airflow_DBT.md) for workaround strategies
|
||||||
|
|
||||||
|
**Recommendation for legacy data**: Export directly to ARCHIVE bucket using `DATA_EXPORTER.EXPORT_TABLE_DATA_BY_DATE` with `pBucketArea => 'ARCHIVE'` to bypass this requirement
|
||||||
|
|
||||||
## Archival Strategies
|
## Archival Strategies
|
||||||
|
|
||||||
@@ -542,7 +553,8 @@ WHERE object_name = 'FILE_ARCHIVER';
|
|||||||
|
|
||||||
### Database Objects
|
### Database Objects
|
||||||
- **Table**: CT_MRDS.A_SOURCE_FILE_CONFIG - Configuration storage
|
- **Table**: CT_MRDS.A_SOURCE_FILE_CONFIG - Configuration storage
|
||||||
- **Table**: CT_ODS.A_LOAD_HISTORY - Workflow tracking
|
- **Table**: CT_MRDS.A_SOURCE_FILE_RECEIVED - File processing tracking
|
||||||
|
- **Table**: CT_MRDS.A_WORKFLOW_HISTORY - Workflow execution tracking (Airflow + DBT)
|
||||||
- **Trigger**: TRG_BI_A_SRC_FILE_CFG_ARCH_VAL - Configuration validation
|
- **Trigger**: TRG_BI_A_SRC_FILE_CFG_ARCH_VAL - Configuration validation
|
||||||
- **Credential**: DEF_CRED_ARN - OCI bucket access
|
- **Credential**: DEF_CRED_ARN - OCI bucket access
|
||||||
|
|
||||||
|
|||||||
@@ -15,6 +15,11 @@ This document provides comprehensive documentation for the `FILE_MANAGER.PROCESS
|
|||||||
- **Error Resilient**: Comprehensive error handling and logging for validation and file operations
|
- **Error Resilient**: Comprehensive error handling and logging for validation and file operations
|
||||||
- **Status Tracking**: Updates file processing status throughout validation and preparation workflow
|
- **Status Tracking**: Updates file processing status throughout validation and preparation workflow
|
||||||
|
|
||||||
|
**Migration Context:**
|
||||||
|
- This procedure is part of the **modern Airflow + DBT system** architecture
|
||||||
|
- Creates records in `CT_MRDS.A_SOURCE_FILE_RECEIVED` (modern control table)
|
||||||
|
- For legacy Informatica + WLA data migration, see [System Migration Guide](System_Migration_Informatica_to_Airflow_DBT.md)
|
||||||
|
|
||||||
## Procedure Signatures
|
## Procedure Signatures
|
||||||
|
|
||||||
The procedure is available in two variants:
|
The procedure is available in two variants:
|
||||||
|
|||||||
446
confluence/System_Migration_Informatica_to_Airflow_DBT.md
Normal file
446
confluence/System_Migration_Informatica_to_Airflow_DBT.md
Normal file
@@ -0,0 +1,446 @@
|
|||||||
|
# System Migration: Informatica + WLA → Airflow + DBT
|
||||||
|
|
||||||
|
This document describes the migration from the legacy Informatica + WLA data processing system to the modern Airflow + DBT architecture, including control table differences, data export strategies, and known limitations.
|
||||||
|
|
||||||
|
## Migration Overview
|
||||||
|
|
||||||
|
The MRDS (Market Reference Data System) is undergoing a fundamental technology migration:
|
||||||
|
|
||||||
|
**Legacy System (Informatica + WLA):**
|
||||||
|
- ETL Tool: Informatica PowerCenter
|
||||||
|
- Workflow Orchestration: WLA (Workflow Automation)
|
||||||
|
- Control Schema: `CT_ODS` (Operational Data Store Control)
|
||||||
|
- Primary Control Table: `CT_ODS.A_LOAD_HISTORY`
|
||||||
|
- Key Column: `A_ETL_LOAD_SET_KEY`
|
||||||
|
|
||||||
|
**Modern System (Airflow + DBT):**
|
||||||
|
- Orchestration: Apache Airflow
|
||||||
|
- Transformation: DBT (Data Build Tool)
|
||||||
|
- Control Schema: `CT_MRDS` (MRDS Control)
|
||||||
|
- Primary Control Tables: `CT_MRDS.A_SOURCE_FILE_RECEIVED`, `CT_MRDS.A_WORKFLOW_HISTORY`
|
||||||
|
- Key Column: `A_WORKFLOW_HISTORY_KEY`
|
||||||
|
|
||||||
|
## Control Table Architecture
|
||||||
|
|
||||||
|
### Legacy System: CT_ODS.A_LOAD_HISTORY
|
||||||
|
|
||||||
|
**Purpose**: Tracks Informatica PowerCenter workflow executions
|
||||||
|
|
||||||
|
**Structure**:
|
||||||
|
```sql
|
||||||
|
DESC CT_ODS.A_LOAD_HISTORY;
|
||||||
|
|
||||||
|
Name Type
|
||||||
|
______________________ _____________________
|
||||||
|
A_ETL_LOAD_SET_KEY NUMBER(38) -- Primary key
|
||||||
|
WORKFLOW_NAME VARCHAR2(255) -- Informatica workflow name
|
||||||
|
INFA_RUN_ID NUMBER(38) -- Informatica run ID
|
||||||
|
LOAD_START TIMESTAMP(6) -- Workflow start time
|
||||||
|
LOAD_END TIMESTAMP(6) -- Workflow end time
|
||||||
|
EXDI_APPL_REQ_ID VARCHAR2(255)
|
||||||
|
EXDI_CORRELATION_ID VARCHAR2(255)
|
||||||
|
LOAD_SUCCESSFUL CHAR(1) -- Y/N success flag
|
||||||
|
WLA_RUN_ID NUMBER(28) -- WLA run ID
|
||||||
|
DQ_FLAG VARCHAR2(5) -- Data quality flag
|
||||||
|
```
|
||||||
|
|
||||||
|
**Usage Pattern**:
|
||||||
|
- Created by Informatica workflows during ETL execution
|
||||||
|
- Used for temporal partitioning in DATA_EXPORTER
|
||||||
|
- Referenced via `A_ETL_LOAD_SET_KEY_FK` foreign key in data tables
|
||||||
|
|
||||||
|
### Modern System: CT_MRDS Control Tables
|
||||||
|
|
||||||
|
#### 1. A_SOURCE_FILE_RECEIVED
|
||||||
|
|
||||||
|
**Purpose**: Tracks individual file processing through the complete lifecycle
|
||||||
|
|
||||||
|
**Key Columns**:
|
||||||
|
```sql
|
||||||
|
A_SOURCE_FILE_RECEIVED_KEY NUMBER -- Primary key
|
||||||
|
SOURCE_FILE_NAME VARCHAR2 -- Full OCI path
|
||||||
|
PROCESSING_STATUS VARCHAR2 -- Status tracking
|
||||||
|
RECEPTION_DATE DATE -- File arrival timestamp
|
||||||
|
PARTITION_YEAR VARCHAR2(4) -- Archive partition (year)
|
||||||
|
PARTITION_MONTH VARCHAR2(2) -- Archive partition (month)
|
||||||
|
ARCH_FILE_NAME VARCHAR2 -- Parquet archive file path
|
||||||
|
```
|
||||||
|
|
||||||
|
**Status Workflow**:
|
||||||
|
```
|
||||||
|
RECEIVED → VALIDATED → READY_FOR_INGESTION → INGESTED → ARCHIVED
|
||||||
|
```
|
||||||
|
|
||||||
|
**Usage Pattern**:
|
||||||
|
- Created by FILE_MANAGER.PROCESS_SOURCE_FILE during file validation
|
||||||
|
- Updated throughout file lifecycle (validation, ingestion, archival)
|
||||||
|
- Required by FILE_ARCHIVER for archival operations
|
||||||
|
- Links to A_WORKFLOW_HISTORY via workflow execution
|
||||||
|
|
||||||
|
#### 2. A_WORKFLOW_HISTORY
|
||||||
|
|
||||||
|
**Purpose**: Tracks Airflow + DBT workflow executions (similar role to A_LOAD_HISTORY)
|
||||||
|
|
||||||
|
**Key Columns**:
|
||||||
|
```sql
|
||||||
|
A_WORKFLOW_HISTORY_KEY NUMBER -- Primary key
|
||||||
|
WORKFLOW_NAME VARCHAR2 -- Airflow DAG name
|
||||||
|
WORKFLOW_START TIMESTAMP -- Workflow start time
|
||||||
|
WORKFLOW_END TIMESTAMP -- Workflow end time
|
||||||
|
WORKFLOW_SUCCESSFUL CHAR(1) -- Y/N success flag
|
||||||
|
```
|
||||||
|
|
||||||
|
**Usage Pattern**:
|
||||||
|
- Created by Airflow + DBT workflows during data processing
|
||||||
|
- Used for temporal partitioning decisions in FILE_ARCHIVER
|
||||||
|
- Referenced via `A_WORKFLOW_HISTORY_KEY_FK` foreign key in data tables
|
||||||
|
|
||||||
|
## Data Export Strategies
|
||||||
|
|
||||||
|
### Scenario 1: Legacy Data Export (Informatica → ODS)
|
||||||
|
|
||||||
|
**Use Case**: One-time migration of historical data loaded by Informatica
|
||||||
|
|
||||||
|
**Package**: `DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE`
|
||||||
|
|
||||||
|
**Control Table**: Uses `CT_ODS.A_LOAD_HISTORY` for temporal partitioning
|
||||||
|
|
||||||
|
**Example**:
|
||||||
|
```sql
|
||||||
|
-- Export historical AGGREGATED_ALLOTMENT data (loaded by Informatica)
|
||||||
|
BEGIN
|
||||||
|
CT_MRDS.DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
|
||||||
|
pSchemaName => 'OU_TOP',
|
||||||
|
pTableName => 'AGGREGATED_ALLOTMENT',
|
||||||
|
pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', -- Links to A_LOAD_HISTORY
|
||||||
|
pBucketArea => 'DATA',
|
||||||
|
pFolderName => 'legacy_migration',
|
||||||
|
pMinDate => DATE '2020-01-01',
|
||||||
|
pMaxDate => DATE '2024-12-31'
|
||||||
|
);
|
||||||
|
END;
|
||||||
|
/
|
||||||
|
```
|
||||||
|
|
||||||
|
**Result**: CSV files in ODS bucket (DATA area), partitioned by LOAD_START from A_LOAD_HISTORY
|
||||||
|
|
||||||
|
### Scenario 2: Modern System Data (Airflow + DBT → ODS → ARCHIVE)
|
||||||
|
|
||||||
|
**Use Case**: Ongoing processing with new Airflow + DBT system
|
||||||
|
|
||||||
|
**Workflow**:
|
||||||
|
1. **File Arrival**: FILES → INBOX bucket
|
||||||
|
2. **Validation**: FILE_MANAGER.PROCESS_SOURCE_FILE → creates A_SOURCE_FILE_RECEIVED
|
||||||
|
3. **Processing**: Airflow + DBT → creates A_WORKFLOW_HISTORY
|
||||||
|
4. **Export**: DATA_EXPORTER → moves to ODS bucket (DATA area)
|
||||||
|
5. **Archival**: FILE_ARCHIVER → moves to ARCHIVE bucket (Parquet with Hive partitioning)
|
||||||
|
|
||||||
|
**Control Tables**: Uses both A_SOURCE_FILE_RECEIVED and A_WORKFLOW_HISTORY
|
||||||
|
|
||||||
|
**Example**:
|
||||||
|
```sql
|
||||||
|
-- Archival of data processed by Airflow + DBT
|
||||||
|
BEGIN
|
||||||
|
CT_MRDS.FILE_ARCHIVER.ARCHIVE_TABLE_DATA(
|
||||||
|
pSourceFileConfig => vConfig -- Requires A_SOURCE_FILE_RECEIVED records
|
||||||
|
);
|
||||||
|
END;
|
||||||
|
/
|
||||||
|
```
|
||||||
|
|
||||||
|
## Critical Gap: Legacy Data Archival
|
||||||
|
|
||||||
|
### Problem Statement
|
||||||
|
|
||||||
|
**Scenario**: Historical data exported using DATA_EXPORTER from Informatica-loaded tables
|
||||||
|
|
||||||
|
**Issue**: FILE_ARCHIVER requires records in `A_SOURCE_FILE_RECEIVED`, but legacy exports don't create them
|
||||||
|
|
||||||
|
**Impact**: Legacy data exported to ODS/DATA bucket **CANNOT** be archived to ARCHIVE bucket using FILE_ARCHIVER
|
||||||
|
|
||||||
|
### Technical Analysis
|
||||||
|
|
||||||
|
**DATA_EXPORTER Behavior**:
|
||||||
|
```sql
|
||||||
|
-- Uses A_LOAD_HISTORY for partitioning (Informatica workflows)
|
||||||
|
SELECT DISTINCT TO_CHAR(L.LOAD_START,'YYYY') AS YR,
|
||||||
|
TO_CHAR(L.LOAD_START,'MM') AS MN
|
||||||
|
FROM OU_TOP.AGGREGATED_ALLOTMENT T, CT_ODS.A_LOAD_HISTORY L
|
||||||
|
WHERE T.A_ETL_LOAD_SET_KEY_FK = L.A_ETL_LOAD_SET_KEY
|
||||||
|
AND L.LOAD_START >= :pMinDate
|
||||||
|
AND L.LOAD_START < :pMaxDate;
|
||||||
|
|
||||||
|
-- Creates CSV files: ODS/legacy_migration/AGGREGATED_ALLOTMENT_YYYYMM.csv
|
||||||
|
-- Does NOT create A_SOURCE_FILE_RECEIVED records
|
||||||
|
```
|
||||||
|
|
||||||
|
**FILE_ARCHIVER Requirement**:
|
||||||
|
```sql
|
||||||
|
-- Joins A_SOURCE_FILE_RECEIVED with A_WORKFLOW_HISTORY
|
||||||
|
JOIN CT_MRDS.A_SOURCE_FILE_RECEIVED r
|
||||||
|
ON r.A_SOURCE_FILE_CONFIG_KEY = pSourceFileConfig.A_SOURCE_FILE_CONFIG_KEY
|
||||||
|
AND r.PROCESSING_STATUS = 'INGESTED';
|
||||||
|
|
||||||
|
-- Without A_SOURCE_FILE_RECEIVED records, archival CANNOT proceed
|
||||||
|
```
|
||||||
|
|
||||||
|
### Workaround Strategies
|
||||||
|
|
||||||
|
#### Strategy 1: Manual Registration (Recommended for Small Datasets)
|
||||||
|
|
||||||
|
Manually create `A_SOURCE_FILE_RECEIVED` records for legacy exported files:
|
||||||
|
|
||||||
|
```sql
|
||||||
|
-- Step 1: Export legacy data to ODS/DATA
|
||||||
|
BEGIN
|
||||||
|
DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
|
||||||
|
pSchemaName => 'OU_TOP',
|
||||||
|
pTableName => 'AGGREGATED_ALLOTMENT',
|
||||||
|
pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK',
|
||||||
|
pBucketArea => 'DATA',
|
||||||
|
pFolderName => 'legacy_export',
|
||||||
|
pMinDate => DATE '2024-01-01',
|
||||||
|
pMaxDate => DATE '2024-12-31'
|
||||||
|
);
|
||||||
|
END;
|
||||||
|
/
|
||||||
|
|
||||||
|
-- Step 2: List exported CSV files
|
||||||
|
SELECT object_name, time_created, bytes
|
||||||
|
FROM TABLE(MRDS_LOADER.cloud_wrapper.list_objects(
|
||||||
|
credential_name => 'DEF_CRED_ARN',
|
||||||
|
location_uri => 'https://objectstorage.eu-frankfurt-1.oraclecloud.com/n/frtgjxu7zl7c/b/data/o/'
|
||||||
|
)) WHERE object_name LIKE 'ODS/legacy_export/AGGREGATED_ALLOTMENT_%';
|
||||||
|
|
||||||
|
-- Step 3: Manually register each file in A_SOURCE_FILE_RECEIVED
|
||||||
|
-- (Requires source configuration for AGGREGATED_ALLOTMENT to exist)
|
||||||
|
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
|
||||||
|
A_SOURCE_FILE_RECEIVED_KEY,
|
||||||
|
A_SOURCE_FILE_CONFIG_KEY,
|
||||||
|
SOURCE_FILE_NAME,
|
||||||
|
PROCESSING_STATUS,
|
||||||
|
RECEPTION_DATE,
|
||||||
|
BYTES,
|
||||||
|
CHECKSUM,
|
||||||
|
EXTERNAL_TABLE_NAME
|
||||||
|
) VALUES (
|
||||||
|
A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL,
|
||||||
|
(SELECT A_SOURCE_FILE_CONFIG_KEY FROM A_SOURCE_FILE_CONFIG
|
||||||
|
WHERE SOURCE_FILE_ID = 'AGGREGATED_ALLOTMENT' AND SOURCE_FILE_TYPE = 'INPUT'),
|
||||||
|
'ODS/legacy_export/AGGREGATED_ALLOTMENT_202401.csv',
|
||||||
|
'INGESTED', -- Skip validation, mark as already ingested
|
||||||
|
DATE '2024-01-15',
|
||||||
|
1048576, -- File size in bytes
|
||||||
|
'manual_registration',
|
||||||
|
NULL -- No external table needed
|
||||||
|
);
|
||||||
|
-- Repeat for all exported CSV files
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- Step 4: Now FILE_ARCHIVER can process these files
|
||||||
|
BEGIN
|
||||||
|
FILE_ARCHIVER.ARCHIVE_TABLE_DATA(pSourceFileConfig => vConfig);
|
||||||
|
END;
|
||||||
|
/
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Strategy 2: Direct Archive Export (Bypass ODS)
|
||||||
|
|
||||||
|
Skip ODS/DATA bucket entirely - export directly to ARCHIVE bucket in Parquet format:
|
||||||
|
|
||||||
|
```sql
|
||||||
|
-- Export legacy data directly to ARCHIVE bucket
|
||||||
|
BEGIN
|
||||||
|
DATA_EXPORTER.EXPORT_TABLE_DATA_BY_DATE(
|
||||||
|
pSchemaName => 'OU_TOP',
|
||||||
|
pTableName => 'AGGREGATED_ALLOTMENT',
|
||||||
|
pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK',
|
||||||
|
pBucketArea => 'ARCHIVE', -- Direct to archive
|
||||||
|
pFolderName => 'legacy_direct',
|
||||||
|
pMinDate => DATE '2020-01-01',
|
||||||
|
pMaxDate => DATE '2024-12-31'
|
||||||
|
);
|
||||||
|
END;
|
||||||
|
/
|
||||||
|
-- Result: Parquet files with Hive partitioning in ARCHIVE bucket
|
||||||
|
-- Files: ARCHIVE/legacy_direct/PARTITION_YEAR=2024/PARTITION_MONTH=01/*.parquet
|
||||||
|
-- No A_SOURCE_FILE_RECEIVED records needed (archival already complete)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Pros**:
|
||||||
|
- Simple, no manual registration
|
||||||
|
- Data already in final archive format (Parquet)
|
||||||
|
- Hive-style partitioning automatically applied
|
||||||
|
|
||||||
|
**Cons**:
|
||||||
|
- No record in A_SOURCE_FILE_RECEIVED (tracking gap)
|
||||||
|
- Cannot use FILE_ARCHIVER features (archival strategies, status tracking)
|
||||||
|
- Mixed folder structure (legacy_direct vs. standard source/table paths)
|
||||||
|
|
||||||
|
#### Strategy 3: Hybrid Approach (Recommended for Large Datasets)
|
||||||
|
|
||||||
|
Use DATA_EXPORTER for initial export, create minimal A_SOURCE_FILE_RECEIVED records programmatically:
|
||||||
|
|
||||||
|
```sql
|
||||||
|
-- Create helper procedure to register legacy exports
|
||||||
|
CREATE OR REPLACE PROCEDURE REGISTER_LEGACY_EXPORT (
|
||||||
|
pSourceFileId VARCHAR2,
|
||||||
|
pBucketArea VARCHAR2,
|
||||||
|
pFolderName VARCHAR2,
|
||||||
|
pFilePattern VARCHAR2,
|
||||||
|
pReceptionDate DATE DEFAULT SYSDATE
|
||||||
|
) AS
|
||||||
|
vConfigKey NUMBER;
|
||||||
|
vBucketUri VARCHAR2(500);
|
||||||
|
vPrefix VARCHAR2(500);
|
||||||
|
BEGIN
|
||||||
|
-- Get source configuration
|
||||||
|
SELECT A_SOURCE_FILE_CONFIG_KEY
|
||||||
|
INTO vConfigKey
|
||||||
|
FROM CT_MRDS.A_SOURCE_FILE_CONFIG
|
||||||
|
WHERE SOURCE_FILE_ID = pSourceFileId
|
||||||
|
AND SOURCE_FILE_TYPE = 'INPUT';
|
||||||
|
|
||||||
|
-- Get bucket URI
|
||||||
|
vBucketUri := CT_MRDS.ENV_MANAGER.GET_BUCKET_URI(pBucketArea);
|
||||||
|
vPrefix := 'ODS/' || pFolderName || '/';
|
||||||
|
|
||||||
|
-- Register all matching files
|
||||||
|
FOR rec IN (
|
||||||
|
SELECT object_name, bytes, etag
|
||||||
|
FROM TABLE(MRDS_LOADER.cloud_wrapper.list_objects(
|
||||||
|
credential_name => 'DEF_CRED_ARN',
|
||||||
|
location_uri => vBucketUri
|
||||||
|
))
|
||||||
|
WHERE object_name LIKE vPrefix || pFilePattern
|
||||||
|
) LOOP
|
||||||
|
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
|
||||||
|
A_SOURCE_FILE_RECEIVED_KEY,
|
||||||
|
A_SOURCE_FILE_CONFIG_KEY,
|
||||||
|
SOURCE_FILE_NAME,
|
||||||
|
PROCESSING_STATUS,
|
||||||
|
RECEPTION_DATE,
|
||||||
|
BYTES,
|
||||||
|
CHECKSUM
|
||||||
|
) VALUES (
|
||||||
|
A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL,
|
||||||
|
vConfigKey,
|
||||||
|
rec.object_name,
|
||||||
|
'INGESTED',
|
||||||
|
pReceptionDate,
|
||||||
|
rec.bytes,
|
||||||
|
rec.etag
|
||||||
|
);
|
||||||
|
END LOOP;
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
DBMS_OUTPUT.PUT_LINE('Registered ' || SQL%ROWCOUNT || ' legacy files');
|
||||||
|
END;
|
||||||
|
/
|
||||||
|
|
||||||
|
-- Usage:
|
||||||
|
BEGIN
|
||||||
|
-- After DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE completes
|
||||||
|
REGISTER_LEGACY_EXPORT(
|
||||||
|
pSourceFileId => 'AGGREGATED_ALLOTMENT',
|
||||||
|
pBucketArea => 'DATA',
|
||||||
|
pFolderName => 'legacy_export',
|
||||||
|
pFilePattern => 'AGGREGATED_ALLOTMENT_2024%.csv',
|
||||||
|
pReceptionDate => DATE '2024-12-31'
|
||||||
|
);
|
||||||
|
|
||||||
|
-- Now FILE_ARCHIVER can process
|
||||||
|
FILE_ARCHIVER.ARCHIVE_TABLE_DATA(pSourceFileConfig => vConfig);
|
||||||
|
END;
|
||||||
|
/
|
||||||
|
```
|
||||||
|
|
||||||
|
## Migration Timeline and Coexistence
|
||||||
|
|
||||||
|
### Phase 1: Legacy System Only (Before Migration)
|
||||||
|
- All data loaded via Informatica + WLA
|
||||||
|
- Control table: `CT_ODS.A_LOAD_HISTORY`
|
||||||
|
- No `A_SOURCE_FILE_RECEIVED` records
|
||||||
|
|
||||||
|
### Phase 2: Parallel Operation (During Migration)
|
||||||
|
- **Old data**: Continue using Informatica + WLA → A_LOAD_HISTORY
|
||||||
|
- **New data**: Start using Airflow + DBT → A_SOURCE_FILE_RECEIVED + A_WORKFLOW_HISTORY
|
||||||
|
- **Challenge**: Different control tables for different data vintages
|
||||||
|
|
||||||
|
### Phase 3: New System Only (After Migration)
|
||||||
|
- All new data via Airflow + DBT
|
||||||
|
- Legacy data archived (one-time export using DATA_EXPORTER)
|
||||||
|
- Control tables: `CT_MRDS.A_SOURCE_FILE_RECEIVED`, `CT_MRDS.A_WORKFLOW_HISTORY`
|
||||||
|
|
||||||
|
## Recommendations
|
||||||
|
|
||||||
|
### For New Data (Airflow + DBT)
|
||||||
|
✅ Use standard workflow:
|
||||||
|
1. FILE_MANAGER.PROCESS_SOURCE_FILE (creates A_SOURCE_FILE_RECEIVED)
|
||||||
|
2. Airflow + DBT processing (creates A_WORKFLOW_HISTORY)
|
||||||
|
3. FILE_ARCHIVER.ARCHIVE_TABLE_DATA (uses both control tables)
|
||||||
|
|
||||||
|
### For Legacy Data Migration
|
||||||
|
✅ **Small Datasets (<1000 files)**: Strategy 1 (Manual Registration)
|
||||||
|
✅ **Large Datasets (>1000 files)**: Strategy 2 (Direct to ARCHIVE) or Strategy 3 (Hybrid)
|
||||||
|
❌ **Avoid**: Exporting to ODS/DATA without registration (orphaned files, cannot archive)
|
||||||
|
|
||||||
|
### Configuration Requirements
|
||||||
|
|
||||||
|
Before archiving legacy data, ensure source configuration exists:
|
||||||
|
|
||||||
|
```sql
|
||||||
|
-- Check if configuration exists
|
||||||
|
SELECT A_SOURCE_FILE_CONFIG_KEY, SOURCE_FILE_ID, TABLE_ID, ARCHIVAL_STRATEGY
|
||||||
|
FROM CT_MRDS.A_SOURCE_FILE_CONFIG
|
||||||
|
WHERE SOURCE_FILE_ID = 'YOUR_SOURCE_FILE_ID';
|
||||||
|
|
||||||
|
-- If missing, create configuration
|
||||||
|
CALL FILE_MANAGER.ADD_SOURCE_FILE_CONFIG(
|
||||||
|
pSourceKey => 'YOUR_SOURCE',
|
||||||
|
pSourceFileType => 'INPUT',
|
||||||
|
pSourceFileId => 'YOUR_SOURCE_FILE_ID',
|
||||||
|
pSourceFileDesc => 'Legacy migrated data',
|
||||||
|
pSourceFileNamePattern => 'pattern_*.csv',
|
||||||
|
pTableId => 'YOUR_TABLE_ID',
|
||||||
|
pTemplateTableName => 'CT_ET_TEMPLATES.YOUR_TEMPLATE'
|
||||||
|
);
|
||||||
|
```
|
||||||
|
|
||||||
|
## Known Limitations
|
||||||
|
|
||||||
|
### 1. No Retroactive A_SOURCE_FILE_RECEIVED Creation
|
||||||
|
DATA_EXPORTER does not automatically create A_SOURCE_FILE_RECEIVED records when exporting legacy data. This is by design - it's a one-time export tool, not a file tracking system.
|
||||||
|
|
||||||
|
### 2. FILE_ARCHIVER Requires A_SOURCE_FILE_RECEIVED
|
||||||
|
FILE_ARCHIVER cannot archive data without corresponding A_SOURCE_FILE_RECEIVED records. This prevents archiving of:
|
||||||
|
- Legacy Informatica-loaded data exported via DATA_EXPORTER
|
||||||
|
- Manually uploaded files not processed through FILE_MANAGER.PROCESS_SOURCE_FILE
|
||||||
|
|
||||||
|
### 3. Mixed Control Table References
|
||||||
|
During migration period, some procedures reference A_LOAD_HISTORY (DATA_EXPORTER) while others reference A_WORKFLOW_HISTORY (FILE_ARCHIVER). This is intentional but requires careful understanding of data lineage.
|
||||||
|
|
||||||
|
### 4. A_WORKFLOW_HISTORY vs A_LOAD_HISTORY Column Mismatch
|
||||||
|
The control tables have different schemas:
|
||||||
|
- **A_LOAD_HISTORY**: `LOAD_START`, `A_ETL_LOAD_SET_KEY`
|
||||||
|
- **A_WORKFLOW_HISTORY**: `WORKFLOW_START`, `A_WORKFLOW_HISTORY_KEY`
|
||||||
|
|
||||||
|
Test scripts must be aware of which table is being used.
|
||||||
|
|
||||||
|
## Related Documentation
|
||||||
|
|
||||||
|
- [PROCESS_SOURCE_FILE Guide](PROCESS_SOURCE_FILE_Guide.md) - File validation and ingestion workflow
|
||||||
|
- [FILE_ARCHIVER Guide](FILE_ARCHIVER_Guide.md) - Archival strategies and configuration
|
||||||
|
- [FILE_MANAGER Configuration Guide](FILE_MANAGER_Configuration_Guide.md) - System configuration
|
||||||
|
- [Package Deployment Guide](Package_Deployment_Guide.md) - Deployment procedures
|
||||||
|
|
||||||
|
## Summary
|
||||||
|
|
||||||
|
The migration from Informatica + WLA to Airflow + DBT introduces new control tables (`A_SOURCE_FILE_RECEIVED`, `A_WORKFLOW_HISTORY`) while maintaining compatibility with legacy control tables (`A_LOAD_HISTORY`). Understanding the relationship between these tables is critical for:
|
||||||
|
|
||||||
|
- **Data Lineage**: Tracking which system processed which data
|
||||||
|
- **Export Operations**: Choosing appropriate DATA_EXPORTER procedures
|
||||||
|
- **Archival Operations**: Ensuring FILE_ARCHIVER has required metadata
|
||||||
|
- **Testing**: Using correct control tables in test scenarios
|
||||||
|
|
||||||
|
The recommended approach for legacy data migration is **Strategy 2 (Direct to ARCHIVE)** for large datasets, as it avoids the complexity of manual A_SOURCE_FILE_RECEIVED registration while achieving the goal of moving historical data to long-term archival storage.
|
||||||
Reference in New Issue
Block a user