Files
mars-elt/python/connectors/tms/TMSDBT.py
Grzegorz Michalski 2c225d68ac init
2026-03-02 09:47:35 +01:00

295 lines
8.9 KiB
Python

import argparse
from TMSQuery import XMLQuery
import mrds.utils.objectstore
import tempfile
import re
import csv
from io import StringIO
import os.path
import os, psutil
import sys
namespace = os.getenv("BUCKET_NAMESPACE", "frcnomajoc7v")
def memory_usage():
# return the memory usage in percentage like top
process = psutil.Process(os.getpid())
mem = process.memory_info().rss/(1024*1024*1024)
return mem
def protect_keyword(s):
s = s.lower()
s = s.replace(' ', '_')
match s.lower():
case 'comment':
#return '"comment"'
return 'comment_'
case 'date':
#return '"date"'
return 'date_'
case 'number':
#return '"number"'
return 'number_'
case _:
return s
cModelsDir = sys.path[0] + '/../dbt/mrds/models/ods/'
cDatasetMultiplier = 10000000
parser = argparse.ArgumentParser()
parser.add_argument("command", choices=['create-model', 'create-oracle-table', 'retrieve'], help="create-model retrieve")
parser.add_argument("-n", "--name", help="Name")
parser.add_argument("-u", "--url", required=True, help="URL of TMS service")
parser.add_argument("-U", "--user", required=True, help="TMS user")
parser.add_argument("-P", "--password", required=True, help="TMS password")
parser.add_argument("-x", "--xmlfile", help="XML file")
parser.add_argument("-l", "--layoutfile", help="layout file")
parser.add_argument("-f", "--format", help="output format")
parser.add_argument("-p", "--parameter", action="append", help="Parameter")
parser.add_argument("-c", "--column", action="append", help="Additional column")
parser.add_argument("-d", "--destination", help="destination")
parser.add_argument("-s", "--dataset", help="data set ID", type=int)
parser.add_argument("-v", "--version", help="data model version", type=int, default=1)
args = parser.parse_args()
query = XMLQuery()
if args.xmlfile:
with open(args.xmlfile) as f:
xml = f.read()
query.xml = xml
if args.layoutfile:
with open(args.layoutfile) as f:
layout = f.read()
query.layout = layout
if args.format:
query.format = args.format
if args.parameter:
for p in args.parameter:
[name, value] = p.split('=', 1)
query.parameter[name] = value
additional_columns = []
if args.column:
for p in args.column:
[name, value] = p.split('=', 1)
t = re.split(r'(?:\|)|(?:/)|(?::)', name, maxsplit = 2)
name = t[0]
type = None
if len(t) == 2:
type = t[1]
if not type:
type = 'varchar2(255)'
additional_columns.append((name, type, value))
query.normalize_output()
from pathlib import Path
import pprint
p = Path('/tmp/kurt.xml')
p.write_text(str(query))
if args.command == 'create-oracle-table':
d = query.describe(args.url, args.user, args. password)
columns = [" a_key number(38, 0)", "a_workflow_history_key number(38, 0)"]
for c in additional_columns:
columns.append("%s %s"%(c[0], c[1]))
for col in d:
name = protect_keyword(col[0])
match col[1]:
case 'text':
columns.append(name + " varchar2(512 char)")
case 'int':
columns.append(name + " number(38,0)")
case 'money':
columns.append(name + " number(19,4)")
case 'floating':
columns.append(name + " binary_double")
case 'datetime':
columns.append(name + " date")
case 'integer':
columns.append(name + " number(12, 0)")
sql = "create table ct_et_templates." + args.name + " (\n"
sql = sql + ",\n ".join(columns)
sql = sql + "\n)\n"
if not args.destination or args.destination == '-':
print(sql)
else:
with open(args.destination, 'w') as f:
f.write(sql)
elif args.command == 'create-ods-model':
d = query.describe(args.url, args.user, args. password)
file_name = cModelsDir + args.name + '.yml'
f = open(file_name, 'w') # open file in append mode
f.write('version: %d\n' % args.version)
f.write('models:' + '\n')
f.write(' - name: ' + args.name + '_dbt\n')
f.write(' description: "A starter dbt model"' + '\n')
f.write(' columns:' + '\n')
for col in d:
f.write(' - name: ' + col[0] + '\n')
f.write(' data_type: ' + col[1] + '\n')
f.close()
file_name = cModelsDir + args.name + '.sql'
f = open(file_name, 'w') # open file in append mode
if args.destination and args.destination != '-':
if ':' in args.destination:
dest = args.destination.split(':', 2)
path = dest[1]
else:
path = args.destination
prefix = os.path.dirname(path)
else:
prefix = 'INBOX/TMS/' + args.name.upper() + '/'
pars = "ptablename => '%s', ptemplatetablename => 'ou_tms.%s', pprefix => '%s'" % (args.name, args.name, prefix)
print(f"creating table {args.name}")
f.write('{{\n config(\n post_hook = "call ct_mrds.file_manager.create_external_table(%s)"\n )\n}}\n\n' % pars)
f.write("{{ config(materialized='table') }}" + "\n")
f.write('with source_data as (' + "\n")
columns = []
columns.append("cast (1 as number(38,0)) as a_key")
columns.append("cast (1 as number(38,0)) as a_workflow_history_key")
for col in d:
name = protect_keyword(col[0])
match col[1]:
case 'text':
columns.append("cast ('x' as varchar2(255 char)) as " + name)
case 'int':
columns.append("cast (1 as number(38, 0)) as " + name)
case 'money':
columns.append("cast (1.0 as number(19,4)) as " + name)
case 'floating':
columns.append("cast (1.0 as binary_double) as " + name)
case 'datetime':
columns.append("cast (sysdate as date) as " + name)
case 'integer':
columns.append("cast (1 as number(12, 0)) as " + name)
f.write(' select\n ' + ',\n '.join(columns) + '\n')
f.write(')\nselect * from source_data\n ')
f.close()
elif args.command == 'retrieve':
ret = query.execute(args.url, args.user, args. password)
if query.format in ('scsv', 'standard_csv') and args.dataset:
# Save result to temporary spooled file for further processing
# We avoid doing this in memory to prevent issues with flow EffectivePermissions
f = tempfile.SpooledTemporaryFile(mode = 'w+', max_size = 200*1024*1024)
f.write(ret)
del ret
f.seek(0)
# Replace embedded newlines for '<br/>'
reader = csv.reader(f)
sio = StringIO()
writer = csv.writer(sio)
for l in reader:
l_tmp = [s.replace('\n', '<br/>') for s in l]
writer.writerow(l_tmp)
f.close()
# Necessary to read the data into an array of lines for further processing
sio.seek(0)
lines_tmp = sio.readlines()
del sio
if not lines_tmp:
ret = ""
else:
# Adding artificial columns A_KEY and A_WORKFLOW_HISTORY_KEY and added columns
additional_headers = [t[0] for t in additional_columns]
additional_values = [t[2] for t in additional_columns]
headers = ['A_KEY','A_WORKFLOW_HISTORY_KEY'] + additional_headers + [protect_keyword(h) for h in lines_tmp[0].split(',')]
lines = [','.join(headers) ]
i = 0
for l in lines_tmp[1:]:
lines.append(str(args.dataset*cDatasetMultiplier + i) + ',' + str(args.dataset) + ',' + ','.join(additional_values + [l]) )
i += 1
del lines_tmp
# Spooling again to temporary file to avoid duplication memory needs
f = tempfile.SpooledTemporaryFile(mode = 'w+', max_size = 200*1024*1024)
f.writelines(lines)
del lines
f.seek(0)
ret = f.read()
f.close()
if not args.destination or args.destination == '-':
print(ret, end='')
elif ':' not in args.destination:
with open(args.destination, 'w') as f:
f.write(ret)
else:
f = tempfile.NamedTemporaryFile(delete = False, mode = 'w', prefix = 'TMSDBT-', suffix = '.csv')
f.write(ret)
f.close()
dest = args.destination.split(':', 2)
bucket = dest[0]
dirname = os.path.dirname(dest[1])
filename = os.path.basename(dest[1])
client = mrds.utils.objectstore.get_client()
with open(f.name, "r") as file:
print(file.read())
mrds.utils.objectstore.upload_file(client, f.name,namespace, bucket, dirname, filename)
os.remove(f.name)
if ret:
sys.exit(0)
else:
sys.exit(1)