Add DATA_EXPORTER package and update installation scripts for export registration

- Created new package CT_MRDS.DATA_EXPORTER (v2.6.3) for comprehensive data export capabilities, including CSV and Parquet formats with OCI integration.
- Implemented version history tracking and enhanced error handling for export processes.
- Updated installation scripts to include pRegisterExport parameter for registering exports in A_WORKFLOW_HISTORY and A_SOURCE_FILE_RECEIVED.
This commit is contained in:
Grzegorz Michalski
2026-02-10 09:33:46 +01:00
parent 293f2873b7
commit 6c8b22eac9
5 changed files with 1685 additions and 24 deletions

View File

@@ -1004,6 +1004,9 @@ AS
* Allows specifying custom column list or uses T.* if pColumnList is NULL.
* Validates that all columns in pColumnList exist in the target table.
* Automatically adds 'T.' prefix to column names in pColumnList.
* When pRegisterExport=TRUE, successfully exported files are registered in:
* - CT_MRDS.A_WORKFLOW_HISTORY (one record per YEAR/MONTH with export timestamp)
* - CT_MRDS.A_SOURCE_FILE_RECEIVED (tracks file location and partition info)
* @example
* begin
* DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
@@ -1015,7 +1018,8 @@ AS
* pFileName => 'my_export.csv',
* pColumnList => 'COLUMN1, COLUMN2, COLUMN3', -- Optional
* pMinDate => DATE '2024-01-01',
* pMaxDate => SYSDATE
* pMaxDate => SYSDATE,
* pRegisterExport => TRUE -- Registers exports to tracking tables
* );
* end;
**/
@@ -1032,6 +1036,7 @@ AS
pParallelDegree IN NUMBER default 1,
pTemplateTableName IN VARCHAR2 default NULL,
pMaxFileSize IN NUMBER default 104857600,
pRegisterExport IN BOOLEAN default FALSE,
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
)
IS
@@ -1045,6 +1050,10 @@ AS
vBucketUri VARCHAR2(4000);
vCurrentCol VARCHAR2(128);
vPartitions partition_tab;
vWorkflowHistoryKey NUMBER;
vSourceFileReceivedKey NUMBER;
vFileName VARCHAR2(1000);
vFileUri VARCHAR2(4000);
BEGIN
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
@@ -1059,6 +1068,7 @@ AS
,'pParallelDegree => '''||nvl(TO_CHAR(pParallelDegree), 'NULL')||''''
,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||''''
,'pMaxFileSize => '''||nvl(TO_CHAR(pMaxFileSize), 'NULL')||''''
,'pRegisterExport => '''||CASE WHEN pRegisterExport THEN 'TRUE' ELSE 'FALSE' END||''''
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
));
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
@@ -1135,6 +1145,58 @@ AS
pMaxFileSize => pMaxFileSize,
pParameters => vParameters
);
-- Register export if requested
IF pRegisterExport THEN
-- Construct filename and URI for this partition
vFileName := NVL(vFileBaseName, UPPER(REPLACE(vTableName, vSchemaName || '.', ''))) || '_' || vPartitions(i).year || vPartitions(i).month || '.csv';
vFileUri := vBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || sanitizeFilename(vFileName);
-- Create A_WORKFLOW_HISTORY record for this export (one per year/month)
vWorkflowHistoryKey := CT_MRDS.A_WORKFLOW_HISTORY_KEY_SEQ.NEXTVAL;
INSERT INTO CT_MRDS.A_WORKFLOW_HISTORY (
A_WORKFLOW_HISTORY_KEY,
SERVICE_NAME,
ORCHESTRATION_RUN_ID,
WORKFLOW_NAME,
WORKFLOW_START,
WORKFLOW_END,
WORKFLOW_SUCCESSFUL
) VALUES (
vWorkflowHistoryKey,
'DATA_EXPORTER',
'CSV_EXPORT_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF'),
'DATA_EXPORT_' || UPPER(REPLACE(vTableName, vSchemaName || '.', '')) || '_' || vPartitions(i).year || vPartitions(i).month,
SYSTIMESTAMP,
SYSTIMESTAMP,
'Y'
);
-- Create A_SOURCE_FILE_RECEIVED record for this export
vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL;
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
A_SOURCE_FILE_RECEIVED_KEY,
A_SOURCE_FILE_CONFIG_KEY,
SOURCE_FILE_NAME,
RECEPTION_DATE,
PROCESSING_STATUS,
PARTITION_YEAR,
PARTITION_MONTH,
ARCH_FILE_NAME
) VALUES (
vSourceFileReceivedKey,
-1, -- Special marker for exported files (no config)
vFileUri,
SYSDATE,
'ARCHIVED',
vPartitions(i).year,
vPartitions(i).month,
vFileName
);
COMMIT;
ENV_MANAGER.LOG_PROCESS_EVENT('Registered export: WorkflowKey=' || vWorkflowHistoryKey || ', FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vFileName, 'INFO', vParameters);
END IF;
END LOOP;
-- Parallel processing (parallel degree > 1)
@@ -1229,6 +1291,64 @@ AS
-- Clean up task
DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName);
-- Register exports if requested (after successful parallel processing)
IF pRegisterExport THEN
ENV_MANAGER.LOG_PROCESS_EVENT('Registering ' || vPartitions.COUNT || ' parallel exports to A_WORKFLOW_HISTORY and A_SOURCE_FILE_RECEIVED', 'INFO', vParameters);
FOR i IN 1 .. vPartitions.COUNT LOOP
-- Construct filename and URI for this partition
vFileName := NVL(vFileBaseName, UPPER(REPLACE(vTableName, vSchemaName || '.', ''))) || '_' || vPartitions(i).year || vPartitions(i).month || '.csv';
vFileUri := vBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || sanitizeFilename(vFileName);
-- Create A_WORKFLOW_HISTORY record for this export (one per year/month)
vWorkflowHistoryKey := CT_MRDS.A_WORKFLOW_HISTORY_KEY_SEQ.NEXTVAL;
INSERT INTO CT_MRDS.A_WORKFLOW_HISTORY (
A_WORKFLOW_HISTORY_KEY,
SERVICE_NAME,
ORCHESTRATION_RUN_ID,
WORKFLOW_NAME,
WORKFLOW_START,
WORKFLOW_END,
WORKFLOW_SUCCESSFUL
) VALUES (
vWorkflowHistoryKey,
'DATA_EXPORTER',
'CSV_EXPORT_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF'),
'DATA_EXPORT_' || UPPER(REPLACE(vTableName, vSchemaName || '.', '')) || '_' || vPartitions(i).year || vPartitions(i).month,
SYSTIMESTAMP,
SYSTIMESTAMP,
'Y'
);
-- Create A_SOURCE_FILE_RECEIVED record for this export
vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL;
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
A_SOURCE_FILE_RECEIVED_KEY,
A_SOURCE_FILE_CONFIG_KEY,
SOURCE_FILE_NAME,
RECEPTION_DATE,
PROCESSING_STATUS,
PARTITION_YEAR,
PARTITION_MONTH,
ARCH_FILE_NAME
) VALUES (
vSourceFileReceivedKey,
-1, -- Special marker for exported files (no config)
vFileUri,
SYSDATE,
'ARCHIVED',
vPartitions(i).year,
vPartitions(i).month,
vFileName
);
ENV_MANAGER.LOG_PROCESS_EVENT('Registered parallel export: WorkflowKey=' || vWorkflowHistoryKey || ', FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vFileName, 'DEBUG', vParameters);
END LOOP;
COMMIT;
ENV_MANAGER.LOG_PROCESS_EVENT('Successfully registered all ' || vPartitions.COUNT || ' exports', 'INFO', vParameters);
END IF;
-- Clean up chunks for THIS specific task only (session-safe)
-- CRITICAL: Use TASK_NAME filter to avoid deleting chunks from other active CSV sessions
DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = vTaskName;