Update DATA_EXPORTER package to version 2.8.0, refactor EXPORT_TABLE_DATA to export to a single CSV file, and add pFileName and pMaxFileSize parameters.

This commit is contained in:
Grzegorz Michalski
2026-02-12 09:56:57 +01:00
parent a7d286b1e6
commit 6039d6bce4
2 changed files with 167 additions and 203 deletions

View File

@@ -602,20 +602,16 @@ AS
pKeyColumnName IN VARCHAR2,
pBucketArea IN VARCHAR2,
pFolderName IN VARCHAR2,
pFileName IN VARCHAR2 default NULL,
pTemplateTableName IN VARCHAR2 default NULL,
pMaxFileSize IN NUMBER default 104857600,
pRegisterExport IN BOOLEAN default FALSE,
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
)
IS
-- Type definition for key values
TYPE key_value_tab IS TABLE OF VARCHAR2(4000);
vKeyValues key_value_tab;
vCount INTEGER;
vSql VARCHAR2(4000);
vKeyValue VARCHAR2(4000);
vQuery VARCHAR2(32767);
vUri VARCHAR2(4000);
vDataType VARCHAR2(30);
vTableName VARCHAR2(128);
vSchemaName VARCHAR2(128);
vKeyColumnName VARCHAR2(128);
@@ -638,7 +634,9 @@ AS
,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||''''
,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||''''
,'pFolderName => '''||nvl(pFolderName, 'NULL')||''''
,'pFileName => '''||nvl(pFileName, 'NULL')||''''
,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||''''
,'pMaxFileSize => '''||nvl(TO_CHAR(pMaxFileSize), 'NULL')||''''
,'pRegisterExport => '''||CASE WHEN pRegisterExport THEN 'TRUE' ELSE 'FALSE' END||''''
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
));
@@ -671,16 +669,8 @@ AS
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
END IF;
-- Get the data type of the key column
SELECT data_type INTO vDataType
FROM all_tab_columns
WHERE table_name = vTableName
AND column_name = vKeyColumnName
AND owner = vSchemaName;
-- Validate template table if provided
IF pTemplateTableName IS NOT NULL THEN
DECLARE
@@ -760,183 +750,158 @@ AS
ENV_MANAGER.LOG_PROCESS_EVENT('File registration enabled with config key: ' || vConfigKey, 'INFO', vParameters);
END IF;
-- Fetch unique key values from A_LOAD_HISTORY
vSql := 'SELECT DISTINCT L.A_ETL_LOAD_SET_KEY' ||
' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' ||
' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY';
-- Construct single query for entire table (no key value partitioning)
vQuery := 'SELECT ' || vProcessedColumnList ||
' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' ||
' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY';
-- Construct the URI for the file in OCI Object Storage
vUri := vBucketUri ||
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
NVL(pFileName, UPPER(vTableName) || '.csv');
ENV_MANAGER.LOG_PROCESS_EVENT('Exporting to single file: ' || vUri, 'INFO', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Max file size: ' || pMaxFileSize || ' bytes (' || ROUND(pMaxFileSize/1048576, 2) || ' MB)', 'DEBUG', vParameters);
-- Use DBMS_CLOUD package to export data to the URI
-- Oracle maxfilesize: min 10MB (10485760), max 1GB (1073741824), default 100MB (104857600)
DBMS_CLOUD.EXPORT_DATA(
credential_name => pCredentialName,
file_uri_list => vUri,
query => vQuery,
format => json_object(
'type' VALUE 'CSV',
'header' VALUE true,
'quote' VALUE CHR(34),
'delimiter' VALUE ',',
'escape' VALUE true,
'recorddelimiter' VALUE CHR(13)||CHR(10), -- CRLF dla Windows
'maxfilesize' VALUE pMaxFileSize -- Dynamic maxfilesize in bytes
)
);
ENV_MANAGER.LOG_PROCESS_EVENT('Executing key values query: ' || vSql, 'DEBUG', vParameters);
EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValues;
ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValues.COUNT || ' unique key values to process', 'DEBUG', vParameters);
-- Loop over each unique key value
FOR i IN 1 .. vKeyValues.COUNT LOOP
vKeyValue := vKeyValues(i);
-- Construct the query to extract data for the current key value with A_WORKFLOW_HISTORY_KEY mapping
IF vDataType IN ('VARCHAR2', 'CHAR', 'NCHAR', 'NVARCHAR2') THEN
vQuery := 'SELECT ' || vProcessedColumnList ||
' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' ||
' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' ||
' AND L.A_ETL_LOAD_SET_KEY = ' || CHR(39) || vKeyValue || CHR(39);
ELSIF vDataType IN ('NUMBER', 'FLOAT', 'BINARY_FLOAT', 'BINARY_DOUBLE') THEN
vQuery := 'SELECT ' || vProcessedColumnList ||
' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' ||
' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' ||
' AND L.A_ETL_LOAD_SET_KEY = ' || vKeyValue;
ELSIF vDataType LIKE 'TIMESTAMP%' OR vDataType = 'DATE' THEN
vQuery := 'SELECT ' || vProcessedColumnList ||
' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' ||
' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' ||
' AND L.A_ETL_LOAD_SET_KEY = TO_TIMESTAMP(' || CHR(39) || vKeyValue || CHR(39) ||', ''YYYY-MM-DD HH24:MI:SS.FF'')';
ELSE
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNSUPPORTED_DATA_TYPE, ENV_MANAGER.MSG_UNSUPPORTED_DATA_TYPE);
END IF;
-- Construct the URI for the file in OCI Object Storage
vUri := vBucketUri ||
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
sanitizeFilename(vKeyValue) || '.csv';
ENV_MANAGER.LOG_PROCESS_EVENT('Processing key value: ' || vKeyValue || ' (' || (i) || '/' || vKeyValues.COUNT || ')', 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Export URI: ' || vUri, 'DEBUG', vParameters);
-- Use DBMS_CLOUD package to export data to the URI
DBMS_CLOUD.EXPORT_DATA(
credential_name => pCredentialName,
file_uri_list => vUri,
query => vQuery,
format => json_object('type' VALUE 'CSV', 'header' VALUE true)
);
-- Register exported file to A_SOURCE_FILE_RECEIVED if requested
IF pRegisterExport THEN
DECLARE
vChecksum VARCHAR2(128);
vCreated TIMESTAMP WITH TIME ZONE;
vBytes NUMBER;
vActualFileName VARCHAR2(1000); -- Actual filename with Oracle suffix
vSanitizedFileName VARCHAR2(1000);
vFileName VARCHAR2(1000);
vRetryCount NUMBER := 0;
vMaxRetries NUMBER := 1; -- One retry after initial attempt
vRetryDelay NUMBER := 2; -- 2 seconds delay
BEGIN
-- Extract filename from URI (after last '/')
vFileName := SUBSTR(vUri, INSTR(vUri, '/', -1) + 1);
-- Sanitize filename first (PL/SQL function cannot be used directly in SQL)
vSanitizedFileName := sanitizeFilename(vFileName);
-- Remove .csv extension for LIKE pattern matching (Oracle adds suffixes BEFORE .csv)
-- Example: keyvalue.csv becomes keyvalue_1_20260211T102621591769Z.csv
vSanitizedFileName := REGEXP_REPLACE(vSanitizedFileName, '\.csv$', '', 1, 0, 'i');
-- Try to get file metadata with retry logic
<<metadata_retry_loop>>
LOOP
BEGIN
SELECT object_name, checksum, created, bytes
INTO vActualFileName, vChecksum, vCreated, vBytes
FROM TABLE(DBMS_CLOUD.LIST_OBJECTS(
credential_name => pCredentialName,
location_uri => vBucketUri
))
WHERE object_name LIKE CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || vSanitizedFileName || '%'
ORDER BY created DESC, bytes DESC
FETCH FIRST 1 ROW ONLY;
-- Extract filename only from full path (remove bucket folder prefix)
vActualFileName := SUBSTR(vActualFileName, INSTR(vActualFileName, '/', -1) + 1);
-- Success - exit retry loop
EXIT metadata_retry_loop;
EXCEPTION
WHEN NO_DATA_FOUND THEN
vRetryCount := vRetryCount + 1;
IF vRetryCount <= vMaxRetries THEN
-- Log retry attempt
ENV_MANAGER.LOG_PROCESS_EVENT('File not found in bucket (attempt ' || vRetryCount || '/' || (vMaxRetries + 1) || '), retrying after ' || vRetryDelay || ' seconds: ' || vFileName, 'DEBUG', vParameters);
-- Wait before retry using DBMS_SESSION.SLEEP (alternative to DBMS_LOCK)
DBMS_SESSION.SLEEP(vRetryDelay);
ELSE
-- Max retries exceeded - re-raise exception
RAISE;
END IF;
END;
END LOOP metadata_retry_loop;
-- Create A_SOURCE_FILE_RECEIVED record for this export with metadata
vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL;
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
A_SOURCE_FILE_RECEIVED_KEY,
A_SOURCE_FILE_CONFIG_KEY,
SOURCE_FILE_NAME,
CHECKSUM,
CREATED,
BYTES,
RECEPTION_DATE,
PROCESSING_STATUS,
PARTITION_YEAR,
PARTITION_MONTH,
ARCH_FILE_NAME
) VALUES (
vSourceFileReceivedKey,
NVL(vConfigKey, -1), -- Use config key if found, otherwise -1
vActualFileName, -- Use actual filename with Oracle suffix
vChecksum,
vCreated,
vBytes,
SYSDATE,
'INGESTED',
NULL, -- PARTITION_YEAR not used for single-file exports
NULL, -- PARTITION_MONTH not used for single-file exports
NULL -- ARCH_FILE_NAME not used for single-file exports
);
ENV_MANAGER.LOG_PROCESS_EVENT('Registered file: FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vActualFileName || ', Size=' || vBytes || ' bytes', 'DEBUG', vParameters);
EXCEPTION
WHEN NO_DATA_FOUND THEN
-- File not found after retries - log warning and continue without metadata
ENV_MANAGER.LOG_PROCESS_EVENT('WARNING: File not found in bucket after ' || (vMaxRetries + 1) || ' attempts: ' || vFileName, 'WARNING', vParameters);
-- Sanitize filename for fallback INSERT (function cannot be used in SQL)
vSanitizedFileName := sanitizeFilename(vFileName);
-- Insert without metadata using theoretical filename
vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL;
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
A_SOURCE_FILE_RECEIVED_KEY,
A_SOURCE_FILE_CONFIG_KEY,
SOURCE_FILE_NAME,
RECEPTION_DATE,
PROCESSING_STATUS,
PARTITION_YEAR,
PARTITION_MONTH,
ARCH_FILE_NAME
) VALUES (
vSourceFileReceivedKey,
NVL(vConfigKey, -1), -- Use config key if found, otherwise -1
vSanitizedFileName, -- Use pre-calculated sanitized filename
SYSDATE,
'INGESTED',
NULL, -- PARTITION_YEAR not used for single-file exports
NULL, -- PARTITION_MONTH not used for single-file exports
NULL -- ARCH_FILE_NAME not used for single-file exports
);
ENV_MANAGER.LOG_PROCESS_EVENT('Registered file without metadata: FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vSanitizedFileName, 'DEBUG', vParameters);
END;
END IF;
END LOOP;
-- Log summary of file registration if enabled
-- Register exported file to A_SOURCE_FILE_RECEIVED if requested
IF pRegisterExport THEN
ENV_MANAGER.LOG_PROCESS_EVENT('Registered ' || vKeyValues.COUNT || ' exported files to A_SOURCE_FILE_RECEIVED with config key: ' || vConfigKey, 'INFO', vParameters);
DECLARE
vChecksum VARCHAR2(128);
vCreated TIMESTAMP WITH TIME ZONE;
vBytes NUMBER;
vActualFileName VARCHAR2(1000); -- Actual filename with Oracle suffix
vSanitizedFileName VARCHAR2(1000);
vFileName VARCHAR2(1000);
vRetryCount NUMBER := 0;
vMaxRetries NUMBER := 1; -- One retry after initial attempt
vRetryDelay NUMBER := 2; -- 2 seconds delay
BEGIN
-- Extract filename from URI (after last '/')
vFileName := SUBSTR(vUri, INSTR(vUri, '/', -1) + 1);
-- Sanitize filename first (PL/SQL function cannot be used directly in SQL)
vSanitizedFileName := sanitizeFilename(vFileName);
-- Remove .csv extension for LIKE pattern matching (Oracle adds suffixes BEFORE .csv)
-- Example: tablename.csv becomes tablename_1_20260211T102621591769Z.csv
vSanitizedFileName := REGEXP_REPLACE(vSanitizedFileName, '\.csv$', '', 1, 0, 'i');
-- Try to get file metadata with retry logic
<<metadata_retry_loop>>
LOOP
BEGIN
SELECT object_name, checksum, created, bytes
INTO vActualFileName, vChecksum, vCreated, vBytes
FROM TABLE(DBMS_CLOUD.LIST_OBJECTS(
credential_name => pCredentialName,
location_uri => vBucketUri
))
WHERE object_name LIKE CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || vSanitizedFileName || '%'
ORDER BY created DESC, bytes DESC
FETCH FIRST 1 ROW ONLY;
-- Extract filename only from full path (remove bucket folder prefix)
vActualFileName := SUBSTR(vActualFileName, INSTR(vActualFileName, '/', -1) + 1);
-- Success - exit retry loop
EXIT metadata_retry_loop;
EXCEPTION
WHEN NO_DATA_FOUND THEN
vRetryCount := vRetryCount + 1;
IF vRetryCount <= vMaxRetries THEN
-- Log retry attempt
ENV_MANAGER.LOG_PROCESS_EVENT('File not found in bucket (attempt ' || vRetryCount || '/' || (vMaxRetries + 1) || '), retrying after ' || vRetryDelay || ' seconds: ' || vFileName, 'DEBUG', vParameters);
-- Wait before retry using DBMS_SESSION.SLEEP (alternative to DBMS_LOCK)
DBMS_SESSION.SLEEP(vRetryDelay);
ELSE
-- Max retries exceeded - re-raise exception
RAISE;
END IF;
END;
END LOOP metadata_retry_loop;
-- Create A_SOURCE_FILE_RECEIVED record for this export with metadata
vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL;
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
A_SOURCE_FILE_RECEIVED_KEY,
A_SOURCE_FILE_CONFIG_KEY,
SOURCE_FILE_NAME,
CHECKSUM,
CREATED,
BYTES,
RECEPTION_DATE,
PROCESSING_STATUS,
PARTITION_YEAR,
PARTITION_MONTH,
ARCH_FILE_NAME
) VALUES (
vSourceFileReceivedKey,
NVL(vConfigKey, -1), -- Use config key if found, otherwise -1
vActualFileName, -- Use actual filename with Oracle suffix
vChecksum,
vCreated,
vBytes,
SYSDATE,
'INGESTED',
NULL, -- PARTITION_YEAR not used for single-file exports
NULL, -- PARTITION_MONTH not used for single-file exports
NULL -- ARCH_FILE_NAME not used for single-file exports
);
ENV_MANAGER.LOG_PROCESS_EVENT('Registered file: FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vActualFileName || ', Size=' || vBytes || ' bytes', 'INFO', vParameters);
EXCEPTION
WHEN NO_DATA_FOUND THEN
-- File not found after retries - log warning and continue without metadata
ENV_MANAGER.LOG_PROCESS_EVENT('WARNING: File not found in bucket after ' || (vMaxRetries + 1) || ' attempts: ' || vFileName, 'WARNING', vParameters);
-- Sanitize filename for fallback INSERT (function cannot be used in SQL)
vSanitizedFileName := sanitizeFilename(vFileName);
-- Insert without metadata using theoretical filename
vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL;
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
A_SOURCE_FILE_RECEIVED_KEY,
A_SOURCE_FILE_CONFIG_KEY,
SOURCE_FILE_NAME,
RECEPTION_DATE,
PROCESSING_STATUS,
PARTITION_YEAR,
PARTITION_MONTH,
ARCH_FILE_NAME
) VALUES (
vSourceFileReceivedKey,
NVL(vConfigKey, -1), -- Use config key if found, otherwise -1
vSanitizedFileName, -- Use pre-calculated sanitized filename
SYSDATE,
'INGESTED',
NULL, -- PARTITION_YEAR not used for single-file exports
NULL, -- PARTITION_MONTH not used for single-file exports
NULL -- ARCH_FILE_NAME not used for single-file exports
);
ENV_MANAGER.LOG_PROCESS_EVENT('Registered file without metadata: FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vSanitizedFileName, 'INFO', vParameters);
END;
END IF;
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
@@ -949,10 +914,6 @@ AS
vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in column list' ELSE '' END;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp);
WHEN ENV_MANAGER.ERR_UNSUPPORTED_DATA_TYPE THEN
vgMsgTmp := ENV_MANAGER.MSG_UNSUPPORTED_DATA_TYPE || ' vDataType: '||vDataType;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNSUPPORTED_DATA_TYPE, vgMsgTmp);
WHEN OTHERS THEN
-- Log complete error details including full stack trace and backtrace
ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER');