257 lines
9.1 KiB
Python
257 lines
9.1 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
import logging
|
|
import time
|
|
from datetime import timedelta
|
|
|
|
from airflow import DAG
|
|
from airflow.utils.dates import days_ago
|
|
from airflow.operators.python import PythonOperator
|
|
from airflow.models import Param
|
|
|
|
sys.path.append('/opt/airflow/python/connectors/devo')
|
|
sys.path.append('/opt/airflow/python/mrds_common')
|
|
|
|
from mrds.utils import oraconn
|
|
|
|
# Get MOPDB table options for dropdown
|
|
def get_mopdb_table_options():
|
|
oracle_conn = None
|
|
try:
|
|
oracle_conn = oraconn.connect('MRDS_LOADER')
|
|
cursor = oracle_conn.cursor()
|
|
cursor.execute("SELECT OWNER || '.' || TABLE_NAME FROM CT_MRDS.a_devo_replica_mgmt_mopdb ORDER BY OWNER, TABLE_NAME")
|
|
options = [row[0] for row in cursor.fetchall()]
|
|
cursor.close()
|
|
return options
|
|
except Exception as e:
|
|
logging.error(f"Error getting MOPDB table options: {e}")
|
|
return []
|
|
finally:
|
|
if oracle_conn:
|
|
oracle_conn.close()
|
|
|
|
default_args = {
|
|
'owner': 'devo',
|
|
'depends_on_past': False,
|
|
'start_date': days_ago(1),
|
|
'email_on_failure': False,
|
|
'email_on_retry': False,
|
|
'retries': 1,
|
|
'retry_delay': timedelta(minutes=1),
|
|
}
|
|
|
|
with DAG(
|
|
dag_id='devo_replicator_trigger_mopdb',
|
|
default_args=default_args,
|
|
description='External trigger DAG for MOPDB tables',
|
|
schedule=None,
|
|
catchup=False,
|
|
tags=['DevoReplicator', 'DevoReplicatorTrigger'],
|
|
params={
|
|
"owner_table": Param(
|
|
default=None,
|
|
type="string",
|
|
description="Select table in format OWNER.TABLE_NAME",
|
|
enum=get_mopdb_table_options()
|
|
)
|
|
}
|
|
) as dag:
|
|
|
|
# Init
|
|
def init_step(**context):
|
|
dag_run = context.get("dag_run")
|
|
ti = context["ti"]
|
|
conf = (dag_run.conf or {}) if dag_run else {}
|
|
|
|
env = os.getenv("MRDS_ENV")
|
|
if not env:
|
|
raise ValueError("MRDS_ENV environment variable is required")
|
|
env = env.lower()
|
|
|
|
store = "mopdb"
|
|
owner_table = conf.get("owner_table")
|
|
|
|
if not owner_table:
|
|
raise ValueError("owner_table parameter is required")
|
|
if '.' not in owner_table:
|
|
raise ValueError("owner_table must be in format 'OWNER.TABLE_NAME'")
|
|
|
|
table_owner, table_name = owner_table.split('.', 1)
|
|
|
|
if env not in {"dev", "tst", "acc", "prd"}:
|
|
raise ValueError(f"Unsupported env '{env}'. Expected 'dev', 'tst', 'acc' or 'prd'.")
|
|
|
|
logging.info("=== init_step === env=%s store=%s owner_table=%s",
|
|
env, store, owner_table)
|
|
|
|
xcom = {
|
|
"env": env,
|
|
"store": store,
|
|
"table_owner": table_owner,
|
|
"table_name": table_name,
|
|
"owner_table": owner_table,
|
|
"threshold": 30,
|
|
}
|
|
|
|
for k, v in xcom.items():
|
|
ti.xcom_push(key=k, value=v)
|
|
|
|
init = PythonOperator(
|
|
task_id='init_step',
|
|
python_callable=init_step,
|
|
)
|
|
|
|
# Get table list
|
|
def get_table_list(**context):
|
|
ti = context["ti"]
|
|
store = ti.xcom_pull(task_ids='init_step', key='store')
|
|
owner_table = ti.xcom_pull(task_ids='init_step', key='owner_table')
|
|
|
|
oracle_conn = None
|
|
try:
|
|
oracle_conn = oraconn.connect('MRDS_LOADER')
|
|
|
|
if owner_table:
|
|
table_owner, table_name = owner_table.split('.', 1)
|
|
tables = [(table_owner, table_name)]
|
|
logging.info("Processing specific table: %s", owner_table)
|
|
else:
|
|
cursor = oracle_conn.cursor()
|
|
cursor.execute("SELECT OWNER, TABLE_NAME FROM CT_MRDS.a_devo_replica_mgmt_mopdb ORDER BY OWNER, TABLE_NAME")
|
|
tables = cursor.fetchall()
|
|
cursor.close()
|
|
logging.info("Found %d tables for MOPDB", len(tables))
|
|
|
|
ti.xcom_push(key='tables_to_process', value=tables)
|
|
return tables
|
|
except Exception as e:
|
|
logging.error(f"Error in get_table_list: {e}")
|
|
raise
|
|
finally:
|
|
if oracle_conn:
|
|
oracle_conn.close()
|
|
|
|
t1 = PythonOperator(
|
|
task_id='get_table_list',
|
|
python_callable=get_table_list,
|
|
)
|
|
|
|
# Check and trigger core DAG
|
|
def check_and_trigger(**context):
|
|
ti = context["ti"]
|
|
env = ti.xcom_pull(task_ids='init_step', key='env')
|
|
store = ti.xcom_pull(task_ids='init_step', key='store')
|
|
threshold = ti.xcom_pull(task_ids='init_step', key='threshold')
|
|
tables = ti.xcom_pull(task_ids='get_table_list', key='tables_to_process')
|
|
|
|
oracle_conn = None
|
|
triggered_count = 0
|
|
|
|
try:
|
|
oracle_conn = oraconn.connect('MRDS_LOADER')
|
|
|
|
for table_owner, table_name in tables:
|
|
logging.info("Processing table: %s.%s", table_owner, table_name)
|
|
|
|
while True:
|
|
cursor = oracle_conn.cursor()
|
|
|
|
# Execute SQL query with variable substitution
|
|
service_name = store.upper()
|
|
sql_query = f"""
|
|
SELECT (SELECT CASE WHEN SUM(MAX_THREADS) IS NULL THEN 0 ELSE SUM(MAX_THREADS) END AS RUNNING_THREADS
|
|
FROM CT_MRDS.A_DEVO_REPLICA_MGMT_MOPDB
|
|
WHERE LAST_STATUS = 'RUNNING') +
|
|
(SELECT CASE WHEN SUM(MAX_THREADS) IS NULL THEN 0 ELSE SUM(MAX_THREADS) END AS RUNNING_THREADS
|
|
FROM CT_MRDS.A_DEVO_REPLICA_MGMT_RAR
|
|
WHERE LAST_STATUS = 'RUNNING')
|
|
AS TOTAL_RUNNING_THREADS_NOW,
|
|
(SELECT COUNT(*) FROM CT_MRDS.A_DEVO_REPLICA_MGMT_{service_name}
|
|
WHERE OWNER = '{table_owner}' AND TABLE_NAME = '{table_name}' AND LAST_STATUS = 'RUNNING') AS TABLE_IS_ALREADY_RUNNING
|
|
FROM DUAL
|
|
"""
|
|
|
|
cursor.execute(sql_query)
|
|
result = cursor.fetchone()
|
|
total_running_val = result[0] or 0
|
|
table_running_val = result[1] or 0
|
|
cursor.close()
|
|
|
|
logging.info("Total running: %d, threshold: %d, table running: %d",
|
|
total_running_val, threshold, table_running_val)
|
|
|
|
if total_running_val > threshold:
|
|
logging.info("Threshold exceeded. Waiting 5 minutes...")
|
|
time.sleep(300)
|
|
continue
|
|
|
|
if table_running_val >= 1:
|
|
logging.info("Table %s.%s is already running. Skipping.", table_owner, table_name)
|
|
break
|
|
|
|
# Trigger core DAG
|
|
from airflow.api.common.trigger_dag import trigger_dag
|
|
|
|
conf = {
|
|
"store": store,
|
|
"owner_table": f"{table_owner}.{table_name}"
|
|
}
|
|
|
|
trigger_dag(
|
|
dag_id='devo_replicator_core',
|
|
conf=conf,
|
|
execution_date=None,
|
|
replace_microseconds=False
|
|
)
|
|
|
|
triggered_count += 1
|
|
logging.info("Triggered core DAG for table %s.%s", table_owner, table_name)
|
|
break
|
|
|
|
logging.info("Total DAGs triggered: %d", triggered_count)
|
|
ti.xcom_push(key='triggered_count', value=triggered_count)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error in check_and_trigger: {e}")
|
|
raise
|
|
finally:
|
|
if oracle_conn:
|
|
oracle_conn.close()
|
|
|
|
t2 = PythonOperator(
|
|
task_id='check_and_trigger',
|
|
python_callable=check_and_trigger,
|
|
)
|
|
|
|
# Dependencies
|
|
init >> t1 >> t2
|
|
|
|
"""
|
|
MOPDB Trigger DAG
|
|
1) init_step
|
|
- Gets environment from MRDS_ENV environment variable
|
|
- Reads owner_table parameter from DAG configuration
|
|
- Validates owner_table format (must be OWNER.TABLE_NAME)
|
|
- Sets store to "mopdb" (fixed for this DAG)
|
|
- Sets threshold to 30 (max concurrent running threads)
|
|
- Pushes parameters to XCom
|
|
2) get_table_list
|
|
- Connects to Oracle database (MRDS_LOADER)
|
|
- If specific owner_table provided: creates single table list
|
|
- If no owner_table: queries all tables from CT_MRDS.a_devo_replica_mgmt_mopdb
|
|
- Returns list of (owner, table_name) tuples to process
|
|
- Pushes table list to XCom
|
|
3) check_and_trigger
|
|
- Loops through each table from the table list
|
|
- For each table, enters monitoring loop:
|
|
- Executes SQL query to check total running threads across MOPDB+RAR
|
|
- Checks if current table is already running
|
|
- If total threads > threshold (30): waits 5 minutes and rechecks
|
|
- If table already running: skips to next tabl
|
|
- If conditions met: triggers core DAG with table parameters
|
|
- Counts and logs total number of DAGs triggered
|
|
- Ensures system doesn't exceed concurrent processing limits
|
|
""" |