pRegisterExport
This commit is contained in:
@@ -1004,9 +1004,6 @@ AS
|
||||
* Allows specifying custom column list or uses T.* if pColumnList is NULL.
|
||||
* Validates that all columns in pColumnList exist in the target table.
|
||||
* Automatically adds 'T.' prefix to column names in pColumnList.
|
||||
* When pRegisterExport=TRUE, successfully exported files are registered in:
|
||||
* - CT_MRDS.A_WORKFLOW_HISTORY (one record per YEAR/MONTH with export timestamp)
|
||||
* - CT_MRDS.A_SOURCE_FILE_RECEIVED (tracks file location and partition info)
|
||||
* @example
|
||||
* begin
|
||||
* DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
|
||||
@@ -1018,8 +1015,7 @@ AS
|
||||
* pFileName => 'my_export.csv',
|
||||
* pColumnList => 'COLUMN1, COLUMN2, COLUMN3', -- Optional
|
||||
* pMinDate => DATE '2024-01-01',
|
||||
* pMaxDate => SYSDATE,
|
||||
* pRegisterExport => TRUE -- Registers exports to tracking tables
|
||||
* pMaxDate => SYSDATE
|
||||
* );
|
||||
* end;
|
||||
**/
|
||||
@@ -1050,10 +1046,16 @@ AS
|
||||
vBucketUri VARCHAR2(4000);
|
||||
vCurrentCol VARCHAR2(128);
|
||||
vPartitions partition_tab;
|
||||
vWorkflowHistoryKey NUMBER;
|
||||
vSourceFileReceivedKey NUMBER;
|
||||
vFileName VARCHAR2(1000);
|
||||
vFileUri VARCHAR2(4000);
|
||||
-- Variables for A_SOURCE_FILE_CONFIG lookup
|
||||
vSourceKey VARCHAR2(100);
|
||||
vTableId VARCHAR2(200);
|
||||
vConfigKey NUMBER := -1;
|
||||
vSlashPos1 NUMBER;
|
||||
vSlashPos2 NUMBER;
|
||||
vFileUri VARCHAR2(4000);
|
||||
|
||||
BEGIN
|
||||
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
|
||||
@@ -1068,7 +1070,7 @@ AS
|
||||
,'pParallelDegree => '''||nvl(TO_CHAR(pParallelDegree), 'NULL')||''''
|
||||
,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||''''
|
||||
,'pMaxFileSize => '''||nvl(TO_CHAR(pMaxFileSize), 'NULL')||''''
|
||||
,'pRegisterExport => '''||CASE WHEN pRegisterExport THEN 'TRUE' ELSE 'FALSE' END||''''
|
||||
,'pRegisterExport => '''||CASE WHEN pRegisterExport THEN 'TRUE' ELSE 'FALSE' END||''''
|
||||
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
|
||||
));
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
|
||||
@@ -1145,58 +1147,6 @@ AS
|
||||
pMaxFileSize => pMaxFileSize,
|
||||
pParameters => vParameters
|
||||
);
|
||||
|
||||
-- Register export if requested
|
||||
IF pRegisterExport THEN
|
||||
-- Construct filename and URI for this partition
|
||||
vFileName := NVL(vFileBaseName, UPPER(REPLACE(vTableName, vSchemaName || '.', ''))) || '_' || vPartitions(i).year || vPartitions(i).month || '.csv';
|
||||
vFileUri := vBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || sanitizeFilename(vFileName);
|
||||
|
||||
-- Create A_WORKFLOW_HISTORY record for this export (one per year/month)
|
||||
vWorkflowHistoryKey := CT_MRDS.A_WORKFLOW_HISTORY_KEY_SEQ.NEXTVAL;
|
||||
INSERT INTO CT_MRDS.A_WORKFLOW_HISTORY (
|
||||
A_WORKFLOW_HISTORY_KEY,
|
||||
SERVICE_NAME,
|
||||
ORCHESTRATION_RUN_ID,
|
||||
WORKFLOW_NAME,
|
||||
WORKFLOW_START,
|
||||
WORKFLOW_END,
|
||||
WORKFLOW_SUCCESSFUL
|
||||
) VALUES (
|
||||
vWorkflowHistoryKey,
|
||||
'DATA_EXPORTER',
|
||||
'CSV_EXPORT_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF'),
|
||||
'DATA_EXPORT_' || UPPER(REPLACE(vTableName, vSchemaName || '.', '')) || '_' || vPartitions(i).year || vPartitions(i).month,
|
||||
SYSTIMESTAMP,
|
||||
SYSTIMESTAMP,
|
||||
'Y'
|
||||
);
|
||||
|
||||
-- Create A_SOURCE_FILE_RECEIVED record for this export
|
||||
vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL;
|
||||
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
|
||||
A_SOURCE_FILE_RECEIVED_KEY,
|
||||
A_SOURCE_FILE_CONFIG_KEY,
|
||||
SOURCE_FILE_NAME,
|
||||
RECEPTION_DATE,
|
||||
PROCESSING_STATUS,
|
||||
PARTITION_YEAR,
|
||||
PARTITION_MONTH,
|
||||
ARCH_FILE_NAME
|
||||
) VALUES (
|
||||
vSourceFileReceivedKey,
|
||||
-1, -- Special marker for exported files (no config)
|
||||
vFileUri,
|
||||
SYSDATE,
|
||||
'ARCHIVED',
|
||||
vPartitions(i).year,
|
||||
vPartitions(i).month,
|
||||
vFileName
|
||||
);
|
||||
|
||||
COMMIT;
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Registered export: WorkflowKey=' || vWorkflowHistoryKey || ', FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vFileName, 'INFO', vParameters);
|
||||
END IF;
|
||||
END LOOP;
|
||||
|
||||
-- Parallel processing (parallel degree > 1)
|
||||
@@ -1291,64 +1241,6 @@ AS
|
||||
-- Clean up task
|
||||
DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName);
|
||||
|
||||
-- Register exports if requested (after successful parallel processing)
|
||||
IF pRegisterExport THEN
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Registering ' || vPartitions.COUNT || ' parallel exports to A_WORKFLOW_HISTORY and A_SOURCE_FILE_RECEIVED', 'INFO', vParameters);
|
||||
|
||||
FOR i IN 1 .. vPartitions.COUNT LOOP
|
||||
-- Construct filename and URI for this partition
|
||||
vFileName := NVL(vFileBaseName, UPPER(REPLACE(vTableName, vSchemaName || '.', ''))) || '_' || vPartitions(i).year || vPartitions(i).month || '.csv';
|
||||
vFileUri := vBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || sanitizeFilename(vFileName);
|
||||
|
||||
-- Create A_WORKFLOW_HISTORY record for this export (one per year/month)
|
||||
vWorkflowHistoryKey := CT_MRDS.A_WORKFLOW_HISTORY_KEY_SEQ.NEXTVAL;
|
||||
INSERT INTO CT_MRDS.A_WORKFLOW_HISTORY (
|
||||
A_WORKFLOW_HISTORY_KEY,
|
||||
SERVICE_NAME,
|
||||
ORCHESTRATION_RUN_ID,
|
||||
WORKFLOW_NAME,
|
||||
WORKFLOW_START,
|
||||
WORKFLOW_END,
|
||||
WORKFLOW_SUCCESSFUL
|
||||
) VALUES (
|
||||
vWorkflowHistoryKey,
|
||||
'DATA_EXPORTER',
|
||||
'CSV_EXPORT_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF'),
|
||||
'DATA_EXPORT_' || UPPER(REPLACE(vTableName, vSchemaName || '.', '')) || '_' || vPartitions(i).year || vPartitions(i).month,
|
||||
SYSTIMESTAMP,
|
||||
SYSTIMESTAMP,
|
||||
'Y'
|
||||
);
|
||||
|
||||
-- Create A_SOURCE_FILE_RECEIVED record for this export
|
||||
vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL;
|
||||
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
|
||||
A_SOURCE_FILE_RECEIVED_KEY,
|
||||
A_SOURCE_FILE_CONFIG_KEY,
|
||||
SOURCE_FILE_NAME,
|
||||
RECEPTION_DATE,
|
||||
PROCESSING_STATUS,
|
||||
PARTITION_YEAR,
|
||||
PARTITION_MONTH,
|
||||
ARCH_FILE_NAME
|
||||
) VALUES (
|
||||
vSourceFileReceivedKey,
|
||||
-1, -- Special marker for exported files (no config)
|
||||
vFileUri,
|
||||
SYSDATE,
|
||||
'ARCHIVED',
|
||||
vPartitions(i).year,
|
||||
vPartitions(i).month,
|
||||
vFileName
|
||||
);
|
||||
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Registered parallel export: WorkflowKey=' || vWorkflowHistoryKey || ', FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vFileName, 'DEBUG', vParameters);
|
||||
END LOOP;
|
||||
|
||||
COMMIT;
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Successfully registered all ' || vPartitions.COUNT || ' exports', 'INFO', vParameters);
|
||||
END IF;
|
||||
|
||||
-- Clean up chunks for THIS specific task only (session-safe)
|
||||
-- CRITICAL: Use TASK_NAME filter to avoid deleting chunks from other active CSV sessions
|
||||
DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = vTaskName;
|
||||
@@ -1371,6 +1263,130 @@ AS
|
||||
END IF;
|
||||
END IF;
|
||||
|
||||
-- Register exported files to A_SOURCE_FILE_RECEIVED if requested (after successful export)
|
||||
IF pRegisterExport THEN
|
||||
-- Lookup A_SOURCE_FILE_CONFIG_KEY based on pFolderName parsing
|
||||
-- Format: {BUCKET_AREA}/{SOURCE_KEY}/{TABLE_ID}
|
||||
-- Example: 'ODS/CSDB/CSDB_DEBT_DAILY' -> SOURCE_KEY='CSDB', TABLE_ID='CSDB_DEBT_DAILY'
|
||||
|
||||
-- Parse pFolderName to extract SOURCE_KEY and TABLE_ID
|
||||
vSlashPos1 := INSTR(pFolderName, '/', 1, 1); -- First '/' position
|
||||
vSlashPos2 := INSTR(pFolderName, '/', 1, 2); -- Second '/' position
|
||||
|
||||
IF vSlashPos1 > 0 AND vSlashPos2 > 0 THEN
|
||||
-- Extract segment 2 (SOURCE_KEY) and segment 3 (TABLE_ID)
|
||||
vSourceKey := SUBSTR(pFolderName, vSlashPos1 + 1, vSlashPos2 - vSlashPos1 - 1);
|
||||
vTableId := SUBSTR(pFolderName, vSlashPos2 + 1);
|
||||
|
||||
-- Find configuration based on SOURCE_KEY and TABLE_ID
|
||||
BEGIN
|
||||
SELECT A_SOURCE_FILE_CONFIG_KEY
|
||||
INTO vConfigKey
|
||||
FROM CT_MRDS.A_SOURCE_FILE_CONFIG
|
||||
WHERE A_SOURCE_KEY = vSourceKey
|
||||
AND TABLE_ID = vTableId
|
||||
AND SOURCE_FILE_TYPE = 'INPUT'
|
||||
AND ROWNUM = 1;
|
||||
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Found config key: ' || vConfigKey || ' for SOURCE=' || vSourceKey || ', TABLE=' || vTableId, 'DEBUG', vParameters);
|
||||
EXCEPTION
|
||||
WHEN NO_DATA_FOUND THEN
|
||||
vConfigKey := -1;
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('No config found for SOURCE=' || vSourceKey || ', TABLE=' || vTableId || ' - using default (-1)', 'INFO', vParameters);
|
||||
END;
|
||||
ELSE
|
||||
-- Cannot parse folder name - use default
|
||||
vConfigKey := -1;
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Cannot parse pFolderName: ' || pFolderName || ' - using default (-1)', 'WARNING', vParameters);
|
||||
END IF;
|
||||
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Registering ' || vPartitions.COUNT || ' exported files to A_SOURCE_FILE_RECEIVED with config key: ' || vConfigKey, 'INFO', vParameters);
|
||||
|
||||
FOR i IN 1 .. vPartitions.COUNT LOOP
|
||||
-- Construct filename and URI for this partition
|
||||
vFileName := NVL(vFileBaseName, UPPER(REPLACE(vTableName, vSchemaName || '.', ''))) || '_' || vPartitions(i).year || vPartitions(i).month || '.csv';
|
||||
vFileUri := vBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || sanitizeFilename(vFileName);
|
||||
|
||||
-- Get file metadata from OCI bucket (CHECKSUM, CREATED, BYTES)
|
||||
DECLARE
|
||||
vChecksum VARCHAR2(128);
|
||||
vCreated TIMESTAMP WITH TIME ZONE;
|
||||
vBytes NUMBER;
|
||||
vSanitizedFileName VARCHAR2(1000);
|
||||
BEGIN
|
||||
-- Sanitize filename first (PL/SQL function cannot be used directly in SQL)
|
||||
vSanitizedFileName := sanitizeFilename(vFileName);
|
||||
|
||||
SELECT checksum, created, bytes
|
||||
INTO vChecksum, vCreated, vBytes
|
||||
FROM TABLE(DBMS_CLOUD.LIST_OBJECTS(
|
||||
credential_name => pCredentialName,
|
||||
location_uri => vBucketUri
|
||||
))
|
||||
WHERE object_name = CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || vSanitizedFileName;
|
||||
|
||||
-- Create A_SOURCE_FILE_RECEIVED record for this export with metadata
|
||||
vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL;
|
||||
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
|
||||
A_SOURCE_FILE_RECEIVED_KEY,
|
||||
A_SOURCE_FILE_CONFIG_KEY,
|
||||
SOURCE_FILE_NAME,
|
||||
CHECKSUM,
|
||||
CREATED,
|
||||
BYTES,
|
||||
RECEPTION_DATE,
|
||||
PROCESSING_STATUS,
|
||||
PARTITION_YEAR,
|
||||
PARTITION_MONTH,
|
||||
ARCH_FILE_NAME
|
||||
) VALUES (
|
||||
vSourceFileReceivedKey,
|
||||
vConfigKey, -- Config key from A_SOURCE_FILE_CONFIG lookup
|
||||
vFileUri,
|
||||
vChecksum,
|
||||
vCreated,
|
||||
vBytes,
|
||||
SYSDATE,
|
||||
'ARCHIVED',
|
||||
vPartitions(i).year,
|
||||
vPartitions(i).month,
|
||||
vFileName
|
||||
);
|
||||
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Registered file: FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vFileName || ', Size=' || vBytes || ' bytes', 'DEBUG', vParameters);
|
||||
EXCEPTION
|
||||
WHEN NO_DATA_FOUND THEN
|
||||
-- File not found in bucket - log warning but continue
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('WARNING: File not found in bucket for metadata lookup: ' || vFileName, 'WARNING', vParameters);
|
||||
|
||||
-- Insert without metadata
|
||||
vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL;
|
||||
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
|
||||
A_SOURCE_FILE_RECEIVED_KEY,
|
||||
A_SOURCE_FILE_CONFIG_KEY,
|
||||
SOURCE_FILE_NAME,
|
||||
RECEPTION_DATE,
|
||||
PROCESSING_STATUS,
|
||||
PARTITION_YEAR,
|
||||
PARTITION_MONTH,
|
||||
ARCH_FILE_NAME
|
||||
) VALUES (
|
||||
vSourceFileReceivedKey,
|
||||
vConfigKey, -- Config key from A_SOURCE_FILE_CONFIG lookup
|
||||
vFileUri,
|
||||
SYSDATE,
|
||||
'ARCHIVED',
|
||||
vPartitions(i).year,
|
||||
vPartitions(i).month,
|
||||
vFileName
|
||||
);
|
||||
END;
|
||||
END LOOP;
|
||||
|
||||
COMMIT;
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Successfully registered all ' || vPartitions.COUNT || ' files', 'INFO', vParameters);
|
||||
END IF;
|
||||
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Export completed successfully for ' || vPartitions.COUNT || ' files', 'INFO', vParameters);
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user