import logging import os import csv from abc import ABC, abstractmethod from mrds.utils.utils import parse_output_columns from mrds.utils import ( manage_files, manage_runs, objectstore, static_vars, ) OUTPUT_FILENAME_TEMPLATE = "{output_table}-{task_history_key}.csv" STATUS_SUCCESS = static_vars.status_success # duplicated needs to be moved #TODO class TaskProcessor(ABC): def __init__(self, global_config, task_conf, client, workflow_context): self.global_config = global_config self.task_conf = task_conf self.client = client self.workflow_context = workflow_context self._init_common() self._post_init() def _init_common(self): # Initialize task self.a_task_history_key = manage_runs.init_task( self.task_conf.task_name, self.workflow_context["run_id"], self.workflow_context["a_workflow_history_key"], ) logging.info(f"Task initialized with history key: {self.a_task_history_key}") # Define output file paths self.output_filename = OUTPUT_FILENAME_TEMPLATE.format( output_table=self.task_conf.output_table, task_history_key=self.a_task_history_key, ) self.output_filepath = os.path.join( self.global_config.tmpdir, self.output_filename ) # Parse the output_columns ( self.xpath_entries, self.csv_entries, self.static_entries, self.a_key_entries, self.workflow_key_entries, self.xml_position_entries, self.column_order, ) = parse_output_columns(self.task_conf.output_columns) def _post_init(self): """Optional hook for classes to override""" pass @abstractmethod def _extract(self): """Non-optional hook for classes to override""" pass def _enrich(self): """ Stream-based enrich: read one row at a time, append static/A-key/workflow-key, reorder columns, and write out immediately. """ TASK_HISTORY_MULTIPLIER = 1_000_000_000 logging.info(f"Enriching CSV file at '{self.output_filepath}'") temp_output = self.output_filepath + ".tmp" encoding = self.global_config.encoding_type with open(self.output_filepath, newline="", encoding=encoding) as inf, open( temp_output, newline="", encoding=encoding, mode="w" ) as outf: reader = csv.reader(inf) writer = csv.writer(outf, quoting=csv.QUOTE_ALL) # Read the original header original_headers = next(reader) # Compute the full set of headers headers = list(original_headers) # Add static column headers if missing for col_name, _ in self.static_entries: if col_name not in headers: headers.append(col_name) # Add A-key column headers if missing for col_name in self.a_key_entries: if col_name not in headers: headers.append(col_name) # Add workflow key column headers if missing for col_name in self.workflow_key_entries: if col_name not in headers: headers.append(col_name) # Rearrange headers to desired ordr header_to_index = {h: i for i, h in enumerate(headers)} out_indices = [ header_to_index[h] for h in self.column_order if h in header_to_index ] out_headers = [headers[i] for i in out_indices] # Write the new header writer.writerow(out_headers) # Stream each row, enrich in-place, reorder, and write row_count = 0 base_task_history = int(self.a_task_history_key) * TASK_HISTORY_MULTIPLIER for i, in_row in enumerate(reader, start=1): # Build a working list that matches `headers` order. # Start by copying the existing columns (or '' if missing) work_row = [None] * len(headers) for j, h in enumerate(original_headers): idx = header_to_index[h] work_row[idx] = in_row[j] # Fill static columns for col_name, value in self.static_entries: idx = header_to_index[col_name] work_row[idx] = value # Fill A-key columns for col_name in self.a_key_entries: idx = header_to_index[col_name] a_key_value = base_task_history + i work_row[idx] = str(a_key_value) # Fill workflow key columns wf_val = self.workflow_context["a_workflow_history_key"] for col_name in self.workflow_key_entries: idx = header_to_index[col_name] work_row[idx] = wf_val # Reorder to output order and write out_row = [work_row[j] for j in out_indices] writer.writerow(out_row) row_count += 1 # Atomically replace os.replace(temp_output, self.output_filepath) logging.info( f"CSV file enriched at '{self.output_filepath}', {row_count} rows generated" ) def _upload(self): # Upload CSV to object store logging.info( f"Uploading CSV file to '{self.global_config.bucket}/{self.task_conf.ods_prefix}/{self.output_filename}'" ) objectstore.upload_file( self.client, self.output_filepath, self.global_config.bucket_namespace, self.global_config.bucket, self.task_conf.ods_prefix, self.output_filename, ) logging.info( f"CSV file uploaded to '{self.global_config.bucket}/{self.task_conf.ods_prefix}/{self.output_filename}'" ) def _process_remote(self): # Process the source file logging.info(f"Processing source file '{self.output_filename}' with CT_MRDS.FILE_MANAGER.PROCESS_SOURCE_FILE database function.") try: manage_files.process_source_file( self.task_conf.ods_prefix, self.output_filename ) except Exception as e: logging.error( f"Processing source file '{self.output_filename}' failed. Cleaning up..." ) objectstore.delete_file( self.client, self.output_filename, self.global_config.bucket_namespace, self.global_config.bucket, self.task_conf.ods_prefix, ) logging.error( f"CSV file '{self.global_config.bucket}/{self.task_conf.ods_prefix}/{self.output_filename}' deleted." ) raise else: logging.info(f"Source file '{self.output_filename}' processed") def _finalize(self): # Finalize task manage_runs.finalise_task(self.a_task_history_key, STATUS_SUCCESS) logging.info(f"Task '{self.task_conf.task_name}' completed successfully") def process(self): # main processor function self._extract() self._enrich() self._upload() self._process_remote() self._finalize()