feat(MARS-835): Enhance EXPORT_PARTITION_PARALLEL with pTaskName parameter for session isolation and optimize chunk retrieval logic

This commit is contained in:
Grzegorz Michalski
2026-02-25 09:49:25 +01:00
parent 04d4f6ac02
commit c68d5bfe2c
3 changed files with 74 additions and 55 deletions

View File

@@ -558,7 +558,8 @@ AS
**/
PROCEDURE EXPORT_PARTITION_PARALLEL (
pStartId IN NUMBER,
pEndId IN NUMBER
pEndId IN NUMBER,
pTaskName IN VARCHAR2 DEFAULT NULL
) IS
vYear VARCHAR2(4);
vMonth VARCHAR2(2);
@@ -575,9 +576,12 @@ AS
vFileBaseName VARCHAR2(1000);
vMaxFileSize NUMBER;
vJobClass VARCHAR2(128);
vTaskName VARCHAR2(128);
vParameters VARCHAR2(4000);
BEGIN
-- Retrieve chunk context from global temporary table
-- Retrieve chunk context from A_PARALLEL_EXPORT_CHUNKS table
-- CRITICAL: Filter by CHUNK_ID and TASK_NAME for precise session isolation
-- pTaskName parameter passed from RUN_TASK ensures deterministic single-row retrieval
SELECT
YEAR_VALUE,
MONTH_VALUE,
@@ -593,7 +597,8 @@ AS
FORMAT_TYPE,
FILE_BASE_NAME,
MAX_FILE_SIZE,
JOB_CLASS
JOB_CLASS,
TASK_NAME
INTO
vYear,
vMonth,
@@ -609,18 +614,22 @@ AS
vFormat,
vFileBaseName,
vMaxFileSize,
vJobClass
vJobClass,
vTaskName
FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
WHERE CHUNK_ID = pStartId;
WHERE CHUNK_ID = pStartId
AND TASK_NAME = pTaskName;
vParameters := 'Parallel task - Year: ' || vYear || ', Month: ' || vMonth || ', ChunkID: ' || pStartId;
vParameters := 'Parallel task - Year: ' || vYear || ', Month: ' || vMonth || ', ChunkID: ' || pStartId || ', TaskName: ' || vTaskName;
ENV_MANAGER.LOG_PROCESS_EVENT('Starting parallel export for partition ' || vYear || '/' || vMonth, 'DEBUG', vParameters);
-- Mark chunk as PROCESSING
-- CRITICAL: Use both CHUNK_ID AND TASK_NAME for session isolation
UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
SET STATUS = 'PROCESSING',
ERROR_MESSAGE = NULL
WHERE CHUNK_ID = pStartId;
WHERE CHUNK_ID = pStartId
AND TASK_NAME = vTaskName;
COMMIT;
-- Call the worker procedure
@@ -643,26 +652,30 @@ AS
);
-- Mark chunk as COMPLETED
-- CRITICAL: Use both CHUNK_ID AND TASK_NAME for session isolation
UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
SET STATUS = 'COMPLETED',
EXPORT_TIMESTAMP = SYSTIMESTAMP,
ERROR_MESSAGE = NULL
WHERE CHUNK_ID = pStartId;
WHERE CHUNK_ID = pStartId
AND TASK_NAME = vTaskName;
COMMIT;
ENV_MANAGER.LOG_PROCESS_EVENT('Completed parallel export for partition ' || vYear || '/' || vMonth, 'DEBUG', vParameters);
EXCEPTION
WHEN OTHERS THEN
-- Capture error details in variable (SQLERRM cannot be used directly in SQL)
vgMsgTmp := 'Parallel task error for partition ' || vYear || '/' || vMonth || ' (ChunkID: ' || pStartId || '): ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE;
vgMsgTmp := 'Parallel task error for partition ' || vYear || '/' || vMonth || ' (ChunkID: ' || pStartId || ', TaskName: ' || vTaskName || '): ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
-- Mark chunk as FAILED with error message
-- CRITICAL: Use both CHUNK_ID AND TASK_NAME for session isolation
-- Use vgMsgTmp variable instead of SQLERRM directly (Oracle limitation in SQL context)
UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
SET STATUS = 'FAILED',
ERROR_MESSAGE = SUBSTR(vgMsgTmp, 1, 4000)
WHERE CHUNK_ID = pStartId;
WHERE CHUNK_ID = pStartId
AND TASK_NAME = vTaskName;
COMMIT;
RAISE;
@@ -1129,8 +1142,8 @@ AS
-- Populate chunks table (insert new chunks, preserve FAILED chunks for retry)
FOR i IN 1 .. vPartitions.COUNT LOOP
MERGE INTO CT_MRDS.A_PARALLEL_EXPORT_CHUNKS t
USING (SELECT i AS chunk_id, vPartitions(i).year AS yr, vPartitions(i).month AS mn FROM DUAL) s
ON (t.CHUNK_ID = s.chunk_id)
USING (SELECT i AS chunk_id, vTaskName AS task_name, vPartitions(i).year AS yr, vPartitions(i).month AS mn FROM DUAL) s
ON (t.CHUNK_ID = s.chunk_id AND t.TASK_NAME = s.task_name)
WHEN NOT MATCHED THEN
INSERT (CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME,
BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE,
@@ -1139,33 +1152,34 @@ AS
vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate,
pCredentialName, 'PARQUET', NULL, pTemplateTableName, 104857600, pJobClass, 'PENDING')
WHEN MATCHED THEN
UPDATE SET TASK_NAME = vTaskName,
STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END,
-- Match found: chunk exists for SAME task (composite PK: TASK_NAME, CHUNK_ID)
-- This handles retry scenario: reset FAILED chunks to PENDING for re-processing
UPDATE SET STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END,
ERROR_MESSAGE = CASE WHEN t.STATUS = 'FAILED' THEN NULL ELSE t.ERROR_MESSAGE END;
END LOOP;
COMMIT;
-- Log chunk statistics
-- Log chunk statistics (session-safe: only count chunks for THIS task)
DECLARE
vPendingCount NUMBER;
vFailedCount NUMBER;
BEGIN
SELECT COUNT(*) INTO vPendingCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'PENDING';
SELECT COUNT(*) INTO vFailedCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'FAILED';
SELECT COUNT(*) INTO vPendingCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'PENDING' AND TASK_NAME = vTaskName;
SELECT COUNT(*) INTO vFailedCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'FAILED' AND TASK_NAME = vTaskName;
ENV_MANAGER.LOG_PROCESS_EVENT('Chunk statistics: PENDING=' || vPendingCount || ', FAILED (retry)=' || vFailedCount, 'INFO', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Chunk statistics for task ' || vTaskName || ': PENDING=' || vPendingCount || ', FAILED (retry)=' || vFailedCount, 'INFO', vParameters);
END;
-- Create parallel task
DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => vTaskName);
-- Define chunks by number range (1 to partition count)
DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_NUMBER_COL(
-- Define chunks using SQL query to ensure TASK_NAME isolation
-- CRITICAL: Filter by TASK_NAME to avoid selecting chunks from other concurrent sessions
-- CRITICAL: Use START_ID and END_ID aliases to avoid ORA-00960 ambiguous column naming
DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(
task_name => vTaskName,
table_owner => 'CT_MRDS',
table_name => 'A_PARALLEL_EXPORT_CHUNKS',
table_column => 'CHUNK_ID',
chunk_size => 1 -- Each partition is one chunk
sql_stmt => 'SELECT CHUNK_ID AS START_ID, CHUNK_ID AS END_ID FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = ''' || vTaskName || ''' ORDER BY CHUNK_ID',
by_rowid => FALSE
);
-- Execute task in parallel
@@ -1174,7 +1188,7 @@ AS
IF pJobClass IS NOT NULL THEN
DBMS_PARALLEL_EXECUTE.RUN_TASK(
task_name => vTaskName,
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id); END;',
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id, ''' || vTaskName || '''); END;',
language_flag => DBMS_SQL.NATIVE,
parallel_level => pParallelDegree,
job_class => pJobClass
@@ -1182,7 +1196,7 @@ AS
ELSE
DBMS_PARALLEL_EXECUTE.RUN_TASK(
task_name => vTaskName,
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id); END;',
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id, ''' || vTaskName || '''); END;',
language_flag => DBMS_SQL.NATIVE,
parallel_level => pParallelDegree
);
@@ -1433,8 +1447,8 @@ AS
-- Populate chunks table (insert new chunks, preserve FAILED chunks for retry)
FOR i IN 1 .. vPartitions.COUNT LOOP
MERGE INTO CT_MRDS.A_PARALLEL_EXPORT_CHUNKS t
USING (SELECT i AS chunk_id, vPartitions(i).year AS yr, vPartitions(i).month AS mn FROM DUAL) s
ON (t.CHUNK_ID = s.chunk_id)
USING (SELECT i AS chunk_id, vTaskName AS task_name, vPartitions(i).year AS yr, vPartitions(i).month AS mn FROM DUAL) s
ON (t.CHUNK_ID = s.chunk_id AND t.TASK_NAME = s.task_name)
WHEN NOT MATCHED THEN
INSERT (CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME,
BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE,
@@ -1443,33 +1457,34 @@ AS
vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate,
pCredentialName, 'CSV', vFileBaseName, pTemplateTableName, pMaxFileSize, pJobClass, 'PENDING')
WHEN MATCHED THEN
UPDATE SET TASK_NAME = vTaskName,
STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END,
-- Match found: chunk exists for SAME task (composite PK: TASK_NAME, CHUNK_ID)
-- This handles retry scenario: reset FAILED chunks to PENDING for re-processing
UPDATE SET STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END,
ERROR_MESSAGE = CASE WHEN t.STATUS = 'FAILED' THEN NULL ELSE t.ERROR_MESSAGE END;
END LOOP;
COMMIT;
-- Log chunk statistics
-- Log chunk statistics (session-safe: only count chunks for THIS task)
DECLARE
vPendingCount NUMBER;
vFailedCount NUMBER;
BEGIN
SELECT COUNT(*) INTO vPendingCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'PENDING';
SELECT COUNT(*) INTO vFailedCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'FAILED';
SELECT COUNT(*) INTO vPendingCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'PENDING' AND TASK_NAME = vTaskName;
SELECT COUNT(*) INTO vFailedCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'FAILED' AND TASK_NAME = vTaskName;
ENV_MANAGER.LOG_PROCESS_EVENT('Chunk statistics: PENDING=' || vPendingCount || ', FAILED (retry)=' || vFailedCount, 'INFO', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Chunk statistics for task ' || vTaskName || ': PENDING=' || vPendingCount || ', FAILED (retry)=' || vFailedCount, 'INFO', vParameters);
END;
-- Create parallel task
DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => vTaskName);
-- Define chunks by number range (1 to partition count)
DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_NUMBER_COL(
-- Define chunks using SQL query to ensure TASK_NAME isolation
-- CRITICAL: Filter by TASK_NAME to avoid selecting chunks from other concurrent sessions
-- CRITICAL: Use START_ID and END_ID aliases to avoid ORA-00960 ambiguous column naming
DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(
task_name => vTaskName,
table_owner => 'CT_MRDS',
table_name => 'A_PARALLEL_EXPORT_CHUNKS',
table_column => 'CHUNK_ID',
chunk_size => 1 -- Each partition is one chunk
sql_stmt => 'SELECT CHUNK_ID AS START_ID, CHUNK_ID AS END_ID FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = ''' || vTaskName || ''' ORDER BY CHUNK_ID',
by_rowid => FALSE
);
-- Execute task in parallel
@@ -1478,7 +1493,7 @@ AS
IF pJobClass IS NOT NULL THEN
DBMS_PARALLEL_EXECUTE.RUN_TASK(
task_name => vTaskName,
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id); END;',
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id, ''' || vTaskName || '''); END;',
language_flag => DBMS_SQL.NATIVE,
parallel_level => pParallelDegree,
job_class => pJobClass
@@ -1486,7 +1501,7 @@ AS
ELSE
DBMS_PARALLEL_EXECUTE.RUN_TASK(
task_name => vTaskName,
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id); END;',
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id, ''' || vTaskName || '''); END;',
language_flag => DBMS_SQL.NATIVE,
parallel_level => pParallelDegree
);