Files
mars-elt/airflow/ods/tms/README.md
Grzegorz Michalski 2c225d68ac init
2026-03-02 09:47:35 +01:00

237 lines
7.3 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# DAG Factory for TMS Data Ingestion
## Overview
This repository contains a **DAG factory** that generates multiple Apache Airflow DAGs to ingest data from a **Treasury Management System (TMS)** into the data warehouse.
The factory dynamically creates one DAG per TMS dataset, using **YAML-based layouts** to define parameters and metadata. Each DAG:
- Calls the **TMSDB CLI connector** (`TMSDB.py`) to retrieve data in CSV format.
- Loads the data into object storage.
- Creates or refreshes **Oracle external tables** if needed.
- Registers workflow metadata in MRDS tables.
- Processes the landed file for downstream use.
---
## Components
### 1. DAG Factory (`create_dag`)
- **Purpose**: Auto-generates DAGs for each TMS dataset.
- **Inputs**:
- `TMS-layouts/<DAG_NAME>.yml`: defines report parameters, visible/hidden flags, virtual/replacement parameters.
- `config/TMS.yml`: holds system-wide TMS connection info and storage prefixes.
- **Outputs**:
- Airflow DAG objects named like `w_ODS_TMS_<ENTITY>`.
### 2. TMSDB Connector (`TMSDB.py`)
- **Purpose**: CLI tool that interacts with the TMS service.
- **Commands**:
- `retrieve`: fetch rows from TMS into CSV, spool to storage, return exit codes (`0 = data`, `1 = no data`).
- `create-oracle-table`: generate an Oracle DDL file based on dataset metadata.
- `create-model`: generate dbt models for dataset integration.
- **Behavior**:
- Adds synthetic columns (`A_KEY`, `A_WORKFLOW_HISTORY_KEY`).
- Supports additional columns via `-c`.
- Uploads to object storage if `bucket:path/file.csv` is given.
### 3. Manage Files (`mf`)
Utilities for file-level operations:
- `execute_query(sql)`
- `add_source_file_config(...)`
- `process_source_file(prefix, file)`
- `create_external_table(table, source, prefix)`
- `add_column_date_format(...)`
### 4. Manage Runs (`mr`)
Utilities for workflow tracking:
- `init_workflow(db, wf_name, run_id)`
- `set_workflow_property(key, db, name, value)`
- `finalise_workflow(key, status)`
- `select_ods_tab(table, expr, cond)`
---
## How a DAG Works
### DAG Structure
Each DAG has a single task:
- `retrieve_report`: a `PythonOperator` that orchestrates all steps internally.
### Task Flow
1. **Read YAML configs**
- Parameters split into visible (exposed in Airflow UI) and hidden.
- System config (URL, creds, bucket/prefix) loaded from `config/TMS.yml`.
2. **Parameter processing**
- Cartesian product of parameter lists.
- Support for:
- `column(...)` aligned columns.
- `select(...)` SQL evaluation (restricted tables only).
- Virtual parameters (dropped later).
- Replace-parameter logic.
3. **Workflow init**
- `mr.init_workflow` creates a workflow key.
4. **Data retrieval**
- Build a `TMSDB.py retrieve` command.
- Run via subprocess.
- Handle return codes:
- `0`: data returned.
- `1`: no data → workflow finalized as success.
- `!=0`: error → workflow finalized as failure.
5. **First-run bootstrap**
- If no config exists for the dataset:
- Run `TMSDB.py create-oracle-table` to generate SQL.
- Execute SQL via `mf.execute`.
- Add date formats and external table with `mf.create_external_table`.
- Register config with `mf.add_source_file_config`.
6. **File processing**
- `mf.process_source_file(prefix, filename)` ingests the CSV.
7. **Workflow finalization**
- `mr.finalise_workflow(wf_key, 'Y' | 'N')`.
---
## Example DAG
Example: `w_ODS_TMS_TRANSACTION`
```python
with DAG(
dag_id="w_ODS_TMS_TRANSACTION",
default_args=default_args,
schedule_interval=None,
start_date=datetime(2025, 1, 1),
catchup=False,
params={"date_from": "2025-01-01", "date_to": "2025-01-31"},
) as dag:
retrieve_report = PythonOperator(
task_id="retrieve_report",
python_callable=execute_report,
execution_timeout=timedelta(minutes=30),
)
```
---
## Repository Layout
```
tms/
├─ generate_tm_ods_dags.py # DAG generator script (calls create_dag many times)
├─ TMS-layouts/
│ ├─ w_ODS_TMS_TRANSACTION.yml
│ └─ ...
├─ config/
│ └─ TMS.yml
└─ TMS-tables/ # Create table SQL scripts
```
---
## Security Considerations
- **`eval()` is dangerous.**
Only `select(...)` is allowed, and its whitelisted to safe tables.
- **No raw shell commands.**
Use `subprocess.run([...], shell=False)` for safety.
- **Secrets in config.**
TMS username/password are stored in `TMS.yml` → best stored in Airflow Connections/Secrets Manager.
- **Exit codes matter.**
Workflow correctness relies on `TMSDB.py` returning the right codes (`0`, `1`, other).
---
## Extending the Factory
### Add a new dataset
1. Create a YAML layout in `TMS-layouts/`, e.g.:
```yaml
parameters:
date_from:
value: "2025-01-01"
date_to:
value: "2025-01-31"
```
2. Add a line in `dag_factory.py`:
```python
create_dag("w_ODS_TMS_NEWENTITY")
```
3. Deploy the DAG file to Airflow.
### Run a DAG manually
In the Airflow UI:
1. Find DAG `w_ODS_TMS_<ENTITY>`.
2. Trigger DAG → optionally override visible parameters.
3. Monitor logs for `retrieve_report`.
---
## Diagram
**DAG Factory Flow**
```mermaid
flowchart LR
subgraph DAGFactory["Dag Factory"]
direction TB
B["Load TMS config (TMS.yml)"] --> C["Load dataset layout (YAML)"]
C --> D["Extract visible & hidden parameters"]
D --> E["Define Airflow DAG with retrieve_report task"]
E --> F["Register DAG globally"]
F --> G["Repeat for each dataset name"]
G --> H["All DAGs available in Airflow"]
end
A["Airflow parses dag_factory.py"] --> DAGFactory
```
**Sample DAG Execution Flow**
```mermaid
flowchart LR
subgraph ExampleDAG
direction TB
B[Read YAML configs] --> C[Build parameter combinations]
C --> D["Evaluate select(...) and replace virtual params"]
D --> E["Init workflow (mr.init_workflow)"]
E --> F["Run TMSDB.py retrieve (subprocess)"]
%% Branches on return codes
F --> |rc=1: No data| G[Finalise workflow success]
F --> |rc=0: Data returned| H[Check if source file config exists]
F --> |rc!=0: Error| M[Finalise workflow failure]
%% Config missing branch
H --> |"Config missing (first run)"| I[Run TMSDB.py create-oracle-table → Generate DDL]
I --> J[Execute DDL via mf.execute → Create Oracle external table]
J --> K["Register file source config (mf.add_source_file_config)"]
K --> L["Process landed file (mf.process_source_file)"]
L --> N[Finalise workflow success]
%% Config exists branch
H --> |Config exists| P["Process landed file (mf.process_source_file)"]
P --> N[Finalise workflow success]
end
```
---
## Dependencies
- **Airflow 2.x**
- **Python 3.9+**
- **mrds** package (providing `utils.manage_files` and `utils.manage_runs`)
- **Oracle client / Impala client** (for table creation & querying)
- **Object storage client** (for uploading CSVs)
---
## Summary
The DAG factory is a scalable way to create **dozens of ingestion DAGs** for TMS datasets with minimal boilerplate. It leverages:
- **YAML configs** for parameters,
- **TMSDB CLI** for data retrieval and DDL generation,
- **MRDS utilities** for workflow tracking and file handling.
It standardizes ingestion while keeping each datasets DAG lightweight and uniform.