182 lines
5.7 KiB
Python
182 lines
5.7 KiB
Python
# dags/dev_replicator_scheduler_rar.py
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
|
|
from airflow import DAG
|
|
from airflow.decorators import task
|
|
from airflow.operators.python import BranchPythonOperator
|
|
from airflow.operators.empty import EmptyOperator
|
|
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
|
|
from datetime import datetime, timedelta
|
|
|
|
from mrds.utils import oraconn # your Oracle connection helper
|
|
|
|
sys.path.append('/opt/airflow/python/connectors/devo')
|
|
sys.path.append('/opt/airflow/python/mrds_common')
|
|
|
|
DAG_NAME = "devo_replicator_scheduler_rar"
|
|
TARGET_DAG_ID = "devo_replicator_trigger"
|
|
ORACLE_CONN_NAME = "MRDS_LOADER"
|
|
|
|
PRECONDITION_SQL = """
|
|
WITH LAST_UPDATE_ORACLE AS (
|
|
SELECT MAX(process_end) AS process_end
|
|
FROM CT_RAR.A_RAR_FOR_DISC_MONITORING
|
|
WHERE UPPER(owner || '.' || target_table_name) = UPPER(:table_name)
|
|
AND process_end IS NOT NULL
|
|
AND process_successful = 'Y'
|
|
),
|
|
LAST_UPDATE_DEVO AS (
|
|
SELECT CASE
|
|
WHEN last_status = 'FINISHED' THEN last_end_time
|
|
ELSE TO_DATE('01-JAN-1999', 'DD-MON-YYYY')
|
|
END AS process_end
|
|
FROM CT_MRDS.a_devo_replica_mgmt_rar
|
|
WHERE owner || '.' || table_name = :table_name
|
|
)
|
|
SELECT CASE
|
|
WHEN (SELECT process_end FROM LAST_UPDATE_ORACLE) >
|
|
(SELECT process_end FROM LAST_UPDATE_DEVO)
|
|
THEN 'Y' ELSE 'N'
|
|
END AS trigger_devo_replicator
|
|
FROM dual
|
|
"""
|
|
|
|
def _get_conn():
|
|
return oraconn.connect(ORACLE_CONN_NAME)
|
|
|
|
def get_devo_replica_table_options() -> list[str]:
|
|
conn = None
|
|
cur = None
|
|
try:
|
|
conn = _get_conn()
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
SELECT OWNER || '.' || TABLE_NAME
|
|
FROM CT_MRDS.a_devo_replica_mgmt_rar
|
|
ORDER BY OWNER, TABLE_NAME
|
|
""")
|
|
rows = cur.fetchall()
|
|
tables = [r[0] for r in rows] if rows else []
|
|
logging.info("Fetched %d table(s) from replica mgmt.", len(tables))
|
|
return tables
|
|
except Exception:
|
|
logging.exception("Error getting DEVO replica table options")
|
|
return []
|
|
finally:
|
|
try:
|
|
if cur: cur.close()
|
|
except Exception:
|
|
pass
|
|
if conn:
|
|
conn.close()
|
|
|
|
def check_table_precondition(table_full_name: str) -> dict:
|
|
"""Returns {"table": <OWNER.TABLE>, "trigger": "Y"|"N"}."""
|
|
conn = None
|
|
cur = None
|
|
try:
|
|
conn = _get_conn()
|
|
cur = conn.cursor()
|
|
cur.execute(PRECONDITION_SQL, {"table_name": table_full_name})
|
|
row = cur.fetchone()
|
|
status = (row[0] if row else 'N') or 'N'
|
|
logging.info("Precondition for %s: %s", table_full_name, status)
|
|
return {"table": table_full_name, "trigger": status}
|
|
except Exception:
|
|
logging.exception("Error checking precondition for %s", table_full_name)
|
|
return {"table": table_full_name, "trigger": "N"} # fail closed
|
|
finally:
|
|
try:
|
|
if cur: cur.close()
|
|
except Exception:
|
|
pass
|
|
if conn:
|
|
conn.close()
|
|
|
|
default_args = {
|
|
'owner': 'devo',
|
|
'depends_on_past': False,
|
|
'email_on_failure': False,
|
|
'email_on_retry': False,
|
|
'retries': 1,
|
|
'retry_delay': timedelta(minutes=1),
|
|
}
|
|
|
|
with DAG(
|
|
dag_id=DAG_NAME,
|
|
description="Checks DEVO replica preconditions; triggers devo_replicator_trigger_rar once if any table is stale",
|
|
default_args=default_args,
|
|
start_date=datetime.now() - timedelta(days=2),
|
|
schedule=None,
|
|
catchup=False,
|
|
schedule_interval='*/10 * * * *', # every 10 minutes
|
|
max_active_runs=1,
|
|
tags=["DevoScheduler", "DevoReplicatorTrigger"],
|
|
) as dag:
|
|
|
|
@task
|
|
def fetch_tables() -> list[str]:
|
|
tables = get_devo_replica_table_options()
|
|
if not tables:
|
|
logging.warning("No tables returned from enumeration.")
|
|
return tables
|
|
|
|
@task
|
|
def check_one(table_name: str) -> dict:
|
|
return check_table_precondition(table_name)
|
|
|
|
@task
|
|
def summarize(results: list[dict]) -> dict:
|
|
y_tables = [r["table"] for r in results if r and r.get("trigger") == "Y"]
|
|
n_tables = [r["table"] for r in results if r and r.get("trigger") == "N"]
|
|
|
|
logging.info("Precondition summary -> Y: %d, N: %d", len(y_tables), len(n_tables))
|
|
if y_tables:
|
|
logging.info("Tables needing replication: %s", ", ".join(y_tables))
|
|
else:
|
|
logging.info("No tables are updated/stale; nothing to trigger.")
|
|
|
|
return {"any_true": bool(y_tables), "y_tables": y_tables}
|
|
|
|
def decide_branch(summary: dict) -> str:
|
|
"""Return the EXACT downstream task_id to follow."""
|
|
return "prepare_trigger_conf" if summary.get("any_true") else "no_updates"
|
|
|
|
@task
|
|
def prepare_trigger_conf(summary: dict) -> dict:
|
|
"""Single conf payload for the downstream DAG."""
|
|
return {"tables_to_replicate": summary.get("y_tables", [])}
|
|
|
|
no_updates = EmptyOperator(task_id="no_updates")
|
|
|
|
# Graph
|
|
tables = fetch_tables()
|
|
results = check_one.expand(table_name=tables) # dynamic mapping across tables
|
|
summary = summarize(results)
|
|
|
|
branch = BranchPythonOperator(
|
|
task_id="branch_on_any",
|
|
python_callable=decide_branch,
|
|
op_args=[summary], # XComArg from summarize
|
|
)
|
|
|
|
|
|
conf_payload = prepare_trigger_conf(summary)
|
|
|
|
trigger_devo = TriggerDagRunOperator(
|
|
task_id="trigger_devo_replicator_rar",
|
|
trigger_dag_id=TARGET_DAG_ID,
|
|
wait_for_completion=True,
|
|
reset_dag_run=True,
|
|
conf=conf_payload,
|
|
)
|
|
|
|
# Wire branching — only ONE instance of prepare_trigger_conf is referenced
|
|
summary >> branch
|
|
branch >> no_updates
|
|
branch >> conf_payload >> trigger_devo
|