260 lines
10 KiB
Python
260 lines
10 KiB
Python
# devo_impala_exporter.py
|
|
|
|
import os
|
|
import io
|
|
import yaml
|
|
import datetime
|
|
import logging
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import pandas as pd
|
|
from mrds.utils.secrets import get_secret
|
|
import mrds.utils.manage_runs as runManager
|
|
import mrds.utils.objectstore as objectstore
|
|
|
|
import oci
|
|
|
|
from impala.dbapi import (
|
|
connect,
|
|
ProgrammingError,
|
|
DatabaseError,
|
|
IntegrityError,
|
|
OperationalError,
|
|
)
|
|
from impala.error import HiveServer2Error
|
|
|
|
TASK_HISTORY_MULTIPLIER = 1_000_000_000
|
|
|
|
|
|
class DevoConnector:
|
|
"""
|
|
Export the result of an Impala (DEVO) query to OCI Object Storage as CSV,
|
|
while recording task run metadata via mrds.runManager.
|
|
|
|
Usage:
|
|
exporter = DevoImpalaExporter(
|
|
flow_config_path="/path/to/flow.yaml",
|
|
env_config_path="/path/to/env.yaml",
|
|
env="dev",
|
|
logger=my_logger, # optional
|
|
oci_client=my_object_storage, # optional ObjectStorageClient
|
|
oci_signer=my_signer, # optional signer (used if client not provided)
|
|
)
|
|
exporter.run({"run_id": 34, "a_workflow_history_key": 6})
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
flow_config_path: str,
|
|
env_config_path: str,
|
|
env: str,
|
|
logger: Optional[logging.Logger] = None,
|
|
oci_client: Optional[oci.object_storage.ObjectStorageClient] = None,
|
|
oci_signer: Optional[Any] = None,
|
|
) -> None:
|
|
self.flow_info = self._initialize_config(flow_config_path)
|
|
envs_info = self._initialize_config(env_config_path)
|
|
BUCKET_NAMESPACE = os.getenv("BUCKET_NAMESPACE", "frcnomajoc7v")
|
|
|
|
if env not in envs_info:
|
|
raise KeyError(f"Environment '{env}' not found in {env_config_path}")
|
|
self.environment_info = envs_info[env]
|
|
self.environment_info["BUCKET_NAMESPACE"]=BUCKET_NAMESPACE
|
|
self.env = env
|
|
|
|
# logging
|
|
self.logger = logger or self._default_logger(self.flow_info.get("TASK_NAME", "devo_task"))
|
|
|
|
# OCI client/signer
|
|
self.oci_client = oci_client
|
|
self.oci_signer = oci_signer
|
|
|
|
# -------------------------
|
|
# Public API
|
|
# -------------------------
|
|
|
|
def run(self, workflow_context: Dict[str, Any]) -> None:
|
|
"""Main entry point; executes query, uploads CSV, and finalizes task."""
|
|
task_name = self.flow_info["TASK_NAME"]
|
|
a_task_history_key = self._initialize_task(workflow_context, task_name)
|
|
|
|
try:
|
|
# credentials
|
|
devo_secret_name = self.environment_info["DEVO_SECRET"]
|
|
password = get_secret(devo_secret_name)
|
|
self.logger.info("Retrieved secret for DEVO connection.")
|
|
|
|
# query
|
|
query = self.flow_info["DEVO_QUERY"]
|
|
user = self.environment_info["DEVO_USERNAME"]
|
|
host = self.environment_info["DEVO_HOSTNAME"]
|
|
|
|
columns, data, rowcount = self._execute_query(query=query, user=user, hostname=host, password=password)
|
|
df = self._tuple_to_dataframe((columns, data))
|
|
self.logger.info("Query executed and DataFrame created with %d rows.", len(df))
|
|
|
|
# upload
|
|
if rowcount > 0:
|
|
csv_name = f"{self.flow_info['OUTPUT_TABLE']}.csv"
|
|
file_path = self._compose_object_path(self.flow_info["ODS_PREFIX"], csv_name)
|
|
self._upload_dataframe_to_oci(df, csv_name, file_path)
|
|
self.logger.info("Finished uploading %s to %s.", csv_name, file_path)
|
|
else:
|
|
return 0
|
|
|
|
# success
|
|
runManager.finalise_task(a_task_history_key, "Y")
|
|
self.logger.info("Task %s finalized successfully.", task_name)
|
|
return rowcount
|
|
|
|
except Exception as e:
|
|
# failure
|
|
self.logger.exception("Run failed: %s", e)
|
|
try:
|
|
runManager.finalise_task(a_task_history_key, "N")
|
|
finally:
|
|
# re-raise for upstream handling if used as a library
|
|
raise
|
|
|
|
# -------------------------
|
|
# Impala / DEVO
|
|
# -------------------------
|
|
|
|
@staticmethod
|
|
def _get_impala_connection(hostname: str, user: str, secret: str):
|
|
return connect(
|
|
host=hostname,
|
|
port=443,
|
|
auth_mechanism="PLAIN",
|
|
user=user,
|
|
password=secret,
|
|
use_http_transport=True,
|
|
http_path="cliservice",
|
|
use_ssl=True,
|
|
)
|
|
|
|
def _execute_query(self, query: str, user: str, hostname: str, password: str) -> Tuple[List[str], List[List[Any]]]:
|
|
conn = self._get_impala_connection(hostname, user, password)
|
|
cursor = None
|
|
self.logger.info("Executing Impala query against host '%s' as user '%s'.", hostname, user)
|
|
try:
|
|
cursor = conn.cursor()
|
|
cursor.execute(query)
|
|
|
|
if query.strip().lower().startswith("select") or "select" in query.strip().lower() :
|
|
rows = cursor.fetchall()
|
|
columns = [col[0] for col in cursor.description]
|
|
return columns, rows, cursor.rowcount
|
|
else:
|
|
# Non-SELECT: return rowcount (still return a columns list for consistency)
|
|
return [], [[cursor.rowcount]]
|
|
|
|
except OperationalError as oe:
|
|
raise RuntimeError("Failed to connect to Impala: " + str(oe)) from oe
|
|
except ProgrammingError as pe:
|
|
raise ValueError("Query syntax error: " + str(pe)) from pe
|
|
except IntegrityError as ie:
|
|
raise PermissionError("Insufficient permissions: " + str(ie)) from ie
|
|
except DatabaseError as db_err:
|
|
raise RuntimeError("Database error: " + str(db_err)) from db_err
|
|
except HiveServer2Error as au_err:
|
|
raise PermissionError("HiveServer2Error error: " + str(au_err)) from au_err
|
|
except Exception as e:
|
|
raise RuntimeError("An unexpected error occurred: " + str(e)) from e
|
|
finally:
|
|
try:
|
|
if cursor:
|
|
cursor.close()
|
|
finally:
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
# log but don't mask the original exception
|
|
self.logger.warning("Failed to close Impala connection cleanly.", exc_info=True)
|
|
|
|
# -------------------------
|
|
# OCI Upload
|
|
# -------------------------
|
|
|
|
|
|
def _upload_dataframe_to_oci(self, df: pd.DataFrame, csv_name: str, object_path: str) -> None:
|
|
namespace = self.environment_info["BUCKET_NAMESPACE"]
|
|
bucket = self.environment_info["BUCKET"]
|
|
# convert DataFrame to CSV bytes without index
|
|
csv_bytes = df.to_csv(index=False).encode("utf-8")
|
|
client=objectstore.get_client()
|
|
client.put_object(namespace, bucket, object_path, csv_bytes)
|
|
self.logger.info("CSV '%s' uploaded to bucket '%s' (ns: '%s', key: '%s').", csv_name, bucket, namespace, object_path)
|
|
|
|
# -------------------------
|
|
# Utilities
|
|
# -------------------------
|
|
|
|
@staticmethod
|
|
def _tuple_to_dataframe(data_tuple: Tuple[List[str], List[List[Any]]]) -> pd.DataFrame:
|
|
columns, data = data_tuple
|
|
if not columns:
|
|
# for non-SELECT queries we returned rowcount; represent it in a DataFrame
|
|
return pd.DataFrame(data, columns=["rowcount"])
|
|
return pd.DataFrame(data, columns=columns)
|
|
|
|
@staticmethod
|
|
def _initialize_config(config_file_path: str) -> Dict[str, Any]:
|
|
if not os.path.exists(config_file_path):
|
|
raise FileNotFoundError(f"Configuration file {config_file_path} not found.")
|
|
with open(config_file_path, "r") as f:
|
|
return yaml.safe_load(f)
|
|
|
|
@staticmethod
|
|
def _initialize_task(workflow_context: Dict[str, Any], task_name: str) -> int:
|
|
return runManager.init_task(
|
|
task_name,
|
|
workflow_context["run_id"],
|
|
workflow_context["a_workflow_history_key"],
|
|
)
|
|
|
|
@staticmethod
|
|
def add_a_key_column(headers: List[str], data_rows: List[List[Any]], task_history_key: int) -> None:
|
|
"""Optionally add an A_KEY column (kept for parity with original script)."""
|
|
headers.insert(0, "A_KEY")
|
|
for i, row in enumerate(data_rows, start=1):
|
|
a_key_value = int(task_history_key) * TASK_HISTORY_MULTIPLIER + i
|
|
row.insert(0, str(a_key_value))
|
|
|
|
@staticmethod
|
|
def add_workflow_key_column(headers: List[str], data_rows: List[List[Any]], workflow_key: int) -> None:
|
|
"""Optionally add the workflow key column right after A_KEY if present, otherwise at position 0."""
|
|
insert_idx = 1 if headers and headers[0] == "A_KEY" else 0
|
|
headers.insert(insert_idx, "A_WORKFLOW_HISTORY_KEY")
|
|
for row in data_rows:
|
|
row.insert(insert_idx, workflow_key)
|
|
|
|
@staticmethod
|
|
def _compose_object_path(prefix: str, filename: str) -> str:
|
|
if prefix.endswith("/"):
|
|
return f"{prefix}{filename}"
|
|
return f"{prefix}/{filename}"
|
|
|
|
@staticmethod
|
|
def _default_logger(task_name: str) -> logging.Logger:
|
|
logger = logging.getLogger(f"{task_name}_logger")
|
|
if not logger.handlers:
|
|
logger.setLevel(logging.INFO)
|
|
handler = logging.StreamHandler()
|
|
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
fmt = logging.Formatter(f"%(asctime)s [{task_name}] %(levelname)s: %(message)s")
|
|
handler.setFormatter(fmt)
|
|
logger.addHandler(handler)
|
|
return logger
|
|
|
|
|
|
# Optional: quick-run convenience if you ever want to execute this module directly.
|
|
if __name__ == "__main__":
|
|
# Example only—adjust paths/env/context as needed or remove this block.
|
|
exporter = DevoConnector(
|
|
flow_config_path="/home/dbt/Marco/mrds_elt/airflow/ods/rqsd/rqsd_process/config/yaml/m_ODS_RQSD_OBSERVATIONS.yaml",
|
|
env_config_path="/home/dbt/Marco/mrds_elt/python/connectors/devo/config/env_config_rqsd.yaml",
|
|
env="dev",
|
|
)
|
|
exporter.run({"run_id": 34, "a_workflow_history_key": 6})
|