Files
Grzegorz Michalski 2c225d68ac init
2026-03-02 09:47:35 +01:00
..
2026-03-02 09:47:35 +01:00
2026-03-02 09:47:35 +01:00
2026-03-02 09:47:35 +01:00
2026-03-02 09:47:35 +01:00
2026-03-02 09:47:35 +01:00

DAG Factory for TMS Data Ingestion

Overview

This repository contains a DAG factory that generates multiple Apache Airflow DAGs to ingest data from a Treasury Management System (TMS) into the data warehouse.

The factory dynamically creates one DAG per TMS dataset, using YAML-based layouts to define parameters and metadata. Each DAG:

  • Calls the TMSDB CLI connector (TMSDB.py) to retrieve data in CSV format.
  • Loads the data into object storage.
  • Creates or refreshes Oracle external tables if needed.
  • Registers workflow metadata in MRDS tables.
  • Processes the landed file for downstream use.

Components

1. DAG Factory (create_dag)

  • Purpose: Auto-generates DAGs for each TMS dataset.
  • Inputs:
    • TMS-layouts/<DAG_NAME>.yml: defines report parameters, visible/hidden flags, virtual/replacement parameters.
    • config/TMS.yml: holds system-wide TMS connection info and storage prefixes.
  • Outputs:
    • Airflow DAG objects named like w_ODS_TMS_<ENTITY>.

2. TMSDB Connector (TMSDB.py)

  • Purpose: CLI tool that interacts with the TMS service.
  • Commands:
    • retrieve: fetch rows from TMS into CSV, spool to storage, return exit codes (0 = data, 1 = no data).
    • create-oracle-table: generate an Oracle DDL file based on dataset metadata.
    • create-model: generate dbt models for dataset integration.
  • Behavior:
    • Adds synthetic columns (A_KEY, A_WORKFLOW_HISTORY_KEY).
    • Supports additional columns via -c.
    • Uploads to object storage if bucket:path/file.csv is given.

3. Manage Files (mf)

Utilities for file-level operations:

  • execute_query(sql)
  • add_source_file_config(...)
  • process_source_file(prefix, file)
  • create_external_table(table, source, prefix)
  • add_column_date_format(...)

4. Manage Runs (mr)

Utilities for workflow tracking:

  • init_workflow(db, wf_name, run_id)
  • set_workflow_property(key, db, name, value)
  • finalise_workflow(key, status)
  • select_ods_tab(table, expr, cond)

How a DAG Works

DAG Structure

Each DAG has a single task:

  • retrieve_report: a PythonOperator that orchestrates all steps internally.

Task Flow

  1. Read YAML configs

    • Parameters split into visible (exposed in Airflow UI) and hidden.
    • System config (URL, creds, bucket/prefix) loaded from config/TMS.yml.
  2. Parameter processing

    • Cartesian product of parameter lists.
    • Support for:
      • column(...) aligned columns.
      • select(...) SQL evaluation (restricted tables only).
      • Virtual parameters (dropped later).
      • Replace-parameter logic.
  3. Workflow init

    • mr.init_workflow creates a workflow key.
  4. Data retrieval

    • Build a TMSDB.py retrieve command.
    • Run via subprocess.
    • Handle return codes:
      • 0: data returned.
      • 1: no data → workflow finalized as success.
      • !=0: error → workflow finalized as failure.
  5. First-run bootstrap

    • If no config exists for the dataset:
      • Run TMSDB.py create-oracle-table to generate SQL.
      • Execute SQL via mf.execute.
      • Add date formats and external table with mf.create_external_table.
      • Register config with mf.add_source_file_config.
  6. File processing

    • mf.process_source_file(prefix, filename) ingests the CSV.
  7. Workflow finalization

    • mr.finalise_workflow(wf_key, 'Y' | 'N').

Example DAG

Example: w_ODS_TMS_TRANSACTION

