from airflow import DAG #from airflow.providers.oracle.operators.oracle import SQLExecuteQueryOperator from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from airflow.utils.dates import days_ago from datetime import timedelta import logging # Importing custom modules sys.path.append('/opt/airflow/python/connectors/devo') sys.path.append('/opt/airflow/python/mrds_common') sys.path.append('/opt/airflow/src/airflow/dags/ods/rqsd') # Import your functions from mrds.utils.manage_runs import init_workflow, finalise_workflow from devo_replicator.data_replicator.impala_refresher import main as impala_main #step 5) Devo replication ## DEVO REPLICATOR WITH SQLOperator ### check the oracle connection, fixed params --> test cnx ### pick it from a file, # TASK : # - retrive directly from config file the param {0} and {1} based dev/test # need to be passed from infromatic (WLA call) to dags # wla to airflow, cnx done default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 2, 'retry_delay': timedelta(minutes=5), } with DAG( dag_id='rqsd_devo_replicator_2', default_args=default_args, description='Run Devo replicator workflow', schedule_interval=None, catchup=False, tags=['Devo', 'RQSD', 'Replicator'], ) as dag: def init_step(**context): env = os.getenv("MRDS_ENV") corporate_store= "corporate store is 'crp_mopdb' for mopdb and 'crp_rar' for rar" config_path = "/opt/airflow/python/devo_replicator/config/env_config.yaml" p_service_name = 'MOPDB' p_table_owner = 'MPEC' p_table_name = 'T_MPEC' #parse the config yml and filter by dev or test and mopdb or rar p_objectstore_uri = 'https://devo-crp-ffppyd8q.bucket.vpce-040b28f5818b670c1-owicl3ow.s3.eu-central-1.vpce.amazonaws.com/mopdb/db' # subject to change as appropriate p_run_id = str(context['ti'].run_id) print(f"=== DEBUG INFO : {p_run_id} ===") context['ti'].xcom_push(key='p_run_id', value=p_run_id) init_step = PythonOperator( task_id='init_step', python_callable=init_step, provide_context=True, ) t1 = SQLExecuteQueryOperator( task_id='start_log_table', oracle_conn_id='oracle_default', # failed ,open up the cnx sql="BEGIN MRDS_LOADER.DATA_REPLICATOR.start_log_table(:p_run_id, :p_service_name, :p_table_owner, :p_table_name); END;", parameters={ 'p_run_id': p_run_id, 'p_service_name': p_service_name, 'p_table_owner': p_table_owner, 'p_table_name': p_table_name }, #oracle_conn_id='oracle_default' ) t2 = SQLExecuteQueryOperator( task_id='export_table', oracle_conn_id='oracle_default', sql="BEGIN MRDS_LOADER.DATA_REPLICATOR.export_table(:p_service_name, :p_table_owner, :p_table_name, :p_objectstore_uri); END;", parameters={ 'p_service_name': p_service_name, 'p_table_owner': p_table_owner, 'p_table_name': p_table_name, 'p_objectstore_uri': p_objectstore_uri }, #oracle_conn_id='oracle_default' ) # Leaving the Devo/Impyla task as a PythonOperator (placeholder) from airflow.operators.python import PythonOperator def devo_impyla_task(**context): status = impala_main(env_config_path, env, table, corporate_store) logging.info("Impyla (Devo) task placeholder ran. Please implement.") # get details-data from impala ( its pending ) t3 = PythonOperator( task_id='devo_impyla', python_callable=devo_impyla_task, ) # push to s3, we need to call the proc t4 = SQLExecuteQueryOperator( task_id='end_log_table', oracle_conn_id='oracle_default', sql="BEGIN MRDS_LOADER.DATA_REPLICATOR.end_log_table(:p_service_name, :p_table_owner, :p_table_name); END;", parameters={ 'p_service_name': p_service_name, 'p_table_owner': p_table_owner, 'p_table_name': p_table_name }, #oracle_conn_id='oracle_default' ) # t4 need to be executed always if we succeed or not ( if t1 failed then go directly to t4) # t5 that will check if any of previous dag failed put everything will be read init_step >> t1 >> t2 >> t3 >> t4