Files
mars-elt/python/connectors/tms/sample_DAG.py
Grzegorz Michalski 2c225d68ac init
2026-03-02 09:47:35 +01:00

356 lines
12 KiB
Python

"""
DAG: w_ODS_TMS_TRANSACTION (expanded example)
Purpose:
- Load layout+parameter metadata from TMS-layouts/w_ODS_TMS_TRANSACTION.yml
- Call connectors/tms/TMSDBT.py to retrieve data into CSV in object storage
- On first run, generate Oracle DDL and create an external table
- Process file and record status in MRDS workflow tables
Notes:
- This is an expanded, readable version of the factory-generated DAG.
- Replace paths/usernames/password references as appropriate.
"""
import copy
import itertools
import json
import logging
import os
import re
import subprocess
import sys
import traceback
from datetime import datetime, timedelta
from pathlib import Path
import yaml
from airflow import DAG
from airflow.operators.python import PythonOperator
from pytz import timezone
# --- Project-specific deps (must exist in your Airflow image) ---
from mrds.core import main # noqa: F401 # imported to mirror the factory env
import mrds.utils.manage_files as mf
import mrds.utils.manage_runs as mr
# ---------- Paths & constants ----------
gScriptDir = Path(globals().get("__file__", "./_")).absolute().parent
gDataDir = str(gScriptDir / "TMS-layouts") + "/"
gConfigDir = str(gScriptDir / "config")
gConnDir = "/opt/airflow/python/connectors/tms/"
gTableDir = str(gScriptDir / "TMS-tables") + "/"
DAG_NAME = "w_ODS_TMS_TRANSACTION"
ODS_TABLE = DAG_NAME
DATABASE_NAME = "ODS"
WF_NAME = DAG_NAME
default_args = {
"owner": "ecb",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"execution_timeout": timedelta(minutes=60),
"retry_delay": timedelta(minutes=5),
}
# ---------- Load YAML configs once on parse ----------
with open(gDataDir + DAG_NAME + ".yml", "r") as f:
report_desc = yaml.safe_load(f) or {}
with open(gConfigDir + "/TMS.yml", "r") as f:
tms_config = yaml.safe_load(f)
# TMS + storage config
tms_url = tms_config["TMS-URL"]
tms_user = tms_config["TMS-user"]
tms_pwd = tms_config["TMS-password"]
prefix = tms_config["dest-prefix"] + DAG_NAME + "/" + DAG_NAME + "/"
data_prefix = tms_config["data-prefix"] + DAG_NAME + "/"
dest = tms_config["dest-bucket"] + ":" + prefix
# Visible vs hidden params (from layout YAML)
params_visible = {}
params_hidden = {}
params_dict = report_desc.get("parameters") or {}
for p, meta in params_dict.items():
val = meta.get("value", None)
if not meta.get("hidden", False):
params_visible[p] = val
else:
params_hidden[p] = val
# ---------- Helpers (parameter handling) ----------
def _enum_param_combinations_recursive(params, keys):
"""
Build all combinations of params (cartesian product), supporting
'column(<name>)' derived lists aligned by index.
"""
k = None
result = []
keys = list(keys) # safe copy
while keys:
k = keys.pop(0)
v = params[k]
if v or v == "":
break
if not k:
return []
v = v if isinstance(v, list) else [v]
# derived columns aligned with v (same length)
derived_columns = []
# params_dict[k] holds the definition, not just the value
pdef = params_dict.get(k, {})
for c in list(pdef):
if re.match(r"column\(.*\)$", c):
vtmp = pdef[c]
vtmp = vtmp if isinstance(vtmp, list) else [vtmp]
derived_columns.append((c, vtmp))
if not keys:
for i, value in enumerate(v):
row = [(k, value)]
for col_key, aligned_values in derived_columns:
row.append((col_key, aligned_values[i]))
result.append(row)
return result
combinations = _enum_param_combinations_recursive(params, keys)
for row in combinations:
for i, vtmp in enumerate(v):
new_row = copy.deepcopy(row)
new_row.append((k, vtmp))
for col_key, aligned_values in derived_columns:
new_row.append((col_key, aligned_values[i]))
result.append(new_row)
return result
def _enum_param_combinations(params, sequential=False):
# Sequential path omitted (buggy in factory; not used there either)
return _enum_param_combinations_recursive(params, list(params))
def _allowed_select(table, expression, condition="1 = 1"):
"""
Guarded select used by eval_params(select(...)).
Whitelist tables to avoid arbitrary reads.
"""
if table.upper() not in (
ODS_TABLE.upper(),
"DUAL",
"CT_MRDS.A_WORKFLOW_HISTORY",
):
raise Exception(f"Not allowed to select from {table}")
res = mr.select_ods_tab(table, expression, condition)
return res[0]
def _eval_param(v):
"""
Evaluate special functional values:
- select(...) => guarded DB helper above
- eval(...) => strongly discouraged; keep disabled or restricted
"""
s = str(v) if v is not None else ""
if re.match(r"\s*select\(.*\)", s):
# Expose only 'select' symbol to eval
return eval(s, {"select": _allowed_select}, {})
if re.match(r"\s*eval\(.*\)\s*$", s):
# If you really must support eval, strictly sandbox or remove this path.
raise ValueError("eval(...) not allowed in this hardened DAG.")
return v
def _finalize_param_list(param_list):
"""
Apply replacements and drop virtual params according to YAML definitions.
"""
d = dict(param_list)
# Replace parameter tokens inside another parameter (string replace)
for p, meta in params_dict.items():
if meta.get("replace_parameter"):
target = meta["replace_parameter"]
if target in d and p in d and isinstance(d[target], str):
d[target] = d[target].replace(p, str(d[p]))
# Drop 'virtual' params
cleaned = []
for k, v in d.items():
meta = params_dict.get(k, {})
if not meta.get("virtual", False):
cleaned.append((k, v))
return cleaned
# ---------- Core work ----------
def execute_report(**context):
"""
For each parameter combination:
- create workflow key
- call TMSDBT.py retrieve to land CSV
- if first time, create Oracle table from generated DDL
- process file, finalize workflow Y/N
"""
logger = logging.getLogger("airflow.task")
logger.setLevel(logging.DEBUG)
run_id = context["dag_run"].run_id
all_params = {**params_visible, **params_hidden}
# 1) Compute combinations
combos = _enum_param_combinations(all_params)
# 2) Evaluate select(...) etc and finalize
evaluated = []
for combo in combos or [[]]:
# first pass: special evaluations
pair_list = []
for k, v in combo:
pair_list.append((k, _eval_param(v)))
# second pass: replacements + pruning
evaluated.append(_finalize_param_list(pair_list))
# if no combos at all, ensure we run once
if not evaluated:
evaluated = [[]]
# Timing + workflow
ts = "{:%Y%m%d_%H%M%S}".format(datetime.now(timezone("Europe/Berlin")))
for idx, param_list in enumerate(evaluated, start=1):
wf_key = mr.init_workflow(DATABASE_NAME, WF_NAME, run_id)
file_name = f"{WF_NAME}.{wf_key}.{ts}.csv"
try:
# Build connector command safely (no shell quoting games)
cmd = [
sys.executable, # 'python'
os.path.join(gConnDir, "TMSDBT.py"),
"retrieve",
"--name", WF_NAME,
"--url", tms_url,
"-U", tms_user,
"--password", tms_pwd,
"--layoutfile", gDataDir + DAG_NAME + ".fkr",
"-f", "scsv",
"--dataset", str(wf_key),
"-d", dest + file_name,
]
# Map params to -p or -c switches
for k, v in param_list:
sval = "" if v is None else str(v).rstrip()
m = re.match(r"column\((.*)\)$", k)
if m:
cmd.extend(["-c", f'{m.group(1)}={sval}'])
else:
cmd.extend(["-p", f"{k}={sval}"])
mr.set_workflow_property(wf_key, DATABASE_NAME, k, sval)
logger.debug("Running connector: %s", json.dumps(cmd))
res = subprocess.run(cmd, capture_output=True, check=False)
logger.debug("stdout: %s", res.stdout.decode(errors="ignore"))
logger.debug("stderr: %s", res.stderr.decode(errors="ignore"))
if res.returncode is None:
raise RuntimeError("Connector returned no status")
if res.returncode == 1:
logger.info("No data returned for wf_key=%s (continuing)", wf_key)
mr.finalise_workflow(wf_key, "Y")
continue
if res.returncode != 0:
raise RuntimeError(f"Connector failed (rc={res.returncode})")
# Data landed -> ensure source config exists, bootstrap table if needed
cfg = mf.execute_query(
"select * from CT_MRDS.A_SOURCE_FILE_CONFIG "
f"where a_source_key = 'TMS' and table_id = '{ODS_TABLE}'"
)
if not cfg:
# Generate DDL file
ddl_cmd = [
sys.executable,
os.path.join(gConnDir, "TMSDBT.py"),
"create-oracle-table",
"--name", WF_NAME,
"--url", tms_url,
"-U", tms_user,
"--password", tms_pwd,
"--layoutfile", gDataDir + DAG_NAME + ".fkr",
"-d", gTableDir + WF_NAME + ".sql",
]
for k, v in param_list:
sval = "" if v is None else str(v).rstrip()
m = re.match(r"column\((.*)\)$", k)
if m:
ddl_cmd.extend(["-c", f'{m.group(1)}={sval}'])
else:
ddl_cmd.extend(["-p", f"{k}={sval}"])
logger.debug("Generating DDL: %s", json.dumps(ddl_cmd))
ddl_res = subprocess.run(ddl_cmd, capture_output=True, check=True)
logger.debug("DDL stdout: %s", ddl_res.stdout.decode(errors="ignore"))
logger.debug("DDL stderr: %s", ddl_res.stderr.decode(errors="ignore"))
# Execute DDL and create external table
sql = Path(gTableDir + WF_NAME + ".sql").read_text()
mf.execute(sql)
mf.add_column_date_format(
f"CT_ET_TEMPLATES.{ODS_TABLE}", "DEFAULT", "DD/MM/YYYY HH24:MI:SS"
)
mf.create_external_table(ODS_TABLE, f"CT_ET_TEMPLATES.{ODS_TABLE}", data_prefix)
mf.add_source_file_config(
"TMS",
"INPUT",
DAG_NAME,
DAG_NAME,
r".*\.csv",
ODS_TABLE,
f"CT_ET_TEMPLATES.{ODS_TABLE}",
)
# Process landed file (register, move, etc. as per your mf impl)
mf.process_source_file(prefix, file_name)
mr.finalise_workflow(wf_key, "Y")
except BaseException as ex:
# rich error logging, then mark workflow failed and re-raise
ex_type, ex_value, ex_tb = sys.exc_info()
tb = traceback.extract_tb(ex_tb)
stack = [
f"File: {t[0]}, Line: {t[1]}, Func: {t[2]}, Code: {t[3]}"
for t in tb
]
logging.error("Exception type: %s", ex_type.__name__)
logging.error("Exception message: %s", ex_value)
logging.error("Stack trace: %s", stack)
mr.finalise_workflow(wf_key, "N")
raise
# ---------- DAG definition ----------
with DAG(
dag_id=DAG_NAME,
default_args=default_args,
description=DAG_NAME,
schedule_interval=None, # manual trigger
params=params_visible, # visible-only; hidden merged inside task
start_date=datetime(2025, 1, 1),
catchup=False,
tags=[DAG_NAME],
) as dag:
retrieve_report = PythonOperator(
task_id="retrieve_report",
python_callable=execute_report,
execution_timeout=timedelta(minutes=30),
)