Files
mars-elt/python/connectors/casper/casper_rqsd.py
Grzegorz Michalski 2c225d68ac init
2026-03-02 09:47:35 +01:00

202 lines
11 KiB
Python

import requests
import io
import zipfile
import pandas as pd
import os
from datetime import datetime
import oci
from mrds.utils.secrets import get_secret
import mrds.utils.manage_runs as runManager
import mrds.utils.manage_files as fileManager
import mrds.utils.sql_statements as sqls
import sys
import yaml
TASK_HISTORY_MULTIPLIER = 1_000_000_000
def initialize_task(workflow_context, task_name):
# Initialize task
a_task_history_key = runManager.init_task(
task_name,
workflow_context["run_id"],
workflow_context["a_workflow_history_key"],
)
return a_task_history_key
def rqsd_parser(fileName,bucket_path,file,bucket_name):
if "SCOPA" in fileName or "SCOPF" in fileName:
print("SCOP")
annex_1_1(fileName,bucket_path,file,bucket_name)
annex_1_2(fileName,bucket_path,file,bucket_name)
elif "RQSDC" in fileName:
print("RQSDC")
return annex_2(fileName, bucket_path,file,bucket_name)
def annex_1_1(fileName, bucket_path,file,bucket_name):
fileData=fileName.split("_")
csv_file_path = fileName[:-4]+".csv"
version_number = fileData[6]
ref_exercise = fileData[2]
ncb = fileData[4]
df = pd.read_excel(file, sheet_name="Counterparties in scope", skiprows=3)
df = df.dropna(axis=1, how='all').dropna(axis=0, how='all')
df['file_name'] = os.path.basename(fileName)
df['ingestion_timestamp'] = datetime.now().isoformat()
df['version_number'] = version_number
df['ref_exercise'] = ref_exercise
df['ncb'] = ncb
signer = oci.auth.signers.InstancePrincipalsSecurityTokenSigner()
client = oci.object_storage.ObjectStorageClient({}, signer = signer) # the first empyty bracket is an empty config
client.put_object("frcnomajoc7v",bucket_name,bucket_path+"1_1/"+csv_file_path,bytes(df.to_csv( index=False), encoding='utf-8'))
print("Finished uploading {}".format(csv_file_path))
print(f"CSV saved to {csv_file_path}")
def annex_1_2(fileName, bucket_path,file,bucket_name):
fileData=fileName.split("_")
csv_file_path = fileName[:-4]+".csv"
version_number = fileData[6]
ref_exercise = fileData[2]
ncb = fileData[4]
df = pd.read_excel(file, sheet_name="Entities to which data relates", skiprows=3)
df = df.dropna(axis=1, how='all').dropna(axis=0, how='all')
df['file_name'] = os.path.basename(fileName)
df['ingestion_timestamp'] = datetime.now().isoformat()
df['version_number'] = version_number
df['ref_exercise'] = ref_exercise
df['ncb'] = ncb
signer = oci.auth.signers.InstancePrincipalsSecurityTokenSigner()
client = oci.object_storage.ObjectStorageClient({}, signer = signer) # the first empyty bracket is an empty config
client.put_object("frcnomajoc7v",bucket_name,bucket_path+"1_2/"+csv_file_path,bytes(df.to_csv( index=False), encoding='utf-8'))
print("Finished uploading {}".format(csv_file_path))
print(f"CSV saved to {csv_file_path}")
def annex_2(fileName,bucket_path,file,bucket_name):
fileData=fileName.split("_")
# Parameters
version_number = fileData[6]
ref_exercise = fileData[2]
ncb = fileData[4]
# Read the first sheet, skip the metadata rows
df = pd.read_excel(file.getvalue(), sheet_name="Data collection template", skiprows=6)
# Clean empty rows/columns
df = df.dropna(axis=1, how='all').dropna(axis=0, how='all')
# Add metadata columns
df['file_name'] = os.path.basename(fileName)
df['ingestion_timestamp'] = datetime.now().isoformat()
df['version_number'] = version_number
df['ref_exercise'] = ref_exercise
df['ncb'] = ncb
csvName=fileName[:-4]+"csv"
# Save to CSV
signer = oci.auth.signers.InstancePrincipalsSecurityTokenSigner()
client = oci.object_storage.ObjectStorageClient({}, signer = signer) # the first empyty bracket is an empty config
client.put_object("frcnomajoc7v",bucket_name,bucket_path+"2/"+csvName,bytes(df.to_csv( index=False), encoding='utf-8'))
print("Finished uploading {}".format(csvName))
print(f"CSV saved to {csvName}")
def rqsd_preflow(secret_crt_id,secret_key_id,casper_api_url,collection_id): #downloads the list of files
crt_path=os.getcwd()+"/rqsd_tst.crt"
key_path=os.getcwd()+"/rqsd_tst.key.pem"
try:
with open(key_path,"w") as keyfile:
keyfile.write(get_secret(secret_key_id))
with open (crt_path,"w") as crtfile:
crtfile.write(get_secret(secret_crt_id))
except:
print("Failed to retrieve certificates from secrets")
os.remove(crt_path)
os.remove(key_path)
raise(Exception)
protected_resource_url=casper_api_url+"/casper-api/filevault/"
try:
max_date=fileManager.execute_query("SELECT to_char(max(processing_end_time),'YYYY-MM-DD HH24:mi:ss') as MAX_PROCESSING_END_TIME FROM ct_ods.a_casper_filevault")
if max_date is not []:
filterString='isTest eq False and processingStatus eq "PS_COMPLETED" and processingEndTime gt '+max_date[0].split(' ')[0]
else:
filterString='isTest eq False and processingStatus eq "PS_COMPLETED"'
response=requests.get(protected_resource_url+"files/"+collection_id ,headers={"accept": "application/json"},cert=(crt_path,key_path), verify=False, params={"filter": filterString})
print(response.text)
files=response.json()
except:
print("Failed to retrieve ACC metadata, error during connection or request")
raise(Exception)
return files
def rqsd_process(files,casper_api_url,bucket_path,bucket_name):
crt_path=os.getcwd()+"/rqsd_tst.crt"
key_path=os.getcwd()+"/rqsd_tst.key.pem"
# GET request to a protected
for downloadable in files:
try:
print("\n\n")
response=requests.get(casper_api_url+"/casper-api/filevault/download/"+str(downloadable["dcId"])+'/'+str(downloadable["fileID"]) ,headers={"accept": "application/json"},cert=(crt_path, key_path),verify=False)
rqsd_parser(downloadable["fileName"],bucket_path,io.BytesIO(response.content),bucket_name)
except:
print(f"Failed to upload file into target bucket, files saved locally in {os.getcwd()}")
os.remove(crt_path)
os.remove(key_path)
raise(Exception)
def add_a_key_column(headers, data_rows, task_history_key):
headers.insert(0, 'A_KEY')
for i, row in enumerate(data_rows, start=1):
a_key_value = int(task_history_key) * TASK_HISTORY_MULTIPLIER + i
row.insert(0, str(a_key_value))
def add_workflow_key_column(headers, data_rows, workflow_key):
headers.insert(1, 'A_WORKFLOW_HISTORY_KEY')
for row in data_rows:
row.insert(0, workflow_key)
def initialize_config(config_file_path):
# Ensure the file exists
if not os.path.exists(config_file_path):
raise FileNotFoundError(f"Configuration file {config_file_path} not found.")
# Load the configuration
with open(config_file_path, "r") as f:
config_data = yaml.safe_load(f)
return config_data
def main(workflow_context, flow_config_path, env_config_path, env):
#init setup
flow_info = initialize_config(flow_config_path)
envs_info = initialize_config(env_config_path)
environment_info = envs_info[env]
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
a_task_history_key = initialize_task(workflow_context, flow_info['TASK_NAME'])
# get list of files
try:
files = rqsd_preflow(environment_info["CERTIFICATE_FILE"],environment_info["CERTIFICATE_KEY"],environment_info["CASPER_URL"],flow_info["COLLECTION_ID"])
rqsd_process(files,environment_info["CASPER_URL"],flow_info["ODS_PREFIX"],environment_info["BUCKET"])
except:
print("Failed to retrieve DEVO data, error during connection or request")
raise(Exception)
# Finalize task
runManager.finalise_task(a_task_history_key, 'Y')