import sys import os from airflow import DAG from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago from datetime import datetime, timedelta import logging from mrds.utils.manage_runs import init_workflow, finalise_workflow ### CASPER CONNECTOR WITH DYNAMIC WORKFLOW CONTEXT & HISTORY KEY # Importing custom module#s sys.path.append('/opt/airflow/python/connectors/casper') sys.path.append('/opt/airflow/python/mrds_common') sys.path.append('/opt/airflow/src/airflow/dags/ods/rqsd') """username = os.getenv("MRDS_LOADER_DB_USER") password = os.getenv("MRDS_LOADER_DB_PASS") tnsalias = os.getenv("MRDS_LOADER_DB_TNS") """ # connstr = f"{username}/{password}@{tnsalias}" # Importing the main function from casper script from casper_rqsd import main as casper_main # Default DAG arguments default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 2, 'retry_delay': timedelta(minutes=5), } with DAG( dag_id='rqsd_casper_connector_test', default_args=default_args, description='Run Casper RQSD data ingestion workflow', schedule_interval=None, # we can set later #start_date=datetime(2025, 10, 7), catchup=False, tags=['Casper', 'RQSD', 'Connector'], ) as dag: def run_casper_rqsd(**context): try: #workflow_context = {"run_id": 34, "a_workflow_history_key": 6} flow_config_path = "/opt/airflow/src/airflow/dags/ods/rqsd/rqsd_process/config/yaml/flow_config_casper.yaml" env_config_path = "/opt/airflow/python/connectors/casper/config/env_config.yaml" workflow_run_id = str(context['ti'].run_id) a_workflow_history_key = init_workflow(database_name, workflow_name, workflow_run_id) logging.info(f"Initialized workflow with history key: {a_workflow_history_key}") workflow_context = { "run_id": workflow_run_id, "a_workflow_history_key": a_workflow_history_key } # Pick env from ENV variables env = os.getenv("MRDS_ENV") #env = os.getenv("MRDS_ENV", "lab") username = os.getenv("MRDS_LOADER_DB_USER") password = os.getenv("MRDS_LOADER_DB_PASS") tnsalias = os.getenv("MRDS_LOADER_DB_TNS") if not all([username, password, tnsalias]): raise ValueError( "Missing one or more required environment variables: " "MRDS_LOADER_DB_USER, MRDS_LOADER_DB_PASS, MRDS_LOADER_DB_TNS" ) logging.info( f"Starting Casper RQSD workflow from Airflow DAG for env '{env}'" ) #Calling main() casper_main(workflow_context, flow_config_path, env_config_path, env) logging.info("Casper RQSD workflow completed successfully") except Exception as e: logging.error(f"Error running Casper RQSD workflow: {e}", exc_info=True) raise run_casper = PythonOperator( task_id='run_casper_rqsd', python_callable=run_casper_rqsd, provide_context=True, )