import xmlschema import hashlib from lxml import etree from typing import Dict, List def validate_xml(xml_file, xsd_file): try: # Create an XMLSchema instance with strict validation schema = xmlschema.XMLSchema(xsd_file, validation="strict") # Validate the XML file schema.validate(xml_file) return True, "XML file is valid against the provided XSD schema." except xmlschema.validators.exceptions.XMLSchemaValidationError as e: return False, f"XML validation error: {str(e)}" except xmlschema.validators.exceptions.XMLSchemaException as e: return False, f"XML schema error: {str(e)}" except Exception as e: return False, f"An error occurred during XML validation: {str(e)}" def extract_data( filename, xpath_columns, # List[(expr, header, is_key)] xml_position_columns, # List[(expr, header)] namespaces, workflow_context, encoding_type="utf-8", ): """ Parses an XML file using XPath expressions and extracts data. Parameters: - filename (str): The path to the XML file to parse. - xpath_columns (list): A list of tuples, each containing: - XPath expression (str) - CSV column header (str) - Indicator if the field is a key ('Y' or 'N') - xml_position_columns (list) - namespaces (dict): Namespace mapping needed for lxml's xpath() Returns: - dict: A dictionary containing headers and rows with extracted data. """ parser = etree.XMLParser(remove_blank_text=True) tree = etree.parse(filename, parser) root = tree.getroot() # Separate out key vs non‐key columns key_cols = [ (expr, h) for expr, h, k in xpath_columns if k == "Y" ] nonkey_cols = [ (expr, h) for expr, h, k in xpath_columns if k == "N" ] # Evaluate every non‐key XPath and keep the ELEMENT nodes nonkey_elements = {} for expr, header in nonkey_cols: elems = root.xpath(expr, namespaces=namespaces) nonkey_elements[header] = elems # figure out how many rows total we need # that's the maximum length of any of the nonkey lists if nonkey_elements: row_count = max(len(lst) for lst in nonkey_elements.values()) else: row_count = 0 # pad every nonkey list up to row_count with `None` for header, lst in nonkey_elements.items(): if len(lst) < row_count: lst.extend([None] * (row_count - len(lst))) # key columns key_values = [] for expr, header in key_cols: nodes = root.xpath(expr, namespaces=namespaces) if not nodes: key_values.append("") else: first = nodes[0] txt = (first.text if isinstance(first, etree._Element) else str(first)) or "" key_values.append(txt.strip()) # xml_position columns xml_positions = {} for expr, header in xml_position_columns: xml_positions[header] = root.xpath(expr, namespaces=namespaces) # prepare headers headers = [h for _, h in nonkey_cols] + [h for _, h in key_cols] + [h for _, h in xml_position_columns] # build rows rows = [] for i in range(row_count): row = [] # non‐key data for expr, header in nonkey_cols: elem = nonkey_elements[header][i] text = "" if isinstance(elem, etree._Element): text = elem.text or "" elif elem is not None: text = str(elem) row.append(text.strip()) # key columns row.extend(key_values) # xml_position columns for expr, header in xml_position_columns: if not nonkey_cols: row.append("") continue first_header = nonkey_cols[0][1] data_elem = nonkey_elements[first_header][i] if data_elem is None: row.append("") continue target_list = xml_positions[header] current = data_elem found = None while current is not None: if current in target_list: found = current break current = current.getparent() if not found: row.append("") else: # compute full‐path with indices path_elems = [] walk = found while walk is not None: idx = 1 + sum(1 for s in walk.itersiblings(preceding=True) if s.tag == walk.tag) path_elems.append(f"{walk.tag}[{idx}]") walk = walk.getparent() full_path = "/" + "/".join(reversed(path_elems)) row.append(_xml_pos_hasher(full_path, workflow_context["a_workflow_history_key"])) rows.append(row) return {"headers": headers, "rows": rows} def _xml_pos_hasher(input_string, salt, hash_length=15): """ Helps hashing xml positions. Parameters: input_string (str): The string to hash. salt (int): The integer salt to ensure deterministic, run-specific behavior. hash_length (int): The desired length of the resulting hash (default is 15 digits). Returns: int: A deterministic integer hash of the specified length. """ # Ensure the hash length is valid if hash_length <= 0: raise ValueError("Hash length must be a positive integer.") # Combine the input string with the salt to create a deterministic input salted_input = f"{salt}:{input_string}" # Generate a SHA-256 hash of the salted input hash_object = hashlib.sha256(salted_input.encode()) full_hash = hash_object.hexdigest() # Convert the hash to an integer hash_integer = int(full_hash, 16) # Truncate or pad the hash to the desired length truncated_hash = str(hash_integer)[:hash_length] return int(truncated_hash)