DAG Factory for TMS Data Ingestion
Overview
This repository contains a DAG factory that generates multiple Apache Airflow DAGs to ingest data from a Treasury Management System (TMS) into the data warehouse.
The factory dynamically creates one DAG per TMS dataset, using YAML-based layouts to define parameters and metadata. Each DAG:
- Calls the TMSDB CLI connector (
TMSDB.py) to retrieve data in CSV format. - Loads the data into object storage.
- Creates or refreshes Oracle external tables if needed.
- Registers workflow metadata in MRDS tables.
- Processes the landed file for downstream use.
Components
1. DAG Factory (create_dag)
- Purpose: Auto-generates DAGs for each TMS dataset.
- Inputs:
TMS-layouts/<DAG_NAME>.yml: defines report parameters, visible/hidden flags, virtual/replacement parameters.config/TMS.yml: holds system-wide TMS connection info and storage prefixes.
- Outputs:
- Airflow DAG objects named like
w_ODS_TMS_<ENTITY>.
- Airflow DAG objects named like
2. TMSDB Connector (TMSDB.py)
- Purpose: CLI tool that interacts with the TMS service.
- Commands:
retrieve: fetch rows from TMS into CSV, spool to storage, return exit codes (0 = data,1 = no data).create-oracle-table: generate an Oracle DDL file based on dataset metadata.create-model: generate dbt models for dataset integration.
- Behavior:
- Adds synthetic columns (
A_KEY,A_WORKFLOW_HISTORY_KEY). - Supports additional columns via
-c. - Uploads to object storage if
bucket:path/file.csvis given.
- Adds synthetic columns (
3. Manage Files (mf)
Utilities for file-level operations:
execute_query(sql)add_source_file_config(...)process_source_file(prefix, file)create_external_table(table, source, prefix)add_column_date_format(...)
4. Manage Runs (mr)
Utilities for workflow tracking:
init_workflow(db, wf_name, run_id)set_workflow_property(key, db, name, value)finalise_workflow(key, status)select_ods_tab(table, expr, cond)
How a DAG Works
DAG Structure
Each DAG has a single task:
retrieve_report: aPythonOperatorthat orchestrates all steps internally.
Task Flow
-
Read YAML configs
- Parameters split into visible (exposed in Airflow UI) and hidden.
- System config (URL, creds, bucket/prefix) loaded from
config/TMS.yml.
-
Parameter processing
- Cartesian product of parameter lists.
- Support for:
column(...)aligned columns.select(...)SQL evaluation (restricted tables only).- Virtual parameters (dropped later).
- Replace-parameter logic.
-
Workflow init
mr.init_workflowcreates a workflow key.
-
Data retrieval
- Build a
TMSDB.py retrievecommand. - Run via subprocess.
- Handle return codes:
0: data returned.1: no data → workflow finalized as success.!=0: error → workflow finalized as failure.
- Build a
-
First-run bootstrap
- If no config exists for the dataset:
- Run
TMSDB.py create-oracle-tableto generate SQL. - Execute SQL via
mf.execute. - Add date formats and external table with
mf.create_external_table. - Register config with
mf.add_source_file_config.
- Run
- If no config exists for the dataset:
-
File processing
mf.process_source_file(prefix, filename)ingests the CSV.
-
Workflow finalization
mr.finalise_workflow(wf_key, 'Y' | 'N').
Example DAG
Example: w_ODS_TMS_TRANSACTION
with DAG(
dag_id="w_ODS_TMS_TRANSACTION",
default_args=default_args,
schedule_interval=None,
start_date=datetime(2025, 1, 1),
catchup=False,
params={"date_from": "2025-01-01", "date_to": "2025-01-31"},
) as dag:
retrieve_report = PythonOperator(
task_id="retrieve_report",
python_callable=execute_report,
execution_timeout=timedelta(minutes=30),
)
Repository Layout
tms/
├─ generate_tm_ods_dags.py # DAG generator script (calls create_dag many times)
├─ TMS-layouts/
│ ├─ w_ODS_TMS_TRANSACTION.yml
│ └─ ...
├─ config/
│ └─ TMS.yml
└─ TMS-tables/ # Create table SQL scripts
Security Considerations
eval()is dangerous.
Onlyselect(...)is allowed, and it’s whitelisted to safe tables.- No raw shell commands.
Usesubprocess.run([...], shell=False)for safety. - Secrets in config.
TMS username/password are stored inTMS.yml→ best stored in Airflow Connections/Secrets Manager. - Exit codes matter.
Workflow correctness relies onTMSDB.pyreturning the right codes (0,1, other).
Extending the Factory
Add a new dataset
-
Create a YAML layout in
TMS-layouts/, e.g.:parameters: date_from: value: "2025-01-01" date_to: value: "2025-01-31" -
Add a line in
dag_factory.py:create_dag("w_ODS_TMS_NEWENTITY") -
Deploy the DAG file to Airflow.
Run a DAG manually
In the Airflow UI:
- Find DAG
w_ODS_TMS_<ENTITY>. - Trigger DAG → optionally override visible parameters.
- Monitor logs for
retrieve_report.
Diagram
DAG Factory Flow
flowchart LR
subgraph DAGFactory["Dag Factory"]
direction TB
B["Load TMS config (TMS.yml)"] --> C["Load dataset layout (YAML)"]
C --> D["Extract visible & hidden parameters"]
D --> E["Define Airflow DAG with retrieve_report task"]
E --> F["Register DAG globally"]
F --> G["Repeat for each dataset name"]
G --> H["All DAGs available in Airflow"]
end
A["Airflow parses dag_factory.py"] --> DAGFactory
Sample DAG Execution Flow
flowchart LR
subgraph ExampleDAG
direction TB
B[Read YAML configs] --> C[Build parameter combinations]
C --> D["Evaluate select(...) and replace virtual params"]
D --> E["Init workflow (mr.init_workflow)"]
E --> F["Run TMSDB.py retrieve (subprocess)"]
%% Branches on return codes
F --> |rc=1: No data| G[Finalise workflow success]
F --> |rc=0: Data returned| H[Check if source file config exists]
F --> |rc!=0: Error| M[Finalise workflow failure]
%% Config missing branch
H --> |"Config missing (first run)"| I[Run TMSDB.py create-oracle-table → Generate DDL]
I --> J[Execute DDL via mf.execute → Create Oracle external table]
J --> K["Register file source config (mf.add_source_file_config)"]
K --> L["Process landed file (mf.process_source_file)"]
L --> N[Finalise workflow success]
%% Config exists branch
H --> |Config exists| P["Process landed file (mf.process_source_file)"]
P --> N[Finalise workflow success]
end
Dependencies
- Airflow 2.x
- Python 3.9+
- mrds package (providing
utils.manage_filesandutils.manage_runs) - Oracle client / Impala client (for table creation & querying)
- Object storage client (for uploading CSVs)
Summary
The DAG factory is a scalable way to create dozens of ingestion DAGs for TMS datasets with minimal boilerplate. It leverages:
- YAML configs for parameters,
- TMSDB CLI for data retrieval and DDL generation,
- MRDS utilities for workflow tracking and file handling.
It standardizes ingestion while keeping each dataset’s DAG lightweight and uniform.