158 lines
5.6 KiB
Python
158 lines
5.6 KiB
Python
import os
|
||
import sys
|
||
import logging
|
||
|
||
from airflow.decorators import dag
|
||
from airflow.operators.python import PythonOperator
|
||
from airflow.utils.dates import days_ago
|
||
from airflow.utils.trigger_rule import TriggerRule
|
||
from airflow import DAG
|
||
from airflow.decorators import task
|
||
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
|
||
from datetime import datetime, timedelta
|
||
from airflow.operators.python import BranchPythonOperator
|
||
from airflow.operators.empty import EmptyOperator
|
||
|
||
from mrds.utils import oraconn
|
||
|
||
sys.path.append('/opt/airflow/python/connectors/devo')
|
||
sys.path.append('/opt/airflow/python/mrds_common')
|
||
|
||
DAG_NAME = "dev_replicator_scheduler_rar"
|
||
TARGET_DAG_ID = "devo_replicator_trigger_rar"
|
||
|
||
def get_devo_replica_table_options():
|
||
oracle_conn = None
|
||
try:
|
||
oracle_conn = oraconn.connect('MRDS_LOADER')
|
||
cursor = oracle_conn.cursor()
|
||
cursor.execute("SELECT OWNER || '.' || TABLE_NAME FROM CT_MRDS.a_devo_replica_mgmt_rar ORDER BY OWNER, TABLE_NAME")
|
||
options = [row[0] for row in cursor.fetchall()]
|
||
cursor.close()
|
||
return options
|
||
except Exception as e:
|
||
logging.error(f"Error getting MOPDB table options: {e}")
|
||
return []
|
||
finally:
|
||
if oracle_conn:
|
||
oracle_conn.close()
|
||
|
||
def check_table_precondition(table_full_name):
|
||
oracle_conn = None
|
||
try:
|
||
oracle_conn = oraconn.connect('MRDS_LOADER')
|
||
cursor = oracle_conn.cursor()
|
||
sql = """
|
||
WITH LAST_UPDATE_ORACLE AS (
|
||
SELECT max(process_end) as process_end
|
||
FROM CT_RAR.A_RAR_FOR_DISC_MONITORING
|
||
WHERE upper(owner||'.'||TARGET_TABLE_NAME) = upper(:table_name)
|
||
AND PROCESS_END is not null AND PROCESS_SUCCESSFUL='Y'
|
||
),
|
||
LAST_UPDATE_DEVO AS (
|
||
SELECT CASE WHEN last_status = 'FINISHED' THEN LAST_END_TIME ELSE TO_DATE('01-JAN-1999', 'DD-MON-YYYY') END as process_end
|
||
FROM CT_MRDS.a_devo_replica_mgmt_rar
|
||
WHERE OWNER || '.' || TABLE_NAME = :table_name
|
||
)
|
||
SELECT CASE WHEN (SELECT process_end FROM LAST_UPDATE_ORACLE) > (SELECT process_end FROM LAST_UPDATE_DEVO)
|
||
THEN 'Y' ELSE 'N' END AS TRIGGER_DEVO_REPLICATOR FROM dual
|
||
"""
|
||
cursor.execute(sql, table_name=table_full_name)
|
||
result = cursor.fetchone()
|
||
status = result[0] if result else 'N'
|
||
logging.info(f"Precondition for {table_full_name}: {status}")
|
||
cursor.close()
|
||
return {"table": table_full_name, "trigger": status}
|
||
except Exception as e:
|
||
logging.error(f"Error checking precondition for {table_full_name}: {e}")
|
||
return {"table": table_full_name, "trigger": 'ERROR'}
|
||
finally:
|
||
if oracle_conn:
|
||
oracle_conn.close()
|
||
|
||
def get_tables_to_trigger(precondition_results):
|
||
triggered_tables = [r["table"] for r in precondition_results if r["trigger"] == "Y"]
|
||
logging.info(f"Tables meeting precondition: {triggered_tables}")
|
||
return [{"owner_table": table_name} for table_name in triggered_tables]
|
||
|
||
def branch_on_tables(ti):
|
||
precondition_results = ti.xcom_pull(task_ids='check_all_tables')
|
||
tables_to_trigger = [r["table"] for r in precondition_results if r["trigger"] == "Y"]
|
||
if tables_to_trigger:
|
||
return "trigger_devo_replicators"
|
||
else:
|
||
return "no_table_updated"
|
||
|
||
default_args = {
|
||
'owner': 'airflow',
|
||
'depends_on_past': False,
|
||
'start_date': days_ago(1),
|
||
'email_on_failure': False,
|
||
'email_on_retry': False,
|
||
'retries': 1,
|
||
'retry_delay': timedelta(minutes=2),
|
||
}
|
||
|
||
with DAG(
|
||
dag_id=DAG_NAME,
|
||
default_args=default_args,
|
||
schedule_interval=None,
|
||
catchup=False,
|
||
tags=['DevoScheduler', 'DevoReplicatorTrigger']
|
||
) as dag:
|
||
|
||
@task()
|
||
def fetch_tables():
|
||
return get_devo_replica_table_options()
|
||
|
||
@task()
|
||
def check_all_tables(table_list):
|
||
results = [check_table_precondition(tbl) for tbl in table_list]
|
||
count_y = sum(1 for r in results if r["trigger"] == "Y")
|
||
count_n = sum(1 for r in results if r["trigger"] == "N")
|
||
logging.info(f"Precondition results: {results}")
|
||
logging.info(f"Tables with trigger = 'Y': {count_y}")
|
||
logging.info(f"Tables with trigger = 'N': {count_n}")
|
||
return results
|
||
|
||
@task()
|
||
def output_tables_to_trigger(precondition_results):
|
||
return get_tables_to_trigger(precondition_results)
|
||
|
||
branch_task = BranchPythonOperator(
|
||
task_id="branch_trigger_check",
|
||
python_callable=branch_on_tables,
|
||
provide_context=True,
|
||
)
|
||
|
||
no_table_updated = EmptyOperator(task_id="no_table_updated")
|
||
|
||
tables = fetch_tables()
|
||
precondition_results = check_all_tables(tables)
|
||
tables_to_trigger = output_tables_to_trigger(precondition_results)
|
||
|
||
trigger_dag = TriggerDagRunOperator.partial(
|
||
task_id="trigger_devo_replicators",
|
||
trigger_dag_id=TARGET_DAG_ID,
|
||
execution_date="{{ ds }}"
|
||
).expand(conf=tables_to_trigger)
|
||
|
||
# Dependencies for branching
|
||
tables >> precondition_results >> tables_to_trigger >> branch_task
|
||
branch_task >> [trigger_dag, no_table_updated]
|
||
|
||
|
||
|
||
|
||
"""
|
||
1. fetch_tables gets the list of tables.
|
||
2. check_all_tables checks each table’s trigger status and logs counts.
|
||
3. output_tables_to_trigger prepares the mapped parameter list for triggering downstream DAGs.
|
||
4. branch_on_tables decides the path:
|
||
"trigger_devo_replicators" if any table triggers.
|
||
"no_table_updated" otherwise.
|
||
5. BranchPythonOperator implements the conditional branching.
|
||
6. TriggerDagRunOperator dynamically triggers a run of devo_replicator_trigger_rar per qualifying table.
|
||
7. EmptyOperator represents the "no tables to trigger" branch.
|
||
|
||
""" |