539 lines
18 KiB
Python
539 lines
18 KiB
Python
from airflow import DAG
|
|
from airflow.decorators import dag
|
|
from airflow.operators.python import PythonOperator
|
|
from datetime import datetime, timedelta
|
|
|
|
import logging
|
|
from pytz import timezone
|
|
|
|
import mrds.utils.manage_files as mf
|
|
import mrds.utils.manage_runs as mr
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import traceback
|
|
import copy
|
|
import itertools
|
|
import pprint
|
|
|
|
import yaml
|
|
import sys, os
|
|
from pathlib import Path
|
|
|
|
|
|
gScriptDir = Path(globals().get("__file__", "./_")).absolute().parent
|
|
gDataDir = str(gScriptDir) + '/TMS-layouts/'
|
|
gConfigDir = str(gScriptDir) + '/config'
|
|
gConnDir = '/opt/airflow/python/connectors/tms/'
|
|
gTableDir = str(gScriptDir) + '/TMS-tables/'
|
|
|
|
|
|
default_args = {
|
|
"owner": "ecb",
|
|
"depends_on_past": False,
|
|
"email_on_failure": False,
|
|
"email_on_retry": False,
|
|
"retries": 0,
|
|
"execution_timeout": timedelta(minutes=60),
|
|
"retry_delay": timedelta(minutes=5),
|
|
}
|
|
|
|
|
|
def create_dag(dag_name):
|
|
|
|
ODS_TABLE =re.sub('^w_ODS_','',dag_name)
|
|
DATABASE_NAME = "ODS"
|
|
DAG_NAME = dag_name
|
|
MAPPING_NAME = dag_name
|
|
WF_NAME = dag_name
|
|
|
|
print("create_dag(" + dag_name + ")")
|
|
|
|
|
|
with open(gDataDir + DAG_NAME + '.yml', 'r') as file:
|
|
report_desc = yaml.safe_load(file)
|
|
|
|
|
|
params_sequential = report_desc.get('parameters_sequential', False)
|
|
|
|
params = {}
|
|
params_visible = {}
|
|
params_hidden = {}
|
|
|
|
params_dict = report_desc.get('parameters', {})
|
|
params_dict = params_dict or {}
|
|
|
|
for p in params_dict.keys():
|
|
params[p] = params_dict[p].get('value', None)
|
|
if not params_dict[p].get('hidden', False):
|
|
params_visible[p] = params_dict[p].get('value', None)
|
|
else:
|
|
params_hidden[p] = params_dict[p].get('value', None)
|
|
|
|
|
|
|
|
with open(gConfigDir + '/TMS.yml', 'r') as f:
|
|
tms_config = yaml.safe_load(f)
|
|
|
|
|
|
tms_url = tms_config['TMS-URL']
|
|
prefix = tms_config['dest-prefix'] + ODS_TABLE + '/' + ODS_TABLE + '/'
|
|
data_prefix = tms_config['data-prefix'] + ODS_TABLE + '/'
|
|
dest = tms_config['dest-bucket'] + ':' + prefix
|
|
|
|
tms_user = tms_config['TMS-user']
|
|
tms_pwd = tms_config['TMS-password']
|
|
|
|
|
|
|
|
|
|
# Define function for the retrieval of the current run_id
|
|
def retrieve_run_id(**kwargs):
|
|
# Retrieve the run_id from the Airflow context
|
|
run_id = kwargs['run_id']
|
|
# Store the run_id in XCom for future reference
|
|
ti = kwargs['ti']
|
|
ti.xcom_push(key='run_id', value=run_id)
|
|
return run_id
|
|
|
|
def check_dag_status(**kwargs):
|
|
for task_instance in kwargs['dag_run'].get_task_instances():
|
|
if task_instance.state == 'failed' and task_instance.task_id != kwargs['task_instance'].task_id:
|
|
raise Exception("Task {} failed. Failing this DAG run".format(task_instance.task_id))
|
|
|
|
# Define function for the check of the status of the previous tasks
|
|
def determine_workflow_status(**kwargs):
|
|
# Check the status of previous tasks
|
|
task_statuses = kwargs['ti'].xcom_pull(task_ids=['retrieve_run_id', 'control_external_run_start', 'mapping_mopdb'])
|
|
# If any task failed, set workflow_status to 'N', otherwise 'Y'
|
|
workflow_status = 'N' if any(status != 'success' for status in task_statuses) else 'Y'
|
|
return workflow_status
|
|
|
|
|
|
|
|
def enum_param_combinations_recursive(params, keys):
|
|
|
|
|
|
k = None
|
|
result = []
|
|
|
|
while(keys):
|
|
k = keys.pop(0)
|
|
v = params[k]
|
|
if v or v == "":
|
|
break
|
|
|
|
if not k:
|
|
return []
|
|
|
|
if not(isinstance(v, list)):
|
|
v = [v]
|
|
|
|
derived_columns = []
|
|
for c in list(params_dict[k]):
|
|
if re.match(r'column\(.*\)', c):
|
|
vtmp = params_dict[k][c]
|
|
if not(isinstance(vtmp, list)):
|
|
vtmp = [vtmp]
|
|
derived_columns.append((c, vtmp))
|
|
|
|
print("derived columns = " + str(derived_columns))
|
|
|
|
if not keys:
|
|
for i, value in enumerate(v):
|
|
result.append([(k, value)])
|
|
for t in derived_columns:
|
|
result[len(result)-1].append((t[0], t[1][i]))
|
|
|
|
return result
|
|
|
|
|
|
|
|
combinations = enum_param_combinations_recursive(params, keys)
|
|
|
|
for l in combinations:
|
|
|
|
for i, vtmp in enumerate(v):
|
|
tmp = copy.deepcopy(l)
|
|
tmp.append((k, vtmp))
|
|
for t in derived_columns:
|
|
tmp.append((t[0], t[1][i]))
|
|
result.append(tmp)
|
|
|
|
return result
|
|
|
|
|
|
def enum_param_combinations_sequential(params):
|
|
|
|
iterator = itertools.count(0, 1)
|
|
keys = list(params)
|
|
|
|
result = []
|
|
|
|
for i in iterator:
|
|
|
|
param_list = []
|
|
while(keys):
|
|
k = keys.pop(0)
|
|
v = params[k]
|
|
if instance(v, list):
|
|
param_list.append((k, v[i]))
|
|
else:
|
|
param_list.append((k, v))
|
|
|
|
result.append(param_list)
|
|
|
|
return result
|
|
|
|
|
|
|
|
def enum_param_combinations(params, sequential = False):
|
|
|
|
if sequential:
|
|
return enum_param_combinations_sequential(params)
|
|
else:
|
|
return enum_param_combinations_recursive(params, list(params))
|
|
|
|
|
|
|
|
def select(table, expression, condition = "1 = 1"):
|
|
|
|
if table.upper() not in (ODS_TABLE.upper(), 'DUAL', 'ct_mrds.a_workflow_history'.upper()):
|
|
raise Exception("Not allowed to select from %s" % table)
|
|
|
|
res = mr.select_ods_tab(table, expression, condition)
|
|
|
|
return res[0]
|
|
|
|
|
|
|
|
# Input is a list of tuples containing (parameter name, value)
|
|
# Returns evaluated parameters in case functional expressions are used
|
|
|
|
def eval_params(params):
|
|
|
|
params_new = []
|
|
for t in params:
|
|
if re.match(r"\s*select\(.*\)", str(t[1])):
|
|
print("globals = " + str(globals()))
|
|
print("locals = " + str(locals()))
|
|
print("expression = " + t[1])
|
|
params_new.append((t[0], eval(t[1], globals() | {'select': select}, locals())))
|
|
else:
|
|
params_new.append(t)
|
|
|
|
return params_new
|
|
|
|
|
|
def eval_param(v):
|
|
|
|
if re.match(r"\s*select\(.*\)", str(v)):
|
|
v = eval(v, {'select': select}, {})
|
|
elif re.match(r"\s*eval\(.*\)*$", str(v)):
|
|
expr = re.match(r"\s*eval\(.*\)*$", str(v)).group(0)
|
|
v = eval(expr)
|
|
return v
|
|
|
|
|
|
|
|
def execute_report_task(**context):
|
|
|
|
params = context['params']
|
|
|
|
all_params = params | params_hidden
|
|
|
|
|
|
# Calculation of all combinations of parameters
|
|
# By default, this is done recursively, exploring all vertices
|
|
param_combinations = enum_param_combinations(params | params_hidden, sequential = False)
|
|
|
|
# The evaluation of parameters requires a few iterations
|
|
|
|
# First iteration is for evaluating functional expressions
|
|
tmp = []
|
|
for l in param_combinations:
|
|
d = dict(l)
|
|
for p in list(d):
|
|
d[p] = eval_param(d[p])
|
|
tmp.append(list(d.items()))
|
|
|
|
param_combinations = tmp
|
|
|
|
|
|
# Second iteration is for replacing virtual symbolic parameters
|
|
tmp = []
|
|
for l in param_combinations:
|
|
d = dict(l)
|
|
|
|
for p in list(d):
|
|
if p in params_dict and params_dict[p].get('replace_parameter', None):
|
|
d[params_dict[p]['replace_parameter']] = d[params_dict[p]['replace_parameter']].replace(p, d[p])
|
|
tmp.append(list(d.items()))
|
|
|
|
param_combinations = tmp
|
|
|
|
|
|
# Next iteration selects actual parameters to be used
|
|
tmp = []
|
|
for l in param_combinations:
|
|
tmp_list = []
|
|
for t in l:
|
|
if t[0] not in params_dict or not params_dict[t[0]].get('virtual', False):
|
|
tmp_list.append((t[0], t[1]))
|
|
tmp.append(tmp_list)
|
|
|
|
param_combinations = tmp
|
|
|
|
|
|
run_id = context['dag_run'].run_id
|
|
|
|
# Finally, we execute the report for each evaluated combination
|
|
if param_combinations:
|
|
i = 1
|
|
for c in param_combinations:
|
|
execute_report(c, run_id)
|
|
i = i + 1
|
|
else:
|
|
execute_report([], run_id)
|
|
|
|
|
|
|
|
|
|
def execute_report(param_list, airflow_run_id):
|
|
|
|
print("execute_report " + str(param_list))
|
|
|
|
logger = logging.getLogger("airflow.task")
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
ts = '{:%Y%m%d_%H%M%S}'.format(datetime.now(timezone('Europe/Berlin')))
|
|
wf_key = mr.init_workflow('ODS', WF_NAME, airflow_run_id)
|
|
|
|
data_set = wf_key
|
|
|
|
file_name = ODS_TABLE + '.' + str(data_set) + '.' + ts + '.csv'
|
|
|
|
|
|
command = ['python', gConnDir + 'TMSDBT.py', 'retrieve', '--name', ODS_TABLE,
|
|
'--url', tms_url,
|
|
'-U', tms_user, '--password', tms_pwd,
|
|
'--layoutfile ' + gDataDir + dag_name + '.fkr',
|
|
'-f', 'scsv', '--dataset', str(wf_key),
|
|
'-d', dest + file_name]
|
|
|
|
for t in param_list:
|
|
if t[1] or t[1] == "":
|
|
m = re.match(r'column\((.*)\)$', t[0])
|
|
if m:
|
|
command.extend(['-c', '"' + m.group(1) + '"' + '="%s"' % str(t[1]).rstrip()])
|
|
elif t[1] or t[1] == "":
|
|
command.extend(['-p', t[0] + '="%s"' % str(t[1]).rstrip()])
|
|
logger.debug(f"workflow_property: {wf_key}, {'ODS'}, {t[0]}, {str(t[1]).rstrip()}")
|
|
mr.set_workflow_property(wf_key, 'ODS', t[0], str(t[1]).rstrip())
|
|
|
|
# Useful printout of command to execute manually
|
|
# p = Path('/tmp/command-' + str(wf_key))
|
|
# p.write_bytes(' '.join(command).encode())
|
|
|
|
command = [' '.join(command)]
|
|
|
|
logger.debug(str(command))
|
|
try:
|
|
logger.debug(f'running TMSDB with arguments: {command}')
|
|
res = subprocess.run(command, capture_output=True, check=False, shell=True)
|
|
logger.debug('subprocess stdout =' + res.stdout.decode())
|
|
logger.debug('subprocess stderr =' + res.stderr.decode())
|
|
|
|
|
|
if res.returncode == None:
|
|
raise Exception('Data Retrieval failure')
|
|
elif res.returncode == 1:
|
|
logger.debug('No data returned')
|
|
raise
|
|
elif res.returncode != 0:
|
|
raise Exception('Other failure')
|
|
elif res.returncode == 0:
|
|
# Data was returned
|
|
# Check if configuration record exists already')
|
|
res = mf.execute_query("select * from CT_MRDS.A_SOURCE_FILE_CONFIG where a_source_key = 'TMS' and table_id = '%s'" % ODS_TABLE)
|
|
logger.debug(f'result of queryng source_file_config {res}')
|
|
if not res:
|
|
logger.debug(f'creating table {ODS_TABLE}')
|
|
mf.add_source_file_config('TMS', 'INPUT', DAG_NAME.strip(), ODS_TABLE, '.*\.csv', ODS_TABLE, 'CT_ET_TEMPLATES.' + ODS_TABLE)
|
|
command = ['python', gConnDir + 'TMSDBT.py', 'create-oracle-table', '--name', ODS_TABLE,
|
|
'--url', tms_url,
|
|
'-U', tms_user, '--password', tms_pwd,
|
|
'--layoutfile ' + gDataDir + dag_name + '.fkr',
|
|
'-d', gTableDir + ODS_TABLE + '.sql']
|
|
|
|
for t in param_list:
|
|
m = re.match(r'column\((.*)\)$', t[0])
|
|
if m:
|
|
command.extend(['-c', '"' + m.group(1) + '"' + '="%s"' % str(t[1]).rstrip()])
|
|
elif t[1] or t[1] == "":
|
|
command.extend(['-p', t[0] + '="%s"' % t[1]])
|
|
|
|
# print(str(command))
|
|
command = [' '.join(command)]
|
|
logger.debug(f'running TMSDB with arguments: {command}')
|
|
|
|
res = subprocess.run(command, capture_output=True, check=True, shell=True)
|
|
logger.debug('subprocess stdout =' + res.stdout.decode())
|
|
logger.debug('subprocess stderr =' + res.stderr.decode())
|
|
|
|
|
|
sql = Path(gTableDir + ODS_TABLE + '.sql').read_text()
|
|
# print('SQL - '+ sql)
|
|
res = mf.execute(sql)
|
|
res = mf.add_column_date_format('CT_ET_TEMPLATES.' + ODS_TABLE, 'DEFAULT', 'DD/MM/YYYY HH24:MI:SS')
|
|
res = mf.create_external_table(ODS_TABLE, 'CT_ET_TEMPLATES.' + ODS_TABLE, data_prefix)
|
|
|
|
|
|
mf.process_source_file(prefix, file_name)
|
|
|
|
mr.finalise_workflow(wf_key, 'Y')
|
|
|
|
except BaseException as ex:
|
|
ex_type, ex_value, ex_traceback = sys.exc_info()
|
|
|
|
trace_back = traceback.extract_tb(ex_traceback)
|
|
|
|
stack_trace = list()
|
|
|
|
for trace in trace_back:
|
|
stack_trace.append("File : %s , Line : %d, Func.Name : %s, Message : %s" % (trace[0], trace[1], trace[2], trace[3]))
|
|
|
|
logger.error("Exception type : %s " % ex_type.__name__)
|
|
logger.error("Exception message : %s" %ex_value)
|
|
logger.error("Stack trace : %s" %stack_trace)
|
|
|
|
mr.finalise_workflow(wf_key, 'N')
|
|
raise
|
|
|
|
|
|
|
|
|
|
@dag(
|
|
DAG_NAME,
|
|
default_args = default_args,
|
|
description = DAG_NAME,
|
|
schedule_interval = None,
|
|
params = params_visible,
|
|
start_date = datetime(2025, 1, 1),
|
|
catchup = False,
|
|
tags = [DAG_NAME],
|
|
)
|
|
def run_dag():
|
|
|
|
PythonOperator(
|
|
task_id = "retrieve_report",
|
|
execution_timeout = timedelta(minutes = 30),
|
|
python_callable = execute_report_task
|
|
)
|
|
|
|
new_dag = run_dag()
|
|
globals()[DAG_NAME] = new_dag
|
|
|
|
|
|
|
|
create_dag('w_ODS_TMS_RAR_PORTFOLIOACCESS')
|
|
create_dag('w_ODS_TMS_CLIENTACCOUNT')
|
|
create_dag('w_ODS_TMS_BRANCH')
|
|
create_dag('w_ODS_TMS_RAR_ECBINSTRUMENTS')
|
|
create_dag('w_ODS_TMS_CALENDAR')
|
|
create_dag('w_ODS_TMS_CLIENTINSTRUCTION')
|
|
create_dag('w_ODS_TMS_HISTORY_LOG')
|
|
create_dag('w_ODS_TMS_ECBINSTRUMENTBONDCASHFLOW')
|
|
create_dag('w_ODS_TMS_PARAMETER')
|
|
create_dag('w_ODS_TMS_RAR_FRM_HOLIDAYS')
|
|
create_dag('w_ODS_TMS_RAR_LIMITACCESS')
|
|
create_dag('w_ODS_TMS_RAR_PORTFOLIOTREE')
|
|
create_dag('w_ODS_TMS_RAR_RARCOLLATERALINVENTORY')
|
|
create_dag('w_ODS_TMS_RULES')
|
|
create_dag('w_ODS_TMS_SDM_ENTITY_STATE')
|
|
create_dag('w_ODS_TMS_USER_ACCOUNT')
|
|
|
|
|
|
|
|
"""
|
|
create_dag('w_ODS_TMS_ACMCURRENCYFLOW')
|
|
create_dag('w_ODS_TMS_ACMENTRYSTATELEDGERGROUP')
|
|
create_dag('w_ODS_TMS_ACTIVITY')
|
|
create_dag('w_ODS_TMS_ACTIVITY_LOG')
|
|
create_dag('w_ODS_TMS_ACTIVITYLOGDUE')
|
|
create_dag('w_ODS_TMS_BALANCE')
|
|
create_dag('w_ODS_TMS_BLACKOUT_LOG')
|
|
create_dag('w_ODS_TMS_BRANCH')
|
|
create_dag('w_ODS_TMS_CALENDAR')
|
|
create_dag('w_ODS_TMS_CASHFLOW')
|
|
create_dag('w_ODS_TMS_CLIENTACCOUNT')
|
|
create_dag('w_ODS_TMS_CLIENTINSTRUCTION')
|
|
create_dag('w_ODS_TMS_CPBLOCKEDISSUERS')
|
|
create_dag('w_ODS_TMS_CUSTODYBALANCE')
|
|
create_dag('w_ODS_TMS_ECBINSTRUMENTBONDCASHFLOW')
|
|
create_dag('w_ODS_TMS_EFFECTIVEROLEPROFILE')
|
|
create_dag('w_ODS_TMS_FINMESSAGELOG')
|
|
create_dag('w_ODS_TMS_HISTORY_LOG')
|
|
create_dag('w_ODS_TMS_INSTRUMENT_BOND_SCHEDULE')
|
|
create_dag('w_ODS_TMS_INSTRUMENT_REPORT')
|
|
create_dag('w_ODS_TMS_INSTRUMENTBONDCASHFLOW')
|
|
create_dag('w_ODS_TMS_MOPDBPRICES')
|
|
create_dag('w_ODS_TMS_PARAMETER')
|
|
create_dag('w_ODS_TMS_POS_PERF_ACTIVITY_STATUS')
|
|
create_dag('w_ODS_TMS_PRICE_RATE_REPORT')
|
|
create_dag('w_ODS_TMS_PROPERTY')
|
|
create_dag('w_ODS_TMS_RAR_CLIENT')
|
|
create_dag('w_ODS_TMS_RAR_CLIENTGROUPMAP')
|
|
create_dag('w_ODS_TMS_RAR_ECBINSTRUMENTS')
|
|
create_dag('w_ODS_TMS_RAR_FRM_HOLIDAYS')
|
|
create_dag('w_ODS_TMS_RAR_FUTURE_BONDS')
|
|
create_dag('w_ODS_TMS_RAR_INSTRUMENT_REPORT')
|
|
create_dag('w_ODS_TMS_RAR_LIMIT')
|
|
create_dag('w_ODS_TMS_RAR_LIMITACCESS')
|
|
create_dag('w_ODS_TMS_RAR_LIMITLOG')
|
|
create_dag('w_ODS_TMS_RAR_MARKETINFO')
|
|
create_dag('w_ODS_TMS_RAR_PORTFOLIO')
|
|
create_dag('w_ODS_TMS_RAR_PORTFOLIOACCESS')
|
|
create_dag('w_ODS_TMS_RAR_PORTFOLIOTREE')
|
|
create_dag('w_ODS_TMS_RAR_PRICES')
|
|
create_dag('w_ODS_TMS_RAR_RARCOLLATERALINVENTORY')
|
|
create_dag('w_ODS_TMS_RAR_RARTRANSACTIONPROPERTY')
|
|
create_dag('w_ODS_TMS_RAR_RARTRANSLIMITCOND')
|
|
create_dag('w_ODS_TMS_RAR_SUBLIMIT')
|
|
create_dag('w_ODS_TMS_RAR_TRANSACTIONSLOG')
|
|
create_dag('w_ODS_TMS_RAR_UMICREDITCLIENT')
|
|
create_dag('w_ODS_TMS_RAR_UMISECURITYCODE')
|
|
create_dag('w_ODS_TMS_RAR_UMISECURITYINFO')
|
|
create_dag('w_ODS_TMS_RECONCILIATION')
|
|
create_dag('w_ODS_TMS_ROLEPORTFOLIOPROFILE')
|
|
create_dag('w_ODS_TMS_RULES')
|
|
create_dag('w_ODS_TMS_SDM_ENTITY_STATE')
|
|
create_dag('w_ODS_TMS_SECURITYPOSITION')
|
|
create_dag('w_ODS_TMS_SETTLEMENTCASHFLOW')
|
|
create_dag('w_ODS_TMS_SETTLEMENTLOG')
|
|
create_dag('w_ODS_TMS_THIRD_PRICE_RATE_REPORT_CHECK')
|
|
create_dag('w_ODS_TMS_TRANSACTION')
|
|
create_dag('w_ODS_TMS_USER_ACCOUNT')
|
|
create_dag('w_ODS_TMS_USERINFORMATION')
|
|
|
|
create_dag('w_ODS_TMS_EFFECTIVEPERMISSIONS')
|
|
|
|
create_dag('w_ODS_TMS_ECBPOSITIONDATA_GROUPING_0')
|
|
create_dag('w_ODS_TMS_ECBPOSITIONDATA_GROUPING_4')
|
|
create_dag('w_ODS_TMS_ECBPOSITIONDATA_GROUPING_5')
|
|
create_dag('w_ODS_TMS_ECBPOSITIONDATA_GROUPING_32')
|
|
create_dag('w_ODS_TMS_ECBPOSITIONDATA_GROUPING_36')
|
|
create_dag('w_ODS_TMS_ECBPOSITIONDATA_GROUPING_96')
|
|
create_dag('w_ODS_TMS_ECBPOSITIONDATA_GROUPING_100')
|
|
create_dag('w_ODS_TMS_ECBPOSITIONDATA_GROUPING_128')
|
|
create_dag('w_ODS_TMS_ECBPOSITIONDATA_GROUPING_132')
|
|
create_dag('w_ODS_TMS_ECBPOSITIONDATA_GROUPING_256')
|
|
create_dag('w_ODS_TMS_ECBPOSITIONDATA_GROUPING_1093')
|
|
|
|
|
|
create_dag('w_ODS_TMS_ECBPERFORMANCEDATA_GROUPING_4')
|
|
create_dag('w_ODS_TMS_ECBPERFORMANCEDATA_GROUPING_5')
|
|
create_dag('w_ODS_TMS_ECBPERFORMANCEDATA_GROUPING_68')
|
|
create_dag('w_ODS_TMS_ECBPERFORMANCEDATA_GROUPING_69')
|
|
|
|
""" |