# dags/m_ODS_LM_BALANCESHEET.py # Idempotent, per-object mtime tracking import sys import os import json import logging from pathlib import Path from datetime import timedelta, datetime, timezone from email.utils import parsedate_to_datetime from airflow import DAG from airflow.models import Variable from airflow.decorators import task as af_task from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago from airflow.utils.trigger_rule import TriggerRule from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.operators.empty import EmptyOperator try: from airflow.exceptions import AirflowFailException, AirflowSkipException except Exception: from airflow.exceptions import AirflowException as AirflowFailException from airflow.exceptions import AirflowSkipException # Import libs sys.path.append('/opt/airflow/python/mrds_common') sys.path.append('/opt/airflow/src/airflow/dags/ods/exdi') from mrds.utils.manage_runs import init_workflow as mrds_init_workflow, finalise_workflow as mrds_finalise_workflow from mrds.core import main as mrds_main dag_id = Path(__file__).stem default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } WORKFLOW_CONFIG = { "database_name": "ODS", "workflow_name": dag_id, } # OCI settings OCI_NAMESPACE = os.getenv("BUCKET_NAMESPACE") OCI_BUCKET = os.getenv("INBOX_BUCKET") # Config YAML (single config for all files) CONFIG_YAML = os.getenv( "EXDI_SINGLE_CONFIG_YAML", "/opt/airflow/src/airflow/dags/ods/lm/balancesheet/config/m_ODS_LM_BALANCESHEET_PARSE.yaml", ) logging.info("Using EXDI_SINGLE_CONFIG_YAML=%s", CONFIG_YAML) # Idempotency controls REPROCESS = (os.getenv("EXDI_REPROCESS", "false").lower() in ("1", "true", "yes")) LAST_TS_VAR = f"{dag_id}__last_seen_ts" # legacy watermark (kept for observability) PROCESSED_SET_VAR = f"{dag_id}__processed_objects" # legacy: list of keys (back-compat only) PROCESSED_TS_VAR = f"{dag_id}__processed_objects_ts" # NEW: map key -> last processed mtime (epoch float) # Helpers def _oci_client(): """ Create an OCI Object Storage client. Order: Resource Principals -> Instance Principals. """ import oci region = os.getenv("OCI_REGION") or os.getenv("OCI_RESOURCE_PRINCIPAL_REGION") or "eu-frankfurt-1" # RP try: rp_signer = oci.auth.signers.get_resource_principals_signer() cfg = {"region": region} if region else {} logging.info("Using OCI Resource Principals signer (region=%s).", cfg.get("region")) return oci.object_storage.ObjectStorageClient(cfg, signer=rp_signer) except Exception as e: logging.info("RP not available: %s", e) # IP try: ip_signer = oci.auth.signers.InstancePrincipalsSecurityTokenSigner() cfg = {"region": region} if region else {} logging.info("Using OCI Instance Principals signer (region=%s).", cfg.get("region")) return oci.object_storage.ObjectStorageClient(cfg, signer=ip_signer) except Exception as e: logging.info("IP not available: %s", e) logging.error("Neither Resource Principals nor Instance Principals authentication found.") raise RuntimeError("Failed to create OCI client") def _load_yaml(cfg_path: str) -> dict: import yaml p = Path(cfg_path) if not p.exists(): raise FileNotFoundError(f"Config YAML not found: {cfg_path}") return yaml.safe_load(p.read_text()) or {} # Build config-derived constants directly from YAML try: CONFIG_DATA = _load_yaml(CONFIG_YAML) OBJECT_PREFIX = CONFIG_DATA.get("inbox_prefix") if not (isinstance(OBJECT_PREFIX, str) and OBJECT_PREFIX.strip()): raise AirflowFailException("YAML must define 'inbox_prefix' for OBJECT_PREFIX.") OBJECT_PREFIX = OBJECT_PREFIX.strip() logging.info("YAML inbox_prefix -> OBJECT_PREFIX: %s", OBJECT_PREFIX) except Exception as e: logging.error("Failed to resolve OBJECT_PREFIX from YAML %s: %s", CONFIG_YAML, e) OBJECT_PREFIX = None # New idempotency map (key -> last_processed_ts) def _load_processed_map() -> dict[str, float]: """ Returns {object_key: last_processed_ts}. Back-compat: if old set variable exists (list), treat those keys as ts=0. """ try: raw = Variable.get(PROCESSED_TS_VAR, default_var="{}") m = json.loads(raw) or {} if isinstance(m, dict): return {k: float(v) for k, v in m.items()} except Exception: pass # Back-compat: migrate old set/list try: old = json.loads(Variable.get(PROCESSED_SET_VAR, default_var="[]")) if isinstance(old, list): return {k: 0.0 for k in old} except Exception: pass return {} def _save_processed_map(m: dict[str, float]) -> None: Variable.set(PROCESSED_TS_VAR, json.dumps(m)) def _mark_processed_ts(objs: list[tuple[str, float]]): """ Update processed map with list of (object_key, mtime). """ if REPROCESS or not objs: return m = _load_processed_map() for key, ts in objs: try: ts = float(ts) except Exception: continue prev = float(m.get(key, 0.0)) if ts > prev: m[key] = ts _save_processed_map(m) logging.info("Processed map updated; size=%d", len(m)) # Object listing (per-key mtime) def _list_new_xml_objects(prefix: str) -> list[dict]: """ List .xml objects and decide inclusion per-object: include if REPROCESS or object_mtime > processed_map.get(object_key, 0.0) Returns: [{"name": "", "base": "", "mtime": }] """ if not OCI_NAMESPACE or not OCI_BUCKET: raise AirflowFailException("BUCKET_NAMESPACE and INBOX_BUCKET must be set") client = _oci_client() processed_map = _load_processed_map() try: last_seen = float(Variable.get(LAST_TS_VAR, default_var="0")) except Exception: last_seen = 0.0 logging.info("Watermark last_seen=%s; processed_map_count=%d; prefix=%s", last_seen, len(processed_map), prefix) # NOTE: add pagination if needed resp = client.list_objects(OCI_NAMESPACE, OCI_BUCKET, prefix=prefix) new_items: list[dict] = [] newest_ts = last_seen for o in (resp.data.objects or []): name = (o.name or "").strip() base = name.rsplit("/", 1)[-1] if name else "" logging.info("Processing object: %s", base) # Skip folder markers / empty keys if not name or name.endswith('/') or not base: logging.debug("Skip: folder marker or empty key: %r", name) continue if not base.lower().endswith(".xml"): logging.debug("Skip: not .xml: %r", name) continue # Resolve mtime ts = None t = getattr(o, "time_created", None) if t: try: ts = t.timestamp() if hasattr(t, "timestamp") else float(t) / 1000.0 except Exception: ts = None if ts is None: try: head = client.head_object(OCI_NAMESPACE, OCI_BUCKET, name) lm = head.headers.get("last-modified") or head.headers.get("Last-Modified") if lm: dt = parsedate_to_datetime(lm) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) ts = dt.timestamp() logging.debug("Resolved ts via HEAD Last-Modified for %s: %s", name, ts) except Exception as e: logging.warning("head_object failed for %s: %s", name, e) if ts is None: ts = datetime.now(timezone.utc).timestamp() logging.warning("Object %s missing timestamp; falling back to now=%s", name, ts) last_proc_ts = float(processed_map.get(name, 0.0)) include = REPROCESS or (ts > last_proc_ts) logging.info( "Decision for %s: obj_ts=%s, last_proc_ts=%s, REPROCESS=%s -> include=%s", name, ts, last_proc_ts, REPROCESS, include ) if not include: continue item = {"name": name, "base": base, "mtime": ts} new_items.append(item) if ts > newest_ts: newest_ts = ts # Watermark advanced for visibility (optional) if not REPROCESS and new_items and newest_ts > last_seen: Variable.set(LAST_TS_VAR, str(newest_ts)) logging.info("Advanced watermark from %s to %s", last_seen, newest_ts) new_items.sort(key=lambda x: x["mtime"]) # ascending logging.info("Found %d candidate .xml object(s) under prefix %s", len(new_items), prefix) return new_items # DAG with DAG( dag_id=dag_id, default_args=default_args, description='EXDI workflow (polling): single YAML config for all XML files in OCI', schedule_interval=None, # Run EVERY 10 MIN catchup=False, max_active_runs=1, render_template_as_native_obj=True, tags=["EXDI", "MRDS", "ODS", "OCI", "BALANCESHEET"], ) as dag: @af_task(task_id="poll_oci_for_xml") def poll_oci_for_xml(): """ Lists new .xml objects and prepares a workload list. Returns {"workload": [{"object": "", "base": "", "mtime": } ...]} """ if not OBJECT_PREFIX: raise AirflowFailException("No OCI object prefix configured. Check YAML 'inbox_prefix'.") new_objs = _list_new_xml_objects(OBJECT_PREFIX) logging.info("New .xml objects found: %s", json.dumps(new_objs, indent=2)) print("New .xml objects found:", json.dumps(new_objs, indent=2)) # already contains base + mtime workload = [{"object": it["name"], "base": it["base"], "mtime": it["mtime"]} for it in new_objs] logging.info("Prepared workload items: %d", len(workload)) print("Prepared workload:", json.dumps(workload, indent=2)) return {"workload": workload} @af_task(task_id="init_workflow") def init_workflow(polled: dict): """Initialize workflow; start MRDS workflow; build per-file task configs.""" database_name = WORKFLOW_CONFIG["database_name"] workflow_name = WORKFLOW_CONFIG["workflow_name"] env = os.getenv("MRDS_ENV", "dev") username = os.getenv("MRDS_LOADER_DB_USER") password = os.getenv("MRDS_LOADER_DB_PASS") tnsalias = os.getenv("MRDS_LOADER_DB_TNS") if not all([username, password, tnsalias]): missing = [] if not username: missing.append("MRDS_LOADER_DB_USER") if not password: missing.append("MRDS_LOADER_DB_PASS") if not tnsalias: missing.append("MRDS_LOADER_DB_TNS") raise AirflowFailException(f"Missing required env vars: {', '.join(missing)}") workload = (polled or {}).get("workload") or [] # Airflow context for run_id from airflow.operators.python import get_current_context ctx = get_current_context() run_id = str(ctx['ti'].run_id) a_workflow_history_key = mrds_init_workflow(database_name, workflow_name, run_id) workflow_context = { "run_id": run_id, "a_workflow_history_key": a_workflow_history_key } # Build TASK_CONFIGS dynamically: one per file, sequential numbering task_base_name = "m_ODS_LM_BALANCESHEET" task_configs = [] for idx, w in enumerate(workload, start=1): task_configs.append({ "task_name": f"{task_base_name}_{idx}", "source_filename": w["base"], # pass basename to MRDS (adjust if you need full key) "config_file": CONFIG_YAML, }) bundle = { "workflow_history_key": a_workflow_history_key, "workflow_context": workflow_context, "workload": workload, # includes object + mtime "task_configs": task_configs, # list-of-dicts for mapping "env": env, } logging.info("Init complete; workload=%d, tasks=%d", len(workload), len(task_configs)) return bundle @af_task(task_id="get_task_configs") def get_task_configs(init_bundle: dict): return init_bundle["task_configs"] def run_mrds_task(task_name: str, source_filename: str, config_file: str, **context): """Run MRDS for a single file (sequential via mapped task with max_active_tis_per_dag=1).""" ti = context['ti'] if not os.path.exists(config_file): raise FileNotFoundError(f"Config file not found: {config_file}") init_bundle = ti.xcom_pull(task_ids='init_workflow') or {} workflow_context = init_bundle.get('workflow_context') workload = init_bundle.get('workload') or [] if not workflow_context: raise AirflowFailException("No workflow_context from init_workflow") # resolve full object key + mtime by matching base name from workload full_object_key, object_mtime = None, None for w in workload: if w.get('base') == source_filename: full_object_key = w.get('object') object_mtime = w.get('mtime') break # Print/log the file being processed logging.info("%s: picking file %s (object=%s, mtime=%s)", task_name, source_filename, full_object_key or source_filename, object_mtime) print(f"{task_name}: picking file {source_filename} (object={full_object_key or source_filename}, mtime={object_mtime})") try: # NOTE: if MRDS expects full URI, change 'source_filename' to 'full_object_key' mrds_main( workflow_context, source_filename, # or full_object_key if required in your env config_file, generate_workflow_context=False ) except Exception: logging.exception("%s: MRDS failed on %s", task_name, source_filename) raise # Mark processed with the mtime we saw during poll if full_object_key and object_mtime: _mark_processed_ts([(full_object_key, object_mtime)]) ti.xcom_push(key='task_status', value='SUCCESS') logging.info("%s: success", task_name) return "SUCCESS" def finalise_workflow_task(**context): """Finalize workflow across all per-file tasks (mapped).""" from airflow.utils.state import State ti = context['ti'] dag_run = context['dag_run'] init_bundle = ti.xcom_pull(task_ids='init_workflow') or {} a_workflow_history_key = init_bundle.get('workflow_history_key') if a_workflow_history_key is None: raise AirflowFailException("No workflow history key; cannot finalise workflow") mapped_task_id = "m_ODS_LM_BALANCESHEET" tis = [t for t in dag_run.get_task_instances() if t.task_id == mapped_task_id] if not tis: mrds_finalise_workflow(a_workflow_history_key, "Y") logging.info("Finalised workflow %s as SUCCESS (no files)", a_workflow_history_key) return any_failed = any(ti_i.state in {State.FAILED, State.UPSTREAM_FAILED} for ti_i in tis) if not any_failed: mrds_finalise_workflow(a_workflow_history_key, "Y") logging.info("Finalised workflow %s as SUCCESS", a_workflow_history_key) return failed_idxs = [getattr(ti_i, "map_index", None) for ti_i in tis if ti_i.state in {State.FAILED, State.UPSTREAM_FAILED}] mrds_finalise_workflow(a_workflow_history_key, "N") logging.error("Finalised workflow %s as FAILED (failed map indexes=%s)", a_workflow_history_key, failed_idxs) raise AirflowFailException(f"Workflow failed for mapped indexes: {failed_idxs}") def check_success_for_mopdb(**context): """Check if all processing tasks succeeded before triggering MOPDB.""" from airflow.utils.state import State try: ti = context['ti'] dag_run = context['dag_run'] has_failures = False failure_reasons = [] # Check finalize_workflow task finalize_task = dag_run.get_task_instance('finalize_workflow') if finalize_task.state == State.FAILED: has_failures = True failure_reasons.append("finalize_workflow failed") # Check all mapped tasks (per-file processing) mapped_task_id = "m_ODS_LM_BALANCESHEET" mapped_tasks = [t for t in dag_run.get_task_instances() if t.task_id == mapped_task_id] for task_instance in mapped_tasks: if task_instance.state in {State.FAILED, State.UPSTREAM_FAILED}: has_failures = True map_idx = getattr(task_instance, 'map_index', 'unknown') failure_reasons.append(f"Processing task failed at index {map_idx}") if has_failures: error_msg = f"Tasks failed - skipping MOPDB trigger: {', '.join(failure_reasons)}" logging.info(error_msg) raise AirflowSkipException(error_msg) # Check if all mapped tasks were skipped (no files to process) all_skipped = all(t.state == State.SKIPPED for t in mapped_tasks) if mapped_tasks else True if all_skipped or not mapped_tasks: error_msg = "All processing tasks were skipped (no files to process) - skipping MOPDB trigger" logging.info(error_msg) raise AirflowSkipException(error_msg) logging.info("All tasks completed successfully - proceeding to trigger MOPDB") return "SUCCESS" except AirflowSkipException: raise except Exception as e: logging.error(f"Error checking success for MOPDB: {e}", exc_info=True) raise AirflowSkipException(f"Error checking success - skipping MOPDB trigger: {e}") # Operators & Dependencies poll_task = poll_oci_for_xml() init_out = init_workflow(poll_task) task_cfgs = get_task_configs(init_out) @af_task(task_id="m_ODS_LM_BALANCESHEET", max_active_tis_per_dag=1) def mapped_run(task_name: str, source_filename: str, config_file: str, **context): return run_mrds_task(task_name=task_name, source_filename=source_filename, config_file=config_file, **context) per_file = mapped_run.expand_kwargs(task_cfgs) finalize_workflow = PythonOperator( task_id='finalize_workflow', python_callable=finalise_workflow_task, provide_context=True, trigger_rule=TriggerRule.ALL_DONE, retries=0, ) check_mopdb = PythonOperator( task_id='check_success_for_mopdb', python_callable=check_success_for_mopdb, provide_context=True, trigger_rule=TriggerRule.ALL_DONE, retries=0, ) trigger_mopdb = TriggerDagRunOperator( task_id="Trigger_w_MOPDB_LM_BALANCESHEET", trigger_dag_id="w_MOPDB_LM_BALANCESHEET", conf={ "source_dag": dag_id, "upstream_run_id": "{{ run_id }}", "objects": "{{ (ti.xcom_pull(task_ids='poll_oci_for_xml')['workload'] | map(attribute='object') | list) if ti.xcom_pull(task_ids='poll_oci_for_xml') else [] }}", "workflow_history_key": "{{ (ti.xcom_pull(task_ids='init_workflow')['workflow_history_key']) if ti.xcom_pull(task_ids='init_workflow') else None }}" }, wait_for_completion=False, # CHANGED: Don't wait for completion trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, # CHANGED: Only trigger if check succeeds retries=0, ) all_good = EmptyOperator( task_id="All_went_well", trigger_rule=TriggerRule.ALL_DONE, # CHANGED: Always run to mark end ) # CHANGED: Chain with check task before trigger poll_task >> init_out >> task_cfgs >> per_file >> finalize_workflow >> check_mopdb >> trigger_mopdb >> all_good logging.info( "EXDI DAG ready: inbox_prefix=%s; using per-object processed ts map %s.", OBJECT_PREFIX, PROCESSED_TS_VAR )