from . import oraconn from . import sql_statements from . import static_vars from . import manage_files def init_workflow(database_name: str, workflow_name: str, workflow_run_id: str): try: conn = oraconn.connect("MRDS_LOADER") a_workflow_history_key = oraconn.run_func( conn, "CT_MRDS.WORKFLOW_MANAGER.INIT_WORKFLOW", int, [database_name, workflow_run_id, workflow_name], ) conn.commit() finally: conn.close() return a_workflow_history_key def finalise_workflow(a_workflow_history_key: int, workflow_status: str): try: conn = oraconn.connect("MRDS_LOADER") oraconn.run_proc( conn, "CT_MRDS.WORKFLOW_MANAGER.FINALISE_WORKFLOW", [a_workflow_history_key, workflow_status], ) conn.commit() finally: conn.close() def init_task(task_name: str, task_run_id: str, a_workflow_history_key: int): a_task_history_key: int try: conn = oraconn.connect("MRDS_LOADER") a_task_history_key = oraconn.run_func( conn, "CT_MRDS.WORKFLOW_MANAGER.INIT_TASK", int, [task_run_id, task_name, a_workflow_history_key], ) conn.commit() finally: conn.close() return a_task_history_key def finalise_task(a_task_history_key: int, task_status: str): try: conn = oraconn.connect("MRDS_LOADER") curs = conn.cursor() curs.execute( sql_statements.get_sql("finalise_task"), [task_status, a_task_history_key] ) conn.commit() finally: conn.close() def set_workflow_property( wf_history_key: int, service_name: str, property: str, value: str ): try: conn = oraconn.connect("MRDS_LOADER") ret = oraconn.run_proc( conn, "CT_MRDS.WORKFLOW_MANAGER.SET_WORKFLOW_PROPERTY", [wf_history_key, service_name, property, value], ) conn.commit() finally: conn.close() return ret def select_ods_tab(table_name: str, value: str, condition="1 = 1"): query = "select %s from %s where %s" % (value, table_name, condition) print("query = |%s|" % query) return manage_files.execute_query(query=query, account_alias="ODS_LOADER")