This commit is contained in:
Grzegorz Michalski
2026-02-02 11:13:24 +01:00
parent ecd833f682
commit ffcb288afa
62 changed files with 8463 additions and 2845 deletions

View File

@@ -43,6 +43,9 @@ CREATE TABLE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS (
FILE_BASE_NAME VARCHAR2(1000),
TEMPLATE_TABLE_NAME VARCHAR2(200),
MAX_FILE_SIZE NUMBER DEFAULT 104857600 NOT NULL,
STATUS VARCHAR2(30) DEFAULT 'PENDING' NOT NULL,
ERROR_MESSAGE VARCHAR2(4000),
EXPORT_TIMESTAMP TIMESTAMP,
CREATED_DATE TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL
);
@@ -66,4 +69,7 @@ COMMENT ON COLUMN CT_MRDS.A_PARALLEL_EXPORT_CHUNKS.FORMAT_TYPE IS 'Export format
COMMENT ON COLUMN CT_MRDS.A_PARALLEL_EXPORT_CHUNKS.FILE_BASE_NAME IS 'Base filename for CSV exports (NULL for Parquet)';
COMMENT ON COLUMN CT_MRDS.A_PARALLEL_EXPORT_CHUNKS.TEMPLATE_TABLE_NAME IS 'Template table name for per-column date format configuration (e.g., CT_ET_TEMPLATES.TABLE_NAME)';
COMMENT ON COLUMN CT_MRDS.A_PARALLEL_EXPORT_CHUNKS.MAX_FILE_SIZE IS 'Maximum file size in bytes for CSV exports only (e.g., 104857600 = 100MB, 1073741824 = 1GB) - default 100MB (104857600). NOTE: Not applicable for PARQUET format (Oracle limitation)';
COMMENT ON COLUMN CT_MRDS.A_PARALLEL_EXPORT_CHUNKS.STATUS IS 'Chunk processing status: PENDING (not started), PROCESSING (in progress), COMPLETED (success), FAILED (error) - allows retry of failed partitions only';
COMMENT ON COLUMN CT_MRDS.A_PARALLEL_EXPORT_CHUNKS.ERROR_MESSAGE IS 'Error message if chunk processing failed (STATUS = FAILED)';
COMMENT ON COLUMN CT_MRDS.A_PARALLEL_EXPORT_CHUNKS.EXPORT_TIMESTAMP IS 'Timestamp when chunk export was completed (STATUS = COMPLETED)';
COMMENT ON COLUMN CT_MRDS.A_PARALLEL_EXPORT_CHUNKS.CREATED_DATE IS 'Timestamp when chunk was created';

View File

