Add JOB_CLASS parameter to DATA_EXPORTER for Oracle Scheduler support

This commit is contained in:
Grzegorz Michalski
2026-02-18 12:58:06 +01:00
parent 6ade77c3c0
commit b6cf54f103
3 changed files with 55 additions and 25 deletions

View File

@@ -501,6 +501,7 @@ AS
vFormat VARCHAR2(20);
vFileBaseName VARCHAR2(1000);
vMaxFileSize NUMBER;
vJobClass VARCHAR2(128);
vParameters VARCHAR2(4000);
BEGIN
-- Retrieve chunk context from global temporary table
@@ -518,7 +519,8 @@ AS
CREDENTIAL_NAME,
FORMAT_TYPE,
FILE_BASE_NAME,
MAX_FILE_SIZE
MAX_FILE_SIZE,
JOB_CLASS
INTO
vYear,
vMonth,
@@ -533,7 +535,8 @@ AS
vCredentialName,
vFormat,
vFileBaseName,
vMaxFileSize
vMaxFileSize,
vJobClass
FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
WHERE CHUNK_ID = pStartId;
@@ -953,6 +956,7 @@ AS
pMaxDate IN DATE default SYSDATE,
pParallelDegree IN NUMBER default 1,
pTemplateTableName IN VARCHAR2 default NULL,
pJobClass IN VARCHAR2 default NULL,
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
)
IS
@@ -976,6 +980,7 @@ AS
,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
,'pParallelDegree => '''||nvl(TO_CHAR(pParallelDegree), 'NULL')||''''
,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||''''
,'pJobClass => '''||nvl(pJobClass, 'NULL')||''''
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
));
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
@@ -1056,10 +1061,10 @@ AS
WHEN NOT MATCHED THEN
INSERT (CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME,
BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE,
CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE, STATUS)
CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE, JOB_CLASS, STATUS)
VALUES (i, vTaskName, vPartitions(i).year, vPartitions(i).month, vSchemaName, vTableName, vKeyColumnName,
vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate,
pCredentialName, 'PARQUET', NULL, pTemplateTableName, 104857600, 'PENDING')
pCredentialName, 'PARQUET', NULL, pTemplateTableName, 104857600, pJobClass, 'PENDING')
WHEN MATCHED THEN
UPDATE SET TASK_NAME = vTaskName,
STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END,
@@ -1091,14 +1096,24 @@ AS
);
-- Execute task in parallel
ENV_MANAGER.LOG_PROCESS_EVENT('Executing parallel task: ' || vTaskName, 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Executing parallel task: ' || vTaskName || CASE WHEN pJobClass IS NOT NULL THEN ' with job class: ' || pJobClass ELSE '' END, 'DEBUG', vParameters);
DBMS_PARALLEL_EXECUTE.RUN_TASK(
task_name => vTaskName,
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id); END;',
language_flag => DBMS_SQL.NATIVE,
parallel_level => pParallelDegree
);
IF pJobClass IS NOT NULL THEN
DBMS_PARALLEL_EXECUTE.RUN_TASK(
task_name => vTaskName,
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id); END;',
language_flag => DBMS_SQL.NATIVE,
parallel_level => pParallelDegree,
job_class => pJobClass
);
ELSE
DBMS_PARALLEL_EXECUTE.RUN_TASK(
task_name => vTaskName,
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id); END;',
language_flag => DBMS_SQL.NATIVE,
parallel_level => pParallelDegree
);
END IF;
-- Check for errors
DECLARE
@@ -1203,6 +1218,7 @@ AS
pMaxFileSize IN NUMBER default 104857600,
pRegisterExport IN BOOLEAN default FALSE,
pProcessName IN VARCHAR2 default 'DATA_EXPORTER',
pJobClass IN VARCHAR2 default NULL,
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
)
IS
@@ -1240,6 +1256,7 @@ AS
,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||''''
,'pMaxFileSize => '''||nvl(TO_CHAR(pMaxFileSize), 'NULL')||''''
,'pRegisterExport => '''||CASE WHEN pRegisterExport THEN 'TRUE' ELSE 'FALSE' END||''''
,'pJobClass => '''||nvl(pJobClass, 'NULL')||''''
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
));
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
@@ -1348,10 +1365,10 @@ AS
WHEN NOT MATCHED THEN
INSERT (CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME,
BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE,
CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE, STATUS)
CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE, JOB_CLASS, STATUS)
VALUES (i, vTaskName, vPartitions(i).year, vPartitions(i).month, vSchemaName, vTableName, vKeyColumnName,
vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate,
pCredentialName, 'CSV', vFileBaseName, pTemplateTableName, pMaxFileSize, 'PENDING')
pCredentialName, 'CSV', vFileBaseName, pTemplateTableName, pMaxFileSize, pJobClass, 'PENDING')
WHEN MATCHED THEN
UPDATE SET TASK_NAME = vTaskName,
STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END,
@@ -1383,14 +1400,24 @@ AS
);
-- Execute task in parallel
ENV_MANAGER.LOG_PROCESS_EVENT('Executing parallel CSV export task: ' || vTaskName, 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Executing parallel CSV export task: ' || vTaskName || CASE WHEN pJobClass IS NOT NULL THEN ' with job class: ' || pJobClass ELSE '' END, 'DEBUG', vParameters);
DBMS_PARALLEL_EXECUTE.RUN_TASK(
task_name => vTaskName,
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id); END;',
language_flag => DBMS_SQL.NATIVE,
parallel_level => pParallelDegree
);
IF pJobClass IS NOT NULL THEN
DBMS_PARALLEL_EXECUTE.RUN_TASK(
task_name => vTaskName,
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id); END;',
language_flag => DBMS_SQL.NATIVE,
parallel_level => pParallelDegree,
job_class => pJobClass
);
ELSE
DBMS_PARALLEL_EXECUTE.RUN_TASK(
task_name => vTaskName,
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id); END;',
language_flag => DBMS_SQL.NATIVE,
parallel_level => pParallelDegree
);
END IF;
-- Check for errors
DECLARE