Files
mars-elt/airflow/TestDags/ods_exdi_multi_task_processor.py
Grzegorz Michalski 2c225d68ac init
2026-03-02 09:47:35 +01:00

245 lines
9.3 KiB
Python

import sys
import os
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta
import logging
try:
from airflow.exceptions import AirflowFailException, AirflowSkipException
except Exception:
from airflow.exceptions import AirflowException as AirflowFailException
from airflow.exceptions import AirflowSkipException
# Importing custom modules
sys.path.append('/opt/airflow/python/mrds_common')
sys.path.append('/opt/airflow/src/airflow/dags/ods/exdi')
from mrds.utils.manage_runs import init_workflow as mrds_init_workflow, finalise_workflow as mrds_finalise_workflow
from mrds.core import main as mrds_main
# Configuration Dictionary - First key will be mandatory,
# Workflow: Init → First Task (Sequential) → Parallel Tasks → Finalize
# Input: Only needs PARSE.yaml config file and source filename
TASK_CONFIGS = {
"m_ODS_EXDI_TASK1": {
"source_filename": "EXDI_TASK1.csv",
"config_file": "/opt/airflow/src/airflow/dags/ods/exdi/exdi_process/config/yaml/m_ODS_EXDI_TASK1_PARSE.yaml"
},
"m_ODS_EXDI_TASK2": {
"source_filename": "EXDI_TASK2.csv",
"config_file": "/opt/airflow/src/airflow/dags/ods/exdi/exdi_process/config/yaml/m_ODS_EXDI_TASK2_PARSE.yaml"
},
"m_ODS_EXDI_TASK3": {
"source_filename": "EXDI_TASK3.csv",
"config_file": "/opt/airflow/src/airflow/dags/ods/exdi/exdi_process/config/yaml/m_ODS_EXDI_TASK3_PARSE.yaml"
}
}
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag_id = os.path.splitext(os.path.basename(__file__))[0]
WORKFLOW_CONFIG = {
"database_name": "ODS",
"workflow_name": dag_id
}
with DAG(
dag_id=dag_id,
default_args=default_args,
description='Run EXDI data processing workflow with MRDS - Multi-task',
schedule_interval=None,
catchup=False,
tags=["EXDI", "MRDS", "Multi-Task", "ODS"]
) as dag:
def init_workflow_task(**context):
"""Initialize workflow and set up context"""
try:
database_name = WORKFLOW_CONFIG["database_name"]
workflow_name = WORKFLOW_CONFIG["workflow_name"]
env = os.getenv("MRDS_ENV", "dev")
username = os.getenv("MRDS_LOADER_DB_USER")
password = os.getenv("MRDS_LOADER_DB_PASS")
tnsalias = os.getenv("MRDS_LOADER_DB_TNS")
if not all([username, password, tnsalias]):
missing_vars = []
if not username: missing_vars.append("MRDS_LOADER_DB_USER")
if not password: missing_vars.append("MRDS_LOADER_DB_PASS")
if not tnsalias: missing_vars.append("MRDS_LOADER_DB_TNS")
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
workflow_run_id = str(context['ti'].run_id)
a_workflow_history_key = mrds_init_workflow(database_name, workflow_name, workflow_run_id)
workflow_context = {
"run_id": workflow_run_id,
"a_workflow_history_key": a_workflow_history_key
}
# Push context to XCom for downstream tasks
ti = context['ti']
ti.xcom_push(key='workflow_history_key', value=a_workflow_history_key)
ti.xcom_push(key='workflow_context', value=workflow_context)
ti.xcom_push(key='env', value=env)
logging.info("Workflow initialization completed successfully")
except Exception as e:
logging.error(f"Error initializing workflow: {e}", exc_info=True)
raise
def run_mrds_task(**context):
"""Run MRDS processing task for EXDI"""
try:
ti = context['ti']
task_id = context['task'].task_id
# Extract task name from task_id
task_name = task_id.replace('_PARSE', '') if task_id.endswith('_PARSE') else task_id
# Get task configuration
task_config = TASK_CONFIGS.get(task_name)
if not task_config:
raise ValueError(f"No configuration found for task: {task_name}")
source_filename = task_config["source_filename"]
config_file = task_config["config_file"]
# Get context from init task
workflow_context = ti.xcom_pull(key='workflow_context', task_ids='init_workflow')
if not workflow_context:
raise ValueError("No workflow_context from init task")
# Verify config file exists
if not os.path.exists(config_file):
raise FileNotFoundError(f"PARSE config file not found: {config_file}")
logging.info(f"Processing EXDI MRDS with source_filename: {source_filename}, config_file: {config_file}")
# Run MRDS - file should already be in bucket from EXDI flow
mrds_main(workflow_context, source_filename, config_file, generate_workflow_context=False)
logging.info(f"EXDI MRDS task completed successfully for {task_name}")
# Push success status to XCom
ti.xcom_push(key='mrds_success', value=True)
ti.xcom_push(key='task_status', value='SUCCESS')
return "SUCCESS"
except Exception as e:
logging.error(f"Error running EXDI MRDS task: {e}", exc_info=True)
# Push failure status to XCom
ti = context['ti']
ti.xcom_push(key='mrds_success', value=False)
ti.xcom_push(key='task_status', value='FAILED')
ti.xcom_push(key='error_message', value=str(e))
raise
def finalise_workflow_task(**context):
"""Finalize workflow based on overall execution results"""
try:
ti = context['ti']
dag_run = context['dag_run']
a_workflow_history_key = ti.xcom_pull(key='workflow_history_key', task_ids='init_workflow')
if a_workflow_history_key is None:
raise ValueError("No workflow history key found in XCom; cannot finalise workflow")
# Check all task statuses
workflow_success = True
failure_reasons = []
for task_name in TASK_CONFIGS.keys():
mrds_task_id = f'{task_name}_PARSE'
mrds_task = dag_run.get_task_instance(mrds_task_id)
if mrds_task.state == 'failed':
workflow_success = False
try:
error_msg = ti.xcom_pull(key='error_message', task_ids=mrds_task_id)
failure_reasons.append(f"{task_name}: MRDS task failed - {error_msg}")
except:
failure_reasons.append(f"{task_name}: MRDS task failed")
# Finalize workflow
if workflow_success:
mrds_finalise_workflow(a_workflow_history_key, "Y")
logging.info(f"Finalised EXDI workflow with history key {a_workflow_history_key} as SUCCESS")
else:
mrds_finalise_workflow(a_workflow_history_key, "N")
logging.error(f"Finalised EXDI workflow with history key {a_workflow_history_key} as FAILED")
raise AirflowFailException(f"EXDI Workflow failed: {', '.join(failure_reasons)}")
except AirflowFailException:
raise
except Exception as e:
logging.error(f"Error finalizing EXDI workflow: {e}", exc_info=True)
try:
if 'a_workflow_history_key' in locals() and a_workflow_history_key:
mrds_finalise_workflow(a_workflow_history_key, "N")
except:
pass
raise AirflowFailException(f"EXDI Workflow finalization failed: {e}")
# Create tasks
init_workflow = PythonOperator(
task_id='init_workflow',
python_callable=init_workflow_task,
provide_context=True,
)
finalize_workflow = PythonOperator(
task_id='finalize_workflow',
python_callable=finalise_workflow_task,
provide_context=True,
trigger_rule=TriggerRule.ALL_DONE,
)
# Get task names - first task runs sequentially, others in parallel
task_names = list(TASK_CONFIGS.keys())
first_task_name = task_names[0]
parallel_task_names = task_names[1:]
# Create first task (sequential)
first_mrds_task = PythonOperator(
task_id=f'{first_task_name}_PARSE',
python_callable=run_mrds_task,
provide_context=True,
)
# Set dependencies for first task
init_workflow >> first_mrds_task >> finalize_workflow
# Create parallel tasks
for task_name in parallel_task_names:
mrds_task = PythonOperator(
task_id=f'{task_name}_PARSE',
python_callable=run_mrds_task,
provide_context=True,
)
# Parallel tasks start after first task completes
first_mrds_task >> mrds_task >> finalize_workflow
logging.info(f"EXDI DAG created with {len(TASK_CONFIGS)} tasks: {list(TASK_CONFIGS.keys())}")