356 lines
12 KiB
Python
356 lines
12 KiB
Python
"""
|
|
DAG: w_ODS_TMS_TRANSACTION (expanded example)
|
|
Purpose:
|
|
- Load layout+parameter metadata from TMS-layouts/w_ODS_TMS_TRANSACTION.yml
|
|
- Call connectors/tms/TMSDBT.py to retrieve data into CSV in object storage
|
|
- On first run, generate Oracle DDL and create an external table
|
|
- Process file and record status in MRDS workflow tables
|
|
Notes:
|
|
- This is an expanded, readable version of the factory-generated DAG.
|
|
- Replace paths/usernames/password references as appropriate.
|
|
"""
|
|
|
|
import copy
|
|
import itertools
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import traceback
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
import yaml
|
|
from airflow import DAG
|
|
from airflow.operators.python import PythonOperator
|
|
from pytz import timezone
|
|
|
|
# --- Project-specific deps (must exist in your Airflow image) ---
|
|
from mrds.core import main # noqa: F401 # imported to mirror the factory env
|
|
import mrds.utils.manage_files as mf
|
|
import mrds.utils.manage_runs as mr
|
|
|
|
# ---------- Paths & constants ----------
|
|
gScriptDir = Path(globals().get("__file__", "./_")).absolute().parent
|
|
gDataDir = str(gScriptDir / "TMS-layouts") + "/"
|
|
gConfigDir = str(gScriptDir / "config")
|
|
gConnDir = "/opt/airflow/python/connectors/tms/"
|
|
gTableDir = str(gScriptDir / "TMS-tables") + "/"
|
|
|
|
DAG_NAME = "w_ODS_TMS_TRANSACTION"
|
|
ODS_TABLE = DAG_NAME
|
|
DATABASE_NAME = "ODS"
|
|
WF_NAME = DAG_NAME
|
|
|
|
default_args = {
|
|
"owner": "ecb",
|
|
"depends_on_past": False,
|
|
"email_on_failure": False,
|
|
"email_on_retry": False,
|
|
"retries": 0,
|
|
"execution_timeout": timedelta(minutes=60),
|
|
"retry_delay": timedelta(minutes=5),
|
|
}
|
|
|
|
# ---------- Load YAML configs once on parse ----------
|
|
with open(gDataDir + DAG_NAME + ".yml", "r") as f:
|
|
report_desc = yaml.safe_load(f) or {}
|
|
|
|
with open(gConfigDir + "/TMS.yml", "r") as f:
|
|
tms_config = yaml.safe_load(f)
|
|
|
|
# TMS + storage config
|
|
tms_url = tms_config["TMS-URL"]
|
|
tms_user = tms_config["TMS-user"]
|
|
tms_pwd = tms_config["TMS-password"]
|
|
prefix = tms_config["dest-prefix"] + DAG_NAME + "/" + DAG_NAME + "/"
|
|
data_prefix = tms_config["data-prefix"] + DAG_NAME + "/"
|
|
dest = tms_config["dest-bucket"] + ":" + prefix
|
|
|
|
# Visible vs hidden params (from layout YAML)
|
|
params_visible = {}
|
|
params_hidden = {}
|
|
params_dict = report_desc.get("parameters") or {}
|
|
for p, meta in params_dict.items():
|
|
val = meta.get("value", None)
|
|
if not meta.get("hidden", False):
|
|
params_visible[p] = val
|
|
else:
|
|
params_hidden[p] = val
|
|
|
|
# ---------- Helpers (parameter handling) ----------
|
|
def _enum_param_combinations_recursive(params, keys):
|
|
"""
|
|
Build all combinations of params (cartesian product), supporting
|
|
'column(<name>)' derived lists aligned by index.
|
|
"""
|
|
k = None
|
|
result = []
|
|
keys = list(keys) # safe copy
|
|
|
|
while keys:
|
|
k = keys.pop(0)
|
|
v = params[k]
|
|
if v or v == "":
|
|
break
|
|
|
|
if not k:
|
|
return []
|
|
|
|
v = v if isinstance(v, list) else [v]
|
|
|
|
# derived columns aligned with v (same length)
|
|
derived_columns = []
|
|
# params_dict[k] holds the definition, not just the value
|
|
pdef = params_dict.get(k, {})
|
|
for c in list(pdef):
|
|
if re.match(r"column\(.*\)$", c):
|
|
vtmp = pdef[c]
|
|
vtmp = vtmp if isinstance(vtmp, list) else [vtmp]
|
|
derived_columns.append((c, vtmp))
|
|
|
|
if not keys:
|
|
for i, value in enumerate(v):
|
|
row = [(k, value)]
|
|
for col_key, aligned_values in derived_columns:
|
|
row.append((col_key, aligned_values[i]))
|
|
result.append(row)
|
|
return result
|
|
|
|
combinations = _enum_param_combinations_recursive(params, keys)
|
|
for row in combinations:
|
|
for i, vtmp in enumerate(v):
|
|
new_row = copy.deepcopy(row)
|
|
new_row.append((k, vtmp))
|
|
for col_key, aligned_values in derived_columns:
|
|
new_row.append((col_key, aligned_values[i]))
|
|
result.append(new_row)
|
|
|
|
return result
|
|
|
|
|
|
def _enum_param_combinations(params, sequential=False):
|
|
# Sequential path omitted (buggy in factory; not used there either)
|
|
return _enum_param_combinations_recursive(params, list(params))
|
|
|
|
|
|
def _allowed_select(table, expression, condition="1 = 1"):
|
|
"""
|
|
Guarded select used by eval_params(select(...)).
|
|
Whitelist tables to avoid arbitrary reads.
|
|
"""
|
|
if table.upper() not in (
|
|
ODS_TABLE.upper(),
|
|
"DUAL",
|
|
"CT_MRDS.A_WORKFLOW_HISTORY",
|
|
):
|
|
raise Exception(f"Not allowed to select from {table}")
|
|
res = mr.select_ods_tab(table, expression, condition)
|
|
return res[0]
|
|
|
|
|
|
def _eval_param(v):
|
|
"""
|
|
Evaluate special functional values:
|
|
- select(...) => guarded DB helper above
|
|
- eval(...) => strongly discouraged; keep disabled or restricted
|
|
"""
|
|
s = str(v) if v is not None else ""
|
|
if re.match(r"\s*select\(.*\)", s):
|
|
# Expose only 'select' symbol to eval
|
|
return eval(s, {"select": _allowed_select}, {})
|
|
if re.match(r"\s*eval\(.*\)\s*$", s):
|
|
# If you really must support eval, strictly sandbox or remove this path.
|
|
raise ValueError("eval(...) not allowed in this hardened DAG.")
|
|
return v
|
|
|
|
|
|
def _finalize_param_list(param_list):
|
|
"""
|
|
Apply replacements and drop virtual params according to YAML definitions.
|
|
"""
|
|
d = dict(param_list)
|
|
|
|
# Replace parameter tokens inside another parameter (string replace)
|
|
for p, meta in params_dict.items():
|
|
if meta.get("replace_parameter"):
|
|
target = meta["replace_parameter"]
|
|
if target in d and p in d and isinstance(d[target], str):
|
|
d[target] = d[target].replace(p, str(d[p]))
|
|
|
|
# Drop 'virtual' params
|
|
cleaned = []
|
|
for k, v in d.items():
|
|
meta = params_dict.get(k, {})
|
|
if not meta.get("virtual", False):
|
|
cleaned.append((k, v))
|
|
return cleaned
|
|
|
|
|
|
# ---------- Core work ----------
|
|
def execute_report(**context):
|
|
"""
|
|
For each parameter combination:
|
|
- create workflow key
|
|
- call TMSDBT.py retrieve to land CSV
|
|
- if first time, create Oracle table from generated DDL
|
|
- process file, finalize workflow Y/N
|
|
"""
|
|
logger = logging.getLogger("airflow.task")
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
run_id = context["dag_run"].run_id
|
|
all_params = {**params_visible, **params_hidden}
|
|
|
|
# 1) Compute combinations
|
|
combos = _enum_param_combinations(all_params)
|
|
|
|
# 2) Evaluate select(...) etc and finalize
|
|
evaluated = []
|
|
for combo in combos or [[]]:
|
|
# first pass: special evaluations
|
|
pair_list = []
|
|
for k, v in combo:
|
|
pair_list.append((k, _eval_param(v)))
|
|
# second pass: replacements + pruning
|
|
evaluated.append(_finalize_param_list(pair_list))
|
|
|
|
# if no combos at all, ensure we run once
|
|
if not evaluated:
|
|
evaluated = [[]]
|
|
|
|
# Timing + workflow
|
|
ts = "{:%Y%m%d_%H%M%S}".format(datetime.now(timezone("Europe/Berlin")))
|
|
|
|
for idx, param_list in enumerate(evaluated, start=1):
|
|
wf_key = mr.init_workflow(DATABASE_NAME, WF_NAME, run_id)
|
|
file_name = f"{WF_NAME}.{wf_key}.{ts}.csv"
|
|
|
|
try:
|
|
# Build connector command safely (no shell quoting games)
|
|
cmd = [
|
|
sys.executable, # 'python'
|
|
os.path.join(gConnDir, "TMSDBT.py"),
|
|
"retrieve",
|
|
"--name", WF_NAME,
|
|
"--url", tms_url,
|
|
"-U", tms_user,
|
|
"--password", tms_pwd,
|
|
"--layoutfile", gDataDir + DAG_NAME + ".fkr",
|
|
"-f", "scsv",
|
|
"--dataset", str(wf_key),
|
|
"-d", dest + file_name,
|
|
]
|
|
|
|
# Map params to -p or -c switches
|
|
for k, v in param_list:
|
|
sval = "" if v is None else str(v).rstrip()
|
|
m = re.match(r"column\((.*)\)$", k)
|
|
if m:
|
|
cmd.extend(["-c", f'{m.group(1)}={sval}'])
|
|
else:
|
|
cmd.extend(["-p", f"{k}={sval}"])
|
|
mr.set_workflow_property(wf_key, DATABASE_NAME, k, sval)
|
|
|
|
logger.debug("Running connector: %s", json.dumps(cmd))
|
|
res = subprocess.run(cmd, capture_output=True, check=False)
|
|
logger.debug("stdout: %s", res.stdout.decode(errors="ignore"))
|
|
logger.debug("stderr: %s", res.stderr.decode(errors="ignore"))
|
|
|
|
if res.returncode is None:
|
|
raise RuntimeError("Connector returned no status")
|
|
if res.returncode == 1:
|
|
logger.info("No data returned for wf_key=%s (continuing)", wf_key)
|
|
mr.finalise_workflow(wf_key, "Y")
|
|
continue
|
|
if res.returncode != 0:
|
|
raise RuntimeError(f"Connector failed (rc={res.returncode})")
|
|
|
|
# Data landed -> ensure source config exists, bootstrap table if needed
|
|
cfg = mf.execute_query(
|
|
"select * from CT_MRDS.A_SOURCE_FILE_CONFIG "
|
|
f"where a_source_key = 'TMS' and table_id = '{ODS_TABLE}'"
|
|
)
|
|
|
|
if not cfg:
|
|
# Generate DDL file
|
|
ddl_cmd = [
|
|
sys.executable,
|
|
os.path.join(gConnDir, "TMSDBT.py"),
|
|
"create-oracle-table",
|
|
"--name", WF_NAME,
|
|
"--url", tms_url,
|
|
"-U", tms_user,
|
|
"--password", tms_pwd,
|
|
"--layoutfile", gDataDir + DAG_NAME + ".fkr",
|
|
"-d", gTableDir + WF_NAME + ".sql",
|
|
]
|
|
for k, v in param_list:
|
|
sval = "" if v is None else str(v).rstrip()
|
|
m = re.match(r"column\((.*)\)$", k)
|
|
if m:
|
|
ddl_cmd.extend(["-c", f'{m.group(1)}={sval}'])
|
|
else:
|
|
ddl_cmd.extend(["-p", f"{k}={sval}"])
|
|
|
|
logger.debug("Generating DDL: %s", json.dumps(ddl_cmd))
|
|
ddl_res = subprocess.run(ddl_cmd, capture_output=True, check=True)
|
|
logger.debug("DDL stdout: %s", ddl_res.stdout.decode(errors="ignore"))
|
|
logger.debug("DDL stderr: %s", ddl_res.stderr.decode(errors="ignore"))
|
|
|
|
# Execute DDL and create external table
|
|
sql = Path(gTableDir + WF_NAME + ".sql").read_text()
|
|
mf.execute(sql)
|
|
mf.add_column_date_format(
|
|
f"CT_ET_TEMPLATES.{ODS_TABLE}", "DEFAULT", "DD/MM/YYYY HH24:MI:SS"
|
|
)
|
|
mf.create_external_table(ODS_TABLE, f"CT_ET_TEMPLATES.{ODS_TABLE}", data_prefix)
|
|
mf.add_source_file_config(
|
|
"TMS",
|
|
"INPUT",
|
|
DAG_NAME,
|
|
DAG_NAME,
|
|
r".*\.csv",
|
|
ODS_TABLE,
|
|
f"CT_ET_TEMPLATES.{ODS_TABLE}",
|
|
)
|
|
|
|
# Process landed file (register, move, etc. as per your mf impl)
|
|
mf.process_source_file(prefix, file_name)
|
|
mr.finalise_workflow(wf_key, "Y")
|
|
|
|
except BaseException as ex:
|
|
# rich error logging, then mark workflow failed and re-raise
|
|
ex_type, ex_value, ex_tb = sys.exc_info()
|
|
tb = traceback.extract_tb(ex_tb)
|
|
stack = [
|
|
f"File: {t[0]}, Line: {t[1]}, Func: {t[2]}, Code: {t[3]}"
|
|
for t in tb
|
|
]
|
|
logging.error("Exception type: %s", ex_type.__name__)
|
|
logging.error("Exception message: %s", ex_value)
|
|
logging.error("Stack trace: %s", stack)
|
|
mr.finalise_workflow(wf_key, "N")
|
|
raise
|
|
|
|
|
|
# ---------- DAG definition ----------
|
|
with DAG(
|
|
dag_id=DAG_NAME,
|
|
default_args=default_args,
|
|
description=DAG_NAME,
|
|
schedule_interval=None, # manual trigger
|
|
params=params_visible, # visible-only; hidden merged inside task
|
|
start_date=datetime(2025, 1, 1),
|
|
catchup=False,
|
|
tags=[DAG_NAME],
|
|
) as dag:
|
|
|
|
retrieve_report = PythonOperator(
|
|
task_id="retrieve_report",
|
|
python_callable=execute_report,
|
|
execution_timeout=timedelta(minutes=30),
|
|
)
|