Files
Grzegorz Michalski 2c225d68ac init
2026-03-02 09:47:35 +01:00

130 lines
3.8 KiB
Python

import os
import yaml
import datetime
import pandas as pd
from mrds.utils.secrets import get_secret
import mrds.utils.manage_runs as runManager
import mrds.utils.manage_files as fileManager
import mrds.utils.sql_statements as sqls
import oci
from impala.dbapi import (
connect,
ProgrammingError,
DatabaseError,
IntegrityError,
OperationalError,
)
from impala.error import HiveServer2Error
def get_impala_connection(hostname: str, user: str, secret: str):
conn = connect(
host=hostname,
port=443,
auth_mechanism="PLAIN",
user=user,
password=secret,
use_http_transport=True,
http_path="cliservice",
use_ssl=True
)
return conn
def execute_query(query: str,user,hostname,password):
conn = get_impala_connection(hostname, user, password)
print(conn)
columns, result = execute_devo_query(query, conn)
return columns, result
def execute_devo_query(query: str, conn):
#impersonation_configuration = {"impala.doas.user": userid} # to be changed
#impersonation_configuration = {} # to be changed
cursor = conn.cursor()
print("executing query")
try:
cursor.execute(query)
# Check if the query is a SELECT query (i.e., reads data)
return None, cursor.rowcount # rowcount returns the number of rows affected
except OperationalError as oe:
raise Exception(
status_code=500, detail="Failed to connect to Impala: " + str(oe)
)
except ProgrammingError as pe:
raise Exception(status_code=400, detail="Query syntax error: " + str(pe))
except IntegrityError as ie:
raise Exception(
status_code=403, detail="Insufficient permissions: " + str(ie)
)
except DatabaseError as db_err:
raise Exception(status_code=500, detail="Database error: " + str(db_err))
except HiveServer2Error as au_err:
raise Exception(
status_code=403, detail="HiveServer2Error error: " + str(au_err)
)
except Exception as e:
raise Exception(
status_code=500, detail="An unexpected error occurred: " + str(e)
) from e
finally:
try:
if cursor:
cursor.close()
if conn:
conn.close()
except Exception as e:
raise Exception(
status_code=500, detail="Failed to close the connection: " + str(e)
)
def initialize_task(workflow_context, task_name):
# Initialize task
a_task_history_key = runManager.init_task(
task_name,
workflow_context["run_id"],
workflow_context["a_workflow_history_key"],
)
return a_task_history_key
def initialize_config(config_file_path):
# Ensure the file exists
if not os.path.exists(config_file_path):
raise FileNotFoundError(f"Configuration file {config_file_path} not found.")
# Load the configuration
with open(config_file_path, "r") as f:
config_data = yaml.safe_load(f)
return config_data
def main(env_config_path, env, table, corporate_store):
#init setup
envs_info = initialize_config(env_config_path)
environment_info = envs_info[env]
try:
devo_secret_name = environment_info["DEVO_SECRET"]
password = get_secret(devo_secret_name)
except:
print("Failed to retrieve credentials from secrets")
raise(Exception)
# get devo data
try:
execute_query(f"INVALIDATE METADATA {corporate_store}.{table}", environment_info['DEVO_USERNAME'], environment_info['IMPALA_HOSTNAME'], password)
execute_query(f"COMPUTE STATS {corporate_store}.{table}", environment_info['DEVO_USERNAME'], environment_info['IMPALA_HOSTNAME'], password)
except:
print("Failed to retrieve DEVO data, error during connection or request")
raise(Exception)
return True