# devo_impala_exporter.py import os import io import yaml import datetime import logging from typing import Any, Dict, List, Optional, Tuple import pandas as pd from mrds.utils.secrets import get_secret import mrds.utils.manage_runs as runManager import mrds.utils.objectstore as objectstore import oci from impala.dbapi import ( connect, ProgrammingError, DatabaseError, IntegrityError, OperationalError, ) from impala.error import HiveServer2Error TASK_HISTORY_MULTIPLIER = 1_000_000_000 class DevoConnector: """ Export the result of an Impala (DEVO) query to OCI Object Storage as CSV, while recording task run metadata via mrds.runManager. Usage: exporter = DevoImpalaExporter( flow_config_path="/path/to/flow.yaml", env_config_path="/path/to/env.yaml", env="dev", logger=my_logger, # optional oci_client=my_object_storage, # optional ObjectStorageClient oci_signer=my_signer, # optional signer (used if client not provided) ) exporter.run({"run_id": 34, "a_workflow_history_key": 6}) """ def __init__( self, flow_config_path: str, env_config_path: str, env: str, logger: Optional[logging.Logger] = None, oci_client: Optional[oci.object_storage.ObjectStorageClient] = None, oci_signer: Optional[Any] = None, ) -> None: self.flow_info = self._initialize_config(flow_config_path) envs_info = self._initialize_config(env_config_path) BUCKET_NAMESPACE = os.getenv("BUCKET_NAMESPACE", "frcnomajoc7v") if env not in envs_info: raise KeyError(f"Environment '{env}' not found in {env_config_path}") self.environment_info = envs_info[env] self.environment_info["BUCKET_NAMESPACE"]=BUCKET_NAMESPACE self.env = env # logging self.logger = logger or self._default_logger(self.flow_info.get("TASK_NAME", "devo_task")) # OCI client/signer self.oci_client = oci_client self.oci_signer = oci_signer # ------------------------- # Public API # ------------------------- def run(self, workflow_context: Dict[str, Any]) -> None: """Main entry point; executes query, uploads CSV, and finalizes task.""" task_name = self.flow_info["TASK_NAME"] a_task_history_key = self._initialize_task(workflow_context, task_name) try: # credentials devo_secret_name = self.environment_info["DEVO_SECRET"] password = get_secret(devo_secret_name) self.logger.info("Retrieved secret for DEVO connection.") # query query = self.flow_info["DEVO_QUERY"] user = self.environment_info["DEVO_USERNAME"] host = self.environment_info["DEVO_HOSTNAME"] columns, data, rowcount = self._execute_query(query=query, user=user, hostname=host, password=password) df = self._tuple_to_dataframe((columns, data)) self.logger.info("Query executed and DataFrame created with %d rows.", len(df)) # upload if rowcount > 0: csv_name = f"{self.flow_info['OUTPUT_TABLE']}.csv" file_path = self._compose_object_path(self.flow_info["ODS_PREFIX"], csv_name) self._upload_dataframe_to_oci(df, csv_name, file_path) self.logger.info("Finished uploading %s to %s.", csv_name, file_path) else: return 0 # success runManager.finalise_task(a_task_history_key, "Y") self.logger.info("Task %s finalized successfully.", task_name) return rowcount except Exception as e: # failure self.logger.exception("Run failed: %s", e) try: runManager.finalise_task(a_task_history_key, "N") finally: # re-raise for upstream handling if used as a library raise # ------------------------- # Impala / DEVO # ------------------------- @staticmethod def _get_impala_connection(hostname: str, user: str, secret: str): return connect( host=hostname, port=443, auth_mechanism="PLAIN", user=user, password=secret, use_http_transport=True, http_path="cliservice", use_ssl=True, ) def _execute_query(self, query: str, user: str, hostname: str, password: str) -> Tuple[List[str], List[List[Any]]]: conn = self._get_impala_connection(hostname, user, password) cursor = None self.logger.info("Executing Impala query against host '%s' as user '%s'.", hostname, user) try: cursor = conn.cursor() cursor.execute(query) if query.strip().lower().startswith("select") or "select" in query.strip().lower() : rows = cursor.fetchall() columns = [col[0] for col in cursor.description] return columns, rows, cursor.rowcount else: # Non-SELECT: return rowcount (still return a columns list for consistency) return [], [[cursor.rowcount]] except OperationalError as oe: raise RuntimeError("Failed to connect to Impala: " + str(oe)) from oe except ProgrammingError as pe: raise ValueError("Query syntax error: " + str(pe)) from pe except IntegrityError as ie: raise PermissionError("Insufficient permissions: " + str(ie)) from ie except DatabaseError as db_err: raise RuntimeError("Database error: " + str(db_err)) from db_err except HiveServer2Error as au_err: raise PermissionError("HiveServer2Error error: " + str(au_err)) from au_err except Exception as e: raise RuntimeError("An unexpected error occurred: " + str(e)) from e finally: try: if cursor: cursor.close() finally: try: conn.close() except Exception: # log but don't mask the original exception self.logger.warning("Failed to close Impala connection cleanly.", exc_info=True) # ------------------------- # OCI Upload # ------------------------- def _upload_dataframe_to_oci(self, df: pd.DataFrame, csv_name: str, object_path: str) -> None: namespace = self.environment_info["BUCKET_NAMESPACE"] bucket = self.environment_info["BUCKET"] # convert DataFrame to CSV bytes without index csv_bytes = df.to_csv(index=False).encode("utf-8") client=objectstore.get_client() client.put_object(namespace, bucket, object_path, csv_bytes) self.logger.info("CSV '%s' uploaded to bucket '%s' (ns: '%s', key: '%s').", csv_name, bucket, namespace, object_path) # ------------------------- # Utilities # ------------------------- @staticmethod def _tuple_to_dataframe(data_tuple: Tuple[List[str], List[List[Any]]]) -> pd.DataFrame: columns, data = data_tuple if not columns: # for non-SELECT queries we returned rowcount; represent it in a DataFrame return pd.DataFrame(data, columns=["rowcount"]) return pd.DataFrame(data, columns=columns) @staticmethod def _initialize_config(config_file_path: str) -> Dict[str, Any]: if not os.path.exists(config_file_path): raise FileNotFoundError(f"Configuration file {config_file_path} not found.") with open(config_file_path, "r") as f: return yaml.safe_load(f) @staticmethod def _initialize_task(workflow_context: Dict[str, Any], task_name: str) -> int: return runManager.init_task( task_name, workflow_context["run_id"], workflow_context["a_workflow_history_key"], ) @staticmethod def add_a_key_column(headers: List[str], data_rows: List[List[Any]], task_history_key: int) -> None: """Optionally add an A_KEY column (kept for parity with original script).""" headers.insert(0, "A_KEY") for i, row in enumerate(data_rows, start=1): a_key_value = int(task_history_key) * TASK_HISTORY_MULTIPLIER + i row.insert(0, str(a_key_value)) @staticmethod def add_workflow_key_column(headers: List[str], data_rows: List[List[Any]], workflow_key: int) -> None: """Optionally add the workflow key column right after A_KEY if present, otherwise at position 0.""" insert_idx = 1 if headers and headers[0] == "A_KEY" else 0 headers.insert(insert_idx, "A_WORKFLOW_HISTORY_KEY") for row in data_rows: row.insert(insert_idx, workflow_key) @staticmethod def _compose_object_path(prefix: str, filename: str) -> str: if prefix.endswith("/"): return f"{prefix}{filename}" return f"{prefix}/{filename}" @staticmethod def _default_logger(task_name: str) -> logging.Logger: logger = logging.getLogger(f"{task_name}_logger") if not logger.handlers: logger.setLevel(logging.INFO) handler = logging.StreamHandler() ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") fmt = logging.Formatter(f"%(asctime)s [{task_name}] %(levelname)s: %(message)s") handler.setFormatter(fmt) logger.addHandler(handler) return logger # Optional: quick-run convenience if you ever want to execute this module directly. if __name__ == "__main__": # Example only—adjust paths/env/context as needed or remove this block. exporter = DevoConnector( flow_config_path="/home/dbt/Marco/mrds_elt/airflow/ods/rqsd/rqsd_process/config/yaml/m_ODS_RQSD_OBSERVATIONS.yaml", env_config_path="/home/dbt/Marco/mrds_elt/python/connectors/devo/config/env_config_rqsd.yaml", env="dev", ) exporter.run({"run_id": 34, "a_workflow_history_key": 6})