with DAG(
    dag_id="w_ODS_TMS_TRANSACTION",
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2025, 1, 1),
    catchup=False,
    params={"date_from": "2025-01-01", "date_to": "2025-01-31"},
) as dag:

    retrieve_report = PythonOperator(
        task_id="retrieve_report",
        python_callable=execute_report,
        execution_timeout=timedelta(minutes=30),
    )

Repository Layout

 tms/
    ├─ generate_tm_ods_dags.py           # DAG generator script (calls create_dag many times)
    ├─ TMS-layouts/
    │    ├─ w_ODS_TMS_TRANSACTION.yml
    │    └─ ...
    ├─ config/
    │    └─ TMS.yml
    └─ TMS-tables/              # Create table SQL scripts

Security Considerations

  • eval() is dangerous.
    Only select(...) is allowed, and its whitelisted to safe tables.
  • No raw shell commands.
    Use subprocess.run([...], shell=False) for safety.
  • Secrets in config.
    TMS username/password are stored in TMS.yml → best stored in Airflow Connections/Secrets Manager.
  • Exit codes matter.
    Workflow correctness relies on TMSDB.py returning the right codes (0, 1, other).

Extending the Factory

Add a new dataset

  1. Create a YAML layout in TMS-layouts/, e.g.:

    parameters:
      date_from:
        value: "2025-01-01"
      date_to:
        value: "2025-01-31"
    
  2. Add a line in dag_factory.py:

    create_dag("w_ODS_TMS_NEWENTITY")
    
  3. Deploy the DAG file to Airflow.

Run a DAG manually

In the Airflow UI:

  1. Find DAG w_ODS_TMS_<ENTITY>.
  2. Trigger DAG → optionally override visible parameters.
  3. Monitor logs for retrieve_report.

Diagram

DAG Factory Flow

flowchart LR
  subgraph DAGFactory["Dag Factory"]
    direction TB
      B["Load TMS config (TMS.yml)"] --> C["Load dataset layout (YAML)"]
      C --> D["Extract visible & hidden parameters"]
      D --> E["Define Airflow DAG with retrieve_report task"]
      E --> F["Register DAG globally"]
      F --> G["Repeat for each dataset name"]
      G --> H["All DAGs available in Airflow"]
  end
  A["Airflow parses dag_factory.py"] --> DAGFactory

Sample DAG Execution Flow

flowchart LR
  subgraph ExampleDAG
    direction TB
      B[Read YAML configs] --> C[Build parameter combinations]
      C --> D["Evaluate select(...) and replace virtual params"]
      D --> E["Init workflow (mr.init_workflow)"]
      E --> F["Run TMSDB.py retrieve (subprocess)"]

      %% Branches on return codes
      F --> |rc=1: No data| G[Finalise workflow success]
      F --> |rc=0: Data returned| H[Check if source file config exists]
      F --> |rc!=0: Error| M[Finalise workflow failure]

      %% Config missing branch
      H --> |"Config missing (first run)"| I[Run TMSDB.py create-oracle-table → Generate DDL]
      I --> J[Execute DDL via mf.execute → Create Oracle external table]
      J --> K["Register file source config (mf.add_source_file_config)"]
      K --> L["Process landed file (mf.process_source_file)"]
      L --> N[Finalise workflow success]

      %% Config exists branch
      H --> |Config exists| P["Process landed file (mf.process_source_file)"]
      P --> N[Finalise workflow success]
  end

Dependencies

  • Airflow 2.x
  • Python 3.9+
  • mrds package (providing utils.manage_files and utils.manage_runs)
  • Oracle client / Impala client (for table creation & querying)
  • Object storage client (for uploading CSVs)

Summary

The DAG factory is a scalable way to create dozens of ingestion DAGs for TMS datasets with minimal boilerplate. It leverages:

  • YAML configs for parameters,
  • TMSDB CLI for data retrieval and DDL generation,
  • MRDS utilities for workflow tracking and file handling.

It standardizes ingestion while keeping each datasets DAG lightweight and uniform.