Files
mars-elt/airflow/TestDags/t_MOPDB_RQSD_DEVO_OBSERVATIONS.py
Grzegorz Michalski 2c225d68ac init
2026-03-02 09:47:35 +01:00

117 lines
4.5 KiB
Python

import sys
import os
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import logging
### DEVO CONNECTOR WITH DYNAMIC WORKFLOW CONTEXT & HISTORY KEY
# Importing custom modules
sys.path.append('/opt/airflow/python/connectors/devo')
sys.path.append('/opt/airflow/python/connectors/devo')
sys.path.append('/opt/airflow/python/mrds_common')
sys.path.append('/opt/airflow/src/airflow/dags/ods/rqsd')
# Import your functions from manage_runs and devo_connector as before
from mrds.utils.manage_runs import init_workflow, finalise_workflow
from devo_connector import main as devo_main
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
# dynamic name extracton from basename
dag_id = os.path.splitext(os.path.basename(__file__))[0]
with DAG(
dag_id=dag_id,
default_args=default_args,
description='Run devo RQSD data ingestion workflow',
schedule_interval=None,
catchup=False,
tags=["Devo", "RQSD", "Connector"],
) as dag:
def run_devo_connector_rqsd(**context):
try:
env = os.getenv("MRDS_ENV")
username = os.getenv("MRDS_LOADER_DB_USER")
password = os.getenv("MRDS_LOADER_DB_PASS")
tnsalias = os.getenv("MRDS_LOADER_DB_TNS")
if not all([username, password, tnsalias]):
raise ValueError(
"Missing one or more required environment variables: "
"MRDS_LOADER_DB_USER, MRDS_LOADER_DB_PASS, MRDS_LOADER_DB_TNS"
)
logging.info(f"Starting Casper RQSD workflow from Airflow DAG for env '{env}'")
print("======== THIS ... =========")
print("======== THIS ... =========")
database_name = 'MOPDB'
workflow_name = 'w_MOPDB_RQSD_PROCESS'
workflow_run_id = str(context['ti'].run_id)
a_workflow_history_key = init_workflow(database_name, workflow_name, workflow_run_id)
logging.info(f"Initialized workflow with history key: {a_workflow_history_key}")
workflow_context = {
"run_id": workflow_run_id,
"a_workflow_history_key": a_workflow_history_key
}
flow_config_path = "/opt/airflow/src/airflow/dags/ods/rqsd/rqsd_process/config/yaml/flow_config_rqsd_observations.yaml"
env_config_path = "/opt/airflow/python/connectors/devo/config/env_config_rqsd.yaml"
logging.info("Starting Devo RQSD workflow from Airflow DAG")
devo_main(workflow_context, flow_config_path, env_config_path, env)
logging.info("Devo RQSD workflow completed successfully")
# Push the workflow history key to XCom for downstream tasks
context['ti'].xcom_push(key='workflow_history_key', value=a_workflow_history_key)
except Exception as e:
logging.error(f"Error running Devo RQSD workflow: {e}", exc_info=True)
# If init_workflow succeeded but workflow failed, finalize with FAILED status
# Attempt to retrieve the key to finalize
if 'a_workflow_history_key' in locals():
try:
finalise_workflow(a_workflow_history_key, "FAILED")
except Exception as finalise_error:
logging.error(f"Failed to finalise workflow after error: {finalise_error}")
raise
def finalise_workflow_task(**context):
# Pull the workflow_history_key from XCom pushed by the main task
ti = context['ti']
a_workflow_history_key = ti.xcom_pull(key='workflow_history_key', task_ids='run_devo_connector_rqsd')
if a_workflow_history_key is None:
raise ValueError("No workflow history key found in XCom; cannot finalise workflow")
# Call finalise with SUCCESS status
finalise_workflow(a_workflow_history_key, "SUCCESS")
logging.info(f"Finalised workflow with history key {a_workflow_history_key} as SUCCESS")
run_devo = PythonOperator(
task_id='run_devo_connector_rqsd',
python_callable=run_devo_connector_rqsd,
provide_context=True,
)
finalize = PythonOperator(
task_id='finalise_workflow',
python_callable=finalise_workflow_task,
provide_context=True,
)
run_devo >> finalize