@@ -17,6 +17,39 @@ AS
----------------------------------------------------------------------------------------------------
/**
* Deletes export file from OCI bucket if it exists (used for cleanup before retry)
* Silently ignores if file doesn't exist (ORA-20404)
**/
PROCEDURE DELETE_FAILED_EXPORT_FILE(
pFileUri IN VARCHAR2,
pCredentialName IN VARCHAR2,
pParameters IN VARCHAR2
) IS
BEGIN
BEGIN
ENV_MANAGER.LOG_PROCESS_EVENT('Attempting to delete potentially corrupted file: ' || pFileUri, 'DEBUG', pParameters);
DBMS_CLOUD.DELETE_OBJECT(
credential_name => pCredentialName,
object_uri => pFileUri
);
ENV_MANAGER.LOG_PROCESS_EVENT('Deleted existing file (cleanup before retry): ' || pFileUri, 'INFO', pParameters);
EXCEPTION
WHEN OTHERS THEN
-- Object not found is OK (file doesn't exist)
IF SQLCODE = -20404 THEN
ENV_MANAGER.LOG_PROCESS_EVENT('File does not exist (OK): ' || pFileUri, 'DEBUG', pParameters);
ELSE
-- Log but don't fail - export will attempt anyway
ENV_MANAGER.LOG_PROCESS_EVENT('Warning: Could not delete file (will retry export anyway): ' || SQLERRM, 'WARNING', pParameters);
END IF;
END;
END DELETE_FAILED_EXPORT_FILE;
----------------------------------------------------------------------------------------------------
/**
* Builds query with TO_CHAR for date/timestamp columns using per-column formats
* Retrieves format for each date column from FILE_MANAGER.GET_DATE_FORMAT
@@ -394,6 +427,10 @@ AS
ENV_MANAGER.LOG_PROCESS_EVENT('Parquet export URI: ' || vUri, 'DEBUG', pParameters);
-- Delete potentially corrupted file from previous failed attempt
-- This prevents Oracle from creating _1 suffixed files on retry
DELETE_FAILED_EXPORT_FILE(vUri, pCredentialName, pParameters);
DBMS_CLOUD.EXPORT_DATA(
credential_name => pCredentialName,
file_uri_list => vUri,
@@ -409,6 +446,10 @@ AS
ENV_MANAGER.LOG_PROCESS_EVENT('CSV export URI: ' || vUri, 'DEBUG', pParameters);
-- Delete potentially corrupted file from previous failed attempt
-- This prevents Oracle from creating _1 suffixed files on retry
DELETE_FAILED_EXPORT_FILE(vUri, pCredentialName, pParameters);
-- Use json_object() for CSV export with maxfilesize in bytes (Oracle requirement)
-- Oracle maxfilesize: min 10MB (10485760), max 1GB (1073741824), default 10MB
-- NOTE: maxfilesize must be NUMBER (bytes), not string like '1000M'
@@ -499,6 +540,13 @@ AS
vParameters := 'Parallel task - Year: ' || vYear || ', Month: ' || vMonth || ', ChunkID: ' || pStartId;
ENV_MANAGER.LOG_PROCESS_EVENT('Starting parallel export for partition ' || vYear || '/' || vMonth, 'DEBUG', vParameters);
-- Mark chunk as PROCESSING
UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
SET STATUS = 'PROCESSING',
ERROR_MESSAGE = NULL
WHERE CHUNK_ID = pStartId;
COMMIT;
-- Call the worker procedure
EXPORT_SINGLE_PARTITION(
pSchemaName => vSchemaName,
@@ -518,11 +566,29 @@ AS
pParameters => vParameters
);
-- Mark chunk as COMPLETED
UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
SET STATUS = 'COMPLETED',
EXPORT_TIMESTAMP = SYSTIMESTAMP,
ERROR_MESSAGE = NULL
WHERE CHUNK_ID = pStartId;
COMMIT;
ENV_MANAGER.LOG_PROCESS_EVENT('Completed parallel export for partition ' || vYear || '/' || vMonth, 'DEBUG', vParameters);
EXCEPTION
WHEN OTHERS THEN
-- Capture error details in variable (SQLERRM cannot be used directly in SQL)
vgMsgTmp := 'Parallel task error for partition ' || vYear || '/' || vMonth || ' (ChunkID: ' || pStartId || '): ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
-- Mark chunk as FAILED with error message
-- Use vgMsgTmp variable instead of SQLERRM directly (Oracle limitation in SQL context)
UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
SET STATUS = 'FAILED',
ERROR_MESSAGE = SUBSTR(vgMsgTmp, 1, 4000)
WHERE CHUNK_ID = pStartId;
COMMIT;
RAISE;
END EXPORT_PARTITION_PARALLEL;
@@ -798,23 +864,50 @@ AS
BEGIN
ENV_MANAGER.LOG_PROCESS_EVENT('Using parallel processing with ' || pParallelDegree || ' threads', 'INFO', vParameters);
-- Clear any existing chunks from previous runs (TRUNCATE to avoid PK violations)
EXECUTE IMMEDIATE 'TRUNCATE TABLE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS';
-- Clean up old completed chunks (>24 hours) to prevent table bloat
-- CRITICAL: Do NOT delete chunks from other active sessions (same-day tasks)
-- This prevents race conditions when multiple exports run simultaneously
DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
WHERE STATUS = 'COMPLETED'
AND CREATED_DATE < SYSTIMESTAMP - INTERVAL '1' DAY;
COMMIT;
-- Populate chunks table
ENV_MANAGER.LOG_PROCESS_EVENT('Cleared old COMPLETED chunks (>24h). Active session chunks preserved.', 'DEBUG', vParameters);
-- This prevents re-exporting successfully completed partitions
DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'COMPLETED';
COMMIT;
ENV_MANAGER.LOG_PROCESS_EVENT('Cleared COMPLETED chunks. FAILED chunks retained for retry.', 'DEBUG', vParameters);
-- Populate chunks table (insert new chunks, preserve FAILED chunks for retry)
FOR i IN 1 .. vPartitions.COUNT LOOP
INSERT INTO CT_MRDS.A_PARALLEL_EXPORT_CHUNKS (
CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME,
BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE,
CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE
) VALUES (
i, vTaskName, vPartitions(i).year, vPartitions(i).month, vSchemaName, vTableName, vKeyColumnName,
vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate,
pCredentialName, 'PARQUET', NULL, pTemplateTableName, 104857600
);
MERGE INTO CT_MRDS.A_PARALLEL_EXPORT_CHUNKS t
USING (SELECT i AS chunk_id, vPartitions(i).year AS yr, vPartitions(i).month AS mn FROM DUAL) s
ON (t.CHUNK_ID = s.chunk_id)
WHEN NOT MATCHED THEN
INSERT (CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME,
BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE,
CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE, STATUS)
VALUES (i, vTaskName, vPartitions(i).year, vPartitions(i).month, vSchemaName, vTableName, vKeyColumnName,
vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate,
pCredentialName, 'PARQUET', NULL, pTemplateTableName, 104857600, 'PENDING')
WHEN MATCHED THEN
UPDATE SET TASK_NAME = vTaskName,
STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END,
ERROR_MESSAGE = CASE WHEN t.STATUS = 'FAILED' THEN NULL ELSE t.ERROR_MESSAGE END;
END LOOP;
COMMIT;
ENV_MANAGER.LOG_PROCESS_EVENT('Populated ' || vPartitions.COUNT || ' chunks for parallel export', 'DEBUG', vParameters);
-- Log chunk statistics
DECLARE
vPendingCount NUMBER;
vFailedCount NUMBER;
BEGIN
SELECT COUNT(*) INTO vPendingCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'PENDING';
SELECT COUNT(*) INTO vFailedCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'FAILED';
ENV_MANAGER.LOG_PROCESS_EVENT('Chunk statistics: PENDING=' || vPendingCount || ', FAILED (retry)=' || vFailedCount, 'INFO', vParameters);
END;
-- Create parallel task
DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => vTaskName);
@@ -856,7 +949,8 @@ AS
-- Clean up task
DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName);
-- Clean up chunks for this task
-- Clean up chunks for THIS specific task only (session-safe)
-- CRITICAL: Use TASK_NAME filter to avoid deleting chunks from other active sessions
DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = vTaskName;
COMMIT;
@@ -1055,23 +1149,45 @@ AS
BEGIN
ENV_MANAGER.LOG_PROCESS_EVENT('Using parallel processing with ' || pParallelDegree || ' threads', 'INFO', vParameters);
-- Clear any existing chunks from previous runs (TRUNCATE to avoid PK violations)
EXECUTE IMMEDIATE 'TRUNCATE TABLE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS';
-- Clean up old completed chunks (>24 hours) to prevent table bloat
-- CRITICAL: Do NOT delete chunks from other active sessions (same-day tasks)
-- This prevents race conditions when multiple CSV exports run simultaneously
DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
WHERE STATUS = 'COMPLETED'
AND CREATED_DATE < SYSTIMESTAMP - INTERVAL '1' DAY;
COMMIT;
-- Populate chunks table
ENV_MANAGER.LOG_PROCESS_EVENT('Cleared old COMPLETED chunks (>24h). Active session chunks preserved.', 'DEBUG', vParameters);
-- Populate chunks table (insert new chunks, preserve FAILED chunks for retry)
FOR i IN 1 .. vPartitions.COUNT LOOP
INSERT INTO CT_MRDS.A_PARALLEL_EXPORT_CHUNKS (
CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME,
BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE,
CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE
) VALUES (
i, vTaskName, vPartitions(i).year, vPartitions(i).month, vSchemaName, vTableName, vKeyColumnName,
vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate,
pCredentialName, 'CSV', vFileBaseName, pTemplateTableName, pMaxFileSize
);
MERGE INTO CT_MRDS.A_PARALLEL_EXPORT_CHUNKS t
USING (SELECT i AS chunk_id, vPartitions(i).year AS yr, vPartitions(i).month AS mn FROM DUAL) s
ON (t.CHUNK_ID = s.chunk_id)
WHEN NOT MATCHED THEN
INSERT (CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME,
BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE,
CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE, STATUS)
VALUES (i, vTaskName, vPartitions(i).year, vPartitions(i).month, vSchemaName, vTableName, vKeyColumnName,
vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate,
pCredentialName, 'CSV', vFileBaseName, pTemplateTableName, pMaxFileSize, 'PENDING')
WHEN MATCHED THEN
UPDATE SET TASK_NAME = vTaskName,
STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END,
ERROR_MESSAGE = CASE WHEN t.STATUS = 'FAILED' THEN NULL ELSE t.ERROR_MESSAGE END;
END LOOP;
COMMIT;
ENV_MANAGER.LOG_PROCESS_EVENT('Populated ' || vPartitions.COUNT || ' chunks for parallel CSV export', 'DEBUG', vParameters);
-- Log chunk statistics
DECLARE
vPendingCount NUMBER;
vFailedCount NUMBER;
BEGIN
SELECT COUNT(*) INTO vPendingCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'PENDING';
SELECT COUNT(*) INTO vFailedCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'FAILED';
ENV_MANAGER.LOG_PROCESS_EVENT('Chunk statistics: PENDING=' || vPendingCount || ', FAILED (retry)=' || vFailedCount, 'INFO', vParameters);
END;
-- Create parallel task
DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => vTaskName);
@@ -1113,7 +1229,8 @@ AS
-- Clean up task
DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName);
-- Clean up chunks for this task
-- Clean up chunks for THIS specific task only (session-safe)
-- CRITICAL: Use TASK_NAME filter to avoid deleting chunks from other active CSV sessions
DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = vTaskName;
COMMIT;

View File

@@ -9,12 +9,16 @@ AS
**/
-- Package Version Information
PACKAGE_VERSION CONSTANT VARCHAR2(10) := '2.5.0';
PACKAGE_BUILD_DATE CONSTANT VARCHAR2(19) := '2026-01-26 13:30:00';
PACKAGE_VERSION CONSTANT VARCHAR2(10) := '2.6.3';
PACKAGE_BUILD_DATE CONSTANT VARCHAR2(19) := '2026-01-28 19:30:00';
PACKAGE_AUTHOR CONSTANT VARCHAR2(50) := 'MRDS Development Team';
-- Version History (last 3-5 changes)
VERSION_HISTORY CONSTANT VARCHAR2(4000) :=
'v2.6.3 (2026-01-28): COMPILATION FIX - Resolved ORA-00904 error in EXPORT_PARTITION_PARALLEL. SQLERRM and DBMS_UTILITY.FORMAT_ERROR_BACKTRACE cannot be used directly in SQL UPDATE statements. Now properly assigned to vgMsgTmp variable before UPDATE.' || CHR(10) ||
'v2.6.2 (2026-01-28): CRITICAL FIX - Race condition when multiple exports run simultaneously. Changed DELETE to filter by age (>24h) instead of deleting all COMPLETED chunks. Prevents concurrent sessions from deleting each other chunks. Session-safe cleanup with TASK_NAME filtering. Enables true parallel execution of multiple export jobs.' || CHR(10) ||
'v2.6.1 (2026-01-28): Added DELETE_FAILED_EXPORT_FILE procedure to clean up partial/corrupted files before retry. When partition fails mid-export, partial file is deleted before retry to prevent Oracle from creating _1 suffixed duplicates. Ensures clean retry without orphaned files in OCI bucket.' || CHR(10) ||
'v2.6.0 (2026-01-28): CRITICAL FIX - Added STATUS tracking to A_PARALLEL_EXPORT_CHUNKS table to prevent data duplication on retry. System now restarts ONLY failed partitions instead of re-exporting all data. Added ERROR_MESSAGE and EXPORT_TIMESTAMP columns for better error handling and monitoring. Prevents duplicate file creation when parallel tasks fail (e.g., 22 partitions with 16 threads, 3 failures no longer duplicates 19 successful exports).' || CHR(10) ||
'v2.5.0 (2026-01-26): Added recorddelimiter parameter with CRLF (CHR(13)||CHR(10)) for CSV exports to ensure Windows-compatible line endings. Improves cross-platform compatibility when CSV files are opened in Windows applications (Notepad, Excel).' || CHR(10) ||
'v2.4.0 (2026-01-11): Added pTemplateTableName parameter for per-column date format configuration. Implements dynamic query building with TO_CHAR for each date/timestamp column using FILE_MANAGER.GET_DATE_FORMAT. Supports 3-tier hierarchy: column-specific, template DEFAULT, global fallback. Eliminates single dateformat limitation of DBMS_CLOUD.EXPORT_DATA.' || CHR(10) ||
'v2.3.0 (2025-12-20): Added parallel partition processing using DBMS_PARALLEL_EXECUTE. New pParallelDegree parameter (1-16, default 1) for EXPORT_TABLE_DATA_BY_DATE and EXPORT_TABLE_DATA_TO_CSV_BY_DATE procedures. Each year/month partition processed in separate thread for improved performance.' || CHR(10) ||