From 19925603fa5ba60a4c49ad73c60a1e85d6aa2c62 Mon Sep 17 00:00:00 2001 From: Grzegorz Michalski Date: Wed, 4 Feb 2026 14:41:30 +0100 Subject: [PATCH] aktualizacja definicji --- .../CT_MRDS/SCHEMA/packages/DATA_EXPORTER.pkb | 1196 ++++++++++++----- .../CT_MRDS/SCHEMA/packages/DATA_EXPORTER.pkg | 104 +- .../CT_MRDS/SCHEMA/packages/ENV_MANAGER.pkb | 6 + .../CT_MRDS/SCHEMA/packages/ENV_MANAGER.pkg | 18 +- .../CT_MRDS/SCHEMA/packages/FILE_ARCHIVER.pkb | 107 +- .../CT_MRDS/SCHEMA/packages/FILE_ARCHIVER.pkg | 30 +- 6 files changed, 1119 insertions(+), 342 deletions(-) diff --git a/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/DATA_EXPORTER.pkb b/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/DATA_EXPORTER.pkb index 797c7c8..6734d50 100644 --- a/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/DATA_EXPORTER.pkb +++ b/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/DATA_EXPORTER.pkb @@ -1,6 +1,192 @@ create or replace PACKAGE BODY CT_MRDS.DATA_EXPORTER AS + ---------------------------------------------------------------------------------------------------- + -- PRIVATE HELPER FUNCTIONS (USED BY MULTIPLE PROCEDURES) + ---------------------------------------------------------------------------------------------------- + + /** + * Sanitizes filename by replacing disallowed characters with underscores + **/ + FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS + vFilename VARCHAR2(1000); + BEGIN + vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_'); + RETURN vFilename; + END sanitizeFilename; + + ---------------------------------------------------------------------------------------------------- + + /** + * Deletes export file from OCI bucket if it exists (used for cleanup before retry) + * Silently ignores if file doesn't exist (ORA-20404) + **/ + PROCEDURE DELETE_FAILED_EXPORT_FILE( + pFileUri IN VARCHAR2, + pCredentialName IN VARCHAR2, + pParameters IN VARCHAR2 + ) IS + BEGIN + BEGIN + ENV_MANAGER.LOG_PROCESS_EVENT('Attempting to delete potentially corrupted file: ' || pFileUri, 'DEBUG', pParameters); + + DBMS_CLOUD.DELETE_OBJECT( + credential_name => pCredentialName, + object_uri => pFileUri + ); + + ENV_MANAGER.LOG_PROCESS_EVENT('Deleted existing file (cleanup before retry): ' || pFileUri, 'INFO', pParameters); + EXCEPTION + WHEN OTHERS THEN + -- Object not found is OK (file doesn't exist) + IF SQLCODE = -20404 THEN + ENV_MANAGER.LOG_PROCESS_EVENT('File does not exist (OK): ' || pFileUri, 'DEBUG', pParameters); + ELSE + -- Log but don't fail - export will attempt anyway + ENV_MANAGER.LOG_PROCESS_EVENT('Warning: Could not delete file (will retry export anyway): ' || SQLERRM, 'WARNING', pParameters); + END IF; + END; + END DELETE_FAILED_EXPORT_FILE; + + ---------------------------------------------------------------------------------------------------- + + /** + * Builds query with TO_CHAR for date/timestamp columns using per-column formats + * Retrieves format for each date column from FILE_MANAGER.GET_DATE_FORMAT + **/ + FUNCTION buildQueryWithDateFormats( + pColumnList IN VARCHAR2, + pTableName IN VARCHAR2, + pSchemaName IN VARCHAR2, + pKeyColumnName IN VARCHAR2, + pTemplateTableName IN VARCHAR2 + ) RETURN VARCHAR2 IS + vResult VARCHAR2(32767); + vColumns VARCHAR2(32767); + vPos PLS_INTEGER; + vNextPos PLS_INTEGER; + vCurrentCol VARCHAR2(128); + vAllCols VARCHAR2(32767); + vDataType VARCHAR2(30); + vDateFormat VARCHAR2(200); + vTemplateSchema VARCHAR2(128); + vTemplateTable VARCHAR2(128); + vColExists NUMBER; + BEGIN + -- Build column list if not provided + IF pColumnList IS NULL THEN + -- Use template table for column order when provided + -- Template defines which columns to export and in what order + IF pTemplateTableName IS NOT NULL THEN + -- Parse template table name (SCHEMA.TABLE or just TABLE) + IF INSTR(pTemplateTableName, '.') > 0 THEN + vTemplateSchema := SUBSTR(pTemplateTableName, 1, INSTR(pTemplateTableName, '.') - 1); + vTemplateTable := SUBSTR(pTemplateTableName, INSTR(pTemplateTableName, '.') + 1); + ELSE + vTemplateSchema := pSchemaName; + vTemplateTable := pTemplateTableName; + END IF; + + -- Get columns from TEMPLATE table in template column order + -- Template defines target CSV structure (column order and which columns to include) + SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id) + INTO vAllCols + FROM all_tab_columns + WHERE table_name = vTemplateTable + AND owner = vTemplateSchema; + ELSE + -- Get columns from source table when no template + SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id) + INTO vAllCols + FROM all_tab_columns + WHERE table_name = pTableName + AND owner = pSchemaName; + END IF; + ELSE + vAllCols := pColumnList; + END IF; + + -- Process each column + vColumns := UPPER(REPLACE(vAllCols, ' ', '')); + vPos := 1; + vResult := ''; + + WHILE vPos <= LENGTH(vColumns) LOOP + vNextPos := INSTR(vColumns, ',', vPos); + IF vNextPos = 0 THEN + vNextPos := LENGTH(vColumns) + 1; + END IF; + + vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos); + + -- When using template table, check if column exists in SOURCE table + -- Template defines target structure, source provides data + -- Skip template columns that don't exist in source (except A_WORKFLOW_HISTORY_KEY) + IF pTemplateTableName IS NOT NULL THEN + -- Check if template column exists in SOURCE table + SELECT COUNT(*) INTO vColExists + FROM all_tab_columns + WHERE table_name = pTableName + AND column_name = vCurrentCol + AND owner = pSchemaName; + + -- Skip columns that don't exist in source table + -- Exception: A_WORKFLOW_HISTORY_KEY is virtual (mapped from pKeyColumnName) + IF vColExists = 0 AND UPPER(vCurrentCol) != 'A_WORKFLOW_HISTORY_KEY' THEN + vPos := vNextPos + 1; + CONTINUE; + END IF; + END IF; + + -- Get column data type from appropriate table (template or source) + IF pTemplateTableName IS NOT NULL THEN + -- Get data type from template table + SELECT data_type INTO vDataType + FROM all_tab_columns + WHERE table_name = vTemplateTable + AND column_name = vCurrentCol + AND owner = vTemplateSchema; + ELSE + -- Get data type from source table + SELECT data_type INTO vDataType + FROM all_tab_columns + WHERE table_name = pTableName + AND column_name = vCurrentCol + AND owner = pSchemaName; + END IF; + + -- Handle key column alias (template table has A_WORKFLOW_HISTORY_KEY, source table has pKeyColumnName) + IF UPPER(vCurrentCol) = 'A_WORKFLOW_HISTORY_KEY' THEN + vResult := vResult || CASE WHEN vResult IS NOT NULL THEN ', ' ELSE '' END || + 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY'; + + -- Convert DATE/TIMESTAMP columns to CHAR with specific format + ELSIF vDataType IN ('DATE', 'TIMESTAMP', 'TIMESTAMP WITH TIME ZONE', 'TIMESTAMP WITH LOCAL TIME ZONE') THEN + IF pTemplateTableName IS NOT NULL THEN + vDateFormat := CT_MRDS.FILE_MANAGER.GET_DATE_FORMAT( + pTemplateTableName => pTemplateTableName, + pColumnName => vCurrentCol + ); + ELSE + vDateFormat := ENV_MANAGER.gvDefaultDateFormat; + END IF; + vResult := vResult || CASE WHEN vResult IS NOT NULL THEN ', ' ELSE '' END || + 'TO_CHAR(T.' || vCurrentCol || ', ''' || vDateFormat || ''') AS ' || vCurrentCol; + + -- Other columns as-is with T. prefix + ELSE + vResult := vResult || CASE WHEN vResult IS NOT NULL THEN ', ' ELSE '' END || + 'T.' || vCurrentCol; + END IF; + + vPos := vNextPos + 1; + END LOOP; + + RETURN vResult; + END buildQueryWithDateFormats; + + ---------------------------------------------------------------------------------------------------- + -- Internal shared function to process column list with T. prefix and key column mapping FUNCTION processColumnList(pColumnList IN VARCHAR2, pTableName IN VARCHAR2, pSchemaName IN VARCHAR2, pKeyColumnName IN VARCHAR2) RETURN VARCHAR2 IS vResult VARCHAR2(32767); @@ -44,8 +230,6 @@ AS -- Check if this is the key column (e.g., A_ETL_LOAD_SET_KEY_FK) and add alias IF UPPER(vCurrentCol) = UPPER(pKeyColumnName) THEN vCurrentCol := 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY'; - ELSIF UPPER(vCurrentCol) = 'A_ETL_LOAD_SET_KEY' THEN - vCurrentCol := 'T.A_ETL_LOAD_SET_KEY AS A_WORKFLOW_HISTORY_KEY'; ELSE -- Add T. prefix if not already present IF INSTR(vCurrentCol, '.') = 0 THEN @@ -67,6 +251,351 @@ AS ---------------------------------------------------------------------------------------------------- + /** + * Validates table existence, key column existence, and column list + **/ + PROCEDURE VALIDATE_TABLE_AND_COLUMNS ( + pSchemaName IN VARCHAR2, + pTableName IN VARCHAR2, + pKeyColumnName IN VARCHAR2, + pColumnList IN VARCHAR2, + pParameters IN VARCHAR2 + ) IS + vCount INTEGER; + vColumns VARCHAR2(32767); + vPos PLS_INTEGER; + vNextPos PLS_INTEGER; + vCurrentCol VARCHAR2(128); + BEGIN + -- Check if table exists + SELECT COUNT(*) INTO vCount + FROM all_tables + WHERE table_name = pTableName + AND owner = pSchemaName; + + IF vCount = 0 THEN + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS); + END IF; + + -- Check if key column exists + SELECT COUNT(*) INTO vCount + FROM all_tab_columns + WHERE table_name = pTableName + AND column_name = pKeyColumnName + AND owner = pSchemaName; + + IF vCount = 0 THEN + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); + END IF; + + -- Validate pColumnList - check if all column names exist in the table + IF pColumnList IS NOT NULL THEN + vColumns := UPPER(REPLACE(pColumnList, ' ', '')); + vPos := 1; + + WHILE vPos <= LENGTH(vColumns) LOOP + vNextPos := INSTR(vColumns, ',', vPos); + IF vNextPos = 0 THEN + vNextPos := LENGTH(vColumns) + 1; + END IF; + + vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos); + + -- Remove table alias prefix if present + IF INSTR(vCurrentCol, '.') > 0 THEN + vCurrentCol := SUBSTR(vCurrentCol, INSTR(vCurrentCol, '.') + 1); + END IF; + + -- Check if column exists + SELECT COUNT(*) INTO vCount + FROM all_tab_columns + WHERE table_name = pTableName + AND column_name = vCurrentCol + AND owner = pSchemaName; + + IF vCount = 0 THEN + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); + END IF; + + vPos := vNextPos + 1; + END LOOP; + END IF; + END VALIDATE_TABLE_AND_COLUMNS; + + ---------------------------------------------------------------------------------------------------- + + /** + * Retrieves list of year/month partitions based on date range + **/ + FUNCTION GET_PARTITIONS ( + pSchemaName IN VARCHAR2, + pTableName IN VARCHAR2, + pKeyColumnName IN VARCHAR2, + pMinDate IN DATE, + pMaxDate IN DATE, + pParameters IN VARCHAR2 + ) RETURN partition_tab IS + vSql VARCHAR2(32000); + vPartitions partition_tab; + vKeyValuesYear DBMS_SQL.VARCHAR2_TABLE; + vKeyValuesMonth DBMS_SQL.VARCHAR2_TABLE; + vFullTableName VARCHAR2(200); + BEGIN + -- Build fully qualified table name if not already qualified + IF INSTR(pTableName, '.') > 0 THEN + vFullTableName := pTableName; -- Already fully qualified + ELSE + vFullTableName := pSchemaName || '.' || pTableName; + END IF; + + vSql := 'SELECT DISTINCT TO_CHAR(L.LOAD_START,''YYYY'') AS YR, TO_CHAR(L.LOAD_START,''MM'') AS MN + FROM ' || vFullTableName || ' T, CT_ODS.A_LOAD_HISTORY L + WHERE T.' || pKeyColumnName || ' = L.A_ETL_LOAD_SET_KEY + AND L.LOAD_START >= :pMinDate + AND L.LOAD_START < :pMaxDate + ORDER BY YR, MN'; + + ENV_MANAGER.LOG_PROCESS_EVENT('Executing date range query: ' || vSql, 'DEBUG', pParameters); + EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValuesYear, vKeyValuesMonth USING pMinDate, pMaxDate; + + ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValuesYear.COUNT || ' year/month combinations to export', 'DEBUG', pParameters); + + -- Convert to partition_tab + vPartitions := partition_tab(); + vPartitions.EXTEND(vKeyValuesYear.COUNT); + FOR i IN 1 .. vKeyValuesYear.COUNT LOOP + vPartitions(i).year := vKeyValuesYear(i); + vPartitions(i).month := vKeyValuesMonth(i); + END LOOP; + + RETURN vPartitions; + END GET_PARTITIONS; + + ---------------------------------------------------------------------------------------------------- + + /** + * Exports single partition (year/month) to specified format (PARQUET or CSV) + * This is the core worker procedure that will be used for parallel processing in v2.3.0 + **/ + PROCEDURE EXPORT_SINGLE_PARTITION ( + pSchemaName IN VARCHAR2, + pTableName IN VARCHAR2, + pKeyColumnName IN VARCHAR2, + pYear IN VARCHAR2, + pMonth IN VARCHAR2, + pBucketUri IN VARCHAR2, + pFolderName IN VARCHAR2, + pProcessedColumns IN VARCHAR2, + pMinDate IN DATE, + pMaxDate IN DATE, + pCredentialName IN VARCHAR2, + pFormat IN VARCHAR2 DEFAULT 'PARQUET', + pFileBaseName IN VARCHAR2 DEFAULT NULL, + pMaxFileSize IN NUMBER DEFAULT 104857600, + pParameters IN VARCHAR2 + ) IS + vQuery VARCHAR2(32767); + vUri VARCHAR2(4000); + vFileName VARCHAR2(1000); + vFullTableName VARCHAR2(200); + BEGIN + -- Build fully qualified table name if not already qualified + IF INSTR(pTableName, '.') > 0 THEN + vFullTableName := pTableName; -- Already fully qualified + ELSE + vFullTableName := pSchemaName || '.' || pTableName; + END IF; + + -- Construct the query to extract data for the current year/month + vQuery := 'SELECT ' || pProcessedColumns || ' + FROM ' || vFullTableName || ' T, CT_ODS.A_LOAD_HISTORY L + WHERE T.' || pKeyColumnName || ' = L.A_ETL_LOAD_SET_KEY + AND TO_CHAR(L.LOAD_START,''YYYY'') = ' || CHR(39) || pYear || CHR(39) || ' + AND TO_CHAR(L.LOAD_START,''MM'') = ' || CHR(39) || pMonth || CHR(39) || ' + AND L.LOAD_START >= TO_DATE(' || CHR(39) || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'') + AND L.LOAD_START < TO_DATE(' || CHR(39) || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')'; + + -- Construct the URI based on format + IF pFormat = 'PARQUET' THEN + -- Parquet: Use Hive-style partitioning + -- Note: maxfilesize is NOT supported for Parquet format (Oracle limitation) + vUri := pBucketUri || + CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || + 'PARTITION_YEAR=' || sanitizeFilename(pYear) || '/' || + 'PARTITION_MONTH=' || sanitizeFilename(pMonth) || '/' || + sanitizeFilename(pYear) || sanitizeFilename(pMonth) || '.parquet'; + + ENV_MANAGER.LOG_PROCESS_EVENT('Parquet export URI: ' || vUri, 'DEBUG', pParameters); + + -- Delete potentially corrupted file from previous failed attempt + -- This prevents Oracle from creating _1 suffixed files on retry + DELETE_FAILED_EXPORT_FILE(vUri, pCredentialName, pParameters); + + DBMS_CLOUD.EXPORT_DATA( + credential_name => pCredentialName, + file_uri_list => vUri, + query => vQuery, + format => json_object('type' VALUE 'parquet') + ); + ELSIF pFormat = 'CSV' THEN + -- CSV: Flat file structure with year/month in filename + vFileName := NVL(pFileBaseName, UPPER(pTableName)) || '_' || pYear || pMonth || '.csv'; + vUri := pBucketUri || + CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || + sanitizeFilename(vFileName); + + ENV_MANAGER.LOG_PROCESS_EVENT('CSV export URI: ' || vUri, 'DEBUG', pParameters); + + -- Delete potentially corrupted file from previous failed attempt + -- This prevents Oracle from creating _1 suffixed files on retry + DELETE_FAILED_EXPORT_FILE(vUri, pCredentialName, pParameters); + + -- Use json_object() for CSV export with maxfilesize in bytes (Oracle requirement) + -- Oracle maxfilesize: min 10MB (10485760), max 1GB (1073741824), default 10MB + -- NOTE: maxfilesize must be NUMBER (bytes), not string like '1000M' + -- Using 100MB (104857600) to avoid PGA memory issues with large files + DBMS_CLOUD.EXPORT_DATA( + credential_name => pCredentialName, + file_uri_list => vUri, + query => vQuery, + format => json_object( + 'type' VALUE 'CSV', + 'header' VALUE true, + 'quote' VALUE CHR(34), + 'delimiter' VALUE ',', + 'escape' VALUE true, + 'recorddelimiter' VALUE CHR(13)||CHR(10), -- CRLF dla Windows + 'maxfilesize' VALUE pMaxFileSize -- Dynamic maxfilesize in bytes (e.g., 104857600 = 100MB) + ) + ); + ELSE + RAISE_APPLICATION_ERROR(-20001, 'Unsupported format: ' || pFormat || '. Use PARQUET or CSV.'); + END IF; + + ENV_MANAGER.LOG_PROCESS_EVENT('Processing Year/Month: ' || pYear || '/' || pMonth || ' (Format: ' || pFormat || ')', 'DEBUG', pParameters); + ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', pParameters); + END EXPORT_SINGLE_PARTITION; + + ---------------------------------------------------------------------------------------------------- + + /** + * Callback procedure for DBMS_PARALLEL_EXECUTE + * Processes single partition (year/month) chunk in parallel task + * Called by DBMS_PARALLEL_EXECUTE framework for each chunk + **/ + PROCEDURE EXPORT_PARTITION_PARALLEL ( + pStartId IN NUMBER, + pEndId IN NUMBER + ) IS + vYear VARCHAR2(4); + vMonth VARCHAR2(2); + vSchemaName VARCHAR2(128); + vTableName VARCHAR2(128); + vKeyColumnName VARCHAR2(128); + vBucketUri VARCHAR2(4000); + vFolderName VARCHAR2(1000); + vProcessedColumns VARCHAR2(32767); + vMinDate DATE; + vMaxDate DATE; + vCredentialName VARCHAR2(200); + vFormat VARCHAR2(20); + vFileBaseName VARCHAR2(1000); + vMaxFileSize NUMBER; + vParameters VARCHAR2(4000); + BEGIN + -- Retrieve chunk context from global temporary table + SELECT + YEAR_VALUE, + MONTH_VALUE, + SCHEMA_NAME, + TABLE_NAME, + KEY_COLUMN_NAME, + BUCKET_URI, + FOLDER_NAME, + PROCESSED_COLUMNS, + MIN_DATE, + MAX_DATE, + CREDENTIAL_NAME, + FORMAT_TYPE, + FILE_BASE_NAME, + MAX_FILE_SIZE + INTO + vYear, + vMonth, + vSchemaName, + vTableName, + vKeyColumnName, + vBucketUri, + vFolderName, + vProcessedColumns, + vMinDate, + vMaxDate, + vCredentialName, + vFormat, + vFileBaseName, + vMaxFileSize + FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS + WHERE CHUNK_ID = pStartId; + + vParameters := 'Parallel task - Year: ' || vYear || ', Month: ' || vMonth || ', ChunkID: ' || pStartId; + ENV_MANAGER.LOG_PROCESS_EVENT('Starting parallel export for partition ' || vYear || '/' || vMonth, 'DEBUG', vParameters); + + -- Mark chunk as PROCESSING + UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS + SET STATUS = 'PROCESSING', + ERROR_MESSAGE = NULL + WHERE CHUNK_ID = pStartId; + COMMIT; + + -- Call the worker procedure + EXPORT_SINGLE_PARTITION( + pSchemaName => vSchemaName, + pTableName => vTableName, + pKeyColumnName => vKeyColumnName, + pYear => vYear, + pMonth => vMonth, + pBucketUri => vBucketUri, + pFolderName => vFolderName, + pProcessedColumns => vProcessedColumns, + pMinDate => vMinDate, + pMaxDate => vMaxDate, + pCredentialName => vCredentialName, + pFormat => vFormat, + pFileBaseName => vFileBaseName, + pMaxFileSize => vMaxFileSize, + pParameters => vParameters + ); + + -- Mark chunk as COMPLETED + UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS + SET STATUS = 'COMPLETED', + EXPORT_TIMESTAMP = SYSTIMESTAMP, + ERROR_MESSAGE = NULL + WHERE CHUNK_ID = pStartId; + COMMIT; + + ENV_MANAGER.LOG_PROCESS_EVENT('Completed parallel export for partition ' || vYear || '/' || vMonth, 'DEBUG', vParameters); + EXCEPTION + WHEN OTHERS THEN + -- Capture error details in variable (SQLERRM cannot be used directly in SQL) + vgMsgTmp := 'Parallel task error for partition ' || vYear || '/' || vMonth || ' (ChunkID: ' || pStartId || '): ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + + -- Mark chunk as FAILED with error message + -- Use vgMsgTmp variable instead of SQLERRM directly (Oracle limitation in SQL context) + UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS + SET STATUS = 'FAILED', + ERROR_MESSAGE = SUBSTR(vgMsgTmp, 1, 4000) + WHERE CHUNK_ID = pStartId; + COMMIT; + + RAISE; + END EXPORT_PARTITION_PARALLEL; + + ---------------------------------------------------------------------------------------------------- + -- MAIN EXPORT PROCEDURES + ---------------------------------------------------------------------------------------------------- + PROCEDURE EXPORT_TABLE_DATA ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, @@ -94,16 +623,6 @@ AS vCurrentCol VARCHAR2(128); vAllColumnsList VARCHAR2(32767); - - -- Function to sanitize file names - FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS - vFilename VARCHAR2(1000); - BEGIN - -- Replace any disallowed characters with underscores - vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_'); - RETURN vFilename; - END sanitizeFilename; - BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||'''' ,'pTableName => '''||nvl(pTableName, 'NULL')||'''' @@ -151,18 +670,17 @@ AS AND column_name = vKeyColumnName AND owner = vSchemaName; - -- Build list of all columns for the table (excluding key column to avoid duplication) + -- Build list of all columns for the table (including key column for aliasing) SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id) INTO vAllColumnsList FROM all_tab_columns WHERE table_name = vTableName - AND owner = vSchemaName - AND column_name != vKeyColumnName; + AND owner = vSchemaName; - -- Process column list to add T. prefix to each column + -- Process column list to add T. prefix and alias key column as A_WORKFLOW_HISTORY_KEY vProcessedColumnList := processColumnList(vAllColumnsList, vTableName, vSchemaName, vKeyColumnName); - ENV_MANAGER.LOG_PROCESS_EVENT('Dynamic column list built (excluding key): ' || vAllColumnsList, 'DEBUG', vParameters); + ENV_MANAGER.LOG_PROCESS_EVENT('Dynamic column list built: ' || vAllColumnsList, 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with T. prefix: ' || vProcessedColumnList, 'DEBUG', vParameters); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName); @@ -241,30 +759,19 @@ AS ---------------------------------------------------------------------------------------------------- PROCEDURE EXPORT_TABLE_DATA_BY_DATE ( - pSchemaName IN VARCHAR2, - pTableName IN VARCHAR2, - pKeyColumnName IN VARCHAR2, - pBucketArea IN VARCHAR2, - pFolderName IN VARCHAR2, - pColumnList IN VARCHAR2 default NULL, - pMinDate IN DATE default DATE '1900-01-01', - pMaxDate IN DATE default SYSDATE, - pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName + pSchemaName IN VARCHAR2, + pTableName IN VARCHAR2, + pKeyColumnName IN VARCHAR2, + pBucketArea IN VARCHAR2, + pFolderName IN VARCHAR2, + pColumnList IN VARCHAR2 default NULL, + pMinDate IN DATE default DATE '1900-01-01', + pMaxDate IN DATE default SYSDATE, + pParallelDegree IN NUMBER default 1, + pTemplateTableName IN VARCHAR2 default NULL, + pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName ) IS - -- Type definition for key values - TYPE key_value_tab IS TABLE OF VARCHAR2(4000); - - vKeyValuesYear key_value_tab; - vKeyValuesMonth key_value_tab; - - vCount INTEGER; - vSql VARCHAR2(32000); - vKeyValueYear VARCHAR2(4000); - vKeyValueMonth VARCHAR2(4000); - vQuery VARCHAR2(32767); - vUri VARCHAR2(4000); - vDataType VARCHAR2(30); vTableName VARCHAR2(128); vSchemaName VARCHAR2(128); vKeyColumnName VARCHAR2(128); @@ -272,15 +779,7 @@ AS vProcessedColumnList VARCHAR2(32767); vBucketUri VARCHAR2(4000); vCurrentCol VARCHAR2(128); - - -- Function to sanitize file names - FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS - vFilename VARCHAR2(1000); - BEGIN - -- Replace any disallowed characters with underscores - vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_'); - RETURN vFilename; - END sanitizeFilename; + vPartitions partition_tab; BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||'''' @@ -291,6 +790,8 @@ AS ,'pColumnList => '''||nvl(pColumnList, 'NULL')||'''' ,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' ,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' + ,'pParallelDegree => '''||nvl(TO_CHAR(pParallelDegree), 'NULL')||'''' + ,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||'''' ,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||'''' )); ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); @@ -303,123 +804,172 @@ AS vSchemaName := UPPER(pSchemaName); vKeyColumnName := UPPER(pKeyColumnName); - -- Check if table exists - SELECT COUNT(*) INTO vCount - FROM all_tables - WHERE table_name = vTableName - AND owner = vSchemaName; + -- Validate table, key column, and column list using shared procedure + VALIDATE_TABLE_AND_COLUMNS(vSchemaName, vTableName, vKeyColumnName, pColumnList, vParameters); - IF vCount = 0 THEN - RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS); - END IF; - - -- Check if key column exists - SELECT COUNT(*) INTO vCount - FROM all_tab_columns - WHERE table_name = vTableName - AND column_name = vKeyColumnName - AND owner = vSchemaName; - - IF vCount = 0 THEN - RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); - END IF; - - -- Validate pColumnList - check if all column names exist in the table - IF pColumnList IS NOT NULL THEN - DECLARE - vColumnName VARCHAR2(128); - vColumns VARCHAR2(32767); - vPos PLS_INTEGER; - vNextPos PLS_INTEGER; - vCurrentCol VARCHAR2(128); - BEGIN - -- Remove spaces and convert to uppercase for processing - vColumns := UPPER(REPLACE(pColumnList, ' ', '')); - vPos := 1; - - -- Parse comma-separated column list - WHILE vPos <= LENGTH(vColumns) LOOP - vNextPos := INSTR(vColumns, ',', vPos); - IF vNextPos = 0 THEN - vNextPos := LENGTH(vColumns) + 1; - END IF; - - vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos); - - -- Remove table alias prefix if present (e.g., 'T.COLUMN_NAME' -> 'COLUMN_NAME') - IF INSTR(vCurrentCol, '.') > 0 THEN - vCurrentCol := SUBSTR(vCurrentCol, INSTR(vCurrentCol, '.') + 1); - END IF; - - -- Check if column exists in the table - SELECT COUNT(*) INTO vCount - FROM all_tab_columns - WHERE table_name = vTableName - AND column_name = vCurrentCol - AND owner = vSchemaName; - - IF vCount = 0 THEN - RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); - END IF; - - vPos := vNextPos + 1; - END LOOP; - END; - END IF; - - -- Process column list to add T. prefix to each column - vProcessedColumnList := processColumnList(pColumnList, vTableName, vSchemaName, vKeyColumnName); + -- Build query with TO_CHAR for date columns (per-column format support) + vProcessedColumnList := buildQueryWithDateFormats(pColumnList, vTableName, vSchemaName, vKeyColumnName, pTemplateTableName); ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (building dynamic list from table metadata)'), 'DEBUG', vParameters); - ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list: ' || vProcessedColumnList, 'DEBUG', vParameters); + ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with TO_CHAR for date columns: ' || vProcessedColumnList, 'DEBUG', vParameters); + ENV_MANAGER.LOG_PROCESS_EVENT('Template table: ' || NVL(pTemplateTableName, 'NULL - using global default for all dates'), 'INFO', vParameters); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName); - -- Fetch unique key values - vSql := 'SELECT DISTINCT TO_CHAR(L.LOAD_START,''YYYY'') AS YR, TO_CHAR(L.LOAD_START,''MM'') AS MN - FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L - WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY - AND L.LOAD_START >= :pMinDate - AND L.LOAD_START < :pMaxDate - ' ; - ENV_MANAGER.LOG_PROCESS_EVENT('Executing date range query: ' || vSql, 'DEBUG', vParameters); - EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValuesYear, vKeyValuesMonth USING pMinDate, pMaxDate; - ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValuesYear.COUNT || ' year/month combinations to export', 'DEBUG', vParameters); + + -- Validate parallel degree parameter + IF pParallelDegree < 1 OR pParallelDegree > 16 THEN + vgMsgTmp := ENV_MANAGER.MSG_INVALID_PARALLEL_DEGREE || ': ' || pParallelDegree || '. Valid range: 1-16'; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp); + END IF; - -- Loop over each unique key value - FOR i IN 1 .. vKeyValuesYear.COUNT LOOP - vKeyValueYear := vKeyValuesYear(i); - vKeyValueMonth := vKeyValuesMonth(i); - - ENV_MANAGER.LOG_PROCESS_EVENT('Processing Year/Month: ' || vKeyValueYear || '/' || vKeyValueMonth || ' (' || i || '/' || vKeyValuesYear.COUNT || ')', 'DEBUG', vParameters); - -- Construct the query to extract data for the current key value - -- Note: processColumnList already handles A_WORKFLOW_HISTORY_KEY aliasing + -- Get partitions using shared function + vPartitions := GET_PARTITIONS(vSchemaName, vTableName, vKeyColumnName, pMinDate, pMaxDate, vParameters); - vQuery := 'SELECT ' || vProcessedColumnList || ' - FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L - WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY - AND TO_CHAR(L.LOAD_START,''YYYY'') = ' || CHR(39) || vKeyValueYear || CHR(39) || ' - AND TO_CHAR(L.LOAD_START,''MM'') = ' || CHR(39) || vKeyValueMonth || CHR(39) || ' - AND L.LOAD_START >= TO_DATE(' || CHR(39) || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'') - AND L.LOAD_START < TO_DATE(' || CHR(39) || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')'; + ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vPartitions.COUNT || ' partitions to export with parallel degree ' || pParallelDegree, 'INFO', vParameters); - -- Construct the URI for the file in OCI Object Storage - vUri := vBucketUri || - CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || - 'PARTITION_YEAR=' || sanitizeFilename(vKeyValueYear) || '/' || - 'PARTITION_MONTH=' || sanitizeFilename(vKeyValueMonth) || '/' || - sanitizeFilename(vKeyValueYear) || sanitizeFilename(vKeyValueMonth) || '.parquet'; - - ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', vParameters); - ENV_MANAGER.LOG_PROCESS_EVENT('Parquet export URI: ' || vUri, 'DEBUG', vParameters); - - -- Use DBMS_CLOUD package to export data to the URI - DBMS_CLOUD.EXPORT_DATA( - credential_name => pCredentialName, - file_uri_list => vUri, - query => vQuery, - format => json_object('type' VALUE 'parquet') - ); - END LOOP; + -- Sequential processing (parallel degree = 1) + IF pParallelDegree = 1 THEN + ENV_MANAGER.LOG_PROCESS_EVENT('Using sequential processing (pParallelDegree = 1)', 'DEBUG', vParameters); + + FOR i IN 1 .. vPartitions.COUNT LOOP + EXPORT_SINGLE_PARTITION( + pSchemaName => vSchemaName, + pTableName => vTableName, + pKeyColumnName => vKeyColumnName, + pYear => vPartitions(i).year, + pMonth => vPartitions(i).month, + pBucketUri => vBucketUri, + pFolderName => pFolderName, + pProcessedColumns => vProcessedColumnList, + pMinDate => pMinDate, + pMaxDate => pMaxDate, + pCredentialName => pCredentialName, + pFormat => 'PARQUET', + pFileBaseName => NULL, + pMaxFileSize => 104857600, + pParameters => vParameters + ); + END LOOP; + + -- Parallel processing (parallel degree > 1) + ELSE + -- Skip parallel processing if no partitions found + IF vPartitions.COUNT = 0 THEN + ENV_MANAGER.LOG_PROCESS_EVENT('No partitions to export - skipping parallel processing', 'INFO', vParameters); + ELSE + DECLARE + vTaskName VARCHAR2(128) := 'DATA_EXPORT_TASK_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF'); + vChunkId NUMBER; + BEGIN + ENV_MANAGER.LOG_PROCESS_EVENT('Using parallel processing with ' || pParallelDegree || ' threads', 'INFO', vParameters); + + -- Clean up old completed chunks (>24 hours) to prevent table bloat + -- CRITICAL: Do NOT delete chunks from other active sessions (same-day tasks) + -- This prevents race conditions when multiple exports run simultaneously + DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS + WHERE STATUS = 'COMPLETED' + AND CREATED_DATE < SYSTIMESTAMP - INTERVAL '1' DAY; + COMMIT; + + ENV_MANAGER.LOG_PROCESS_EVENT('Cleared old COMPLETED chunks (>24h). Active session chunks preserved.', 'DEBUG', vParameters); + -- This prevents re-exporting successfully completed partitions + DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'COMPLETED'; + COMMIT; + + ENV_MANAGER.LOG_PROCESS_EVENT('Cleared COMPLETED chunks. FAILED chunks retained for retry.', 'DEBUG', vParameters); + + -- Populate chunks table (insert new chunks, preserve FAILED chunks for retry) + FOR i IN 1 .. vPartitions.COUNT LOOP + MERGE INTO CT_MRDS.A_PARALLEL_EXPORT_CHUNKS t + USING (SELECT i AS chunk_id, vPartitions(i).year AS yr, vPartitions(i).month AS mn FROM DUAL) s + ON (t.CHUNK_ID = s.chunk_id) + WHEN NOT MATCHED THEN + INSERT (CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME, + BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE, + CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE, STATUS) + VALUES (i, vTaskName, vPartitions(i).year, vPartitions(i).month, vSchemaName, vTableName, vKeyColumnName, + vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate, + pCredentialName, 'PARQUET', NULL, pTemplateTableName, 104857600, 'PENDING') + WHEN MATCHED THEN + UPDATE SET TASK_NAME = vTaskName, + STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END, + ERROR_MESSAGE = CASE WHEN t.STATUS = 'FAILED' THEN NULL ELSE t.ERROR_MESSAGE END; + END LOOP; + COMMIT; + + -- Log chunk statistics + DECLARE + vPendingCount NUMBER; + vFailedCount NUMBER; + BEGIN + SELECT COUNT(*) INTO vPendingCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'PENDING'; + SELECT COUNT(*) INTO vFailedCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'FAILED'; + + ENV_MANAGER.LOG_PROCESS_EVENT('Chunk statistics: PENDING=' || vPendingCount || ', FAILED (retry)=' || vFailedCount, 'INFO', vParameters); + END; + + -- Create parallel task + DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => vTaskName); + + -- Define chunks by number range (1 to partition count) + DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_NUMBER_COL( + task_name => vTaskName, + table_owner => 'CT_MRDS', + table_name => 'A_PARALLEL_EXPORT_CHUNKS', + table_column => 'CHUNK_ID', + chunk_size => 1 -- Each partition is one chunk + ); + + -- Execute task in parallel + ENV_MANAGER.LOG_PROCESS_EVENT('Executing parallel task: ' || vTaskName, 'DEBUG', vParameters); + + DBMS_PARALLEL_EXECUTE.RUN_TASK( + task_name => vTaskName, + sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id); END;', + language_flag => DBMS_SQL.NATIVE, + parallel_level => pParallelDegree + ); + + -- Check for errors + DECLARE + vErrorCount NUMBER; + BEGIN + SELECT COUNT(*) INTO vErrorCount + FROM USER_PARALLEL_EXECUTE_CHUNKS + WHERE task_name = vTaskName AND status = 'PROCESSED_WITH_ERROR'; + + IF vErrorCount > 0 THEN + vgMsgTmp := 'Parallel execution completed with ' || vErrorCount || ' errors. Check USER_PARALLEL_EXECUTE_CHUNKS for details.'; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); + END IF; + END; + + -- Clean up task + DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName); + + -- Clean up chunks for THIS specific task only (session-safe) + -- CRITICAL: Use TASK_NAME filter to avoid deleting chunks from other active sessions + DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = vTaskName; + COMMIT; + + ENV_MANAGER.LOG_PROCESS_EVENT('Parallel execution completed successfully', 'INFO', vParameters); + EXCEPTION + WHEN OTHERS THEN + -- Attempt to drop task on error + BEGIN + DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName); + EXCEPTION + WHEN OTHERS THEN NULL; -- Ignore drop errors + END; + + vgMsgTmp := ENV_MANAGER.MSG_PARALLEL_EXECUTION_FAILED || ': ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); + END; + END IF; + END IF; ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION @@ -431,6 +981,10 @@ AS vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp); + WHEN ENV_MANAGER.ERR_INVALID_PARALLEL_DEGREE THEN + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp); + WHEN ENV_MANAGER.ERR_PARALLEL_EXECUTION_FAILED THEN + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); WHEN OTHERS THEN -- Log complete error details including full stack trace and backtrace ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER'); @@ -466,31 +1020,21 @@ AS * end; **/ PROCEDURE EXPORT_TABLE_DATA_TO_CSV_BY_DATE ( - pSchemaName IN VARCHAR2, - pTableName IN VARCHAR2, - pKeyColumnName IN VARCHAR2, - pBucketArea IN VARCHAR2, - pFolderName IN VARCHAR2, - pFileName IN VARCHAR2 DEFAULT NULL, - pColumnList IN VARCHAR2 default NULL, - pMinDate IN DATE default DATE '1900-01-01', - pMaxDate IN DATE default SYSDATE, - pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName + pSchemaName IN VARCHAR2, + pTableName IN VARCHAR2, + pKeyColumnName IN VARCHAR2, + pBucketArea IN VARCHAR2, + pFolderName IN VARCHAR2, + pFileName IN VARCHAR2 DEFAULT NULL, + pColumnList IN VARCHAR2 default NULL, + pMinDate IN DATE default DATE '1900-01-01', + pMaxDate IN DATE default SYSDATE, + pParallelDegree IN NUMBER default 1, + pTemplateTableName IN VARCHAR2 default NULL, + pMaxFileSize IN NUMBER default 104857600, + pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName ) IS - -- Type definition for key values - TYPE key_value_tab IS TABLE OF VARCHAR2(4000); - - vKeyValuesYear key_value_tab; - vKeyValuesMonth key_value_tab; - - vCount INTEGER; - vSql VARCHAR2(4000); - vKeyValueYear VARCHAR2(4000); - vKeyValueMonth VARCHAR2(4000); - vQuery VARCHAR2(32767); - vUri VARCHAR2(4000); - vDataType VARCHAR2(30); vTableName VARCHAR2(128); vSchemaName VARCHAR2(128); vKeyColumnName VARCHAR2(128); @@ -500,15 +1044,7 @@ AS vProcessedColumnList VARCHAR2(32767); vBucketUri VARCHAR2(4000); vCurrentCol VARCHAR2(128); - - -- Function to sanitize file names - FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS - vFilename VARCHAR2(1000); - BEGIN - -- Replace any disallowed characters with underscores - vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_'); - RETURN vFilename; - END sanitizeFilename; + vPartitions partition_tab; BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||'''' @@ -520,6 +1056,9 @@ AS ,'pColumnList => '''||nvl(pColumnList, 'NULL')||'''' ,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' ,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' + ,'pParallelDegree => '''||nvl(TO_CHAR(pParallelDegree), 'NULL')||'''' + ,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||'''' + ,'pMaxFileSize => '''||nvl(TO_CHAR(pMaxFileSize), 'NULL')||'''' ,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||'''' )); ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); @@ -543,139 +1082,176 @@ AS vFileExtension := '.csv'; END IF; ELSE - -- Construct default filename: TABLENAME.csv (without date range) + -- Construct default filename: TABLENAME (without extension, will be added by worker) vFileBaseName := UPPER(pTableName); vFileExtension := '.csv'; END IF; - -- Check if table exists - SELECT COUNT(*) INTO vCount - FROM all_tables - WHERE table_name = vTableName - AND owner = vSchemaName; + -- Validate table, key column, and column list using shared procedure + VALIDATE_TABLE_AND_COLUMNS(vSchemaName, vTableName, vKeyColumnName, pColumnList, vParameters); - IF vCount = 0 THEN - RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS); - END IF; - - -- Check if key column exists - SELECT COUNT(*) INTO vCount - FROM all_tab_columns - WHERE table_name = vTableName - AND column_name = vKeyColumnName - AND owner = vSchemaName; - - IF vCount = 0 THEN - RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); - END IF; - - -- Validate pColumnList - check if all column names exist in the table - IF pColumnList IS NOT NULL THEN - DECLARE - vColumnName VARCHAR2(128); - vColumns VARCHAR2(32767); - vPos PLS_INTEGER; - vNextPos PLS_INTEGER; - vCurrentCol VARCHAR2(128); - BEGIN - -- Remove spaces and convert to uppercase for processing - vColumns := UPPER(REPLACE(pColumnList, ' ', '')); - vPos := 1; - - -- Parse comma-separated column list - WHILE vPos <= LENGTH(vColumns) LOOP - vNextPos := INSTR(vColumns, ',', vPos); - IF vNextPos = 0 THEN - vNextPos := LENGTH(vColumns) + 1; - END IF; - - vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos); - - -- Remove table alias prefix if present (e.g., 'T.COLUMN_NAME' -> 'COLUMN_NAME') - IF INSTR(vCurrentCol, '.') > 0 THEN - vCurrentCol := SUBSTR(vCurrentCol, INSTR(vCurrentCol, '.') + 1); - END IF; - - -- Check if column exists in the table - SELECT COUNT(*) INTO vCount - FROM all_tab_columns - WHERE table_name = vTableName - AND column_name = vCurrentCol - AND owner = vSchemaName; - - IF vCount = 0 THEN - RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); - END IF; - - vPos := vNextPos + 1; - END LOOP; - END; - END IF; - - -- Process column list to add T. prefix to each column - vProcessedColumnList := processColumnList(pColumnList, vTableName, vSchemaName, vKeyColumnName); + -- Build query with TO_CHAR for date columns (per-column format support) + vProcessedColumnList := buildQueryWithDateFormats(pColumnList, vTableName, vSchemaName, vKeyColumnName, pTemplateTableName); ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (using dynamic column list)'), 'DEBUG', vParameters); - ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list: ' || vProcessedColumnList, 'DEBUG', vParameters); - - -- Get the data type of the key column - SELECT data_type INTO vDataType - FROM all_tab_columns - WHERE table_name = vTableName - AND column_name = vKeyColumnName - AND owner = vSchemaName; + ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with TO_CHAR for date columns: ' || vProcessedColumnList, 'DEBUG', vParameters); + ENV_MANAGER.LOG_PROCESS_EVENT('Template table: ' || NVL(pTemplateTableName, 'NULL - using global default for all dates'), 'INFO', vParameters); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName); - -- Fetch unique year/month combinations - vSql := 'SELECT DISTINCT TO_CHAR(L.LOAD_START,''YYYY'') AS YR, TO_CHAR(L.LOAD_START,''MM'') AS MN - FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L - WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY - AND L.LOAD_START >= :pMinDate - AND L.LOAD_START < :pMaxDate - ' ; - ENV_MANAGER.LOG_PROCESS_EVENT('Executing date range query: ' || vSql, 'DEBUG', vParameters); - EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValuesYear, vKeyValuesMonth USING pMinDate, pMaxDate; + -- Validate parallel degree parameter + IF pParallelDegree < 1 OR pParallelDegree > 16 THEN + vgMsgTmp := ENV_MANAGER.MSG_INVALID_PARALLEL_DEGREE || ': ' || pParallelDegree || '. Valid range: 1-16'; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp); + END IF; - ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValuesYear.COUNT || ' year/month combinations to export', 'INFO', vParameters); + -- Get partitions using shared function + vPartitions := GET_PARTITIONS(vSchemaName, vTableName, vKeyColumnName, pMinDate, pMaxDate, vParameters); + + ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vPartitions.COUNT || ' year/month combinations to export', 'INFO', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Date range: ' || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || ' to ' || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'DEBUG', vParameters); + ENV_MANAGER.LOG_PROCESS_EVENT('Parallel degree: ' || pParallelDegree, 'INFO', vParameters); - -- Loop over each unique year/month combination - FOR i IN 1 .. vKeyValuesYear.COUNT LOOP - vKeyValueYear := vKeyValuesYear(i); - vKeyValueMonth := vKeyValuesMonth(i); + -- Sequential processing (parallel degree = 1) + IF pParallelDegree = 1 THEN + ENV_MANAGER.LOG_PROCESS_EVENT('Using sequential processing (pParallelDegree = 1)', 'DEBUG', vParameters); + + FOR i IN 1 .. vPartitions.COUNT LOOP + EXPORT_SINGLE_PARTITION( + pSchemaName => vSchemaName, + pTableName => vTableName, + pKeyColumnName => vKeyColumnName, + pYear => vPartitions(i).year, + pMonth => vPartitions(i).month, + pBucketUri => vBucketUri, + pFolderName => pFolderName, + pProcessedColumns => vProcessedColumnList, + pMinDate => pMinDate, + pMaxDate => pMaxDate, + pCredentialName => pCredentialName, + pFormat => 'CSV', + pFileBaseName => vFileBaseName, + pMaxFileSize => pMaxFileSize, + pParameters => vParameters + ); + END LOOP; + + -- Parallel processing (parallel degree > 1) + ELSE + -- Skip parallel processing if no partitions found + IF vPartitions.COUNT = 0 THEN + ENV_MANAGER.LOG_PROCESS_EVENT('No partitions to export - skipping parallel CSV processing', 'INFO', vParameters); + ELSE + DECLARE + vTaskName VARCHAR2(128) := 'DATA_CSV_EXPORT_TASK_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF'); + vChunkId NUMBER; + BEGIN + ENV_MANAGER.LOG_PROCESS_EVENT('Using parallel processing with ' || pParallelDegree || ' threads', 'INFO', vParameters); + + -- Clean up old completed chunks (>24 hours) to prevent table bloat + -- CRITICAL: Do NOT delete chunks from other active sessions (same-day tasks) + -- This prevents race conditions when multiple CSV exports run simultaneously + DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS + WHERE STATUS = 'COMPLETED' + AND CREATED_DATE < SYSTIMESTAMP - INTERVAL '1' DAY; + COMMIT; + + ENV_MANAGER.LOG_PROCESS_EVENT('Cleared old COMPLETED chunks (>24h). Active session chunks preserved.', 'DEBUG', vParameters); + + -- Populate chunks table (insert new chunks, preserve FAILED chunks for retry) + FOR i IN 1 .. vPartitions.COUNT LOOP + MERGE INTO CT_MRDS.A_PARALLEL_EXPORT_CHUNKS t + USING (SELECT i AS chunk_id, vPartitions(i).year AS yr, vPartitions(i).month AS mn FROM DUAL) s + ON (t.CHUNK_ID = s.chunk_id) + WHEN NOT MATCHED THEN + INSERT (CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME, + BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE, + CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE, STATUS) + VALUES (i, vTaskName, vPartitions(i).year, vPartitions(i).month, vSchemaName, vTableName, vKeyColumnName, + vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate, + pCredentialName, 'CSV', vFileBaseName, pTemplateTableName, pMaxFileSize, 'PENDING') + WHEN MATCHED THEN + UPDATE SET TASK_NAME = vTaskName, + STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END, + ERROR_MESSAGE = CASE WHEN t.STATUS = 'FAILED' THEN NULL ELSE t.ERROR_MESSAGE END; + END LOOP; + COMMIT; + + -- Log chunk statistics + DECLARE + vPendingCount NUMBER; + vFailedCount NUMBER; + BEGIN + SELECT COUNT(*) INTO vPendingCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'PENDING'; + SELECT COUNT(*) INTO vFailedCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'FAILED'; + + ENV_MANAGER.LOG_PROCESS_EVENT('Chunk statistics: PENDING=' || vPendingCount || ', FAILED (retry)=' || vFailedCount, 'INFO', vParameters); + END; + + -- Create parallel task + DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => vTaskName); + + -- Define chunks by number range (1 to partition count) + DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_NUMBER_COL( + task_name => vTaskName, + table_owner => 'CT_MRDS', + table_name => 'A_PARALLEL_EXPORT_CHUNKS', + table_column => 'CHUNK_ID', + chunk_size => 1 -- Each partition is one chunk + ); + + -- Execute task in parallel + ENV_MANAGER.LOG_PROCESS_EVENT('Executing parallel CSV export task: ' || vTaskName, 'DEBUG', vParameters); + + DBMS_PARALLEL_EXECUTE.RUN_TASK( + task_name => vTaskName, + sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id); END;', + language_flag => DBMS_SQL.NATIVE, + parallel_level => pParallelDegree + ); + + -- Check for errors + DECLARE + vErrorCount NUMBER; + BEGIN + SELECT COUNT(*) INTO vErrorCount + FROM USER_PARALLEL_EXECUTE_CHUNKS + WHERE task_name = vTaskName AND status = 'PROCESSED_WITH_ERROR'; + + IF vErrorCount > 0 THEN + vgMsgTmp := 'Parallel CSV export completed with ' || vErrorCount || ' errors. Check USER_PARALLEL_EXECUTE_CHUNKS for details.'; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); + END IF; + END; + + -- Clean up task + DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName); + + -- Clean up chunks for THIS specific task only (session-safe) + -- CRITICAL: Use TASK_NAME filter to avoid deleting chunks from other active CSV sessions + DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = vTaskName; + COMMIT; + + ENV_MANAGER.LOG_PROCESS_EVENT('Parallel CSV execution completed successfully', 'INFO', vParameters); + EXCEPTION + WHEN OTHERS THEN + -- Attempt to drop task on error + BEGIN + DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName); + EXCEPTION + WHEN OTHERS THEN NULL; -- Ignore drop errors + END; + + vgMsgTmp := ENV_MANAGER.MSG_PARALLEL_EXECUTION_FAILED || ': ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); + END; + END IF; + END IF; - -- Construct the query to extract data for the current year/month - vQuery := 'SELECT ' || vProcessedColumnList || ' - FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L - WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY - AND TO_CHAR(L.LOAD_START,''YYYY'') = ' || CHR(39) || vKeyValueYear || CHR(39) || ' - AND TO_CHAR(L.LOAD_START,''MM'') = ' || CHR(39) || vKeyValueMonth || CHR(39) || ' - AND L.LOAD_START >= TO_DATE(' || CHR(39) || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'') - AND L.LOAD_START < TO_DATE(' || CHR(39) || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')'; - - -- Construct the URI for the CSV file in OCI Object Storage - vUri := vBucketUri || - CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || - sanitizeFilename(vFileBaseName) || '_' || - sanitizeFilename(vKeyValueYear) || sanitizeFilename(vKeyValueMonth) || - vFileExtension; - - ENV_MANAGER.LOG_PROCESS_EVENT('Exporting to CSV file: ' || vUri, 'INFO', vParameters); - ENV_MANAGER.LOG_PROCESS_EVENT('Processing Year/Month: ' || vKeyValueYear || '/' || vKeyValueMonth || ' (' || i || '/' || vKeyValuesYear.COUNT || ')', 'DEBUG', vParameters); - ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', vParameters); - ENV_MANAGER.LOG_PROCESS_EVENT('File name pattern: ' || vFileBaseName || '_' || vKeyValueYear || vKeyValueMonth || vFileExtension, 'DEBUG', vParameters); - - -- Use DBMS_CLOUD package to export data to CSV file - DBMS_CLOUD.EXPORT_DATA( - credential_name => pCredentialName, - file_uri_list => vUri, - query => vQuery, - format => json_object('type' VALUE 'CSV', 'header' VALUE true) - ); - END LOOP; - - ENV_MANAGER.LOG_PROCESS_EVENT('Export completed successfully for ' || vKeyValuesYear.COUNT || ' files', 'INFO', vParameters); + ENV_MANAGER.LOG_PROCESS_EVENT('Export completed successfully for ' || vPartitions.COUNT || ' files', 'INFO', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION @@ -687,6 +1263,10 @@ AS vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp); + WHEN ENV_MANAGER.ERR_INVALID_PARALLEL_DEGREE THEN + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp); + WHEN ENV_MANAGER.ERR_PARALLEL_EXECUTION_FAILED THEN + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); WHEN OTHERS THEN -- Log complete error details including full stack trace and backtrace ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER'); diff --git a/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/DATA_EXPORTER.pkg b/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/DATA_EXPORTER.pkg index 9de3821..1209d84 100644 --- a/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/DATA_EXPORTER.pkg +++ b/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/DATA_EXPORTER.pkg @@ -9,22 +9,65 @@ AS **/ -- Package Version Information - PACKAGE_VERSION CONSTANT VARCHAR2(10) := '2.1.1'; - PACKAGE_BUILD_DATE CONSTANT VARCHAR2(19) := '2025-12-04 13:10:00'; + PACKAGE_VERSION CONSTANT VARCHAR2(10) := '2.6.3'; + PACKAGE_BUILD_DATE CONSTANT VARCHAR2(19) := '2026-01-28 19:30:00'; PACKAGE_AUTHOR CONSTANT VARCHAR2(50) := 'MRDS Development Team'; -- Version History (last 3-5 changes) VERSION_HISTORY CONSTANT VARCHAR2(4000) := + 'v2.6.3 (2026-01-28): COMPILATION FIX - Resolved ORA-00904 error in EXPORT_PARTITION_PARALLEL. SQLERRM and DBMS_UTILITY.FORMAT_ERROR_BACKTRACE cannot be used directly in SQL UPDATE statements. Now properly assigned to vgMsgTmp variable before UPDATE.' || CHR(10) || + 'v2.6.2 (2026-01-28): CRITICAL FIX - Race condition when multiple exports run simultaneously. Changed DELETE to filter by age (>24h) instead of deleting all COMPLETED chunks. Prevents concurrent sessions from deleting each other chunks. Session-safe cleanup with TASK_NAME filtering. Enables true parallel execution of multiple export jobs.' || CHR(10) || + 'v2.6.1 (2026-01-28): Added DELETE_FAILED_EXPORT_FILE procedure to clean up partial/corrupted files before retry. When partition fails mid-export, partial file is deleted before retry to prevent Oracle from creating _1 suffixed duplicates. Ensures clean retry without orphaned files in OCI bucket.' || CHR(10) || + 'v2.6.0 (2026-01-28): CRITICAL FIX - Added STATUS tracking to A_PARALLEL_EXPORT_CHUNKS table to prevent data duplication on retry. System now restarts ONLY failed partitions instead of re-exporting all data. Added ERROR_MESSAGE and EXPORT_TIMESTAMP columns for better error handling and monitoring. Prevents duplicate file creation when parallel tasks fail (e.g., 22 partitions with 16 threads, 3 failures no longer duplicates 19 successful exports).' || CHR(10) || + 'v2.5.0 (2026-01-26): Added recorddelimiter parameter with CRLF (CHR(13)||CHR(10)) for CSV exports to ensure Windows-compatible line endings. Improves cross-platform compatibility when CSV files are opened in Windows applications (Notepad, Excel).' || CHR(10) || + 'v2.4.0 (2026-01-11): Added pTemplateTableName parameter for per-column date format configuration. Implements dynamic query building with TO_CHAR for each date/timestamp column using FILE_MANAGER.GET_DATE_FORMAT. Supports 3-tier hierarchy: column-specific, template DEFAULT, global fallback. Eliminates single dateformat limitation of DBMS_CLOUD.EXPORT_DATA.' || CHR(10) || + 'v2.3.0 (2025-12-20): Added parallel partition processing using DBMS_PARALLEL_EXECUTE. New pParallelDegree parameter (1-16, default 1) for EXPORT_TABLE_DATA_BY_DATE and EXPORT_TABLE_DATA_TO_CSV_BY_DATE procedures. Each year/month partition processed in separate thread for improved performance.' || CHR(10) || + 'v2.2.0 (2025-12-19): DRY refactoring - extracted shared helper functions (sanitizeFilename, VALIDATE_TABLE_AND_COLUMNS, GET_PARTITIONS, EXPORT_SINGLE_PARTITION worker procedure). Reduced code duplication by ~400 lines. Prepared architecture for v2.3.0 parallel processing.' || CHR(10) || 'v2.1.1 (2025-12-04): Fixed JOIN column reference A_WORKFLOW_HISTORY_KEY -> A_ETL_LOAD_SET_KEY, added consistent column mapping and dynamic column list to EXPORT_TABLE_DATA procedure, enhanced DEBUG logging for all export operations' || CHR(10) || - 'v2.1.1 (2025-12-04): Fixed JOIN column reference A_WORKFLOW_HISTORY_KEY -> A_ETL_LOAD_SET_KEY' || CHR(10) || 'v2.1.0 (2025-10-22): Added version tracking and PARTITION_YEAR/PARTITION_MONTH support' || CHR(10) || - 'v2.0.0 (2025-10-01): Separated export functionality from FILE_MANAGER package' || CHR(10) || - 'v1.0.0 (2025-09-15): Initial implementation within FILE_MANAGER package' || CHR(10); + 'v2.0.0 (2025-10-01): Separated export functionality from FILE_MANAGER package' || CHR(10); cgBL CONSTANT VARCHAR2(2) := CHR(13)||CHR(10); vgMsgTmp VARCHAR2(32000); --------------------------------------------------------------------------------------------------------------------------- + -- TYPE DEFINITIONS FOR PARTITION HANDLING + --------------------------------------------------------------------------------------------------------------------------- + + /** + * Record type for year/month partition information + **/ + TYPE partition_rec IS RECORD ( + year VARCHAR2(4), + month VARCHAR2(2) + ); + + /** + * Table type for collection of partition records + **/ + TYPE partition_tab IS TABLE OF partition_rec; + + --------------------------------------------------------------------------------------------------------------------------- + -- INTERNAL PARALLEL PROCESSING CALLBACK + --------------------------------------------------------------------------------------------------------------------------- + + /** + * @name EXPORT_PARTITION_PARALLEL + * @desc Internal callback procedure for DBMS_PARALLEL_EXECUTE. + * Processes single partition (year/month) chunk in parallel task. + * Called by DBMS_PARALLEL_EXECUTE framework for each chunk. + * This procedure is PUBLIC because DBMS_PARALLEL_EXECUTE requires it, + * but should NOT be called directly by external code. + * @param pStartId - Chunk start ID (CHUNK_ID from A_PARALLEL_EXPORT_CHUNKS table) + * @param pEndId - Chunk end ID (same as pStartId for single-row chunks) + **/ + PROCEDURE EXPORT_PARTITION_PARALLEL ( + pStartId IN NUMBER, + pEndId IN NUMBER + ); + + --------------------------------------------------------------------------------------------------------------------------- + -- MAIN EXPORT PROCEDURES --------------------------------------------------------------------------------------------------------------------------- /** @@ -62,6 +105,7 @@ AS * Allows specifying custom column list or uses T.* if pColumnList is NULL. * Validates that all columns in pColumnList exist in the target table. * Automatically adds 'T.' prefix to column names in pColumnList. + * Supports parallel partition processing via pParallelDegree parameter (default 1, range 1-16). * pBucketArea parameter accepts: 'INBOX', 'ODS', 'DATA', 'ARCHIVE' * @example * begin @@ -73,20 +117,23 @@ AS * pFolderName => 'parquet_exports', * pColumnList => 'COLUMN1, COLUMN2, COLUMN3', -- Optional * pMinDate => DATE '2024-01-01', - * pMaxDate => SYSDATE + * pMaxDate => SYSDATE, + * pParallelDegree => 8 -- Optional, default 1, range 1-16 * ); * end; **/ PROCEDURE EXPORT_TABLE_DATA_BY_DATE ( - pSchemaName IN VARCHAR2, - pTableName IN VARCHAR2, - pKeyColumnName IN VARCHAR2, - pBucketArea IN VARCHAR2, - pFolderName IN VARCHAR2, - pColumnList IN VARCHAR2 default NULL, - pMinDate IN DATE default DATE '1900-01-01', - pMaxDate IN DATE default SYSDATE, - pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName + pSchemaName IN VARCHAR2, + pTableName IN VARCHAR2, + pKeyColumnName IN VARCHAR2, + pBucketArea IN VARCHAR2, + pFolderName IN VARCHAR2, + pColumnList IN VARCHAR2 default NULL, + pMinDate IN DATE default DATE '1900-01-01', + pMaxDate IN DATE default SYSDATE, + pParallelDegree IN NUMBER default 1, + pTemplateTableName IN VARCHAR2 default NULL, + pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName ); @@ -97,6 +144,7 @@ AS * Creates one CSV file for each year/month combination found in the data. * Uses the same date filtering mechanism with CT_ODS.A_LOAD_HISTORY as EXPORT_TABLE_DATA_BY_DATE, * but exports to CSV format instead of Parquet. + * Supports parallel partition processing via pParallelDegree parameter (1-16). * File naming pattern: {pFileName}_YYYYMM.csv or {TABLENAME}_YYYYMM.csv (if pFileName is NULL) * @example * begin @@ -109,7 +157,8 @@ AS * pFolderName => 'exports', * pFileName => 'my_export.csv', * pMinDate => DATE '2024-01-01', - * pMaxDate => SYSDATE + * pMaxDate => SYSDATE, + * pParallelDegree => 8 -- Optional, default 1, range 1-16 * ); * * -- With auto-generated filename (based on table name only) @@ -127,16 +176,19 @@ AS * end; **/ PROCEDURE EXPORT_TABLE_DATA_TO_CSV_BY_DATE ( - pSchemaName IN VARCHAR2, - pTableName IN VARCHAR2, - pKeyColumnName IN VARCHAR2, - pBucketArea IN VARCHAR2, - pFolderName IN VARCHAR2, - pFileName IN VARCHAR2 DEFAULT NULL, - pColumnList IN VARCHAR2 default NULL, - pMinDate IN DATE default DATE '1900-01-01', - pMaxDate IN DATE default SYSDATE, - pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName + pSchemaName IN VARCHAR2, + pTableName IN VARCHAR2, + pKeyColumnName IN VARCHAR2, + pBucketArea IN VARCHAR2, + pFolderName IN VARCHAR2, + pFileName IN VARCHAR2 DEFAULT NULL, + pColumnList IN VARCHAR2 default NULL, + pMinDate IN DATE default DATE '1900-01-01', + pMaxDate IN DATE default SYSDATE, + pParallelDegree IN NUMBER default 1, + pTemplateTableName IN VARCHAR2 default NULL, + pMaxFileSize IN NUMBER default 104857600, + pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName ); --------------------------------------------------------------------------------------------------------------------------- diff --git a/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/ENV_MANAGER.pkb b/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/ENV_MANAGER.pkb index 995177f..856d449 100644 --- a/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/ENV_MANAGER.pkb +++ b/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/ENV_MANAGER.pkb @@ -39,6 +39,8 @@ AS Errors(CODE_MOVE_FILE_TO_TRASH_FAILED) := Error_Record(CODE_MOVE_FILE_TO_TRASH_FAILED, MSG_MOVE_FILE_TO_TRASH_FAILED); -- -20032 Errors(CODE_DROP_EXPORTED_FILES_FAILED) := Error_Record(CODE_DROP_EXPORTED_FILES_FAILED, MSG_DROP_EXPORTED_FILES_FAILED); -- -20033 Errors(CODE_INVALID_BUCKET_AREA) := Error_Record(CODE_INVALID_BUCKET_AREA, MSG_INVALID_BUCKET_AREA); -- -20034 + Errors(CODE_INVALID_PARALLEL_DEGREE) := Error_Record(CODE_INVALID_PARALLEL_DEGREE, MSG_INVALID_PARALLEL_DEGREE); -- -20110 + Errors(CODE_PARALLEL_EXECUTION_FAILED) := Error_Record(CODE_PARALLEL_EXECUTION_FAILED, MSG_PARALLEL_EXECUTION_FAILED); -- -20111 Errors(CODE_UNKNOWN) := Error_Record(CODE_UNKNOWN, MSG_UNKNOWN); -- -20999 @@ -1163,3 +1165,7 @@ BEGIN INIT_VARIABLES(pEnv => gvEnv); END ENV_MANAGER; + +/ + +/ diff --git a/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/ENV_MANAGER.pkg b/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/ENV_MANAGER.pkg index d06b728..fded944 100644 --- a/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/ENV_MANAGER.pkg +++ b/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/ENV_MANAGER.pkg @@ -17,12 +17,13 @@ AS **/ -- Package Version Information (Semantic Versioning: MAJOR.MINOR.PATCH) - PACKAGE_VERSION CONSTANT VARCHAR2(10) := '3.1.0'; - PACKAGE_BUILD_DATE CONSTANT VARCHAR2(20) := '2025-10-22 20:57:00'; + PACKAGE_VERSION CONSTANT VARCHAR2(10) := '3.2.0'; + PACKAGE_BUILD_DATE CONSTANT VARCHAR2(20) := '2025-12-20 10:00:00'; PACKAGE_AUTHOR CONSTANT VARCHAR2(100) := 'Grzegorz Michalski'; -- Version History (Latest changes first) VERSION_HISTORY CONSTANT VARCHAR2(4000) := + '3.2.0 (2025-12-20): Added error codes for parallel execution support (CODE_INVALID_PARALLEL_DEGREE -20110, CODE_PARALLEL_EXECUTION_FAILED -20111)' || CHR(13)||CHR(10) || '3.1.0 (2025-10-22): Added package hash tracking and automatic change detection system (SHA256 hashing)' || CHR(13)||CHR(10) || '3.0.0 (2025-10-22): Added package versioning system with centralized version management functions' || CHR(13)||CHR(10) || '2.1.0 (2025-10-15): Added ANALYZE_VALIDATION_ERRORS function for comprehensive CSV validation analysis' || CHR(13)||CHR(10) || @@ -296,6 +297,18 @@ AS PRAGMA EXCEPTION_INIT( ERR_INVALID_BUCKET_AREA ,CODE_INVALID_BUCKET_AREA); + ERR_INVALID_PARALLEL_DEGREE EXCEPTION; + CODE_INVALID_PARALLEL_DEGREE CONSTANT PLS_INTEGER := -20110; + MSG_INVALID_PARALLEL_DEGREE VARCHAR2(4000) := 'Invalid parallel degree parameter. Must be between 1 and 16'; + PRAGMA EXCEPTION_INIT( ERR_INVALID_PARALLEL_DEGREE + ,CODE_INVALID_PARALLEL_DEGREE); + + ERR_PARALLEL_EXECUTION_FAILED EXCEPTION; + CODE_PARALLEL_EXECUTION_FAILED CONSTANT PLS_INTEGER := -20111; + MSG_PARALLEL_EXECUTION_FAILED VARCHAR2(4000) := 'Parallel execution failed'; + PRAGMA EXCEPTION_INIT( ERR_PARALLEL_EXECUTION_FAILED + ,CODE_PARALLEL_EXECUTION_FAILED); + ERR_UNKNOWN EXCEPTION; CODE_UNKNOWN CONSTANT PLS_INTEGER := -20999; MSG_UNKNOWN VARCHAR2(4000) := 'Unknown Error Occured'; @@ -609,3 +622,4 @@ AS ) RETURN VARCHAR2; END ENV_MANAGER; +/ diff --git a/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/FILE_ARCHIVER.pkb b/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/FILE_ARCHIVER.pkb index 83edcf3..8fe9867 100644 --- a/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/FILE_ARCHIVER.pkb +++ b/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/FILE_ARCHIVER.pkb @@ -1,6 +1,54 @@ create or replace PACKAGE BODY CT_MRDS.FILE_ARCHIVER AS + ---------------------------------------------------------------------------------------------------- + -- PRIVATE FUNCTION: GET_ARCHIVAL_WHERE_CLAUSE + ---------------------------------------------------------------------------------------------------- + /** + * @name GET_ARCHIVAL_WHERE_CLAUSE + * @desc Private function that generates WHERE clause based on ARCHIVAL_STRATEGY configuration. + * Supports four strategies: THRESHOLD_BASED, CURRENT_MONTH_ONLY, MINIMUM_AGE_MONTHS, HYBRID. + * @param pSourceFileConfig - Source file configuration record with ARCHIVAL_STRATEGY + * @return VARCHAR2 - WHERE clause for filtering archival candidates + **/ + FUNCTION GET_ARCHIVAL_WHERE_CLAUSE( + pSourceFileConfig IN CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE + ) RETURN VARCHAR2 + IS + vWhereClause VARCHAR2(4000); + cgBL CONSTANT VARCHAR2(2) := CHR(13)||CHR(10); + BEGIN + CASE pSourceFileConfig.ARCHIVAL_STRATEGY + -- Legacy threshold-based strategy (backward compatible) + WHEN 'THRESHOLD_BASED' THEN + vWhereClause := 'extract(day from (systimestamp - workflow_start)) > ' || pSourceFileConfig.DAYS_FOR_ARCHIVE_THRESHOLD; + + -- Archive all data except current month + WHEN 'CURRENT_MONTH_ONLY' THEN + vWhereClause := 'TRUNC(workflow_start, ''MM'') < TRUNC(SYSDATE, ''MM'')'; + + -- Archive only data older than X months + WHEN 'MINIMUM_AGE_MONTHS' THEN + IF pSourceFileConfig.MINIMUM_AGE_MONTHS IS NULL THEN + RAISE_APPLICATION_ERROR(-20001, 'MINIMUM_AGE_MONTHS must be configured for MINIMUM_AGE_MONTHS strategy'); + END IF; + vWhereClause := 'workflow_start < ADD_MONTHS(TRUNC(SYSDATE, ''MM''), -' || pSourceFileConfig.MINIMUM_AGE_MONTHS || ')'; + + -- Hybrid: Current month exclusion AND minimum age requirement + WHEN 'HYBRID' THEN + IF pSourceFileConfig.MINIMUM_AGE_MONTHS IS NULL THEN + RAISE_APPLICATION_ERROR(-20001, 'MINIMUM_AGE_MONTHS must be configured for HYBRID strategy'); + END IF; + vWhereClause := 'TRUNC(workflow_start, ''MM'') < TRUNC(SYSDATE, ''MM'') ' || + 'AND workflow_start < ADD_MONTHS(TRUNC(SYSDATE, ''MM''), -' || pSourceFileConfig.MINIMUM_AGE_MONTHS || ')'; + + ELSE + RAISE_APPLICATION_ERROR(-20002, 'Invalid ARCHIVAL_STRATEGY: ' || pSourceFileConfig.ARCHIVAL_STRATEGY); + END CASE; + + RETURN vWhereClause; + END GET_ARCHIVAL_WHERE_CLAUSE; + ---------------------------------------------------------------------------------------------------- FUNCTION GET_TABLE_STAT(pSourceFileConfigKey IN NUMBER) @@ -85,6 +133,8 @@ AS if LENGTH(vArchivalTriggeredBy)>0 THEN ENV_MANAGER.LOG_PROCESS_EVENT('Archival Triggered By: '||vArchivalTriggeredBy,'INFO'); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSourceFileConfig.ODS_SCHEMA_NAME) || '.'||vSourceFileConfig.A_SOURCE_KEY||'_'||DBMS_ASSERT.simple_sql_name(vSourceFileConfig.TABLE_ID)||'_ODS'; + + -- Use strategy-based WHERE clause (MARS-828) vQuery := ' select t_filename( file$name @@ -96,7 +146,7 @@ AS from '||vTableName||' s join CT_MRDS.a_workflow_history h on s.a_workflow_history_key = h.a_workflow_history_key - where extract(day from (systimestamp - workflow_start)) > '||vSourceFileConfig.DAYS_FOR_ARCHIVE_THRESHOLD + where ' || GET_ARCHIVAL_WHERE_CLAUSE(vSourceFileConfig) ; -- Get all files that will be archived into "vfiles" collection ("regular data files") @@ -337,6 +387,7 @@ AS vSourceFileConfig CT_MRDS.A_SOURCE_FILE_CONFIG%ROWTYPE; vTableName VARCHAR2(200); vQuery VARCHAR2(32000); + vWhereClause VARCHAR2(4000); BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'))); ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); @@ -344,6 +395,12 @@ AS vTableName := DBMS_ASSERT.SCHEMA_NAME(vSourceFileConfig.ODS_SCHEMA_NAME) || '.'||vSourceFileConfig.A_SOURCE_KEY||'_'||DBMS_ASSERT.simple_sql_name(vSourceFileConfig.TABLE_ID)||'_ODS'; ENV_MANAGER.LOG_PROCESS_EVENT('vTableName','DEBUG',vTableName); + + -- Get WHERE clause based on archival strategy (MARS-828) + vWhereClause := GET_ARCHIVAL_WHERE_CLAUSE(vSourceFileConfig); + ENV_MANAGER.LOG_PROCESS_EVENT('vWhereClause','DEBUG',vWhereClause); + + -- Use strategy-based WHERE clause for statistics (MARS-828) vQuery := 'with tmp as ( select @@ -367,11 +424,11 @@ AS ,'||pSourceFileConfigKey||' as A_SOURCE_FILE_CONFIG_KEY ,'''||vTableName||''' as TABLE_NAME ,count(*) as FILE_COUNT - ,sum(case when extract(day from (systimestamp - workflow_start)) > '||vSourceFileConfig.DAYS_FOR_ARCHIVE_THRESHOLD||' then 1 else 0 end) as OLD_FILE_COUNT + ,sum(case when ' || vWhereClause || ' then 1 else 0 end) as OLD_FILE_COUNT ,sum (row_count_per_file) as ROW_COUNT - ,sum(case when extract(day from (systimestamp - workflow_start)) > '||vSourceFileConfig.DAYS_FOR_ARCHIVE_THRESHOLD||' then row_count_per_file else 0 end) as OLD_ROW_COUNT + ,sum(case when ' || vWhereClause || ' then row_count_per_file else 0 end) as OLD_ROW_COUNT ,sum(r.bytes) as BYTES - ,sum(case when extract(day from (systimestamp - workflow_start)) > '||vSourceFileConfig.DAYS_FOR_ARCHIVE_THRESHOLD||' then r.bytes else 0 end) as OLD_BYTES + ,sum(case when ' || vWhereClause || ' then r.bytes else 0 end) as OLD_BYTES ,'||vSourceFileConfig.DAYS_FOR_ARCHIVE_THRESHOLD||' as DAYS_FOR_ARCHIVE_THRESHOLD ,systimestamp as CREATED from tmp_gr t @@ -438,6 +495,48 @@ AS ---------------------------------------------------------------------------------------------------- + FUNCTION ARCHIVE_TABLE_DATA ( + pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE + ) RETURN PLS_INTEGER + IS + vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; + BEGIN + vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'))); + ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); + ---- + ARCHIVE_TABLE_DATA(pSourceFileConfigKey => pSourceFileConfigKey); + ---- + ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); + RETURN SQLCODE; + EXCEPTION + WHEN OTHERS THEN + ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); + RETURN SQLCODE; + END ARCHIVE_TABLE_DATA; + + ---------------------------------------------------------------------------------------------------- + + FUNCTION GATHER_TABLE_STAT ( + pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE + ) RETURN PLS_INTEGER + IS + vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; + BEGIN + vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST('pSourceFileConfigKey => '||nvl(to_char(pSourceFileConfigKey), 'NULL'))); + ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); + ---- + GATHER_TABLE_STAT(pSourceFileConfigKey => pSourceFileConfigKey); + ---- + ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); + RETURN SQLCODE; + EXCEPTION + WHEN OTHERS THEN + ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); + RETURN SQLCODE; + END GATHER_TABLE_STAT; + + ---------------------------------------------------------------------------------------------------- + END; / diff --git a/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/FILE_ARCHIVER.pkg b/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/FILE_ARCHIVER.pkg index 5185456..2ff8ff2 100644 --- a/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/FILE_ARCHIVER.pkg +++ b/MARS_Packages/mrds_elt-dev-database/mrds_elt-dev-database/database/CT_MRDS/SCHEMA/packages/FILE_ARCHIVER.pkg @@ -17,12 +17,14 @@ AS **/ -- Package Version Information (Semantic Versioning: MAJOR.MINOR.PATCH) - PACKAGE_VERSION CONSTANT VARCHAR2(10) := '2.0.0'; - PACKAGE_BUILD_DATE CONSTANT VARCHAR2(20) := '2025-10-22 16:45:00'; + PACKAGE_VERSION CONSTANT VARCHAR2(10) := '3.1.0'; + PACKAGE_BUILD_DATE CONSTANT VARCHAR2(20) := '2026-01-29 21:00:00'; PACKAGE_AUTHOR CONSTANT VARCHAR2(100) := 'Grzegorz Michalski'; -- Version History (Latest changes first) VERSION_HISTORY CONSTANT VARCHAR2(4000) := + '3.1.0 (2026-01-29): Added function overloads for ARCHIVE_TABLE_DATA and GATHER_TABLE_STAT returning SQLCODE for Python library integration' || CHR(13)||CHR(10) || + '3.0.0 (2026-01-27): MARS-828 - Added flexible archival strategies (CURRENT_MONTH_ONLY, MINIMUM_AGE_MONTHS, HYBRID) via ARCHIVAL_STRATEGY configuration' || CHR(13)||CHR(10) || '2.0.0 (2025-10-22): Added package versioning system using centralized ENV_MANAGER functions' || CHR(13)||CHR(10) || '1.5.0 (2025-10-18): Enhanced ARCHIVE_TABLE_DATA with Hive-style partitioning support' || CHR(13)||CHR(10) || '1.0.0 (2025-09-15): Initial release with table archival and statistics gathering'; @@ -39,6 +41,18 @@ AS pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE ); + /** + * @name ARCHIVE_TABLE_DATA + * @desc Function overload for ARCHIVE_TABLE_DATA procedure. + * Returns SQLCODE for Python library integration. + * Calls the main ARCHIVE_TABLE_DATA procedure and captures execution result. + * @example SELECT FILE_ARCHIVER.ARCHIVE_TABLE_DATA(pSourceFileConfigKey => 123) FROM DUAL; + * @ex_rslt 0 (success) or error code + **/ + FUNCTION ARCHIVE_TABLE_DATA ( + pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE + ) RETURN PLS_INTEGER; + /** @@ -50,6 +64,18 @@ AS pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE ); + /** + * @name GATHER_TABLE_STAT + * @desc Function overload for GATHER_TABLE_STAT procedure. + * Returns SQLCODE for Python library integration. + * Calls the main GATHER_TABLE_STAT procedure and captures execution result. + * @example SELECT FILE_ARCHIVER.GATHER_TABLE_STAT(pSourceFileConfigKey => 123) FROM DUAL; + * @ex_rslt 0 (success) or error code + **/ + FUNCTION GATHER_TABLE_STAT ( + pSourceFileConfigKey IN CT_MRDS.A_SOURCE_FILE_CONFIG.A_SOURCE_FILE_CONFIG_KEY%TYPE + ) RETURN PLS_INTEGER; + --------------------------------------------------------------------------------------------------------------------------- -- PACKAGE VERSION MANAGEMENT FUNCTIONS ---------------------------------------------------------------------------------------------------------------------------