Files
mars-elt/python/connectors/devo/devo_connector.py
Grzegorz Michalski 2c225d68ac init
2026-03-02 09:47:35 +01:00

260 lines
10 KiB
Python

# devo_impala_exporter.py
import os
import io
import yaml
import datetime
import logging
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
from mrds.utils.secrets import get_secret
import mrds.utils.manage_runs as runManager
import mrds.utils.objectstore as objectstore
import oci
from impala.dbapi import (
connect,
ProgrammingError,
DatabaseError,
IntegrityError,
OperationalError,
)
from impala.error import HiveServer2Error
TASK_HISTORY_MULTIPLIER = 1_000_000_000
class DevoConnector:
"""
Export the result of an Impala (DEVO) query to OCI Object Storage as CSV,
while recording task run metadata via mrds.runManager.
Usage:
exporter = DevoImpalaExporter(
flow_config_path="/path/to/flow.yaml",
env_config_path="/path/to/env.yaml",
env="dev",
logger=my_logger, # optional
oci_client=my_object_storage, # optional ObjectStorageClient
oci_signer=my_signer, # optional signer (used if client not provided)
)
exporter.run({"run_id": 34, "a_workflow_history_key": 6})
"""
def __init__(
self,
flow_config_path: str,
env_config_path: str,
env: str,
logger: Optional[logging.Logger] = None,
oci_client: Optional[oci.object_storage.ObjectStorageClient] = None,
oci_signer: Optional[Any] = None,
) -> None:
self.flow_info = self._initialize_config(flow_config_path)
envs_info = self._initialize_config(env_config_path)
BUCKET_NAMESPACE = os.getenv("BUCKET_NAMESPACE", "frcnomajoc7v")
if env not in envs_info:
raise KeyError(f"Environment '{env}' not found in {env_config_path}")
self.environment_info = envs_info[env]
self.environment_info["BUCKET_NAMESPACE"]=BUCKET_NAMESPACE
self.env = env
# logging
self.logger = logger or self._default_logger(self.flow_info.get("TASK_NAME", "devo_task"))
# OCI client/signer
self.oci_client = oci_client
self.oci_signer = oci_signer
# -------------------------
# Public API
# -------------------------
def run(self, workflow_context: Dict[str, Any]) -> None:
"""Main entry point; executes query, uploads CSV, and finalizes task."""
task_name = self.flow_info["TASK_NAME"]
a_task_history_key = self._initialize_task(workflow_context, task_name)
try:
# credentials
devo_secret_name = self.environment_info["DEVO_SECRET"]
password = get_secret(devo_secret_name)
self.logger.info("Retrieved secret for DEVO connection.")
# query
query = self.flow_info["DEVO_QUERY"]
user = self.environment_info["DEVO_USERNAME"]
host = self.environment_info["DEVO_HOSTNAME"]
columns, data, rowcount = self._execute_query(query=query, user=user, hostname=host, password=password)
df = self._tuple_to_dataframe((columns, data))
self.logger.info("Query executed and DataFrame created with %d rows.", len(df))
# upload
if rowcount > 0:
csv_name = f"{self.flow_info['OUTPUT_TABLE']}.csv"
file_path = self._compose_object_path(self.flow_info["ODS_PREFIX"], csv_name)
self._upload_dataframe_to_oci(df, csv_name, file_path)
self.logger.info("Finished uploading %s to %s.", csv_name, file_path)
else:
return 0
# success
runManager.finalise_task(a_task_history_key, "Y")
self.logger.info("Task %s finalized successfully.", task_name)
return rowcount
except Exception as e:
# failure
self.logger.exception("Run failed: %s", e)
try:
runManager.finalise_task(a_task_history_key, "N")
finally:
# re-raise for upstream handling if used as a library
raise
# -------------------------
# Impala / DEVO
# -------------------------
@staticmethod
def _get_impala_connection(hostname: str, user: str, secret: str):
return connect(
host=hostname,
port=443,
auth_mechanism="PLAIN",
user=user,
password=secret,
use_http_transport=True,
http_path="cliservice",
use_ssl=True,
)
def _execute_query(self, query: str, user: str, hostname: str, password: str) -> Tuple[List[str], List[List[Any]]]:
conn = self._get_impala_connection(hostname, user, password)
cursor = None
self.logger.info("Executing Impala query against host '%s' as user '%s'.", hostname, user)
try:
cursor = conn.cursor()
cursor.execute(query)
if query.strip().lower().startswith("select") or "select" in query.strip().lower() :
rows = cursor.fetchall()
columns = [col[0] for col in cursor.description]
return columns, rows, cursor.rowcount
else:
# Non-SELECT: return rowcount (still return a columns list for consistency)
return [], [[cursor.rowcount]]
except OperationalError as oe:
raise RuntimeError("Failed to connect to Impala: " + str(oe)) from oe
except ProgrammingError as pe:
raise ValueError("Query syntax error: " + str(pe)) from pe
except IntegrityError as ie:
raise PermissionError("Insufficient permissions: " + str(ie)) from ie
except DatabaseError as db_err:
raise RuntimeError("Database error: " + str(db_err)) from db_err
except HiveServer2Error as au_err:
raise PermissionError("HiveServer2Error error: " + str(au_err)) from au_err
except Exception as e:
raise RuntimeError("An unexpected error occurred: " + str(e)) from e
finally:
try:
if cursor:
cursor.close()
finally:
try:
conn.close()
except Exception:
# log but don't mask the original exception
self.logger.warning("Failed to close Impala connection cleanly.", exc_info=True)
# -------------------------
# OCI Upload
# -------------------------
def _upload_dataframe_to_oci(self, df: pd.DataFrame, csv_name: str, object_path: str) -> None:
namespace = self.environment_info["BUCKET_NAMESPACE"]
bucket = self.environment_info["BUCKET"]
# convert DataFrame to CSV bytes without index
csv_bytes = df.to_csv(index=False).encode("utf-8")
client=objectstore.get_client()
client.put_object(namespace, bucket, object_path, csv_bytes)
self.logger.info("CSV '%s' uploaded to bucket '%s' (ns: '%s', key: '%s').", csv_name, bucket, namespace, object_path)
# -------------------------
# Utilities
# -------------------------
@staticmethod
def _tuple_to_dataframe(data_tuple: Tuple[List[str], List[List[Any]]]) -> pd.DataFrame:
columns, data = data_tuple
if not columns:
# for non-SELECT queries we returned rowcount; represent it in a DataFrame
return pd.DataFrame(data, columns=["rowcount"])
return pd.DataFrame(data, columns=columns)
@staticmethod
def _initialize_config(config_file_path: str) -> Dict[str, Any]:
if not os.path.exists(config_file_path):
raise FileNotFoundError(f"Configuration file {config_file_path} not found.")
with open(config_file_path, "r") as f:
return yaml.safe_load(f)
@staticmethod
def _initialize_task(workflow_context: Dict[str, Any], task_name: str) -> int:
return runManager.init_task(
task_name,
workflow_context["run_id"],
workflow_context["a_workflow_history_key"],
)
@staticmethod
def add_a_key_column(headers: List[str], data_rows: List[List[Any]], task_history_key: int) -> None:
"""Optionally add an A_KEY column (kept for parity with original script)."""
headers.insert(0, "A_KEY")
for i, row in enumerate(data_rows, start=1):
a_key_value = int(task_history_key) * TASK_HISTORY_MULTIPLIER + i
row.insert(0, str(a_key_value))
@staticmethod
def add_workflow_key_column(headers: List[str], data_rows: List[List[Any]], workflow_key: int) -> None:
"""Optionally add the workflow key column right after A_KEY if present, otherwise at position 0."""
insert_idx = 1 if headers and headers[0] == "A_KEY" else 0
headers.insert(insert_idx, "A_WORKFLOW_HISTORY_KEY")
for row in data_rows:
row.insert(insert_idx, workflow_key)
@staticmethod
def _compose_object_path(prefix: str, filename: str) -> str:
if prefix.endswith("/"):
return f"{prefix}{filename}"
return f"{prefix}/{filename}"
@staticmethod
def _default_logger(task_name: str) -> logging.Logger:
logger = logging.getLogger(f"{task_name}_logger")
if not logger.handlers:
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
fmt = logging.Formatter(f"%(asctime)s [{task_name}] %(levelname)s: %(message)s")
handler.setFormatter(fmt)
logger.addHandler(handler)
return logger
# Optional: quick-run convenience if you ever want to execute this module directly.
if __name__ == "__main__":
# Example only—adjust paths/env/context as needed or remove this block.
exporter = DevoConnector(
flow_config_path="/home/dbt/Marco/mrds_elt/airflow/ods/rqsd/rqsd_process/config/yaml/m_ODS_RQSD_OBSERVATIONS.yaml",
env_config_path="/home/dbt/Marco/mrds_elt/python/connectors/devo/config/env_config_rqsd.yaml",
env="dev",
)
exporter.run({"run_id": 34, "a_workflow_history_key": 6})