From 6c8b22eac93b689fd4395bb21f9bad1e9ee1e710 Mon Sep 17 00:00:00 2001 From: Grzegorz Michalski Date: Tue, 10 Feb 2026 09:33:46 +0100 Subject: [PATCH] Add DATA_EXPORTER package and update installation scripts for export registration - Created new package CT_MRDS.DATA_EXPORTER (v2.6.3) for comprehensive data export capabilities, including CSV and Parquet formats with OCI integration. - Implemented version history tracking and enhanced error handling for export processes. - Updated installation scripts to include pRegisterExport parameter for registering exports in A_WORKFLOW_HISTORY and A_SOURCE_FILE_RECEIVED. --- .../new_version/DATA_EXPORTER.pkb | 122 +- .../new_version/DATA_EXPORTER.pkg | 50 +- .../v.2.6.3/DATA_EXPORTER.pkb | 1313 +++++++++++++++++ .../v.2.6.3/DATA_EXPORTER.pkg | 218 +++ .../MARS-835/01_MARS_835_install_step1.sql | 6 +- 5 files changed, 1685 insertions(+), 24 deletions(-) create mode 100644 MARS_Packages/REL01_ADDITIONS/MARS-835-PREHOOK/rollback_version/v.2.6.3/DATA_EXPORTER.pkb create mode 100644 MARS_Packages/REL01_ADDITIONS/MARS-835-PREHOOK/rollback_version/v.2.6.3/DATA_EXPORTER.pkg diff --git a/MARS_Packages/REL01_ADDITIONS/MARS-835-PREHOOK/new_version/DATA_EXPORTER.pkb b/MARS_Packages/REL01_ADDITIONS/MARS-835-PREHOOK/new_version/DATA_EXPORTER.pkb index 6734d50..9bfd735 100644 --- a/MARS_Packages/REL01_ADDITIONS/MARS-835-PREHOOK/new_version/DATA_EXPORTER.pkb +++ b/MARS_Packages/REL01_ADDITIONS/MARS-835-PREHOOK/new_version/DATA_EXPORTER.pkb @@ -1004,6 +1004,9 @@ AS * Allows specifying custom column list or uses T.* if pColumnList is NULL. * Validates that all columns in pColumnList exist in the target table. * Automatically adds 'T.' prefix to column names in pColumnList. + * When pRegisterExport=TRUE, successfully exported files are registered in: + * - CT_MRDS.A_WORKFLOW_HISTORY (one record per YEAR/MONTH with export timestamp) + * - CT_MRDS.A_SOURCE_FILE_RECEIVED (tracks file location and partition info) * @example * begin * DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE( @@ -1015,7 +1018,8 @@ AS * pFileName => 'my_export.csv', * pColumnList => 'COLUMN1, COLUMN2, COLUMN3', -- Optional * pMinDate => DATE '2024-01-01', - * pMaxDate => SYSDATE + * pMaxDate => SYSDATE, + * pRegisterExport => TRUE -- Registers exports to tracking tables * ); * end; **/ @@ -1032,6 +1036,7 @@ AS pParallelDegree IN NUMBER default 1, pTemplateTableName IN VARCHAR2 default NULL, pMaxFileSize IN NUMBER default 104857600, + pRegisterExport IN BOOLEAN default FALSE, pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName ) IS @@ -1045,6 +1050,10 @@ AS vBucketUri VARCHAR2(4000); vCurrentCol VARCHAR2(128); vPartitions partition_tab; + vWorkflowHistoryKey NUMBER; + vSourceFileReceivedKey NUMBER; + vFileName VARCHAR2(1000); + vFileUri VARCHAR2(4000); BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||'''' @@ -1059,6 +1068,7 @@ AS ,'pParallelDegree => '''||nvl(TO_CHAR(pParallelDegree), 'NULL')||'''' ,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||'''' ,'pMaxFileSize => '''||nvl(TO_CHAR(pMaxFileSize), 'NULL')||'''' + ,'pRegisterExport => '''||CASE WHEN pRegisterExport THEN 'TRUE' ELSE 'FALSE' END||'''' ,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||'''' )); ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); @@ -1135,6 +1145,58 @@ AS pMaxFileSize => pMaxFileSize, pParameters => vParameters ); + + -- Register export if requested + IF pRegisterExport THEN + -- Construct filename and URI for this partition + vFileName := NVL(vFileBaseName, UPPER(REPLACE(vTableName, vSchemaName || '.', ''))) || '_' || vPartitions(i).year || vPartitions(i).month || '.csv'; + vFileUri := vBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || sanitizeFilename(vFileName); + + -- Create A_WORKFLOW_HISTORY record for this export (one per year/month) + vWorkflowHistoryKey := CT_MRDS.A_WORKFLOW_HISTORY_KEY_SEQ.NEXTVAL; + INSERT INTO CT_MRDS.A_WORKFLOW_HISTORY ( + A_WORKFLOW_HISTORY_KEY, + SERVICE_NAME, + ORCHESTRATION_RUN_ID, + WORKFLOW_NAME, + WORKFLOW_START, + WORKFLOW_END, + WORKFLOW_SUCCESSFUL + ) VALUES ( + vWorkflowHistoryKey, + 'DATA_EXPORTER', + 'CSV_EXPORT_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF'), + 'DATA_EXPORT_' || UPPER(REPLACE(vTableName, vSchemaName || '.', '')) || '_' || vPartitions(i).year || vPartitions(i).month, + SYSTIMESTAMP, + SYSTIMESTAMP, + 'Y' + ); + + -- Create A_SOURCE_FILE_RECEIVED record for this export + vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL; + INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED ( + A_SOURCE_FILE_RECEIVED_KEY, + A_SOURCE_FILE_CONFIG_KEY, + SOURCE_FILE_NAME, + RECEPTION_DATE, + PROCESSING_STATUS, + PARTITION_YEAR, + PARTITION_MONTH, + ARCH_FILE_NAME + ) VALUES ( + vSourceFileReceivedKey, + -1, -- Special marker for exported files (no config) + vFileUri, + SYSDATE, + 'ARCHIVED', + vPartitions(i).year, + vPartitions(i).month, + vFileName + ); + + COMMIT; + ENV_MANAGER.LOG_PROCESS_EVENT('Registered export: WorkflowKey=' || vWorkflowHistoryKey || ', FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vFileName, 'INFO', vParameters); + END IF; END LOOP; -- Parallel processing (parallel degree > 1) @@ -1229,6 +1291,64 @@ AS -- Clean up task DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName); + -- Register exports if requested (after successful parallel processing) + IF pRegisterExport THEN + ENV_MANAGER.LOG_PROCESS_EVENT('Registering ' || vPartitions.COUNT || ' parallel exports to A_WORKFLOW_HISTORY and A_SOURCE_FILE_RECEIVED', 'INFO', vParameters); + + FOR i IN 1 .. vPartitions.COUNT LOOP + -- Construct filename and URI for this partition + vFileName := NVL(vFileBaseName, UPPER(REPLACE(vTableName, vSchemaName || '.', ''))) || '_' || vPartitions(i).year || vPartitions(i).month || '.csv'; + vFileUri := vBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || sanitizeFilename(vFileName); + + -- Create A_WORKFLOW_HISTORY record for this export (one per year/month) + vWorkflowHistoryKey := CT_MRDS.A_WORKFLOW_HISTORY_KEY_SEQ.NEXTVAL; + INSERT INTO CT_MRDS.A_WORKFLOW_HISTORY ( + A_WORKFLOW_HISTORY_KEY, + SERVICE_NAME, + ORCHESTRATION_RUN_ID, + WORKFLOW_NAME, + WORKFLOW_START, + WORKFLOW_END, + WORKFLOW_SUCCESSFUL + ) VALUES ( + vWorkflowHistoryKey, + 'DATA_EXPORTER', + 'CSV_EXPORT_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF'), + 'DATA_EXPORT_' || UPPER(REPLACE(vTableName, vSchemaName || '.', '')) || '_' || vPartitions(i).year || vPartitions(i).month, + SYSTIMESTAMP, + SYSTIMESTAMP, + 'Y' + ); + + -- Create A_SOURCE_FILE_RECEIVED record for this export + vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL; + INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED ( + A_SOURCE_FILE_RECEIVED_KEY, + A_SOURCE_FILE_CONFIG_KEY, + SOURCE_FILE_NAME, + RECEPTION_DATE, + PROCESSING_STATUS, + PARTITION_YEAR, + PARTITION_MONTH, + ARCH_FILE_NAME + ) VALUES ( + vSourceFileReceivedKey, + -1, -- Special marker for exported files (no config) + vFileUri, + SYSDATE, + 'ARCHIVED', + vPartitions(i).year, + vPartitions(i).month, + vFileName + ); + + ENV_MANAGER.LOG_PROCESS_EVENT('Registered parallel export: WorkflowKey=' || vWorkflowHistoryKey || ', FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vFileName, 'DEBUG', vParameters); + END LOOP; + + COMMIT; + ENV_MANAGER.LOG_PROCESS_EVENT('Successfully registered all ' || vPartitions.COUNT || ' exports', 'INFO', vParameters); + END IF; + -- Clean up chunks for THIS specific task only (session-safe) -- CRITICAL: Use TASK_NAME filter to avoid deleting chunks from other active CSV sessions DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = vTaskName; diff --git a/MARS_Packages/REL01_ADDITIONS/MARS-835-PREHOOK/new_version/DATA_EXPORTER.pkg b/MARS_Packages/REL01_ADDITIONS/MARS-835-PREHOOK/new_version/DATA_EXPORTER.pkg index 1209d84..866f74a 100644 --- a/MARS_Packages/REL01_ADDITIONS/MARS-835-PREHOOK/new_version/DATA_EXPORTER.pkg +++ b/MARS_Packages/REL01_ADDITIONS/MARS-835-PREHOOK/new_version/DATA_EXPORTER.pkg @@ -8,26 +8,28 @@ AS * which returns documentation text for confluence page (to Copy-Paste it). **/ - -- Package Version Information - PACKAGE_VERSION CONSTANT VARCHAR2(10) := '2.6.3'; - PACKAGE_BUILD_DATE CONSTANT VARCHAR2(19) := '2026-01-28 19:30:00'; - PACKAGE_AUTHOR CONSTANT VARCHAR2(50) := 'MRDS Development Team'; - - -- Version History (last 3-5 changes) - VERSION_HISTORY CONSTANT VARCHAR2(4000) := - 'v2.6.3 (2026-01-28): COMPILATION FIX - Resolved ORA-00904 error in EXPORT_PARTITION_PARALLEL. SQLERRM and DBMS_UTILITY.FORMAT_ERROR_BACKTRACE cannot be used directly in SQL UPDATE statements. Now properly assigned to vgMsgTmp variable before UPDATE.' || CHR(10) || - 'v2.6.2 (2026-01-28): CRITICAL FIX - Race condition when multiple exports run simultaneously. Changed DELETE to filter by age (>24h) instead of deleting all COMPLETED chunks. Prevents concurrent sessions from deleting each other chunks. Session-safe cleanup with TASK_NAME filtering. Enables true parallel execution of multiple export jobs.' || CHR(10) || - 'v2.6.1 (2026-01-28): Added DELETE_FAILED_EXPORT_FILE procedure to clean up partial/corrupted files before retry. When partition fails mid-export, partial file is deleted before retry to prevent Oracle from creating _1 suffixed duplicates. Ensures clean retry without orphaned files in OCI bucket.' || CHR(10) || - 'v2.6.0 (2026-01-28): CRITICAL FIX - Added STATUS tracking to A_PARALLEL_EXPORT_CHUNKS table to prevent data duplication on retry. System now restarts ONLY failed partitions instead of re-exporting all data. Added ERROR_MESSAGE and EXPORT_TIMESTAMP columns for better error handling and monitoring. Prevents duplicate file creation when parallel tasks fail (e.g., 22 partitions with 16 threads, 3 failures no longer duplicates 19 successful exports).' || CHR(10) || - 'v2.5.0 (2026-01-26): Added recorddelimiter parameter with CRLF (CHR(13)||CHR(10)) for CSV exports to ensure Windows-compatible line endings. Improves cross-platform compatibility when CSV files are opened in Windows applications (Notepad, Excel).' || CHR(10) || - 'v2.4.0 (2026-01-11): Added pTemplateTableName parameter for per-column date format configuration. Implements dynamic query building with TO_CHAR for each date/timestamp column using FILE_MANAGER.GET_DATE_FORMAT. Supports 3-tier hierarchy: column-specific, template DEFAULT, global fallback. Eliminates single dateformat limitation of DBMS_CLOUD.EXPORT_DATA.' || CHR(10) || - 'v2.3.0 (2025-12-20): Added parallel partition processing using DBMS_PARALLEL_EXECUTE. New pParallelDegree parameter (1-16, default 1) for EXPORT_TABLE_DATA_BY_DATE and EXPORT_TABLE_DATA_TO_CSV_BY_DATE procedures. Each year/month partition processed in separate thread for improved performance.' || CHR(10) || - 'v2.2.0 (2025-12-19): DRY refactoring - extracted shared helper functions (sanitizeFilename, VALIDATE_TABLE_AND_COLUMNS, GET_PARTITIONS, EXPORT_SINGLE_PARTITION worker procedure). Reduced code duplication by ~400 lines. Prepared architecture for v2.3.0 parallel processing.' || CHR(10) || - 'v2.1.1 (2025-12-04): Fixed JOIN column reference A_WORKFLOW_HISTORY_KEY -> A_ETL_LOAD_SET_KEY, added consistent column mapping and dynamic column list to EXPORT_TABLE_DATA procedure, enhanced DEBUG logging for all export operations' || CHR(10) || - 'v2.1.0 (2025-10-22): Added version tracking and PARTITION_YEAR/PARTITION_MONTH support' || CHR(10) || - 'v2.0.0 (2025-10-01): Separated export functionality from FILE_MANAGER package' || CHR(10); - + -- Package Version Information (Semantic Versioning: MAJOR.MINOR.PATCH) + PACKAGE_VERSION CONSTANT VARCHAR2(10) := '2.7.0'; + PACKAGE_BUILD_DATE CONSTANT VARCHAR2(20) := '2026-02-09 20:00:00'; + PACKAGE_AUTHOR CONSTANT VARCHAR2(100) := 'Grzegorz Michalski'; + cgBL CONSTANT VARCHAR2(2) := CHR(13)||CHR(10); + + -- Version History (Latest changes first) + VERSION_HISTORY CONSTANT VARCHAR2(4000) := + 'v2.7.0 (2026-02-09): NEW FEATURE - Added pRegisterExport parameter to EXPORT_TABLE_DATA_TO_CSV_BY_DATE. When TRUE, successfully exported files are registered in A_WORKFLOW_HISTORY (one record per YEAR/MONTH) and A_SOURCE_FILE_RECEIVED tables for tracking and audit purposes.' || cgBL || + 'v2.6.3 (2026-01-28): COMPILATION FIX - Resolved ORA-00904 error in EXPORT_PARTITION_PARALLEL. SQLERRM and DBMS_UTILITY.FORMAT_ERROR_BACKTRACE cannot be used directly in SQL UPDATE statements. Now properly assigned to vgMsgTmp variable before UPDATE.' || cgBL || + 'v2.6.2 (2026-01-28): CRITICAL FIX - Race condition when multiple exports run simultaneously. Changed DELETE to filter by age (>24h) instead of deleting all COMPLETED chunks. Prevents concurrent sessions from deleting each other chunks. Session-safe cleanup with TASK_NAME filtering. Enables true parallel execution of multiple export jobs.' || cgBL || + 'v2.6.1 (2026-01-28): Added DELETE_FAILED_EXPORT_FILE procedure to clean up partial/corrupted files before retry. When partition fails mid-export, partial file is deleted before retry to prevent Oracle from creating _1 suffixed duplicates. Ensures clean retry without orphaned files in OCI bucket.' || cgBL || + 'v2.6.0 (2026-01-28): CRITICAL FIX - Added STATUS tracking to A_PARALLEL_EXPORT_CHUNKS table to prevent data duplication on retry. System now restarts ONLY failed partitions instead of re-exporting all data. Added ERROR_MESSAGE and EXPORT_TIMESTAMP columns for better error handling and monitoring. Prevents duplicate file creation when parallel tasks fail (e.g., 22 partitions with 16 threads, 3 failures no longer duplicates 19 successful exports).' || cgBL || + 'v2.5.0 (2026-01-26): Added recorddelimiter parameter with CRLF (CHR(13)||CHR(10)) for CSV exports to ensure Windows-compatible line endings. Improves cross-platform compatibility when CSV files are opened in Windows applications (Notepad, Excel).' || cgBL || + 'v2.4.0 (2026-01-11): Added pTemplateTableName parameter for per-column date format configuration. Implements dynamic query building with TO_CHAR for each date/timestamp column using FILE_MANAGER.GET_DATE_FORMAT. Supports 3-tier hierarchy: column-specific, template DEFAULT, global fallback. Eliminates single dateformat limitation of DBMS_CLOUD.EXPORT_DATA.' || cgBL || + 'v2.3.0 (2025-12-20): Added parallel partition processing using DBMS_PARALLEL_EXECUTE. New pParallelDegree parameter (1-16, default 1) for EXPORT_TABLE_DATA_BY_DATE and EXPORT_TABLE_DATA_TO_CSV_BY_DATE procedures. Each year/month partition processed in separate thread for improved performance.' || cgBL || + 'v2.2.0 (2025-12-19): DRY refactoring - extracted shared helper functions (sanitizeFilename, VALIDATE_TABLE_AND_COLUMNS, GET_PARTITIONS, EXPORT_SINGLE_PARTITION worker procedure). Reduced code duplication by ~400 lines. Prepared architecture for v2.3.0 parallel processing.' || cgBL || + 'v2.1.1 (2025-12-04): Fixed JOIN column reference A_WORKFLOW_HISTORY_KEY -> A_ETL_LOAD_SET_KEY, added consistent column mapping and dynamic column list to EXPORT_TABLE_DATA procedure, enhanced DEBUG logging for all export operations' || cgBL || + 'v2.1.0 (2025-10-22): Added version tracking and PARTITION_YEAR/PARTITION_MONTH support' || cgBL || + 'v2.0.0 (2025-10-01): Separated export functionality from FILE_MANAGER package'; + vgMsgTmp VARCHAR2(32000); --------------------------------------------------------------------------------------------------------------------------- @@ -146,6 +148,9 @@ AS * but exports to CSV format instead of Parquet. * Supports parallel partition processing via pParallelDegree parameter (1-16). * File naming pattern: {pFileName}_YYYYMM.csv or {TABLENAME}_YYYYMM.csv (if pFileName is NULL) + * When pRegisterExport=TRUE, successfully exported files are registered in: + * - CT_MRDS.A_WORKFLOW_HISTORY (one record per YEAR/MONTH with export timestamp) + * - CT_MRDS.A_SOURCE_FILE_RECEIVED (tracks file location and partition info) * @example * begin * -- With custom filename @@ -158,7 +163,8 @@ AS * pFileName => 'my_export.csv', * pMinDate => DATE '2024-01-01', * pMaxDate => SYSDATE, - * pParallelDegree => 8 -- Optional, default 1, range 1-16 + * pParallelDegree => 8, -- Optional, default 1, range 1-16 + * pRegisterExport => TRUE -- Optional, default FALSE, registers to A_WORKFLOW_HISTORY and A_SOURCE_FILE_RECEIVED * ); * * -- With auto-generated filename (based on table name only) @@ -169,7 +175,8 @@ AS * pBucketArea => 'ARCHIVE', * pFolderName => 'exports', * pMinDate => DATE '2025-09-01', - * pMaxDate => DATE '2025-09-17' + * pMaxDate => DATE '2025-09-17', + * pRegisterExport => TRUE -- Registers each export to tracking tables * ); * -- This will create files like: AGGREGATED_ALLOTMENT_202509.csv, etc. * pBucketArea parameter accepts: 'INBOX', 'ODS', 'DATA', 'ARCHIVE' @@ -188,6 +195,7 @@ AS pParallelDegree IN NUMBER default 1, pTemplateTableName IN VARCHAR2 default NULL, pMaxFileSize IN NUMBER default 104857600, + pRegisterExport IN BOOLEAN default FALSE, pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName ); diff --git a/MARS_Packages/REL01_ADDITIONS/MARS-835-PREHOOK/rollback_version/v.2.6.3/DATA_EXPORTER.pkb b/MARS_Packages/REL01_ADDITIONS/MARS-835-PREHOOK/rollback_version/v.2.6.3/DATA_EXPORTER.pkb new file mode 100644 index 0000000..6734d50 --- /dev/null +++ b/MARS_Packages/REL01_ADDITIONS/MARS-835-PREHOOK/rollback_version/v.2.6.3/DATA_EXPORTER.pkb @@ -0,0 +1,1313 @@ +create or replace PACKAGE BODY CT_MRDS.DATA_EXPORTER +AS + + ---------------------------------------------------------------------------------------------------- + -- PRIVATE HELPER FUNCTIONS (USED BY MULTIPLE PROCEDURES) + ---------------------------------------------------------------------------------------------------- + + /** + * Sanitizes filename by replacing disallowed characters with underscores + **/ + FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS + vFilename VARCHAR2(1000); + BEGIN + vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_'); + RETURN vFilename; + END sanitizeFilename; + + ---------------------------------------------------------------------------------------------------- + + /** + * Deletes export file from OCI bucket if it exists (used for cleanup before retry) + * Silently ignores if file doesn't exist (ORA-20404) + **/ + PROCEDURE DELETE_FAILED_EXPORT_FILE( + pFileUri IN VARCHAR2, + pCredentialName IN VARCHAR2, + pParameters IN VARCHAR2 + ) IS + BEGIN + BEGIN + ENV_MANAGER.LOG_PROCESS_EVENT('Attempting to delete potentially corrupted file: ' || pFileUri, 'DEBUG', pParameters); + + DBMS_CLOUD.DELETE_OBJECT( + credential_name => pCredentialName, + object_uri => pFileUri + ); + + ENV_MANAGER.LOG_PROCESS_EVENT('Deleted existing file (cleanup before retry): ' || pFileUri, 'INFO', pParameters); + EXCEPTION + WHEN OTHERS THEN + -- Object not found is OK (file doesn't exist) + IF SQLCODE = -20404 THEN + ENV_MANAGER.LOG_PROCESS_EVENT('File does not exist (OK): ' || pFileUri, 'DEBUG', pParameters); + ELSE + -- Log but don't fail - export will attempt anyway + ENV_MANAGER.LOG_PROCESS_EVENT('Warning: Could not delete file (will retry export anyway): ' || SQLERRM, 'WARNING', pParameters); + END IF; + END; + END DELETE_FAILED_EXPORT_FILE; + + ---------------------------------------------------------------------------------------------------- + + /** + * Builds query with TO_CHAR for date/timestamp columns using per-column formats + * Retrieves format for each date column from FILE_MANAGER.GET_DATE_FORMAT + **/ + FUNCTION buildQueryWithDateFormats( + pColumnList IN VARCHAR2, + pTableName IN VARCHAR2, + pSchemaName IN VARCHAR2, + pKeyColumnName IN VARCHAR2, + pTemplateTableName IN VARCHAR2 + ) RETURN VARCHAR2 IS + vResult VARCHAR2(32767); + vColumns VARCHAR2(32767); + vPos PLS_INTEGER; + vNextPos PLS_INTEGER; + vCurrentCol VARCHAR2(128); + vAllCols VARCHAR2(32767); + vDataType VARCHAR2(30); + vDateFormat VARCHAR2(200); + vTemplateSchema VARCHAR2(128); + vTemplateTable VARCHAR2(128); + vColExists NUMBER; + BEGIN + -- Build column list if not provided + IF pColumnList IS NULL THEN + -- Use template table for column order when provided + -- Template defines which columns to export and in what order + IF pTemplateTableName IS NOT NULL THEN + -- Parse template table name (SCHEMA.TABLE or just TABLE) + IF INSTR(pTemplateTableName, '.') > 0 THEN + vTemplateSchema := SUBSTR(pTemplateTableName, 1, INSTR(pTemplateTableName, '.') - 1); + vTemplateTable := SUBSTR(pTemplateTableName, INSTR(pTemplateTableName, '.') + 1); + ELSE + vTemplateSchema := pSchemaName; + vTemplateTable := pTemplateTableName; + END IF; + + -- Get columns from TEMPLATE table in template column order + -- Template defines target CSV structure (column order and which columns to include) + SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id) + INTO vAllCols + FROM all_tab_columns + WHERE table_name = vTemplateTable + AND owner = vTemplateSchema; + ELSE + -- Get columns from source table when no template + SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id) + INTO vAllCols + FROM all_tab_columns + WHERE table_name = pTableName + AND owner = pSchemaName; + END IF; + ELSE + vAllCols := pColumnList; + END IF; + + -- Process each column + vColumns := UPPER(REPLACE(vAllCols, ' ', '')); + vPos := 1; + vResult := ''; + + WHILE vPos <= LENGTH(vColumns) LOOP + vNextPos := INSTR(vColumns, ',', vPos); + IF vNextPos = 0 THEN + vNextPos := LENGTH(vColumns) + 1; + END IF; + + vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos); + + -- When using template table, check if column exists in SOURCE table + -- Template defines target structure, source provides data + -- Skip template columns that don't exist in source (except A_WORKFLOW_HISTORY_KEY) + IF pTemplateTableName IS NOT NULL THEN + -- Check if template column exists in SOURCE table + SELECT COUNT(*) INTO vColExists + FROM all_tab_columns + WHERE table_name = pTableName + AND column_name = vCurrentCol + AND owner = pSchemaName; + + -- Skip columns that don't exist in source table + -- Exception: A_WORKFLOW_HISTORY_KEY is virtual (mapped from pKeyColumnName) + IF vColExists = 0 AND UPPER(vCurrentCol) != 'A_WORKFLOW_HISTORY_KEY' THEN + vPos := vNextPos + 1; + CONTINUE; + END IF; + END IF; + + -- Get column data type from appropriate table (template or source) + IF pTemplateTableName IS NOT NULL THEN + -- Get data type from template table + SELECT data_type INTO vDataType + FROM all_tab_columns + WHERE table_name = vTemplateTable + AND column_name = vCurrentCol + AND owner = vTemplateSchema; + ELSE + -- Get data type from source table + SELECT data_type INTO vDataType + FROM all_tab_columns + WHERE table_name = pTableName + AND column_name = vCurrentCol + AND owner = pSchemaName; + END IF; + + -- Handle key column alias (template table has A_WORKFLOW_HISTORY_KEY, source table has pKeyColumnName) + IF UPPER(vCurrentCol) = 'A_WORKFLOW_HISTORY_KEY' THEN + vResult := vResult || CASE WHEN vResult IS NOT NULL THEN ', ' ELSE '' END || + 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY'; + + -- Convert DATE/TIMESTAMP columns to CHAR with specific format + ELSIF vDataType IN ('DATE', 'TIMESTAMP', 'TIMESTAMP WITH TIME ZONE', 'TIMESTAMP WITH LOCAL TIME ZONE') THEN + IF pTemplateTableName IS NOT NULL THEN + vDateFormat := CT_MRDS.FILE_MANAGER.GET_DATE_FORMAT( + pTemplateTableName => pTemplateTableName, + pColumnName => vCurrentCol + ); + ELSE + vDateFormat := ENV_MANAGER.gvDefaultDateFormat; + END IF; + vResult := vResult || CASE WHEN vResult IS NOT NULL THEN ', ' ELSE '' END || + 'TO_CHAR(T.' || vCurrentCol || ', ''' || vDateFormat || ''') AS ' || vCurrentCol; + + -- Other columns as-is with T. prefix + ELSE + vResult := vResult || CASE WHEN vResult IS NOT NULL THEN ', ' ELSE '' END || + 'T.' || vCurrentCol; + END IF; + + vPos := vNextPos + 1; + END LOOP; + + RETURN vResult; + END buildQueryWithDateFormats; + + ---------------------------------------------------------------------------------------------------- + + -- Internal shared function to process column list with T. prefix and key column mapping + FUNCTION processColumnList(pColumnList IN VARCHAR2, pTableName IN VARCHAR2, pSchemaName IN VARCHAR2, pKeyColumnName IN VARCHAR2) RETURN VARCHAR2 IS + vResult VARCHAR2(32767); + vColumns VARCHAR2(32767); + vPos PLS_INTEGER; + vNextPos PLS_INTEGER; + vCurrentCol VARCHAR2(128); + vAllCols VARCHAR2(32767); + BEGIN + IF pColumnList IS NULL THEN + -- Build list of all columns + SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id) + INTO vAllCols + FROM all_tab_columns + WHERE table_name = pTableName + AND owner = pSchemaName; + + -- Add T. prefix to all columns + vResult := 'T.' || REPLACE(vAllCols, ', ', ', T.'); + + -- Replace key column with aliased version (e.g., T.A_ETL_LOAD_SET_KEY_FK AS A_WORKFLOW_HISTORY_KEY) + vResult := REPLACE(vResult, 'T.' || pKeyColumnName, 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY'); + + RETURN vResult; + END IF; + + -- Remove extra spaces and convert to uppercase + vColumns := UPPER(REPLACE(pColumnList, ' ', '')); + vPos := 1; + vResult := ''; + + -- Parse comma-separated column list and add T. prefix + WHILE vPos <= LENGTH(vColumns) LOOP + vNextPos := INSTR(vColumns, ',', vPos); + IF vNextPos = 0 THEN + vNextPos := LENGTH(vColumns) + 1; + END IF; + + vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos); + + -- Check if this is the key column (e.g., A_ETL_LOAD_SET_KEY_FK) and add alias + IF UPPER(vCurrentCol) = UPPER(pKeyColumnName) THEN + vCurrentCol := 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY'; + ELSE + -- Add T. prefix if not already present + IF INSTR(vCurrentCol, '.') = 0 THEN + vCurrentCol := 'T.' || vCurrentCol; + END IF; + END IF; + + -- Add to result with comma separator + IF vResult IS NOT NULL THEN + vResult := vResult || ', '; + END IF; + vResult := vResult || vCurrentCol; + + vPos := vNextPos + 1; + END LOOP; + + RETURN vResult; + END processColumnList; + + ---------------------------------------------------------------------------------------------------- + + /** + * Validates table existence, key column existence, and column list + **/ + PROCEDURE VALIDATE_TABLE_AND_COLUMNS ( + pSchemaName IN VARCHAR2, + pTableName IN VARCHAR2, + pKeyColumnName IN VARCHAR2, + pColumnList IN VARCHAR2, + pParameters IN VARCHAR2 + ) IS + vCount INTEGER; + vColumns VARCHAR2(32767); + vPos PLS_INTEGER; + vNextPos PLS_INTEGER; + vCurrentCol VARCHAR2(128); + BEGIN + -- Check if table exists + SELECT COUNT(*) INTO vCount + FROM all_tables + WHERE table_name = pTableName + AND owner = pSchemaName; + + IF vCount = 0 THEN + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS); + END IF; + + -- Check if key column exists + SELECT COUNT(*) INTO vCount + FROM all_tab_columns + WHERE table_name = pTableName + AND column_name = pKeyColumnName + AND owner = pSchemaName; + + IF vCount = 0 THEN + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); + END IF; + + -- Validate pColumnList - check if all column names exist in the table + IF pColumnList IS NOT NULL THEN + vColumns := UPPER(REPLACE(pColumnList, ' ', '')); + vPos := 1; + + WHILE vPos <= LENGTH(vColumns) LOOP + vNextPos := INSTR(vColumns, ',', vPos); + IF vNextPos = 0 THEN + vNextPos := LENGTH(vColumns) + 1; + END IF; + + vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos); + + -- Remove table alias prefix if present + IF INSTR(vCurrentCol, '.') > 0 THEN + vCurrentCol := SUBSTR(vCurrentCol, INSTR(vCurrentCol, '.') + 1); + END IF; + + -- Check if column exists + SELECT COUNT(*) INTO vCount + FROM all_tab_columns + WHERE table_name = pTableName + AND column_name = vCurrentCol + AND owner = pSchemaName; + + IF vCount = 0 THEN + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); + END IF; + + vPos := vNextPos + 1; + END LOOP; + END IF; + END VALIDATE_TABLE_AND_COLUMNS; + + ---------------------------------------------------------------------------------------------------- + + /** + * Retrieves list of year/month partitions based on date range + **/ + FUNCTION GET_PARTITIONS ( + pSchemaName IN VARCHAR2, + pTableName IN VARCHAR2, + pKeyColumnName IN VARCHAR2, + pMinDate IN DATE, + pMaxDate IN DATE, + pParameters IN VARCHAR2 + ) RETURN partition_tab IS + vSql VARCHAR2(32000); + vPartitions partition_tab; + vKeyValuesYear DBMS_SQL.VARCHAR2_TABLE; + vKeyValuesMonth DBMS_SQL.VARCHAR2_TABLE; + vFullTableName VARCHAR2(200); + BEGIN + -- Build fully qualified table name if not already qualified + IF INSTR(pTableName, '.') > 0 THEN + vFullTableName := pTableName; -- Already fully qualified + ELSE + vFullTableName := pSchemaName || '.' || pTableName; + END IF; + + vSql := 'SELECT DISTINCT TO_CHAR(L.LOAD_START,''YYYY'') AS YR, TO_CHAR(L.LOAD_START,''MM'') AS MN + FROM ' || vFullTableName || ' T, CT_ODS.A_LOAD_HISTORY L + WHERE T.' || pKeyColumnName || ' = L.A_ETL_LOAD_SET_KEY + AND L.LOAD_START >= :pMinDate + AND L.LOAD_START < :pMaxDate + ORDER BY YR, MN'; + + ENV_MANAGER.LOG_PROCESS_EVENT('Executing date range query: ' || vSql, 'DEBUG', pParameters); + EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValuesYear, vKeyValuesMonth USING pMinDate, pMaxDate; + + ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValuesYear.COUNT || ' year/month combinations to export', 'DEBUG', pParameters); + + -- Convert to partition_tab + vPartitions := partition_tab(); + vPartitions.EXTEND(vKeyValuesYear.COUNT); + FOR i IN 1 .. vKeyValuesYear.COUNT LOOP + vPartitions(i).year := vKeyValuesYear(i); + vPartitions(i).month := vKeyValuesMonth(i); + END LOOP; + + RETURN vPartitions; + END GET_PARTITIONS; + + ---------------------------------------------------------------------------------------------------- + + /** + * Exports single partition (year/month) to specified format (PARQUET or CSV) + * This is the core worker procedure that will be used for parallel processing in v2.3.0 + **/ + PROCEDURE EXPORT_SINGLE_PARTITION ( + pSchemaName IN VARCHAR2, + pTableName IN VARCHAR2, + pKeyColumnName IN VARCHAR2, + pYear IN VARCHAR2, + pMonth IN VARCHAR2, + pBucketUri IN VARCHAR2, + pFolderName IN VARCHAR2, + pProcessedColumns IN VARCHAR2, + pMinDate IN DATE, + pMaxDate IN DATE, + pCredentialName IN VARCHAR2, + pFormat IN VARCHAR2 DEFAULT 'PARQUET', + pFileBaseName IN VARCHAR2 DEFAULT NULL, + pMaxFileSize IN NUMBER DEFAULT 104857600, + pParameters IN VARCHAR2 + ) IS + vQuery VARCHAR2(32767); + vUri VARCHAR2(4000); + vFileName VARCHAR2(1000); + vFullTableName VARCHAR2(200); + BEGIN + -- Build fully qualified table name if not already qualified + IF INSTR(pTableName, '.') > 0 THEN + vFullTableName := pTableName; -- Already fully qualified + ELSE + vFullTableName := pSchemaName || '.' || pTableName; + END IF; + + -- Construct the query to extract data for the current year/month + vQuery := 'SELECT ' || pProcessedColumns || ' + FROM ' || vFullTableName || ' T, CT_ODS.A_LOAD_HISTORY L + WHERE T.' || pKeyColumnName || ' = L.A_ETL_LOAD_SET_KEY + AND TO_CHAR(L.LOAD_START,''YYYY'') = ' || CHR(39) || pYear || CHR(39) || ' + AND TO_CHAR(L.LOAD_START,''MM'') = ' || CHR(39) || pMonth || CHR(39) || ' + AND L.LOAD_START >= TO_DATE(' || CHR(39) || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'') + AND L.LOAD_START < TO_DATE(' || CHR(39) || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')'; + + -- Construct the URI based on format + IF pFormat = 'PARQUET' THEN + -- Parquet: Use Hive-style partitioning + -- Note: maxfilesize is NOT supported for Parquet format (Oracle limitation) + vUri := pBucketUri || + CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || + 'PARTITION_YEAR=' || sanitizeFilename(pYear) || '/' || + 'PARTITION_MONTH=' || sanitizeFilename(pMonth) || '/' || + sanitizeFilename(pYear) || sanitizeFilename(pMonth) || '.parquet'; + + ENV_MANAGER.LOG_PROCESS_EVENT('Parquet export URI: ' || vUri, 'DEBUG', pParameters); + + -- Delete potentially corrupted file from previous failed attempt + -- This prevents Oracle from creating _1 suffixed files on retry + DELETE_FAILED_EXPORT_FILE(vUri, pCredentialName, pParameters); + + DBMS_CLOUD.EXPORT_DATA( + credential_name => pCredentialName, + file_uri_list => vUri, + query => vQuery, + format => json_object('type' VALUE 'parquet') + ); + ELSIF pFormat = 'CSV' THEN + -- CSV: Flat file structure with year/month in filename + vFileName := NVL(pFileBaseName, UPPER(pTableName)) || '_' || pYear || pMonth || '.csv'; + vUri := pBucketUri || + CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || + sanitizeFilename(vFileName); + + ENV_MANAGER.LOG_PROCESS_EVENT('CSV export URI: ' || vUri, 'DEBUG', pParameters); + + -- Delete potentially corrupted file from previous failed attempt + -- This prevents Oracle from creating _1 suffixed files on retry + DELETE_FAILED_EXPORT_FILE(vUri, pCredentialName, pParameters); + + -- Use json_object() for CSV export with maxfilesize in bytes (Oracle requirement) + -- Oracle maxfilesize: min 10MB (10485760), max 1GB (1073741824), default 10MB + -- NOTE: maxfilesize must be NUMBER (bytes), not string like '1000M' + -- Using 100MB (104857600) to avoid PGA memory issues with large files + DBMS_CLOUD.EXPORT_DATA( + credential_name => pCredentialName, + file_uri_list => vUri, + query => vQuery, + format => json_object( + 'type' VALUE 'CSV', + 'header' VALUE true, + 'quote' VALUE CHR(34), + 'delimiter' VALUE ',', + 'escape' VALUE true, + 'recorddelimiter' VALUE CHR(13)||CHR(10), -- CRLF dla Windows + 'maxfilesize' VALUE pMaxFileSize -- Dynamic maxfilesize in bytes (e.g., 104857600 = 100MB) + ) + ); + ELSE + RAISE_APPLICATION_ERROR(-20001, 'Unsupported format: ' || pFormat || '. Use PARQUET or CSV.'); + END IF; + + ENV_MANAGER.LOG_PROCESS_EVENT('Processing Year/Month: ' || pYear || '/' || pMonth || ' (Format: ' || pFormat || ')', 'DEBUG', pParameters); + ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', pParameters); + END EXPORT_SINGLE_PARTITION; + + ---------------------------------------------------------------------------------------------------- + + /** + * Callback procedure for DBMS_PARALLEL_EXECUTE + * Processes single partition (year/month) chunk in parallel task + * Called by DBMS_PARALLEL_EXECUTE framework for each chunk + **/ + PROCEDURE EXPORT_PARTITION_PARALLEL ( + pStartId IN NUMBER, + pEndId IN NUMBER + ) IS + vYear VARCHAR2(4); + vMonth VARCHAR2(2); + vSchemaName VARCHAR2(128); + vTableName VARCHAR2(128); + vKeyColumnName VARCHAR2(128); + vBucketUri VARCHAR2(4000); + vFolderName VARCHAR2(1000); + vProcessedColumns VARCHAR2(32767); + vMinDate DATE; + vMaxDate DATE; + vCredentialName VARCHAR2(200); + vFormat VARCHAR2(20); + vFileBaseName VARCHAR2(1000); + vMaxFileSize NUMBER; + vParameters VARCHAR2(4000); + BEGIN + -- Retrieve chunk context from global temporary table + SELECT + YEAR_VALUE, + MONTH_VALUE, + SCHEMA_NAME, + TABLE_NAME, + KEY_COLUMN_NAME, + BUCKET_URI, + FOLDER_NAME, + PROCESSED_COLUMNS, + MIN_DATE, + MAX_DATE, + CREDENTIAL_NAME, + FORMAT_TYPE, + FILE_BASE_NAME, + MAX_FILE_SIZE + INTO + vYear, + vMonth, + vSchemaName, + vTableName, + vKeyColumnName, + vBucketUri, + vFolderName, + vProcessedColumns, + vMinDate, + vMaxDate, + vCredentialName, + vFormat, + vFileBaseName, + vMaxFileSize + FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS + WHERE CHUNK_ID = pStartId; + + vParameters := 'Parallel task - Year: ' || vYear || ', Month: ' || vMonth || ', ChunkID: ' || pStartId; + ENV_MANAGER.LOG_PROCESS_EVENT('Starting parallel export for partition ' || vYear || '/' || vMonth, 'DEBUG', vParameters); + + -- Mark chunk as PROCESSING + UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS + SET STATUS = 'PROCESSING', + ERROR_MESSAGE = NULL + WHERE CHUNK_ID = pStartId; + COMMIT; + + -- Call the worker procedure + EXPORT_SINGLE_PARTITION( + pSchemaName => vSchemaName, + pTableName => vTableName, + pKeyColumnName => vKeyColumnName, + pYear => vYear, + pMonth => vMonth, + pBucketUri => vBucketUri, + pFolderName => vFolderName, + pProcessedColumns => vProcessedColumns, + pMinDate => vMinDate, + pMaxDate => vMaxDate, + pCredentialName => vCredentialName, + pFormat => vFormat, + pFileBaseName => vFileBaseName, + pMaxFileSize => vMaxFileSize, + pParameters => vParameters + ); + + -- Mark chunk as COMPLETED + UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS + SET STATUS = 'COMPLETED', + EXPORT_TIMESTAMP = SYSTIMESTAMP, + ERROR_MESSAGE = NULL + WHERE CHUNK_ID = pStartId; + COMMIT; + + ENV_MANAGER.LOG_PROCESS_EVENT('Completed parallel export for partition ' || vYear || '/' || vMonth, 'DEBUG', vParameters); + EXCEPTION + WHEN OTHERS THEN + -- Capture error details in variable (SQLERRM cannot be used directly in SQL) + vgMsgTmp := 'Parallel task error for partition ' || vYear || '/' || vMonth || ' (ChunkID: ' || pStartId || '): ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + + -- Mark chunk as FAILED with error message + -- Use vgMsgTmp variable instead of SQLERRM directly (Oracle limitation in SQL context) + UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS + SET STATUS = 'FAILED', + ERROR_MESSAGE = SUBSTR(vgMsgTmp, 1, 4000) + WHERE CHUNK_ID = pStartId; + COMMIT; + + RAISE; + END EXPORT_PARTITION_PARALLEL; + + ---------------------------------------------------------------------------------------------------- + -- MAIN EXPORT PROCEDURES + ---------------------------------------------------------------------------------------------------- + + PROCEDURE EXPORT_TABLE_DATA ( + pSchemaName IN VARCHAR2, + pTableName IN VARCHAR2, + pKeyColumnName IN VARCHAR2, + pBucketArea IN VARCHAR2, + pFolderName IN VARCHAR2, + pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName + ) + IS + -- Type definition for key values + TYPE key_value_tab IS TABLE OF VARCHAR2(4000); + vKeyValues key_value_tab; + vCount INTEGER; + vSql VARCHAR2(4000); + vKeyValue VARCHAR2(4000); + vQuery VARCHAR2(32767); + vUri VARCHAR2(4000); + vDataType VARCHAR2(30); + vTableName VARCHAR2(128); + vSchemaName VARCHAR2(128); + vKeyColumnName VARCHAR2(128); + vParameters VARCHAR2(4000); + vBucketUri VARCHAR2(4000); + vProcessedColumnList VARCHAR2(32767); + vCurrentCol VARCHAR2(128); + vAllColumnsList VARCHAR2(32767); + + BEGIN + vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||'''' + ,'pTableName => '''||nvl(pTableName, 'NULL')||'''' + ,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||'''' + ,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||'''' + ,'pFolderName => '''||nvl(pFolderName, 'NULL')||'''' + ,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||'''' + )); + ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); + + -- Get bucket URI based on bucket area using FILE_MANAGER function + vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea); + + -- Convert table and column names to uppercase to match data dictionary + vTableName := UPPER(pTableName); + vSchemaName := UPPER(pSchemaName); + vKeyColumnName := UPPER(pKeyColumnName); + + -- Check if table exists + SELECT COUNT(*) INTO vCount + FROM all_tables + WHERE table_name = vTableName + AND owner = vSchemaName; + + IF vCount = 0 THEN + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS); + END IF; + + -- Check if key column exists + SELECT COUNT(*) INTO vCount + FROM all_tab_columns + WHERE table_name = vTableName + AND column_name = vKeyColumnName + AND owner = vSchemaName; + + IF vCount = 0 THEN + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); + + END IF; + + -- Get the data type of the key column + SELECT data_type INTO vDataType + FROM all_tab_columns + WHERE table_name = vTableName + AND column_name = vKeyColumnName + AND owner = vSchemaName; + + -- Build list of all columns for the table (including key column for aliasing) + SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id) + INTO vAllColumnsList + FROM all_tab_columns + WHERE table_name = vTableName + AND owner = vSchemaName; + + -- Process column list to add T. prefix and alias key column as A_WORKFLOW_HISTORY_KEY + vProcessedColumnList := processColumnList(vAllColumnsList, vTableName, vSchemaName, vKeyColumnName); + + ENV_MANAGER.LOG_PROCESS_EVENT('Dynamic column list built: ' || vAllColumnsList, 'DEBUG', vParameters); + ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with T. prefix: ' || vProcessedColumnList, 'DEBUG', vParameters); + + vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName); + -- Fetch unique key values from A_LOAD_HISTORY + vSql := 'SELECT DISTINCT L.A_ETL_LOAD_SET_KEY' || + ' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' || + ' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY'; + + ENV_MANAGER.LOG_PROCESS_EVENT('Executing key values query: ' || vSql, 'DEBUG', vParameters); + EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValues; + ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValues.COUNT || ' unique key values to process', 'DEBUG', vParameters); + + -- Loop over each unique key value + FOR i IN 1 .. vKeyValues.COUNT LOOP + vKeyValue := vKeyValues(i); + + -- Construct the query to extract data for the current key value with A_WORKFLOW_HISTORY_KEY mapping + IF vDataType IN ('VARCHAR2', 'CHAR', 'NCHAR', 'NVARCHAR2') THEN + vQuery := 'SELECT ' || vProcessedColumnList || + ' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' || + ' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' || + ' AND L.A_ETL_LOAD_SET_KEY = ' || CHR(39) || vKeyValue || CHR(39); + ELSIF vDataType IN ('NUMBER', 'FLOAT', 'BINARY_FLOAT', 'BINARY_DOUBLE') THEN + vQuery := 'SELECT ' || vProcessedColumnList || + ' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' || + ' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' || + ' AND L.A_ETL_LOAD_SET_KEY = ' || vKeyValue; + ELSIF vDataType LIKE 'TIMESTAMP%' OR vDataType = 'DATE' THEN + vQuery := 'SELECT ' || vProcessedColumnList || + ' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' || + ' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' || + ' AND L.A_ETL_LOAD_SET_KEY = TO_TIMESTAMP(' || CHR(39) || vKeyValue || CHR(39) ||', ''YYYY-MM-DD HH24:MI:SS.FF'')'; + ELSE + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNSUPPORTED_DATA_TYPE, ENV_MANAGER.MSG_UNSUPPORTED_DATA_TYPE); + END IF; + + -- Construct the URI for the file in OCI Object Storage + vUri := vBucketUri || + CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || + sanitizeFilename(vKeyValue) || '.csv'; + + ENV_MANAGER.LOG_PROCESS_EVENT('Processing key value: ' || vKeyValue || ' (' || (i) || '/' || vKeyValues.COUNT || ')', 'DEBUG', vParameters); + ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', vParameters); + ENV_MANAGER.LOG_PROCESS_EVENT('Export URI: ' || vUri, 'DEBUG', vParameters); + + -- Use DBMS_CLOUD package to export data to the URI + DBMS_CLOUD.EXPORT_DATA( + credential_name => pCredentialName, + file_uri_list => vUri, + query => vQuery, + format => json_object('type' VALUE 'CSV', 'header' VALUE true) + ); + END LOOP; + ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); + EXCEPTION + WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN + vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp); + WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN + vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in column list' ELSE '' END; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp); + WHEN ENV_MANAGER.ERR_UNSUPPORTED_DATA_TYPE THEN + vgMsgTmp := ENV_MANAGER.MSG_UNSUPPORTED_DATA_TYPE || ' vDataType: '||vDataType; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNSUPPORTED_DATA_TYPE, vgMsgTmp); + WHEN OTHERS THEN + -- Log complete error details including full stack trace and backtrace + ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER'); + ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); + + END EXPORT_TABLE_DATA; + + ---------------------------------------------------------------------------------------------------- + + PROCEDURE EXPORT_TABLE_DATA_BY_DATE ( + pSchemaName IN VARCHAR2, + pTableName IN VARCHAR2, + pKeyColumnName IN VARCHAR2, + pBucketArea IN VARCHAR2, + pFolderName IN VARCHAR2, + pColumnList IN VARCHAR2 default NULL, + pMinDate IN DATE default DATE '1900-01-01', + pMaxDate IN DATE default SYSDATE, + pParallelDegree IN NUMBER default 1, + pTemplateTableName IN VARCHAR2 default NULL, + pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName + ) + IS + vTableName VARCHAR2(128); + vSchemaName VARCHAR2(128); + vKeyColumnName VARCHAR2(128); + vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; + vProcessedColumnList VARCHAR2(32767); + vBucketUri VARCHAR2(4000); + vCurrentCol VARCHAR2(128); + vPartitions partition_tab; + + BEGIN + vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||'''' + ,'pTableName => '''||nvl(pTableName, 'NULL')||'''' + ,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||'''' + ,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||'''' + ,'pFolderName => '''||nvl(pFolderName, 'NULL')||'''' + ,'pColumnList => '''||nvl(pColumnList, 'NULL')||'''' + ,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' + ,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' + ,'pParallelDegree => '''||nvl(TO_CHAR(pParallelDegree), 'NULL')||'''' + ,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||'''' + ,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||'''' + )); + ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); + + -- Get bucket URI based on bucket area using FILE_MANAGER function + vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea); + + -- Convert table and column names to uppercase to match data dictionary + vTableName := UPPER(pTableName); + vSchemaName := UPPER(pSchemaName); + vKeyColumnName := UPPER(pKeyColumnName); + + -- Validate table, key column, and column list using shared procedure + VALIDATE_TABLE_AND_COLUMNS(vSchemaName, vTableName, vKeyColumnName, pColumnList, vParameters); + + -- Build query with TO_CHAR for date columns (per-column format support) + vProcessedColumnList := buildQueryWithDateFormats(pColumnList, vTableName, vSchemaName, vKeyColumnName, pTemplateTableName); + + ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (building dynamic list from table metadata)'), 'DEBUG', vParameters); + ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with TO_CHAR for date columns: ' || vProcessedColumnList, 'DEBUG', vParameters); + ENV_MANAGER.LOG_PROCESS_EVENT('Template table: ' || NVL(pTemplateTableName, 'NULL - using global default for all dates'), 'INFO', vParameters); + + vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName); + + -- Validate parallel degree parameter + IF pParallelDegree < 1 OR pParallelDegree > 16 THEN + vgMsgTmp := ENV_MANAGER.MSG_INVALID_PARALLEL_DEGREE || ': ' || pParallelDegree || '. Valid range: 1-16'; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp); + END IF; + + -- Get partitions using shared function + vPartitions := GET_PARTITIONS(vSchemaName, vTableName, vKeyColumnName, pMinDate, pMaxDate, vParameters); + + ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vPartitions.COUNT || ' partitions to export with parallel degree ' || pParallelDegree, 'INFO', vParameters); + + -- Sequential processing (parallel degree = 1) + IF pParallelDegree = 1 THEN + ENV_MANAGER.LOG_PROCESS_EVENT('Using sequential processing (pParallelDegree = 1)', 'DEBUG', vParameters); + + FOR i IN 1 .. vPartitions.COUNT LOOP + EXPORT_SINGLE_PARTITION( + pSchemaName => vSchemaName, + pTableName => vTableName, + pKeyColumnName => vKeyColumnName, + pYear => vPartitions(i).year, + pMonth => vPartitions(i).month, + pBucketUri => vBucketUri, + pFolderName => pFolderName, + pProcessedColumns => vProcessedColumnList, + pMinDate => pMinDate, + pMaxDate => pMaxDate, + pCredentialName => pCredentialName, + pFormat => 'PARQUET', + pFileBaseName => NULL, + pMaxFileSize => 104857600, + pParameters => vParameters + ); + END LOOP; + + -- Parallel processing (parallel degree > 1) + ELSE + -- Skip parallel processing if no partitions found + IF vPartitions.COUNT = 0 THEN + ENV_MANAGER.LOG_PROCESS_EVENT('No partitions to export - skipping parallel processing', 'INFO', vParameters); + ELSE + DECLARE + vTaskName VARCHAR2(128) := 'DATA_EXPORT_TASK_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF'); + vChunkId NUMBER; + BEGIN + ENV_MANAGER.LOG_PROCESS_EVENT('Using parallel processing with ' || pParallelDegree || ' threads', 'INFO', vParameters); + + -- Clean up old completed chunks (>24 hours) to prevent table bloat + -- CRITICAL: Do NOT delete chunks from other active sessions (same-day tasks) + -- This prevents race conditions when multiple exports run simultaneously + DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS + WHERE STATUS = 'COMPLETED' + AND CREATED_DATE < SYSTIMESTAMP - INTERVAL '1' DAY; + COMMIT; + + ENV_MANAGER.LOG_PROCESS_EVENT('Cleared old COMPLETED chunks (>24h). Active session chunks preserved.', 'DEBUG', vParameters); + -- This prevents re-exporting successfully completed partitions + DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'COMPLETED'; + COMMIT; + + ENV_MANAGER.LOG_PROCESS_EVENT('Cleared COMPLETED chunks. FAILED chunks retained for retry.', 'DEBUG', vParameters); + + -- Populate chunks table (insert new chunks, preserve FAILED chunks for retry) + FOR i IN 1 .. vPartitions.COUNT LOOP + MERGE INTO CT_MRDS.A_PARALLEL_EXPORT_CHUNKS t + USING (SELECT i AS chunk_id, vPartitions(i).year AS yr, vPartitions(i).month AS mn FROM DUAL) s + ON (t.CHUNK_ID = s.chunk_id) + WHEN NOT MATCHED THEN + INSERT (CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME, + BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE, + CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE, STATUS) + VALUES (i, vTaskName, vPartitions(i).year, vPartitions(i).month, vSchemaName, vTableName, vKeyColumnName, + vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate, + pCredentialName, 'PARQUET', NULL, pTemplateTableName, 104857600, 'PENDING') + WHEN MATCHED THEN + UPDATE SET TASK_NAME = vTaskName, + STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END, + ERROR_MESSAGE = CASE WHEN t.STATUS = 'FAILED' THEN NULL ELSE t.ERROR_MESSAGE END; + END LOOP; + COMMIT; + + -- Log chunk statistics + DECLARE + vPendingCount NUMBER; + vFailedCount NUMBER; + BEGIN + SELECT COUNT(*) INTO vPendingCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'PENDING'; + SELECT COUNT(*) INTO vFailedCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'FAILED'; + + ENV_MANAGER.LOG_PROCESS_EVENT('Chunk statistics: PENDING=' || vPendingCount || ', FAILED (retry)=' || vFailedCount, 'INFO', vParameters); + END; + + -- Create parallel task + DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => vTaskName); + + -- Define chunks by number range (1 to partition count) + DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_NUMBER_COL( + task_name => vTaskName, + table_owner => 'CT_MRDS', + table_name => 'A_PARALLEL_EXPORT_CHUNKS', + table_column => 'CHUNK_ID', + chunk_size => 1 -- Each partition is one chunk + ); + + -- Execute task in parallel + ENV_MANAGER.LOG_PROCESS_EVENT('Executing parallel task: ' || vTaskName, 'DEBUG', vParameters); + + DBMS_PARALLEL_EXECUTE.RUN_TASK( + task_name => vTaskName, + sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id); END;', + language_flag => DBMS_SQL.NATIVE, + parallel_level => pParallelDegree + ); + + -- Check for errors + DECLARE + vErrorCount NUMBER; + BEGIN + SELECT COUNT(*) INTO vErrorCount + FROM USER_PARALLEL_EXECUTE_CHUNKS + WHERE task_name = vTaskName AND status = 'PROCESSED_WITH_ERROR'; + + IF vErrorCount > 0 THEN + vgMsgTmp := 'Parallel execution completed with ' || vErrorCount || ' errors. Check USER_PARALLEL_EXECUTE_CHUNKS for details.'; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); + END IF; + END; + + -- Clean up task + DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName); + + -- Clean up chunks for THIS specific task only (session-safe) + -- CRITICAL: Use TASK_NAME filter to avoid deleting chunks from other active sessions + DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = vTaskName; + COMMIT; + + ENV_MANAGER.LOG_PROCESS_EVENT('Parallel execution completed successfully', 'INFO', vParameters); + EXCEPTION + WHEN OTHERS THEN + -- Attempt to drop task on error + BEGIN + DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName); + EXCEPTION + WHEN OTHERS THEN NULL; -- Ignore drop errors + END; + + vgMsgTmp := ENV_MANAGER.MSG_PARALLEL_EXECUTION_FAILED || ': ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); + END; + END IF; + END IF; + + ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); + EXCEPTION + WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN + vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp); + WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN + vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp); + WHEN ENV_MANAGER.ERR_INVALID_PARALLEL_DEGREE THEN + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp); + WHEN ENV_MANAGER.ERR_PARALLEL_EXECUTION_FAILED THEN + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); + WHEN OTHERS THEN + -- Log complete error details including full stack trace and backtrace + ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER'); + ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); + + END EXPORT_TABLE_DATA_BY_DATE; + + ---------------------------------------------------------------------------------------------------- + + /** + * @name EXPORT_TABLE_DATA_TO_CSV_BY_DATE + * @desc Exports data to a single CSV file with date filtering. + * Unlike EXPORT_TABLE_DATA_BY_DATE, this procedure creates one CSV file + * instead of multiple Parquet files partitioned by year/month. + * Uses the same date filtering mechanism with CT_ODS.A_LOAD_HISTORY. + * Allows specifying custom column list or uses T.* if pColumnList is NULL. + * Validates that all columns in pColumnList exist in the target table. + * Automatically adds 'T.' prefix to column names in pColumnList. + * @example + * begin + * DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE( + * pSchemaName => 'CT_MRDS', + * pTableName => 'MY_TABLE', + * pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', + * pBucketArea => 'DATA', + * pFolderName => 'exports', + * pFileName => 'my_export.csv', + * pColumnList => 'COLUMN1, COLUMN2, COLUMN3', -- Optional + * pMinDate => DATE '2024-01-01', + * pMaxDate => SYSDATE + * ); + * end; + **/ + PROCEDURE EXPORT_TABLE_DATA_TO_CSV_BY_DATE ( + pSchemaName IN VARCHAR2, + pTableName IN VARCHAR2, + pKeyColumnName IN VARCHAR2, + pBucketArea IN VARCHAR2, + pFolderName IN VARCHAR2, + pFileName IN VARCHAR2 DEFAULT NULL, + pColumnList IN VARCHAR2 default NULL, + pMinDate IN DATE default DATE '1900-01-01', + pMaxDate IN DATE default SYSDATE, + pParallelDegree IN NUMBER default 1, + pTemplateTableName IN VARCHAR2 default NULL, + pMaxFileSize IN NUMBER default 104857600, + pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName + ) + IS + vTableName VARCHAR2(128); + vSchemaName VARCHAR2(128); + vKeyColumnName VARCHAR2(128); + vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; + vFileBaseName VARCHAR2(4000); + vFileExtension VARCHAR2(10); + vProcessedColumnList VARCHAR2(32767); + vBucketUri VARCHAR2(4000); + vCurrentCol VARCHAR2(128); + vPartitions partition_tab; + + BEGIN + vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||'''' + ,'pTableName => '''||nvl(pTableName, 'NULL')||'''' + ,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||'''' + ,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||'''' + ,'pFolderName => '''||nvl(pFolderName, 'NULL')||'''' + ,'pFileName => '''||nvl(pFileName, 'NULL')||'''' + ,'pColumnList => '''||nvl(pColumnList, 'NULL')||'''' + ,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' + ,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' + ,'pParallelDegree => '''||nvl(TO_CHAR(pParallelDegree), 'NULL')||'''' + ,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||'''' + ,'pMaxFileSize => '''||nvl(TO_CHAR(pMaxFileSize), 'NULL')||'''' + ,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||'''' + )); + ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); + + -- Get bucket URI based on bucket area using FILE_MANAGER function + vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea); + + -- Convert table and column names to uppercase to match data dictionary + vTableName := UPPER(pTableName); + vSchemaName := UPPER(pSchemaName); + vKeyColumnName := UPPER(pKeyColumnName); + + -- Extract base filename and extension or construct default filename + IF pFileName IS NOT NULL THEN + -- Use provided filename + IF INSTR(pFileName, '.') > 0 THEN + vFileBaseName := SUBSTR(pFileName, 1, INSTR(pFileName, '.', -1) - 1); + vFileExtension := SUBSTR(pFileName, INSTR(pFileName, '.', -1)); + ELSE + vFileBaseName := pFileName; + vFileExtension := '.csv'; + END IF; + ELSE + -- Construct default filename: TABLENAME (without extension, will be added by worker) + vFileBaseName := UPPER(pTableName); + vFileExtension := '.csv'; + END IF; + + -- Validate table, key column, and column list using shared procedure + VALIDATE_TABLE_AND_COLUMNS(vSchemaName, vTableName, vKeyColumnName, pColumnList, vParameters); + + -- Build query with TO_CHAR for date columns (per-column format support) + vProcessedColumnList := buildQueryWithDateFormats(pColumnList, vTableName, vSchemaName, vKeyColumnName, pTemplateTableName); + + ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (using dynamic column list)'), 'DEBUG', vParameters); + ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with TO_CHAR for date columns: ' || vProcessedColumnList, 'DEBUG', vParameters); + ENV_MANAGER.LOG_PROCESS_EVENT('Template table: ' || NVL(pTemplateTableName, 'NULL - using global default for all dates'), 'INFO', vParameters); + + vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName); + + -- Validate parallel degree parameter + IF pParallelDegree < 1 OR pParallelDegree > 16 THEN + vgMsgTmp := ENV_MANAGER.MSG_INVALID_PARALLEL_DEGREE || ': ' || pParallelDegree || '. Valid range: 1-16'; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp); + END IF; + + -- Get partitions using shared function + vPartitions := GET_PARTITIONS(vSchemaName, vTableName, vKeyColumnName, pMinDate, pMaxDate, vParameters); + + ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vPartitions.COUNT || ' year/month combinations to export', 'INFO', vParameters); + ENV_MANAGER.LOG_PROCESS_EVENT('Date range: ' || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || ' to ' || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'DEBUG', vParameters); + ENV_MANAGER.LOG_PROCESS_EVENT('Parallel degree: ' || pParallelDegree, 'INFO', vParameters); + + -- Sequential processing (parallel degree = 1) + IF pParallelDegree = 1 THEN + ENV_MANAGER.LOG_PROCESS_EVENT('Using sequential processing (pParallelDegree = 1)', 'DEBUG', vParameters); + + FOR i IN 1 .. vPartitions.COUNT LOOP + EXPORT_SINGLE_PARTITION( + pSchemaName => vSchemaName, + pTableName => vTableName, + pKeyColumnName => vKeyColumnName, + pYear => vPartitions(i).year, + pMonth => vPartitions(i).month, + pBucketUri => vBucketUri, + pFolderName => pFolderName, + pProcessedColumns => vProcessedColumnList, + pMinDate => pMinDate, + pMaxDate => pMaxDate, + pCredentialName => pCredentialName, + pFormat => 'CSV', + pFileBaseName => vFileBaseName, + pMaxFileSize => pMaxFileSize, + pParameters => vParameters + ); + END LOOP; + + -- Parallel processing (parallel degree > 1) + ELSE + -- Skip parallel processing if no partitions found + IF vPartitions.COUNT = 0 THEN + ENV_MANAGER.LOG_PROCESS_EVENT('No partitions to export - skipping parallel CSV processing', 'INFO', vParameters); + ELSE + DECLARE + vTaskName VARCHAR2(128) := 'DATA_CSV_EXPORT_TASK_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF'); + vChunkId NUMBER; + BEGIN + ENV_MANAGER.LOG_PROCESS_EVENT('Using parallel processing with ' || pParallelDegree || ' threads', 'INFO', vParameters); + + -- Clean up old completed chunks (>24 hours) to prevent table bloat + -- CRITICAL: Do NOT delete chunks from other active sessions (same-day tasks) + -- This prevents race conditions when multiple CSV exports run simultaneously + DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS + WHERE STATUS = 'COMPLETED' + AND CREATED_DATE < SYSTIMESTAMP - INTERVAL '1' DAY; + COMMIT; + + ENV_MANAGER.LOG_PROCESS_EVENT('Cleared old COMPLETED chunks (>24h). Active session chunks preserved.', 'DEBUG', vParameters); + + -- Populate chunks table (insert new chunks, preserve FAILED chunks for retry) + FOR i IN 1 .. vPartitions.COUNT LOOP + MERGE INTO CT_MRDS.A_PARALLEL_EXPORT_CHUNKS t + USING (SELECT i AS chunk_id, vPartitions(i).year AS yr, vPartitions(i).month AS mn FROM DUAL) s + ON (t.CHUNK_ID = s.chunk_id) + WHEN NOT MATCHED THEN + INSERT (CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME, + BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE, + CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE, STATUS) + VALUES (i, vTaskName, vPartitions(i).year, vPartitions(i).month, vSchemaName, vTableName, vKeyColumnName, + vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate, + pCredentialName, 'CSV', vFileBaseName, pTemplateTableName, pMaxFileSize, 'PENDING') + WHEN MATCHED THEN + UPDATE SET TASK_NAME = vTaskName, + STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END, + ERROR_MESSAGE = CASE WHEN t.STATUS = 'FAILED' THEN NULL ELSE t.ERROR_MESSAGE END; + END LOOP; + COMMIT; + + -- Log chunk statistics + DECLARE + vPendingCount NUMBER; + vFailedCount NUMBER; + BEGIN + SELECT COUNT(*) INTO vPendingCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'PENDING'; + SELECT COUNT(*) INTO vFailedCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'FAILED'; + + ENV_MANAGER.LOG_PROCESS_EVENT('Chunk statistics: PENDING=' || vPendingCount || ', FAILED (retry)=' || vFailedCount, 'INFO', vParameters); + END; + + -- Create parallel task + DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => vTaskName); + + -- Define chunks by number range (1 to partition count) + DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_NUMBER_COL( + task_name => vTaskName, + table_owner => 'CT_MRDS', + table_name => 'A_PARALLEL_EXPORT_CHUNKS', + table_column => 'CHUNK_ID', + chunk_size => 1 -- Each partition is one chunk + ); + + -- Execute task in parallel + ENV_MANAGER.LOG_PROCESS_EVENT('Executing parallel CSV export task: ' || vTaskName, 'DEBUG', vParameters); + + DBMS_PARALLEL_EXECUTE.RUN_TASK( + task_name => vTaskName, + sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id); END;', + language_flag => DBMS_SQL.NATIVE, + parallel_level => pParallelDegree + ); + + -- Check for errors + DECLARE + vErrorCount NUMBER; + BEGIN + SELECT COUNT(*) INTO vErrorCount + FROM USER_PARALLEL_EXECUTE_CHUNKS + WHERE task_name = vTaskName AND status = 'PROCESSED_WITH_ERROR'; + + IF vErrorCount > 0 THEN + vgMsgTmp := 'Parallel CSV export completed with ' || vErrorCount || ' errors. Check USER_PARALLEL_EXECUTE_CHUNKS for details.'; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); + END IF; + END; + + -- Clean up task + DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName); + + -- Clean up chunks for THIS specific task only (session-safe) + -- CRITICAL: Use TASK_NAME filter to avoid deleting chunks from other active CSV sessions + DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = vTaskName; + COMMIT; + + ENV_MANAGER.LOG_PROCESS_EVENT('Parallel CSV execution completed successfully', 'INFO', vParameters); + EXCEPTION + WHEN OTHERS THEN + -- Attempt to drop task on error + BEGIN + DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName); + EXCEPTION + WHEN OTHERS THEN NULL; -- Ignore drop errors + END; + + vgMsgTmp := ENV_MANAGER.MSG_PARALLEL_EXECUTION_FAILED || ': ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); + END; + END IF; + END IF; + + ENV_MANAGER.LOG_PROCESS_EVENT('Export completed successfully for ' || vPartitions.COUNT || ' files', 'INFO', vParameters); + ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); + + EXCEPTION + WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN + vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp); + WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN + vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END; + ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp); + WHEN ENV_MANAGER.ERR_INVALID_PARALLEL_DEGREE THEN + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp); + WHEN ENV_MANAGER.ERR_PARALLEL_EXECUTION_FAILED THEN + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); + WHEN OTHERS THEN + -- Log complete error details including full stack trace and backtrace + ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER'); + ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); + RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); + + END EXPORT_TABLE_DATA_TO_CSV_BY_DATE; + + ---------------------------------------------------------------------------------------------------- + -- VERSION MANAGEMENT FUNCTIONS + ---------------------------------------------------------------------------------------------------- + + FUNCTION GET_VERSION RETURN VARCHAR2 IS + BEGIN + RETURN PACKAGE_VERSION; + END GET_VERSION; + + ---------------------------------------------------------------------------------------------------- + + FUNCTION GET_BUILD_INFO RETURN VARCHAR2 IS + BEGIN + RETURN ENV_MANAGER.GET_PACKAGE_VERSION_INFO( + pPackageName => 'DATA_EXPORTER', + pVersion => PACKAGE_VERSION, + pBuildDate => PACKAGE_BUILD_DATE, + pAuthor => PACKAGE_AUTHOR + ); + END GET_BUILD_INFO; + + ---------------------------------------------------------------------------------------------------- + + FUNCTION GET_VERSION_HISTORY RETURN VARCHAR2 IS + BEGIN + RETURN ENV_MANAGER.FORMAT_VERSION_HISTORY( + pPackageName => 'DATA_EXPORTER', + pVersionHistory => VERSION_HISTORY + ); + END GET_VERSION_HISTORY; + + ---------------------------------------------------------------------------------------------------- + +END; + +/ diff --git a/MARS_Packages/REL01_ADDITIONS/MARS-835-PREHOOK/rollback_version/v.2.6.3/DATA_EXPORTER.pkg b/MARS_Packages/REL01_ADDITIONS/MARS-835-PREHOOK/rollback_version/v.2.6.3/DATA_EXPORTER.pkg new file mode 100644 index 0000000..1209d84 --- /dev/null +++ b/MARS_Packages/REL01_ADDITIONS/MARS-835-PREHOOK/rollback_version/v.2.6.3/DATA_EXPORTER.pkg @@ -0,0 +1,218 @@ +create or replace PACKAGE CT_MRDS.DATA_EXPORTER +AUTHID CURRENT_USER +AS + /** + * Data Export Package: Provides comprehensive data export capabilities to various formats (CSV, Parquet) + * with support for cloud storage integration via Oracle Cloud Infrastructure (OCI). + * The structure of comment is used by GET_PACKAGE_DOCUMENTATION function + * which returns documentation text for confluence page (to Copy-Paste it). + **/ + + -- Package Version Information + PACKAGE_VERSION CONSTANT VARCHAR2(10) := '2.6.3'; + PACKAGE_BUILD_DATE CONSTANT VARCHAR2(19) := '2026-01-28 19:30:00'; + PACKAGE_AUTHOR CONSTANT VARCHAR2(50) := 'MRDS Development Team'; + + -- Version History (last 3-5 changes) + VERSION_HISTORY CONSTANT VARCHAR2(4000) := + 'v2.6.3 (2026-01-28): COMPILATION FIX - Resolved ORA-00904 error in EXPORT_PARTITION_PARALLEL. SQLERRM and DBMS_UTILITY.FORMAT_ERROR_BACKTRACE cannot be used directly in SQL UPDATE statements. Now properly assigned to vgMsgTmp variable before UPDATE.' || CHR(10) || + 'v2.6.2 (2026-01-28): CRITICAL FIX - Race condition when multiple exports run simultaneously. Changed DELETE to filter by age (>24h) instead of deleting all COMPLETED chunks. Prevents concurrent sessions from deleting each other chunks. Session-safe cleanup with TASK_NAME filtering. Enables true parallel execution of multiple export jobs.' || CHR(10) || + 'v2.6.1 (2026-01-28): Added DELETE_FAILED_EXPORT_FILE procedure to clean up partial/corrupted files before retry. When partition fails mid-export, partial file is deleted before retry to prevent Oracle from creating _1 suffixed duplicates. Ensures clean retry without orphaned files in OCI bucket.' || CHR(10) || + 'v2.6.0 (2026-01-28): CRITICAL FIX - Added STATUS tracking to A_PARALLEL_EXPORT_CHUNKS table to prevent data duplication on retry. System now restarts ONLY failed partitions instead of re-exporting all data. Added ERROR_MESSAGE and EXPORT_TIMESTAMP columns for better error handling and monitoring. Prevents duplicate file creation when parallel tasks fail (e.g., 22 partitions with 16 threads, 3 failures no longer duplicates 19 successful exports).' || CHR(10) || + 'v2.5.0 (2026-01-26): Added recorddelimiter parameter with CRLF (CHR(13)||CHR(10)) for CSV exports to ensure Windows-compatible line endings. Improves cross-platform compatibility when CSV files are opened in Windows applications (Notepad, Excel).' || CHR(10) || + 'v2.4.0 (2026-01-11): Added pTemplateTableName parameter for per-column date format configuration. Implements dynamic query building with TO_CHAR for each date/timestamp column using FILE_MANAGER.GET_DATE_FORMAT. Supports 3-tier hierarchy: column-specific, template DEFAULT, global fallback. Eliminates single dateformat limitation of DBMS_CLOUD.EXPORT_DATA.' || CHR(10) || + 'v2.3.0 (2025-12-20): Added parallel partition processing using DBMS_PARALLEL_EXECUTE. New pParallelDegree parameter (1-16, default 1) for EXPORT_TABLE_DATA_BY_DATE and EXPORT_TABLE_DATA_TO_CSV_BY_DATE procedures. Each year/month partition processed in separate thread for improved performance.' || CHR(10) || + 'v2.2.0 (2025-12-19): DRY refactoring - extracted shared helper functions (sanitizeFilename, VALIDATE_TABLE_AND_COLUMNS, GET_PARTITIONS, EXPORT_SINGLE_PARTITION worker procedure). Reduced code duplication by ~400 lines. Prepared architecture for v2.3.0 parallel processing.' || CHR(10) || + 'v2.1.1 (2025-12-04): Fixed JOIN column reference A_WORKFLOW_HISTORY_KEY -> A_ETL_LOAD_SET_KEY, added consistent column mapping and dynamic column list to EXPORT_TABLE_DATA procedure, enhanced DEBUG logging for all export operations' || CHR(10) || + 'v2.1.0 (2025-10-22): Added version tracking and PARTITION_YEAR/PARTITION_MONTH support' || CHR(10) || + 'v2.0.0 (2025-10-01): Separated export functionality from FILE_MANAGER package' || CHR(10); + + cgBL CONSTANT VARCHAR2(2) := CHR(13)||CHR(10); + vgMsgTmp VARCHAR2(32000); + + --------------------------------------------------------------------------------------------------------------------------- + -- TYPE DEFINITIONS FOR PARTITION HANDLING + --------------------------------------------------------------------------------------------------------------------------- + + /** + * Record type for year/month partition information + **/ + TYPE partition_rec IS RECORD ( + year VARCHAR2(4), + month VARCHAR2(2) + ); + + /** + * Table type for collection of partition records + **/ + TYPE partition_tab IS TABLE OF partition_rec; + + --------------------------------------------------------------------------------------------------------------------------- + -- INTERNAL PARALLEL PROCESSING CALLBACK + --------------------------------------------------------------------------------------------------------------------------- + + /** + * @name EXPORT_PARTITION_PARALLEL + * @desc Internal callback procedure for DBMS_PARALLEL_EXECUTE. + * Processes single partition (year/month) chunk in parallel task. + * Called by DBMS_PARALLEL_EXECUTE framework for each chunk. + * This procedure is PUBLIC because DBMS_PARALLEL_EXECUTE requires it, + * but should NOT be called directly by external code. + * @param pStartId - Chunk start ID (CHUNK_ID from A_PARALLEL_EXPORT_CHUNKS table) + * @param pEndId - Chunk end ID (same as pStartId for single-row chunks) + **/ + PROCEDURE EXPORT_PARTITION_PARALLEL ( + pStartId IN NUMBER, + pEndId IN NUMBER + ); + + --------------------------------------------------------------------------------------------------------------------------- + -- MAIN EXPORT PROCEDURES + --------------------------------------------------------------------------------------------------------------------------- + + /** + * @name EXPORT_TABLE_DATA + * @desc Wrapper procedure for DBMS_CLOUD.EXPORT_DATA. + * Exports data into CSV file on OCI infrustructure. + * pBucketArea parameter accepts: 'INBOX', 'ODS', 'DATA', 'ARCHIVE' + * @example + * begin + * DATA_EXPORTER.EXPORT_TABLE_DATA( + * pSchemaName => 'CT_MRDS', + * pTableName => 'MY_TABLE', + * pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', + * pBucketArea => 'DATA', + * pFolderName => 'csv_exports' + * ); + * end; + **/ + PROCEDURE EXPORT_TABLE_DATA ( + pSchemaName IN VARCHAR2, + pTableName IN VARCHAR2, + pKeyColumnName IN VARCHAR2, + pBucketArea IN VARCHAR2, + pFolderName IN VARCHAR2, + pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName + ); + + + + /** + * @name EXPORT_TABLE_DATA_BY_DATE + * @desc Wrapper procedure for DBMS_CLOUD.EXPORT_DATA. + * Exports data into PARQUET files on OCI infrustructure. + * Each YEAR_MONTH pair goes to seperate file (implicit partitioning). + * Allows specifying custom column list or uses T.* if pColumnList is NULL. + * Validates that all columns in pColumnList exist in the target table. + * Automatically adds 'T.' prefix to column names in pColumnList. + * Supports parallel partition processing via pParallelDegree parameter (default 1, range 1-16). + * pBucketArea parameter accepts: 'INBOX', 'ODS', 'DATA', 'ARCHIVE' + * @example + * begin + * DATA_EXPORTER.EXPORT_TABLE_DATA_BY_DATE( + * pSchemaName => 'CT_MRDS', + * pTableName => 'MY_TABLE', + * pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', + * pBucketArea => 'DATA', + * pFolderName => 'parquet_exports', + * pColumnList => 'COLUMN1, COLUMN2, COLUMN3', -- Optional + * pMinDate => DATE '2024-01-01', + * pMaxDate => SYSDATE, + * pParallelDegree => 8 -- Optional, default 1, range 1-16 + * ); + * end; + **/ + PROCEDURE EXPORT_TABLE_DATA_BY_DATE ( + pSchemaName IN VARCHAR2, + pTableName IN VARCHAR2, + pKeyColumnName IN VARCHAR2, + pBucketArea IN VARCHAR2, + pFolderName IN VARCHAR2, + pColumnList IN VARCHAR2 default NULL, + pMinDate IN DATE default DATE '1900-01-01', + pMaxDate IN DATE default SYSDATE, + pParallelDegree IN NUMBER default 1, + pTemplateTableName IN VARCHAR2 default NULL, + pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName + ); + + + + /** + * @name EXPORT_TABLE_DATA_TO_CSV_BY_DATE + * @desc Exports data to separate CSV files partitioned by year and month. + * Creates one CSV file for each year/month combination found in the data. + * Uses the same date filtering mechanism with CT_ODS.A_LOAD_HISTORY as EXPORT_TABLE_DATA_BY_DATE, + * but exports to CSV format instead of Parquet. + * Supports parallel partition processing via pParallelDegree parameter (1-16). + * File naming pattern: {pFileName}_YYYYMM.csv or {TABLENAME}_YYYYMM.csv (if pFileName is NULL) + * @example + * begin + * -- With custom filename + * DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE( + * pSchemaName => 'CT_MRDS', + * pTableName => 'MY_TABLE', + * pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', + * pBucketArea => 'DATA', + * pFolderName => 'exports', + * pFileName => 'my_export.csv', + * pMinDate => DATE '2024-01-01', + * pMaxDate => SYSDATE, + * pParallelDegree => 8 -- Optional, default 1, range 1-16 + * ); + * + * -- With auto-generated filename (based on table name only) + * DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE( + * pSchemaName => 'OU_TOP', + * pTableName => 'AGGREGATED_ALLOTMENT', + * pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', + * pBucketArea => 'ARCHIVE', + * pFolderName => 'exports', + * pMinDate => DATE '2025-09-01', + * pMaxDate => DATE '2025-09-17' + * ); + * -- This will create files like: AGGREGATED_ALLOTMENT_202509.csv, etc. + * pBucketArea parameter accepts: 'INBOX', 'ODS', 'DATA', 'ARCHIVE' + * end; + **/ + PROCEDURE EXPORT_TABLE_DATA_TO_CSV_BY_DATE ( + pSchemaName IN VARCHAR2, + pTableName IN VARCHAR2, + pKeyColumnName IN VARCHAR2, + pBucketArea IN VARCHAR2, + pFolderName IN VARCHAR2, + pFileName IN VARCHAR2 DEFAULT NULL, + pColumnList IN VARCHAR2 default NULL, + pMinDate IN DATE default DATE '1900-01-01', + pMaxDate IN DATE default SYSDATE, + pParallelDegree IN NUMBER default 1, + pTemplateTableName IN VARCHAR2 default NULL, + pMaxFileSize IN NUMBER default 104857600, + pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName + ); + + --------------------------------------------------------------------------------------------------------------------------- + -- VERSION MANAGEMENT FUNCTIONS + --------------------------------------------------------------------------------------------------------------------------- + + /** + * Returns the current package version number + * return: Version string in format X.Y.Z (e.g., '2.1.0') + **/ + FUNCTION GET_VERSION RETURN VARCHAR2; + + /** + * Returns comprehensive build information including version, date, and author + * return: Formatted string with complete build details + **/ + FUNCTION GET_BUILD_INFO RETURN VARCHAR2; + + /** + * Returns the version history with recent changes + * return: Multi-line string with version history + **/ + FUNCTION GET_VERSION_HISTORY RETURN VARCHAR2; + +END; + +/ diff --git a/MARS_Packages/REL01_ADDITIONS/MARS-835/01_MARS_835_install_step1.sql b/MARS_Packages/REL01_ADDITIONS/MARS-835/01_MARS_835_install_step1.sql index 0a16f07..2db2fd4 100644 --- a/MARS_Packages/REL01_ADDITIONS/MARS-835/01_MARS_835_install_step1.sql +++ b/MARS_Packages/REL01_ADDITIONS/MARS-835/01_MARS_835_install_step1.sql @@ -106,7 +106,8 @@ BEGIN pMaxDate => SYSDATE, pParallelDegree => 16, pTemplateTableName => 'CT_ET_TEMPLATES.CSDB_DEBT', - pMaxFileSize => 104857600 -- 100MB in bytes (safe for parallel execution, avoids ORA-04036) + pMaxFileSize => 104857600, -- 100MB in bytes (safe for parallel execution, avoids ORA-04036) + pRegisterExport => TRUE -- Register exports in A_WORKFLOW_HISTORY and A_SOURCE_FILE_RECEIVED ); DBMS_OUTPUT.PUT_LINE('SUCCESS: LEGACY_DEBT exported to DATA bucket with template column order'); @@ -225,7 +226,8 @@ BEGIN pMaxDate => SYSDATE, pParallelDegree => 16, pTemplateTableName => 'CT_ET_TEMPLATES.CSDB_DEBT_DAILY', - pMaxFileSize => 104857600 -- 100MB in bytes (safe for parallel execution, avoids ORA-04036) + pMaxFileSize => 104857600, -- 100MB in bytes (safe for parallel execution, avoids ORA-04036) + pRegisterExport => TRUE -- Register exports in A_WORKFLOW_HISTORY and A_SOURCE_FILE_RECEIVED ); DBMS_OUTPUT.PUT_LINE('SUCCESS: LEGACY_DEBT_DAILY exported to DATA bucket with template column order');