Files
mars/MARS_Packages/REL01_ADDITIONS/MARS-835-PREHOOK/new_version/DATA_EXPORTER.pkb

1783 lines
88 KiB
Plaintext

create or replace PACKAGE BODY CT_MRDS.DATA_EXPORTER
AS
----------------------------------------------------------------------------------------------------
-- PRIVATE HELPER FUNCTIONS (USED BY MULTIPLE PROCEDURES)
----------------------------------------------------------------------------------------------------
/**
* Sanitizes filename by replacing disallowed characters with underscores
**/
FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS
vFilename VARCHAR2(1000);
BEGIN
vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_');
RETURN vFilename;
END sanitizeFilename;
----------------------------------------------------------------------------------------------------
/**
* Deletes ALL files matching specific file pattern before retry export
* Critical for preventing data duplication when DBMS_CLOUD.EXPORT_DATA fails mid-process
*
* Problem: Export fails after creating partial file(s), retry creates new _2, _3 suffixed files
* Solution: Delete ALL files matching the base filename pattern before retry
*
* Pattern matching strategy:
* - Parquet: folder/PARTITION_YEAR=2024/PARTITION_MONTH=11/*.parquet (folder-level safe - each chunk has own partition folder)
* - CSV: folder/TABLENAME_202411*.csv (file-level pattern - multiple chunks share same folder!)
*
* CRITICAL for parallel processing:
* - Parquet chunks are isolated by partition folder structure (safe to delete folder/*)
* - CSV chunks share flat folder structure - MUST use file-specific pattern (TABLENAME_YYYYMM*)
* to avoid deleting files from other parallel chunks in same folder
**/
PROCEDURE DELETE_FAILED_EXPORT_FILE(
pFileUri IN VARCHAR2,
pCredentialName IN VARCHAR2,
pParameters IN VARCHAR2
) IS
vBucketUri VARCHAR2(4000);
vFolderPath VARCHAR2(4000);
vFileName VARCHAR2(1000);
vFileNamePattern VARCHAR2(1000);
vSlashPos NUMBER;
vDotPos NUMBER;
vFilesDeleted NUMBER := 0;
BEGIN
-- Extract components from URI
-- Example Parquet: https://.../bucket/folder/PARTITION_YEAR=2024/PARTITION_MONTH=11/202411.parquet
-- Example CSV: https://.../bucket/folder/TABLENAME_202411.csv
-- Find last slash before filename
vSlashPos := INSTR(pFileUri, '/', -1);
IF vSlashPos > 0 THEN
-- Extract filename from URI (after last slash)
vFileName := SUBSTR(pFileUri, vSlashPos + 1);
-- Extract folder path (before last slash)
vFolderPath := SUBSTR(pFileUri, 1, vSlashPos - 1);
-- Find bucket URI (protocol + namespace + bucket name)
-- Bucket URI ends after /o/ in OCI Object Storage URLs
vBucketUri := SUBSTR(pFileUri, 1, INSTR(pFileUri, '/o/') + 2);
-- Extract relative folder path (after bucket)
vFolderPath := SUBSTR(vFolderPath, LENGTH(vBucketUri) + 1);
-- Create file pattern by removing extension
-- Oracle adds suffixes BEFORE extension: file.csv -> file_1_timestamp.csv
-- Pattern: file* matches file_1_timestamp.csv, file_2_timestamp.csv
vDotPos := INSTR(vFileName, '.', -1);
IF vDotPos > 0 THEN
vFileNamePattern := SUBSTR(vFileName, 1, vDotPos - 1) || '%';
ELSE
vFileNamePattern := vFileName || '%';
END IF;
ENV_MANAGER.LOG_PROCESS_EVENT('Cleanup before retry - Pattern: ' || vFolderPath || '/' || vFileNamePattern, 'DEBUG', pParameters);
-- List and delete ALL files matching pattern
-- CRITICAL: Uses file-specific pattern for CSV chunk isolation in shared folder
FOR rec IN (
SELECT object_name
FROM TABLE(DBMS_CLOUD.LIST_OBJECTS(
credential_name => pCredentialName,
location_uri => vBucketUri
))
WHERE object_name LIKE vFolderPath || '/' || vFileNamePattern
) LOOP
BEGIN
DBMS_CLOUD.DELETE_OBJECT(
credential_name => pCredentialName,
object_uri => vBucketUri || rec.object_name
);
vFilesDeleted := vFilesDeleted + 1;
ENV_MANAGER.LOG_PROCESS_EVENT('Deleted partial file ' || vFilesDeleted || ': ' || rec.object_name, 'DEBUG', pParameters);
EXCEPTION
WHEN OTHERS THEN
-- Log but continue - don't fail entire cleanup
ENV_MANAGER.LOG_PROCESS_EVENT('Warning: Could not delete ' || rec.object_name || ': ' || SQLERRM, 'WARNING', pParameters);
END;
END LOOP;
IF vFilesDeleted > 0 THEN
ENV_MANAGER.LOG_PROCESS_EVENT('Cleanup completed: Deleted ' || vFilesDeleted || ' partial file(s) from previous failed export', 'INFO', pParameters);
ELSE
ENV_MANAGER.LOG_PROCESS_EVENT('No existing files to clean up (pattern match: ' || vFileNamePattern || ')', 'DEBUG', pParameters);
END IF;
ELSE
ENV_MANAGER.LOG_PROCESS_EVENT('Warning: Cannot parse file URI for cleanup: ' || pFileUri, 'WARNING', pParameters);
END IF;
EXCEPTION
WHEN OTHERS THEN
-- Don't fail export if cleanup fails - log and continue
ENV_MANAGER.LOG_PROCESS_EVENT('Warning: Cleanup failed (will retry export anyway): ' || SQLERRM, 'WARNING', pParameters);
END DELETE_FAILED_EXPORT_FILE;
----------------------------------------------------------------------------------------------------
/**
* Builds query with TO_CHAR for date/timestamp columns using per-column formats
* Retrieves format for each date column from FILE_MANAGER.GET_DATE_FORMAT
**/
FUNCTION buildQueryWithDateFormats(
pColumnList IN VARCHAR2,
pTableName IN VARCHAR2,
pSchemaName IN VARCHAR2,
pKeyColumnName IN VARCHAR2,
pTemplateTableName IN VARCHAR2
) RETURN VARCHAR2 IS
vResult VARCHAR2(32767);
vColumns VARCHAR2(32767);
vPos PLS_INTEGER;
vNextPos PLS_INTEGER;
vCurrentCol VARCHAR2(128);
vAllCols VARCHAR2(32767);
vDataType VARCHAR2(30);
vDateFormat VARCHAR2(200);
vTemplateSchema VARCHAR2(128);
vTemplateTable VARCHAR2(128);
vColExists NUMBER;
BEGIN
-- Build column list if not provided
IF pColumnList IS NULL THEN
-- Use template table for column order when provided
-- Template defines which columns to export and in what order
IF pTemplateTableName IS NOT NULL THEN
-- Parse template table name (SCHEMA.TABLE or just TABLE)
IF INSTR(pTemplateTableName, '.') > 0 THEN
vTemplateSchema := SUBSTR(pTemplateTableName, 1, INSTR(pTemplateTableName, '.') - 1);
vTemplateTable := SUBSTR(pTemplateTableName, INSTR(pTemplateTableName, '.') + 1);
ELSE
vTemplateSchema := pSchemaName;
vTemplateTable := pTemplateTableName;
END IF;
-- Get columns from TEMPLATE table in template column order
-- Template defines target CSV structure (column order and which columns to include)
SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id)
INTO vAllCols
FROM all_tab_columns
WHERE table_name = vTemplateTable
AND owner = vTemplateSchema;
ELSE
-- Get columns from source table when no template
SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id)
INTO vAllCols
FROM all_tab_columns
WHERE table_name = pTableName
AND owner = pSchemaName;
END IF;
ELSE
vAllCols := pColumnList;
END IF;
-- Process each column
vColumns := UPPER(REPLACE(vAllCols, ' ', ''));
vPos := 1;
vResult := '';
WHILE vPos <= LENGTH(vColumns) LOOP
vNextPos := INSTR(vColumns, ',', vPos);
IF vNextPos = 0 THEN
vNextPos := LENGTH(vColumns) + 1;
END IF;
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
-- When using template table, check if column exists in SOURCE table
-- Template defines target structure, source provides data
-- Skip template columns that don't exist in source (except A_WORKFLOW_HISTORY_KEY)
IF pTemplateTableName IS NOT NULL THEN
-- Check if template column exists in SOURCE table
SELECT COUNT(*) INTO vColExists
FROM all_tab_columns
WHERE table_name = pTableName
AND column_name = vCurrentCol
AND owner = pSchemaName;
-- Skip columns that don't exist in source table
-- Exception: A_WORKFLOW_HISTORY_KEY is virtual (mapped from pKeyColumnName)
IF vColExists = 0 AND UPPER(vCurrentCol) != 'A_WORKFLOW_HISTORY_KEY' THEN
vPos := vNextPos + 1;
CONTINUE;
END IF;
END IF;
-- Get column data type from appropriate table (template or source)
IF pTemplateTableName IS NOT NULL THEN
-- Get data type from template table
SELECT data_type INTO vDataType
FROM all_tab_columns
WHERE table_name = vTemplateTable
AND column_name = vCurrentCol
AND owner = vTemplateSchema;
ELSE
-- Get data type from source table
SELECT data_type INTO vDataType
FROM all_tab_columns
WHERE table_name = pTableName
AND column_name = vCurrentCol
AND owner = pSchemaName;
END IF;
-- Handle key column alias (template table has A_WORKFLOW_HISTORY_KEY, source table has pKeyColumnName)
IF UPPER(vCurrentCol) = 'A_WORKFLOW_HISTORY_KEY' THEN
vResult := vResult || CASE WHEN vResult IS NOT NULL THEN ', ' ELSE '' END ||
'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY';
-- Convert DATE/TIMESTAMP columns to CHAR with specific format
ELSIF vDataType IN ('DATE', 'TIMESTAMP', 'TIMESTAMP WITH TIME ZONE', 'TIMESTAMP WITH LOCAL TIME ZONE') THEN
IF pTemplateTableName IS NOT NULL THEN
vDateFormat := CT_MRDS.FILE_MANAGER.GET_DATE_FORMAT(
pTemplateTableName => pTemplateTableName,
pColumnName => vCurrentCol
);
ELSE
vDateFormat := ENV_MANAGER.gvDefaultDateFormat;
END IF;
vResult := vResult || CASE WHEN vResult IS NOT NULL THEN ', ' ELSE '' END ||
'TO_CHAR(T.' || vCurrentCol || ', ''' || vDateFormat || ''') AS ' || vCurrentCol;
-- Other columns as-is with T. prefix
ELSE
vResult := vResult || CASE WHEN vResult IS NOT NULL THEN ', ' ELSE '' END ||
'T.' || vCurrentCol;
END IF;
vPos := vNextPos + 1;
END LOOP;
RETURN vResult;
END buildQueryWithDateFormats;
----------------------------------------------------------------------------------------------------
-- Internal shared function to process column list with T. prefix and key column mapping
FUNCTION processColumnList(pColumnList IN VARCHAR2, pTableName IN VARCHAR2, pSchemaName IN VARCHAR2, pKeyColumnName IN VARCHAR2) RETURN VARCHAR2 IS
vResult VARCHAR2(32767);
vColumns VARCHAR2(32767);
vPos PLS_INTEGER;
vNextPos PLS_INTEGER;
vCurrentCol VARCHAR2(128);
vAllCols VARCHAR2(32767);
BEGIN
IF pColumnList IS NULL THEN
-- Build list of all columns
SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id)
INTO vAllCols
FROM all_tab_columns
WHERE table_name = pTableName
AND owner = pSchemaName;
-- Add T. prefix to all columns
vResult := 'T.' || REPLACE(vAllCols, ', ', ', T.');
-- Replace key column with aliased version (e.g., T.A_ETL_LOAD_SET_KEY_FK AS A_WORKFLOW_HISTORY_KEY)
vResult := REPLACE(vResult, 'T.' || pKeyColumnName, 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY');
RETURN vResult;
END IF;
-- Remove extra spaces and convert to uppercase
vColumns := UPPER(REPLACE(pColumnList, ' ', ''));
vPos := 1;
vResult := '';
-- Parse comma-separated column list and add T. prefix
WHILE vPos <= LENGTH(vColumns) LOOP
vNextPos := INSTR(vColumns, ',', vPos);
IF vNextPos = 0 THEN
vNextPos := LENGTH(vColumns) + 1;
END IF;
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
-- Check if this is the key column (e.g., A_ETL_LOAD_SET_KEY_FK) and add alias
IF UPPER(vCurrentCol) = UPPER(pKeyColumnName) THEN
vCurrentCol := 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY';
ELSE
-- Add T. prefix if not already present
IF INSTR(vCurrentCol, '.') = 0 THEN
vCurrentCol := 'T.' || vCurrentCol;
END IF;
END IF;
-- Add to result with comma separator
IF vResult IS NOT NULL THEN
vResult := vResult || ', ';
END IF;
vResult := vResult || vCurrentCol;
vPos := vNextPos + 1;
END LOOP;
RETURN vResult;
END processColumnList;
----------------------------------------------------------------------------------------------------
/**
* Validates table existence, key column existence, and column list
**/
PROCEDURE VALIDATE_TABLE_AND_COLUMNS (
pSchemaName IN VARCHAR2,
pTableName IN VARCHAR2,
pKeyColumnName IN VARCHAR2,
pColumnList IN VARCHAR2,
pParameters IN VARCHAR2
) IS
vCount INTEGER;
vColumns VARCHAR2(32767);
vPos PLS_INTEGER;
vNextPos PLS_INTEGER;
vCurrentCol VARCHAR2(128);
BEGIN
-- Check if table exists
SELECT COUNT(*) INTO vCount
FROM all_tables
WHERE table_name = pTableName
AND owner = pSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS);
END IF;
-- Check if key column exists
SELECT COUNT(*) INTO vCount
FROM all_tab_columns
WHERE table_name = pTableName
AND column_name = pKeyColumnName
AND owner = pSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
END IF;
-- Validate pColumnList - check if all column names exist in the table
IF pColumnList IS NOT NULL THEN
vColumns := UPPER(REPLACE(pColumnList, ' ', ''));
vPos := 1;
WHILE vPos <= LENGTH(vColumns) LOOP
vNextPos := INSTR(vColumns, ',', vPos);
IF vNextPos = 0 THEN
vNextPos := LENGTH(vColumns) + 1;
END IF;
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
-- Remove table alias prefix if present
IF INSTR(vCurrentCol, '.') > 0 THEN
vCurrentCol := SUBSTR(vCurrentCol, INSTR(vCurrentCol, '.') + 1);
END IF;
-- Check if column exists
SELECT COUNT(*) INTO vCount
FROM all_tab_columns
WHERE table_name = pTableName
AND column_name = vCurrentCol
AND owner = pSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
END IF;
vPos := vNextPos + 1;
END LOOP;
END IF;
END VALIDATE_TABLE_AND_COLUMNS;
----------------------------------------------------------------------------------------------------
/**
* Retrieves list of year/month partitions based on date range
**/
FUNCTION GET_PARTITIONS (
pSchemaName IN VARCHAR2,
pTableName IN VARCHAR2,
pKeyColumnName IN VARCHAR2,
pMinDate IN DATE,
pMaxDate IN DATE,
pParameters IN VARCHAR2
) RETURN partition_tab IS
vSql VARCHAR2(32000);
vPartitions partition_tab;
vKeyValuesYear DBMS_SQL.VARCHAR2_TABLE;
vKeyValuesMonth DBMS_SQL.VARCHAR2_TABLE;
vFullTableName VARCHAR2(200);
BEGIN
-- Build fully qualified table name if not already qualified
IF INSTR(pTableName, '.') > 0 THEN
vFullTableName := pTableName; -- Already fully qualified
ELSE
vFullTableName := pSchemaName || '.' || pTableName;
END IF;
vSql := 'SELECT DISTINCT TO_CHAR(L.LOAD_START,''YYYY'') AS YR, TO_CHAR(L.LOAD_START,''MM'') AS MN
FROM ' || vFullTableName || ' T, CT_ODS.A_LOAD_HISTORY L
WHERE T.' || pKeyColumnName || ' = L.A_ETL_LOAD_SET_KEY
AND L.LOAD_START >= :pMinDate
AND L.LOAD_START < :pMaxDate
ORDER BY YR, MN';
ENV_MANAGER.LOG_PROCESS_EVENT('Executing date range query: ' || vSql, 'DEBUG', pParameters);
EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValuesYear, vKeyValuesMonth USING pMinDate, pMaxDate;
ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValuesYear.COUNT || ' year/month combinations to export', 'DEBUG', pParameters);
-- Convert to partition_tab
vPartitions := partition_tab();
vPartitions.EXTEND(vKeyValuesYear.COUNT);
FOR i IN 1 .. vKeyValuesYear.COUNT LOOP
vPartitions(i).year := vKeyValuesYear(i);
vPartitions(i).month := vKeyValuesMonth(i);
END LOOP;
RETURN vPartitions;
END GET_PARTITIONS;
----------------------------------------------------------------------------------------------------
/**
* Exports single partition (year/month) to specified format (PARQUET or CSV)
* This is the core worker procedure that will be used for parallel processing in v2.3.0
**/
PROCEDURE EXPORT_SINGLE_PARTITION (
pSchemaName IN VARCHAR2,
pTableName IN VARCHAR2,
pKeyColumnName IN VARCHAR2,
pYear IN VARCHAR2,
pMonth IN VARCHAR2,
pBucketUri IN VARCHAR2,
pFolderName IN VARCHAR2,
pProcessedColumns IN VARCHAR2,
pMinDate IN DATE,
pMaxDate IN DATE,
pCredentialName IN VARCHAR2,
pFormat IN VARCHAR2 DEFAULT 'PARQUET',
pFileBaseName IN VARCHAR2 DEFAULT NULL,
pMaxFileSize IN NUMBER DEFAULT 104857600,
pParameters IN VARCHAR2
) IS
vQuery VARCHAR2(32767);
vUri VARCHAR2(4000);
vFileName VARCHAR2(1000);
vFullTableName VARCHAR2(200);
BEGIN
-- Build fully qualified table name if not already qualified
IF INSTR(pTableName, '.') > 0 THEN
vFullTableName := pTableName; -- Already fully qualified
ELSE
vFullTableName := pSchemaName || '.' || pTableName;
END IF;
-- Construct the query to extract data for the current year/month
vQuery := 'SELECT ' || pProcessedColumns || '
FROM ' || vFullTableName || ' T, CT_ODS.A_LOAD_HISTORY L
WHERE T.' || pKeyColumnName || ' = L.A_ETL_LOAD_SET_KEY
AND TO_CHAR(L.LOAD_START,''YYYY'') = ' || CHR(39) || pYear || CHR(39) || '
AND TO_CHAR(L.LOAD_START,''MM'') = ' || CHR(39) || pMonth || CHR(39) || '
AND L.LOAD_START >= TO_DATE(' || CHR(39) || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')
AND L.LOAD_START < TO_DATE(' || CHR(39) || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')';
ENV_MANAGER.LOG_PROCESS_EVENT('Processing Year/Month: ' || pYear || '/' || pMonth || ' (Format: '||pFormat||')', 'DEBUG', pParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', pParameters);
-- Construct the URI based on format
IF pFormat = 'PARQUET' THEN
-- Parquet: Use Hive-style partitioning
-- Note: maxfilesize is NOT supported for Parquet format (Oracle limitation)
vUri := pBucketUri ||
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
'PARTITION_YEAR=' || sanitizeFilename(pYear) || '/' ||
'PARTITION_MONTH=' || sanitizeFilename(pMonth) || '/' ||
sanitizeFilename(pYear) || sanitizeFilename(pMonth) || '.parquet';
ENV_MANAGER.LOG_PROCESS_EVENT('Parquet export URI: ' || vUri, 'DEBUG', pParameters);
-- Delete potentially corrupted file from previous failed attempt
-- This prevents Oracle from creating _1 suffixed files on retry
DELETE_FAILED_EXPORT_FILE(vUri, pCredentialName, pParameters);
DBMS_CLOUD.EXPORT_DATA(
credential_name => pCredentialName,
file_uri_list => vUri,
query => vQuery,
format => json_object('type' VALUE 'parquet')
);
ELSIF pFormat = 'CSV' THEN
-- CSV: Flat file structure with year/month in filename
vFileName := NVL(pFileBaseName, UPPER(pTableName)) || '_' || pYear || pMonth || '.csv';
vUri := pBucketUri ||
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
sanitizeFilename(vFileName);
ENV_MANAGER.LOG_PROCESS_EVENT('CSV export URI: ' || vUri, 'DEBUG', pParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('CSV maxfilesize: ' || pMaxFileSize || ' bytes (' || ROUND(pMaxFileSize/1048576, 2) || ' MB)', 'DEBUG', pParameters);
-- Delete potentially corrupted file from previous failed attempt
-- This prevents Oracle from creating _1 suffixed files on retry
DELETE_FAILED_EXPORT_FILE(vUri, pCredentialName, pParameters);
-- Use json_object() for CSV export with maxfilesize in bytes (Oracle requirement)
-- Oracle maxfilesize: min 10MB (10485760), max 1GB (1073741824), default 10MB
-- NOTE: maxfilesize must be NUMBER (bytes), not string like '1000M'
-- Using 100MB (104857600) to avoid PGA memory issues with large files
DBMS_CLOUD.EXPORT_DATA(
credential_name => pCredentialName,
file_uri_list => vUri,
query => vQuery,
format => json_object(
'type' VALUE 'CSV',
'header' VALUE true,
'quote' VALUE CHR(34),
'delimiter' VALUE ',',
'escape' VALUE true,
'recorddelimiter' VALUE CHR(13)||CHR(10), -- CRLF dla Windows
'maxfilesize' VALUE pMaxFileSize -- Dynamic maxfilesize in bytes (e.g., 104857600 = 100MB)
)
);
ELSE
RAISE_APPLICATION_ERROR(-20001, 'Unsupported format: ' || pFormat || '. Use PARQUET or CSV.');
END IF;
ENV_MANAGER.LOG_PROCESS_EVENT('Export completed successfully for ' || pYear || '/' || pMonth, 'DEBUG', pParameters);
END EXPORT_SINGLE_PARTITION;
----------------------------------------------------------------------------------------------------
/**
* Callback procedure for DBMS_PARALLEL_EXECUTE
* Processes single partition (year/month) chunk in parallel task
* Called by DBMS_PARALLEL_EXECUTE framework for each chunk
**/
PROCEDURE EXPORT_PARTITION_PARALLEL (
pStartId IN NUMBER,
pEndId IN NUMBER,
pTaskName IN VARCHAR2 DEFAULT NULL
) IS
vYear VARCHAR2(4);
vMonth VARCHAR2(2);
vSchemaName VARCHAR2(128);
vTableName VARCHAR2(128);
vKeyColumnName VARCHAR2(128);
vBucketUri VARCHAR2(4000);
vFolderName VARCHAR2(1000);
vProcessedColumns VARCHAR2(32767);
vMinDate DATE;
vMaxDate DATE;
vCredentialName VARCHAR2(200);
vFormat VARCHAR2(20);
vFileBaseName VARCHAR2(1000);
vMaxFileSize NUMBER;
vJobClass VARCHAR2(128);
vTaskName VARCHAR2(128);
vParameters VARCHAR2(4000);
BEGIN
-- Retrieve chunk context from A_PARALLEL_EXPORT_CHUNKS table
-- CRITICAL: Filter by CHUNK_ID and TASK_NAME for precise session isolation
-- pTaskName parameter passed from RUN_TASK ensures deterministic single-row retrieval
SELECT
YEAR_VALUE,
MONTH_VALUE,
SCHEMA_NAME,
TABLE_NAME,
KEY_COLUMN_NAME,
BUCKET_URI,
FOLDER_NAME,
PROCESSED_COLUMNS,
MIN_DATE,
MAX_DATE,
CREDENTIAL_NAME,
FORMAT_TYPE,
FILE_BASE_NAME,
MAX_FILE_SIZE,
JOB_CLASS,
TASK_NAME
INTO
vYear,
vMonth,
vSchemaName,
vTableName,
vKeyColumnName,
vBucketUri,
vFolderName,
vProcessedColumns,
vMinDate,
vMaxDate,
vCredentialName,
vFormat,
vFileBaseName,
vMaxFileSize,
vJobClass,
vTaskName
FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
WHERE CHUNK_ID = pStartId
AND TASK_NAME = pTaskName;
vParameters := 'Parallel task - Year: ' || vYear || ', Month: ' || vMonth || ', ChunkID: ' || pStartId || ', TaskName: ' || vTaskName;
ENV_MANAGER.LOG_PROCESS_EVENT('Starting parallel export for partition ' || vYear || '/' || vMonth, 'DEBUG', vParameters);
-- Mark chunk as PROCESSING
-- CRITICAL: Use both CHUNK_ID AND TASK_NAME for session isolation
UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
SET STATUS = 'PROCESSING',
ERROR_MESSAGE = NULL
WHERE CHUNK_ID = pStartId
AND TASK_NAME = vTaskName;
COMMIT;
-- Call the worker procedure
EXPORT_SINGLE_PARTITION(
pSchemaName => vSchemaName,
pTableName => vTableName,
pKeyColumnName => vKeyColumnName,
pYear => vYear,
pMonth => vMonth,
pBucketUri => vBucketUri,
pFolderName => vFolderName,
pProcessedColumns => vProcessedColumns,
pMinDate => vMinDate,
pMaxDate => vMaxDate,
pCredentialName => vCredentialName,
pFormat => vFormat,
pFileBaseName => vFileBaseName,
pMaxFileSize => vMaxFileSize,
pParameters => vParameters
);
-- Mark chunk as COMPLETED
-- CRITICAL: Use both CHUNK_ID AND TASK_NAME for session isolation
UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
SET STATUS = 'COMPLETED',
EXPORT_TIMESTAMP = SYSTIMESTAMP,
ERROR_MESSAGE = NULL
WHERE CHUNK_ID = pStartId
AND TASK_NAME = vTaskName;
COMMIT;
ENV_MANAGER.LOG_PROCESS_EVENT('Completed parallel export for partition ' || vYear || '/' || vMonth, 'DEBUG', vParameters);
EXCEPTION
WHEN OTHERS THEN
-- Capture error details in variable (SQLERRM cannot be used directly in SQL)
vgMsgTmp := 'Parallel task error for partition ' || vYear || '/' || vMonth || ' (ChunkID: ' || pStartId || ', TaskName: ' || vTaskName || '): ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
-- Mark chunk as FAILED with error message
-- CRITICAL: Use both CHUNK_ID AND TASK_NAME for session isolation
-- Use vgMsgTmp variable instead of SQLERRM directly (Oracle limitation in SQL context)
UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
SET STATUS = 'FAILED',
ERROR_MESSAGE = SUBSTR(vgMsgTmp, 1, 4000)
WHERE CHUNK_ID = pStartId
AND TASK_NAME = vTaskName;
COMMIT;
RAISE;
END EXPORT_PARTITION_PARALLEL;
----------------------------------------------------------------------------------------------------
-- MAIN EXPORT PROCEDURES
----------------------------------------------------------------------------------------------------
PROCEDURE EXPORT_TABLE_DATA (
pSchemaName IN VARCHAR2,
pTableName IN VARCHAR2,
pKeyColumnName IN VARCHAR2,
pBucketArea IN VARCHAR2,
pFolderName IN VARCHAR2,
pFileName IN VARCHAR2 default NULL,
pTemplateTableName IN VARCHAR2 default NULL,
pMaxFileSize IN NUMBER default 104857600,
pRegisterExport IN BOOLEAN default FALSE,
pProcessName IN VARCHAR2 default 'DATA_EXPORTER',
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
)
IS
vCount INTEGER;
vQuery VARCHAR2(32767);
vUri VARCHAR2(4000);
vTableName VARCHAR2(128);
vSchemaName VARCHAR2(128);
vKeyColumnName VARCHAR2(128);
vParameters VARCHAR2(4000);
vBucketUri VARCHAR2(4000);
vProcessedColumnList VARCHAR2(32767);
vCurrentCol VARCHAR2(128);
-- Variables for file registration (when pRegisterExport=TRUE)
vConfigKey NUMBER;
vSourceKey VARCHAR2(100);
vTableId VARCHAR2(100);
vSlashPos1 NUMBER;
vSlashPos2 NUMBER;
vSourceFileReceivedKey NUMBER;
BEGIN
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
,'pTableName => '''||nvl(pTableName, 'NULL')||''''
,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||''''
,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||''''
,'pFolderName => '''||nvl(pFolderName, 'NULL')||''''
,'pFileName => '''||nvl(pFileName, 'NULL')||''''
,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||''''
,'pMaxFileSize => '''||nvl(TO_CHAR(pMaxFileSize), 'NULL')||''''
,'pRegisterExport => '''||CASE WHEN pRegisterExport THEN 'TRUE' ELSE 'FALSE' END||''''
,'pProcessName => '''||nvl(pProcessName, 'NULL')||''''
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
));
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
-- Get bucket URI based on bucket area using FILE_MANAGER function
vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea);
-- Convert table and column names to uppercase to match data dictionary
vTableName := UPPER(pTableName);
vSchemaName := UPPER(pSchemaName);
vKeyColumnName := UPPER(pKeyColumnName);
-- Check if table exists
SELECT COUNT(*) INTO vCount
FROM all_tables
WHERE table_name = vTableName
AND owner = vSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS);
END IF;
-- Check if key column exists
SELECT COUNT(*) INTO vCount
FROM all_tab_columns
WHERE table_name = vTableName
AND column_name = vKeyColumnName
AND owner = vSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
END IF;
-- Validate template table if provided
IF pTemplateTableName IS NOT NULL THEN
DECLARE
vTemplateSchema VARCHAR2(128);
vTemplateTable VARCHAR2(128);
vTemplateCount NUMBER;
BEGIN
-- Parse template table name (SCHEMA.TABLE or just TABLE)
IF INSTR(pTemplateTableName, '.') > 0 THEN
vTemplateSchema := UPPER(SUBSTR(pTemplateTableName, 1, INSTR(pTemplateTableName, '.') - 1));
vTemplateTable := UPPER(SUBSTR(pTemplateTableName, INSTR(pTemplateTableName, '.') + 1));
ELSE
vTemplateSchema := vSchemaName;
vTemplateTable := UPPER(pTemplateTableName);
END IF;
-- Check if template table exists
SELECT COUNT(*) INTO vTemplateCount
FROM all_tables
WHERE table_name = vTemplateTable
AND owner = vTemplateSchema;
IF vTemplateCount = 0 THEN
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS || ': Template table ' || vTemplateSchema || '.' || vTemplateTable;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
END IF;
ENV_MANAGER.LOG_PROCESS_EVENT('Template table validated: ' || vTemplateSchema || '.' || vTemplateTable, 'DEBUG', vParameters);
END;
END IF;
-- Build query with TO_CHAR for date columns (per-column format support)
vProcessedColumnList := buildQueryWithDateFormats(NULL, vTableName, vSchemaName, vKeyColumnName, pTemplateTableName);
ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with TO_CHAR for date columns: ' || vProcessedColumnList, 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Template table: ' || NVL(pTemplateTableName, 'NULL - using global default for all dates'), 'INFO', vParameters);
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName);
-- Lookup A_SOURCE_FILE_CONFIG_KEY based on pFolderName parsing if pRegisterExport is enabled
IF pRegisterExport THEN
-- Format: {BUCKET_AREA}/{SOURCE_KEY}/{TABLE_ID}
-- Example: 'ODS/CSDB/CSDB_DEBT_DAILY' -> SOURCE_KEY='CSDB', TABLE_ID='CSDB_DEBT_DAILY'
-- Parse pFolderName to extract SOURCE_KEY and TABLE_ID
vSlashPos1 := INSTR(pFolderName, '/', 1, 1); -- First '/' position
vSlashPos2 := INSTR(pFolderName, '/', 1, 2); -- Second '/' position
IF vSlashPos1 > 0 AND vSlashPos2 > 0 THEN
-- Extract segment 2 (SOURCE_KEY) and segment 3 (TABLE_ID)
vSourceKey := SUBSTR(pFolderName, vSlashPos1 + 1, vSlashPos2 - vSlashPos1 - 1);
vTableId := SUBSTR(pFolderName, vSlashPos2 + 1);
-- Find configuration based on SOURCE_KEY and TABLE_ID
BEGIN
SELECT A_SOURCE_FILE_CONFIG_KEY
INTO vConfigKey
FROM CT_MRDS.A_SOURCE_FILE_CONFIG
WHERE A_SOURCE_KEY = vSourceKey
AND TABLE_ID = vTableId
AND SOURCE_FILE_TYPE = 'INPUT'
AND ROWNUM = 1;
ENV_MANAGER.LOG_PROCESS_EVENT('Found config key: ' || vConfigKey || ' for SOURCE=' || vSourceKey || ', TABLE=' || vTableId, 'DEBUG', vParameters);
EXCEPTION
WHEN NO_DATA_FOUND THEN
vConfigKey := -1;
ENV_MANAGER.LOG_PROCESS_EVENT('No config found for SOURCE=' || vSourceKey || ', TABLE=' || vTableId || ' - using default (-1)', 'INFO', vParameters);
END;
ELSE
-- Cannot parse folder name - use default
vConfigKey := -1;
ENV_MANAGER.LOG_PROCESS_EVENT('Cannot parse pFolderName: ' || pFolderName || ' - using default (-1)', 'WARNING', vParameters);
END IF;
ENV_MANAGER.LOG_PROCESS_EVENT('File registration enabled with config key: ' || vConfigKey, 'INFO', vParameters);
END IF;
-- Construct single query for entire table (no join with A_LOAD_HISTORY - ensures single file output)
vQuery := 'SELECT ' || vProcessedColumnList ||
' FROM ' || vTableName || ' T';
-- Construct the URI for the file in OCI Object Storage
vUri := vBucketUri ||
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
NVL(pFileName, UPPER(vTableName) || '.csv');
ENV_MANAGER.LOG_PROCESS_EVENT('Exporting to single file: ' || vUri, 'INFO', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Max file size: ' || pMaxFileSize || ' bytes (' || ROUND(pMaxFileSize/1048576, 2) || ' MB)', 'DEBUG', vParameters);
-- Use DBMS_CLOUD package to export data to the URI
-- Oracle maxfilesize: min 10MB (10485760), max 1GB (1073741824), default 100MB (104857600)
DBMS_CLOUD.EXPORT_DATA(
credential_name => pCredentialName,
file_uri_list => vUri,
query => vQuery,
format => json_object(
'type' VALUE 'CSV',
'header' VALUE true,
'quote' VALUE CHR(34),
'delimiter' VALUE ',',
'escape' VALUE true,
'recorddelimiter' VALUE CHR(13)||CHR(10), -- CRLF dla Windows
'maxfilesize' VALUE pMaxFileSize -- Dynamic maxfilesize in bytes
)
);
-- Register exported file to A_SOURCE_FILE_RECEIVED if requested
IF pRegisterExport THEN
DECLARE
vActualFileName VARCHAR2(1000); -- Actual filename with Oracle suffix
vSanitizedFileName VARCHAR2(1000);
vFileName VARCHAR2(1000);
vRetryCount NUMBER := 0;
vMaxRetries NUMBER := 1; -- One retry after initial attempt
vRetryDelay NUMBER := 2; -- 2 seconds delay
vFilesFound NUMBER := 0;
vTotalBytes NUMBER := 0;
BEGIN
-- Extract filename from URI (after last '/')
vFileName := SUBSTR(vUri, INSTR(vUri, '/', -1) + 1);
-- Sanitize filename first (PL/SQL function cannot be used directly in SQL)
vSanitizedFileName := sanitizeFilename(vFileName);
-- Remove .csv extension for LIKE pattern matching (Oracle adds suffixes BEFORE .csv)
-- Example: tablename.csv becomes tablename_1_20260211T102621591769Z.csv
vSanitizedFileName := REGEXP_REPLACE(vSanitizedFileName, '\.csv$', '', 1, 0, 'i');
-- Try to get ALL exported files with retry logic
-- Oracle DBMS_CLOUD.EXPORT_DATA can create MULTIPLE files due to:
-- 1. maxfilesize parameter (splits files larger than limit)
-- 2. Automatic parallel processing (especially on large production instances)
-- We must register ALL files, not just the first one
<<metadata_retry_loop>>
LOOP
BEGIN
-- Register ALL files matching the pattern (cursor loop)
FOR rec IN (
SELECT object_name, checksum, created, bytes
FROM TABLE(DBMS_CLOUD.LIST_OBJECTS(
credential_name => pCredentialName,
location_uri => vBucketUri
))
WHERE object_name LIKE CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || vSanitizedFileName || '%'
ORDER BY created DESC, bytes DESC
) LOOP
-- Extract filename only from full path (remove bucket folder prefix)
vActualFileName := SUBSTR(rec.object_name, INSTR(rec.object_name, '/', -1) + 1);
-- Create A_SOURCE_FILE_RECEIVED record for EACH exported file
vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL;
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
A_SOURCE_FILE_RECEIVED_KEY,
A_SOURCE_FILE_CONFIG_KEY,
SOURCE_FILE_NAME,
CHECKSUM,
CREATED,
BYTES,
RECEPTION_DATE,
PROCESSING_STATUS,
PARTITION_YEAR,
PARTITION_MONTH,
ARCH_PATH,
PROCESS_NAME
) VALUES (
vSourceFileReceivedKey,
NVL(vConfigKey, -1), -- Use config key if found, otherwise -1
vActualFileName, -- Use actual filename with Oracle suffix
rec.checksum,
rec.created,
rec.bytes,
SYSDATE,
'INGESTED',
NULL, -- PARTITION_YEAR not used for single-file exports
NULL, -- PARTITION_MONTH not used for single-file exports
NULL, -- ARCH_PATH not used for single-file exports
pProcessName -- Process name from parameter
);
vFilesFound := vFilesFound + 1;
vTotalBytes := vTotalBytes + rec.bytes;
ENV_MANAGER.LOG_PROCESS_EVENT('Registered file ' || vFilesFound || ': FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vActualFileName || ', Size=' || rec.bytes || ' bytes', 'INFO', vParameters);
END LOOP;
-- Check if any files were found
IF vFilesFound = 0 THEN
RAISE NO_DATA_FOUND;
END IF;
-- Success - exit retry loop
ENV_MANAGER.LOG_PROCESS_EVENT('Total registered: ' || vFilesFound || ' file(s), Total size: ' || vTotalBytes || ' bytes (' || ROUND(vTotalBytes/1048576, 2) || ' MB)', 'INFO', vParameters);
EXIT metadata_retry_loop;
EXCEPTION
WHEN NO_DATA_FOUND THEN
vRetryCount := vRetryCount + 1;
IF vRetryCount <= vMaxRetries THEN
-- Log retry attempt
ENV_MANAGER.LOG_PROCESS_EVENT('File(s) not found in bucket (attempt ' || vRetryCount || '/' || (vMaxRetries + 1) || '), retrying after ' || vRetryDelay || ' seconds: ' || vFileName, 'DEBUG', vParameters);
-- Wait before retry using DBMS_SESSION.SLEEP (alternative to DBMS_LOCK)
DBMS_SESSION.SLEEP(vRetryDelay);
ELSE
-- Max retries exceeded - re-raise exception
RAISE;
END IF;
END;
END LOOP metadata_retry_loop;
EXCEPTION
WHEN NO_DATA_FOUND THEN
-- File not found after retries - log warning and continue without metadata
ENV_MANAGER.LOG_PROCESS_EVENT('WARNING: File not found in bucket after ' || (vMaxRetries + 1) || ' attempts: ' || vFileName, 'WARNING', vParameters);
-- Sanitize filename for fallback INSERT (function cannot be used in SQL)
vSanitizedFileName := sanitizeFilename(vFileName);
-- Insert without metadata using theoretical filename
vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL;
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
A_SOURCE_FILE_RECEIVED_KEY,
A_SOURCE_FILE_CONFIG_KEY,
SOURCE_FILE_NAME,
RECEPTION_DATE,
PROCESSING_STATUS,
PARTITION_YEAR,
PARTITION_MONTH,
ARCH_PATH,
PROCESS_NAME
) VALUES (
vSourceFileReceivedKey,
NVL(vConfigKey, -1), -- Use config key if found, otherwise -1
vSanitizedFileName, -- Use pre-calculated sanitized filename
SYSDATE,
'INGESTED',
NULL, -- PARTITION_YEAR not used for single-file exports
NULL, -- PARTITION_MONTH not used for single-file exports
NULL, -- ARCH_PATH not used for single-file exports
pProcessName -- Process name from parameter
);
ENV_MANAGER.LOG_PROCESS_EVENT('Registered file without metadata: FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vSanitizedFileName, 'INFO', vParameters);
END;
END IF;
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
EXCEPTION
WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN
vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in column list' ELSE '' END;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp);
WHEN OTHERS THEN
-- Log complete error details including full stack trace and backtrace
ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER');
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
END EXPORT_TABLE_DATA;
----------------------------------------------------------------------------------------------------
PROCEDURE EXPORT_TABLE_DATA_BY_DATE (
pSchemaName IN VARCHAR2,
pTableName IN VARCHAR2,
pKeyColumnName IN VARCHAR2,
pBucketArea IN VARCHAR2,
pFolderName IN VARCHAR2,
pColumnList IN VARCHAR2 default NULL,
pMinDate IN DATE default DATE '1900-01-01',
pMaxDate IN DATE default SYSDATE,
pParallelDegree IN NUMBER default 1,
pTemplateTableName IN VARCHAR2 default NULL,
pJobClass IN VARCHAR2 default NULL,
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
)
IS
vTableName VARCHAR2(128);
vSchemaName VARCHAR2(128);
vKeyColumnName VARCHAR2(128);
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
vProcessedColumnList VARCHAR2(32767);
vBucketUri VARCHAR2(4000);
vCurrentCol VARCHAR2(128);
vPartitions partition_tab;
BEGIN
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
,'pTableName => '''||nvl(pTableName, 'NULL')||''''
,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||''''
,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||''''
,'pFolderName => '''||nvl(pFolderName, 'NULL')||''''
,'pColumnList => '''||nvl(pColumnList, 'NULL')||''''
,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
,'pParallelDegree => '''||nvl(TO_CHAR(pParallelDegree), 'NULL')||''''
,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||''''
,'pJobClass => '''||nvl(pJobClass, 'NULL')||''''
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
));
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
-- Get bucket URI based on bucket area using FILE_MANAGER function
vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea);
-- Convert table and column names to uppercase to match data dictionary
vTableName := UPPER(pTableName);
vSchemaName := UPPER(pSchemaName);
vKeyColumnName := UPPER(pKeyColumnName);
-- Validate table, key column, and column list using shared procedure
VALIDATE_TABLE_AND_COLUMNS(vSchemaName, vTableName, vKeyColumnName, pColumnList, vParameters);
-- Build query with TO_CHAR for date columns (per-column format support)
vProcessedColumnList := buildQueryWithDateFormats(pColumnList, vTableName, vSchemaName, vKeyColumnName, pTemplateTableName);
ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (building dynamic list from table metadata)'), 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with TO_CHAR for date columns: ' || vProcessedColumnList, 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Template table: ' || NVL(pTemplateTableName, 'NULL - using global default for all dates'), 'INFO', vParameters);
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName);
-- Validate parallel degree parameter
IF pParallelDegree < 1 OR pParallelDegree > 16 THEN
vgMsgTmp := ENV_MANAGER.MSG_INVALID_PARALLEL_DEGREE || ': ' || pParallelDegree || '. Valid range: 1-16';
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp);
END IF;
-- Get partitions using shared function
vPartitions := GET_PARTITIONS(vSchemaName, vTableName, vKeyColumnName, pMinDate, pMaxDate, vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vPartitions.COUNT || ' partitions to export with parallel degree ' || pParallelDegree, 'INFO', vParameters);
-- Sequential processing (parallel degree = 1)
IF pParallelDegree = 1 THEN
ENV_MANAGER.LOG_PROCESS_EVENT('Using sequential processing (pParallelDegree = 1)', 'DEBUG', vParameters);
FOR i IN 1 .. vPartitions.COUNT LOOP
EXPORT_SINGLE_PARTITION(
pSchemaName => vSchemaName,
pTableName => vTableName,
pKeyColumnName => vKeyColumnName,
pYear => vPartitions(i).year,
pMonth => vPartitions(i).month,
pBucketUri => vBucketUri,
pFolderName => pFolderName,
pProcessedColumns => vProcessedColumnList,
pMinDate => pMinDate,
pMaxDate => pMaxDate,
pCredentialName => pCredentialName,
pFormat => 'PARQUET',
pFileBaseName => NULL,
pMaxFileSize => 104857600,
pParameters => vParameters
);
END LOOP;
-- Parallel processing (parallel degree > 1)
ELSE
-- Skip parallel processing if no partitions found
IF vPartitions.COUNT = 0 THEN
ENV_MANAGER.LOG_PROCESS_EVENT('No partitions to export - skipping parallel processing', 'INFO', vParameters);
ELSE
DECLARE
vTaskName VARCHAR2(128) := 'DATA_EXPORT_TASK_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF');
vChunkId NUMBER;
BEGIN
ENV_MANAGER.LOG_PROCESS_EVENT('Using parallel processing with ' || pParallelDegree || ' threads', 'INFO', vParameters);
-- Populate chunks table (insert new chunks, preserve FAILED chunks for retry)
FOR i IN 1 .. vPartitions.COUNT LOOP
MERGE INTO CT_MRDS.A_PARALLEL_EXPORT_CHUNKS t
USING (SELECT i AS chunk_id, vTaskName AS task_name, vPartitions(i).year AS yr, vPartitions(i).month AS mn FROM DUAL) s
ON (t.CHUNK_ID = s.chunk_id AND t.TASK_NAME = s.task_name)
WHEN NOT MATCHED THEN
INSERT (CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME,
BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE,
CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE, JOB_CLASS, STATUS)
VALUES (i, vTaskName, vPartitions(i).year, vPartitions(i).month, vSchemaName, vTableName, vKeyColumnName,
vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate,
pCredentialName, 'PARQUET', NULL, pTemplateTableName, 104857600, pJobClass, 'PENDING')
WHEN MATCHED THEN
-- Match found: chunk exists for SAME task (composite PK: TASK_NAME, CHUNK_ID)
-- This handles retry scenario: reset FAILED chunks to PENDING for re-processing
UPDATE SET STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END,
ERROR_MESSAGE = CASE WHEN t.STATUS = 'FAILED' THEN NULL ELSE t.ERROR_MESSAGE END;
END LOOP;
COMMIT;
-- Log chunk statistics (session-safe: only count chunks for THIS task)
DECLARE
vPendingCount NUMBER;
vFailedCount NUMBER;
BEGIN
SELECT COUNT(*) INTO vPendingCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'PENDING' AND TASK_NAME = vTaskName;
SELECT COUNT(*) INTO vFailedCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'FAILED' AND TASK_NAME = vTaskName;
ENV_MANAGER.LOG_PROCESS_EVENT('Chunk statistics for task ' || vTaskName || ': PENDING=' || vPendingCount || ', FAILED (retry)=' || vFailedCount, 'INFO', vParameters);
END;
-- Create parallel task
DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => vTaskName);
-- Define chunks using SQL query to ensure TASK_NAME isolation
-- CRITICAL: Filter by TASK_NAME to avoid selecting chunks from other concurrent sessions
-- CRITICAL: Use START_ID and END_ID aliases to avoid ORA-00960 ambiguous column naming
DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(
task_name => vTaskName,
sql_stmt => 'SELECT CHUNK_ID AS START_ID, CHUNK_ID AS END_ID FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = ''' || vTaskName || ''' ORDER BY CHUNK_ID',
by_rowid => FALSE
);
-- Execute task in parallel
ENV_MANAGER.LOG_PROCESS_EVENT('Executing parallel task: ' || vTaskName || CASE WHEN pJobClass IS NOT NULL THEN ' with job class: ' || pJobClass ELSE '' END, 'DEBUG', vParameters);
IF pJobClass IS NOT NULL THEN
DBMS_PARALLEL_EXECUTE.RUN_TASK(
task_name => vTaskName,
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id, ''' || vTaskName || '''); END;',
language_flag => DBMS_SQL.NATIVE,
parallel_level => pParallelDegree,
job_class => pJobClass
);
ELSE
DBMS_PARALLEL_EXECUTE.RUN_TASK(
task_name => vTaskName,
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id, ''' || vTaskName || '''); END;',
language_flag => DBMS_SQL.NATIVE,
parallel_level => pParallelDegree
);
END IF;
-- Check for errors
DECLARE
vErrorCount NUMBER;
BEGIN
SELECT COUNT(*) INTO vErrorCount
FROM USER_PARALLEL_EXECUTE_CHUNKS
WHERE task_name = vTaskName AND status = 'PROCESSED_WITH_ERROR';
IF vErrorCount > 0 THEN
vgMsgTmp := 'Parallel execution completed with ' || vErrorCount || ' errors. Check USER_PARALLEL_EXECUTE_CHUNKS for details.';
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp);
END IF;
END;
-- Clean up task
DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName);
-- Clean up chunks for THIS specific task only (session-safe)
-- CRITICAL: Use TASK_NAME filter to avoid deleting chunks from other active sessions
DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = vTaskName;
COMMIT;
ENV_MANAGER.LOG_PROCESS_EVENT('Parallel execution completed successfully', 'INFO', vParameters);
EXCEPTION
WHEN OTHERS THEN
-- Attempt to drop task on error
BEGIN
DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName);
EXCEPTION
WHEN OTHERS THEN NULL; -- Ignore drop errors
END;
vgMsgTmp := ENV_MANAGER.MSG_PARALLEL_EXECUTION_FAILED || ': ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp);
END;
END IF;
END IF;
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
EXCEPTION
WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN
vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp);
WHEN ENV_MANAGER.ERR_INVALID_PARALLEL_DEGREE THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp);
WHEN ENV_MANAGER.ERR_PARALLEL_EXECUTION_FAILED THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp);
WHEN OTHERS THEN
-- Log complete error details including full stack trace and backtrace
ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER');
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
END EXPORT_TABLE_DATA_BY_DATE;
----------------------------------------------------------------------------------------------------
/**
* @name EXPORT_TABLE_DATA_TO_CSV_BY_DATE
* @desc Exports data to a single CSV file with date filtering.
* Unlike EXPORT_TABLE_DATA_BY_DATE, this procedure creates one CSV file
* instead of multiple Parquet files partitioned by year/month.
* Uses the same date filtering mechanism with CT_ODS.A_LOAD_HISTORY.
* Allows specifying custom column list or uses T.* if pColumnList is NULL.
* Validates that all columns in pColumnList exist in the target table.
* Automatically adds 'T.' prefix to column names in pColumnList.
* @example
* begin
* DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
* pSchemaName => 'CT_MRDS',
* pTableName => 'MY_TABLE',
* pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK',
* pBucketArea => 'DATA',
* pFolderName => 'exports',
* pFileName => 'my_export.csv',
* pColumnList => 'COLUMN1, COLUMN2, COLUMN3', -- Optional
* pMinDate => DATE '2024-01-01',
* pMaxDate => SYSDATE
* );
* end;
**/
PROCEDURE EXPORT_TABLE_DATA_TO_CSV_BY_DATE (
pSchemaName IN VARCHAR2,
pTableName IN VARCHAR2,
pKeyColumnName IN VARCHAR2,
pBucketArea IN VARCHAR2,
pFolderName IN VARCHAR2,
pFileName IN VARCHAR2 DEFAULT NULL,
pColumnList IN VARCHAR2 default NULL,
pMinDate IN DATE default DATE '1900-01-01',
pMaxDate IN DATE default SYSDATE,
pParallelDegree IN NUMBER default 1,
pTemplateTableName IN VARCHAR2 default NULL,
pMaxFileSize IN NUMBER default 104857600,
pRegisterExport IN BOOLEAN default FALSE,
pProcessName IN VARCHAR2 default 'DATA_EXPORTER',
pJobClass IN VARCHAR2 default NULL,
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
)
IS
vTableName VARCHAR2(128);
vSchemaName VARCHAR2(128);
vKeyColumnName VARCHAR2(128);
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
vFileBaseName VARCHAR2(4000);
vFileExtension VARCHAR2(10);
vProcessedColumnList VARCHAR2(32767);
vBucketUri VARCHAR2(4000);
vCurrentCol VARCHAR2(128);
vPartitions partition_tab;
vSourceFileReceivedKey NUMBER;
vFileName VARCHAR2(1000);
vFileUri VARCHAR2(4000);
-- Variables for A_SOURCE_FILE_CONFIG lookup
vSourceKey VARCHAR2(100);
vTableId VARCHAR2(200);
vConfigKey NUMBER := -1;
vSlashPos1 NUMBER;
vSlashPos2 NUMBER;
BEGIN
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
,'pTableName => '''||nvl(pTableName, 'NULL')||''''
,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||''''
,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||''''
,'pFolderName => '''||nvl(pFolderName, 'NULL')||''''
,'pFileName => '''||nvl(pFileName, 'NULL')||''''
,'pColumnList => '''||nvl(pColumnList, 'NULL')||''''
,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
,'pParallelDegree => '''||nvl(TO_CHAR(pParallelDegree), 'NULL')||''''
,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||''''
,'pMaxFileSize => '''||nvl(TO_CHAR(pMaxFileSize), 'NULL')||''''
,'pRegisterExport => '''||CASE WHEN pRegisterExport THEN 'TRUE' ELSE 'FALSE' END||''''
,'pJobClass => '''||nvl(pJobClass, 'NULL')||''''
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
));
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
-- Get bucket URI based on bucket area using FILE_MANAGER function
vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea);
-- Convert table and column names to uppercase to match data dictionary
vTableName := UPPER(pTableName);
vSchemaName := UPPER(pSchemaName);
vKeyColumnName := UPPER(pKeyColumnName);
-- Extract base filename and extension or construct default filename
IF pFileName IS NOT NULL THEN
-- Use provided filename
IF INSTR(pFileName, '.') > 0 THEN
vFileBaseName := SUBSTR(pFileName, 1, INSTR(pFileName, '.', -1) - 1);
vFileExtension := SUBSTR(pFileName, INSTR(pFileName, '.', -1));
ELSE
vFileBaseName := pFileName;
vFileExtension := '.csv';
END IF;
ELSE
-- Construct default filename: TABLENAME (without extension, will be added by worker)
vFileBaseName := UPPER(pTableName);
vFileExtension := '.csv';
END IF;
-- Validate table, key column, and column list using shared procedure
VALIDATE_TABLE_AND_COLUMNS(vSchemaName, vTableName, vKeyColumnName, pColumnList, vParameters);
-- Build query with TO_CHAR for date columns (per-column format support)
vProcessedColumnList := buildQueryWithDateFormats(pColumnList, vTableName, vSchemaName, vKeyColumnName, pTemplateTableName);
ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (using dynamic column list)'), 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with TO_CHAR for date columns: ' || vProcessedColumnList, 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Template table: ' || NVL(pTemplateTableName, 'NULL - using global default for all dates'), 'INFO', vParameters);
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName);
-- Validate parallel degree parameter
IF pParallelDegree < 1 OR pParallelDegree > 16 THEN
vgMsgTmp := ENV_MANAGER.MSG_INVALID_PARALLEL_DEGREE || ': ' || pParallelDegree || '. Valid range: 1-16';
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp);
END IF;
-- Get partitions using shared function
vPartitions := GET_PARTITIONS(vSchemaName, vTableName, vKeyColumnName, pMinDate, pMaxDate, vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vPartitions.COUNT || ' year/month combinations to export', 'INFO', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Date range: ' || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || ' to ' || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Parallel degree: ' || pParallelDegree, 'INFO', vParameters);
-- Sequential processing (parallel degree = 1)
IF pParallelDegree = 1 THEN
ENV_MANAGER.LOG_PROCESS_EVENT('Using sequential processing (pParallelDegree = 1)', 'DEBUG', vParameters);
FOR i IN 1 .. vPartitions.COUNT LOOP
EXPORT_SINGLE_PARTITION(
pSchemaName => vSchemaName,
pTableName => vTableName,
pKeyColumnName => vKeyColumnName,
pYear => vPartitions(i).year,
pMonth => vPartitions(i).month,
pBucketUri => vBucketUri,
pFolderName => pFolderName,
pProcessedColumns => vProcessedColumnList,
pMinDate => pMinDate,
pMaxDate => pMaxDate,
pCredentialName => pCredentialName,
pFormat => 'CSV',
pFileBaseName => vFileBaseName,
pMaxFileSize => pMaxFileSize,
pParameters => vParameters
);
END LOOP;
-- Parallel processing (parallel degree > 1)
ELSE
-- Skip parallel processing if no partitions found
IF vPartitions.COUNT = 0 THEN
ENV_MANAGER.LOG_PROCESS_EVENT('No partitions to export - skipping parallel CSV processing', 'INFO', vParameters);
ELSE
DECLARE
vTaskName VARCHAR2(128) := 'DATA_CSV_EXPORT_TASK_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF');
vChunkId NUMBER;
BEGIN
ENV_MANAGER.LOG_PROCESS_EVENT('Using parallel processing with ' || pParallelDegree || ' threads', 'INFO', vParameters);
-- Clean up old completed chunks (>24 hours) to prevent table bloat
-- CRITICAL: Do NOT delete chunks from other active sessions (same-day tasks)
-- This prevents race conditions when multiple CSV exports run simultaneously
DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
WHERE STATUS = 'COMPLETED'
AND CREATED_DATE < SYSTIMESTAMP - INTERVAL '1' DAY;
COMMIT;
ENV_MANAGER.LOG_PROCESS_EVENT('Cleared old COMPLETED chunks (>24h). Active session chunks preserved.', 'DEBUG', vParameters);
-- Populate chunks table (insert new chunks, preserve FAILED chunks for retry)
FOR i IN 1 .. vPartitions.COUNT LOOP
MERGE INTO CT_MRDS.A_PARALLEL_EXPORT_CHUNKS t
USING (SELECT i AS chunk_id, vTaskName AS task_name, vPartitions(i).year AS yr, vPartitions(i).month AS mn FROM DUAL) s
ON (t.CHUNK_ID = s.chunk_id AND t.TASK_NAME = s.task_name)
WHEN NOT MATCHED THEN
INSERT (CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME,
BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE,
CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE, JOB_CLASS, STATUS)
VALUES (i, vTaskName, vPartitions(i).year, vPartitions(i).month, vSchemaName, vTableName, vKeyColumnName,
vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate,
pCredentialName, 'CSV', vFileBaseName, pTemplateTableName, pMaxFileSize, pJobClass, 'PENDING')
WHEN MATCHED THEN
-- Match found: chunk exists for SAME task (composite PK: TASK_NAME, CHUNK_ID)
-- This handles retry scenario: reset FAILED chunks to PENDING for re-processing
UPDATE SET STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END,
ERROR_MESSAGE = CASE WHEN t.STATUS = 'FAILED' THEN NULL ELSE t.ERROR_MESSAGE END;
END LOOP;
COMMIT;
-- Log chunk statistics (session-safe: only count chunks for THIS task)
DECLARE
vPendingCount NUMBER;
vFailedCount NUMBER;
BEGIN
SELECT COUNT(*) INTO vPendingCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'PENDING' AND TASK_NAME = vTaskName;
SELECT COUNT(*) INTO vFailedCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'FAILED' AND TASK_NAME = vTaskName;
ENV_MANAGER.LOG_PROCESS_EVENT('Chunk statistics for task ' || vTaskName || ': PENDING=' || vPendingCount || ', FAILED (retry)=' || vFailedCount, 'INFO', vParameters);
END;
-- Create parallel task
DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => vTaskName);
-- Define chunks using SQL query to ensure TASK_NAME isolation
-- CRITICAL: Filter by TASK_NAME to avoid selecting chunks from other concurrent sessions
-- CRITICAL: Use START_ID and END_ID aliases to avoid ORA-00960 ambiguous column naming
DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(
task_name => vTaskName,
sql_stmt => 'SELECT CHUNK_ID AS START_ID, CHUNK_ID AS END_ID FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = ''' || vTaskName || ''' ORDER BY CHUNK_ID',
by_rowid => FALSE
);
-- Execute task in parallel
ENV_MANAGER.LOG_PROCESS_EVENT('Executing parallel CSV export task: ' || vTaskName || CASE WHEN pJobClass IS NOT NULL THEN ' with job class: ' || pJobClass ELSE '' END, 'DEBUG', vParameters);
IF pJobClass IS NOT NULL THEN
DBMS_PARALLEL_EXECUTE.RUN_TASK(
task_name => vTaskName,
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id, ''' || vTaskName || '''); END;',
language_flag => DBMS_SQL.NATIVE,
parallel_level => pParallelDegree,
job_class => pJobClass
);
ELSE
DBMS_PARALLEL_EXECUTE.RUN_TASK(
task_name => vTaskName,
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id, ''' || vTaskName || '''); END;',
language_flag => DBMS_SQL.NATIVE,
parallel_level => pParallelDegree
);
END IF;
-- Check for errors
DECLARE
vErrorCount NUMBER;
BEGIN
SELECT COUNT(*) INTO vErrorCount
FROM USER_PARALLEL_EXECUTE_CHUNKS
WHERE task_name = vTaskName AND status = 'PROCESSED_WITH_ERROR';
IF vErrorCount > 0 THEN
vgMsgTmp := 'Parallel CSV export completed with ' || vErrorCount || ' errors. Check USER_PARALLEL_EXECUTE_CHUNKS for details.';
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp);
END IF;
END;
-- Clean up task
DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName);
-- Clean up chunks for THIS specific task only (session-safe)
-- CRITICAL: Use TASK_NAME filter to avoid deleting chunks from other active CSV sessions
DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = vTaskName;
COMMIT;
ENV_MANAGER.LOG_PROCESS_EVENT('Parallel CSV execution completed successfully', 'INFO', vParameters);
EXCEPTION
WHEN OTHERS THEN
-- Attempt to drop task on error
BEGIN
DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName);
EXCEPTION
WHEN OTHERS THEN NULL; -- Ignore drop errors
END;
vgMsgTmp := ENV_MANAGER.MSG_PARALLEL_EXECUTION_FAILED || ': ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp);
END;
END IF;
END IF;
-- Note: File registration handled by EXPORT_SINGLE_PARTITION when pRegisterExport=TRUE
-- Each partition calls pRegisterExport logic independently during serial/parallel execution
-- Register exported files to A_SOURCE_FILE_RECEIVED if requested (after successful export)
IF pRegisterExport THEN
-- Lookup A_SOURCE_FILE_CONFIG_KEY based on pFolderName parsing
-- Format: {BUCKET_AREA}/{SOURCE_KEY}/{TABLE_ID}
-- Example: 'ODS/CSDB/CSDB_DEBT_DAILY' -> SOURCE_KEY='CSDB', TABLE_ID='CSDB_DEBT_DAILY'
-- Parse pFolderName to extract SOURCE_KEY and TABLE_ID
vSlashPos1 := INSTR(pFolderName, '/', 1, 1); -- First '/' position
vSlashPos2 := INSTR(pFolderName, '/', 1, 2); -- Second '/' position
IF vSlashPos1 > 0 AND vSlashPos2 > 0 THEN
-- Extract segment 2 (SOURCE_KEY) and segment 3 (TABLE_ID)
vSourceKey := SUBSTR(pFolderName, vSlashPos1 + 1, vSlashPos2 - vSlashPos1 - 1);
vTableId := SUBSTR(pFolderName, vSlashPos2 + 1);
-- Find configuration based on SOURCE_KEY and TABLE_ID
BEGIN
SELECT A_SOURCE_FILE_CONFIG_KEY
INTO vConfigKey
FROM CT_MRDS.A_SOURCE_FILE_CONFIG
WHERE A_SOURCE_KEY = vSourceKey
AND TABLE_ID = vTableId
AND SOURCE_FILE_TYPE = 'INPUT'
AND ROWNUM = 1;
ENV_MANAGER.LOG_PROCESS_EVENT('Found config key: ' || vConfigKey || ' for SOURCE=' || vSourceKey || ', TABLE=' || vTableId, 'DEBUG', vParameters);
EXCEPTION
WHEN NO_DATA_FOUND THEN
vConfigKey := -1;
ENV_MANAGER.LOG_PROCESS_EVENT('No config found for SOURCE=' || vSourceKey || ', TABLE=' || vTableId || ' - using default (-1)', 'INFO', vParameters);
END;
ELSE
-- Cannot parse folder name - use default
vConfigKey := -1;
ENV_MANAGER.LOG_PROCESS_EVENT('Cannot parse pFolderName: ' || pFolderName || ' - using default (-1)', 'WARNING', vParameters);
END IF;
ENV_MANAGER.LOG_PROCESS_EVENT('Registering ' || vPartitions.COUNT || ' exported files to A_SOURCE_FILE_RECEIVED with config key: ' || vConfigKey, 'INFO', vParameters);
FOR i IN 1 .. vPartitions.COUNT LOOP
-- Construct filename and URI for this partition
vFileName := NVL(vFileBaseName, UPPER(REPLACE(vTableName, vSchemaName || '.', ''))) || '_' || vPartitions(i).year || vPartitions(i).month || '.csv';
vFileUri := vBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || sanitizeFilename(vFileName);
-- Get file metadata from OCI bucket (CHECKSUM, CREATED, BYTES) with retry logic
DECLARE
vChecksum VARCHAR2(128);
vCreated TIMESTAMP WITH TIME ZONE;
vBytes NUMBER;
vActualFileName VARCHAR2(1000); -- Actual filename with Oracle suffix
vSanitizedFileName VARCHAR2(1000);
vRetryCount NUMBER := 0;
vMaxRetries NUMBER := 1; -- One retry after initial attempt
vRetryDelay NUMBER := 2; -- 2 seconds delay
BEGIN
-- Sanitize filename first (PL/SQL function cannot be used directly in SQL)
vSanitizedFileName := sanitizeFilename(vFileName);
-- Remove .csv extension for LIKE pattern matching (Oracle adds suffixes BEFORE .csv)
-- Example: LEGACY_DEBT_202508.csv becomes LEGACY_DEBT_202508_1_20260211T102621591769Z.csv
vSanitizedFileName := REGEXP_REPLACE(vSanitizedFileName, '\.csv$', '', 1, 0, 'i');
-- Try to get file metadata with retry logic
<<metadata_retry_loop>>
LOOP
BEGIN
SELECT object_name, checksum, created, bytes
INTO vActualFileName, vChecksum, vCreated, vBytes
FROM TABLE(DBMS_CLOUD.LIST_OBJECTS(
credential_name => pCredentialName,
location_uri => vBucketUri
))
WHERE object_name LIKE CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || vSanitizedFileName || '%'
ORDER BY created DESC, bytes DESC
FETCH FIRST 1 ROW ONLY;
-- Extract filename only from full path (remove bucket folder prefix)
-- vActualFileName contains: 'ODS/CSDB/CSDB_DEBT/LEGACY_DEBT_202508_1_20260211T111341375171Z.csv'
-- Extract only: 'LEGACY_DEBT_202508_1_20260211T111341375171Z.csv'
vActualFileName := SUBSTR(vActualFileName, INSTR(vActualFileName, '/', -1) + 1);
-- Success - exit retry loop
EXIT metadata_retry_loop;
EXCEPTION
WHEN NO_DATA_FOUND THEN
vRetryCount := vRetryCount + 1;
IF vRetryCount <= vMaxRetries THEN
-- Log retry attempt
ENV_MANAGER.LOG_PROCESS_EVENT('File not found in bucket (attempt ' || vRetryCount || '/' || (vMaxRetries + 1) || '), retrying after ' || vRetryDelay || ' seconds: ' || vFileName, 'DEBUG', vParameters);
-- Wait before retry using DBMS_SESSION.SLEEP (alternative to DBMS_LOCK)
DBMS_SESSION.SLEEP(vRetryDelay);
ELSE
-- Max retries exceeded - re-raise exception
RAISE;
END IF;
END;
END LOOP metadata_retry_loop;
-- Create A_SOURCE_FILE_RECEIVED record for this export with metadata
vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL;
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
A_SOURCE_FILE_RECEIVED_KEY,
A_SOURCE_FILE_CONFIG_KEY,
SOURCE_FILE_NAME,
CHECKSUM,
CREATED,
BYTES,
RECEPTION_DATE,
PROCESSING_STATUS,
PARTITION_YEAR,
PARTITION_MONTH,
ARCH_PATH,
PROCESS_NAME
) VALUES (
vSourceFileReceivedKey,
vConfigKey, -- Config key from A_SOURCE_FILE_CONFIG lookup
vActualFileName, -- Use actual filename with Oracle suffix
vChecksum,
vCreated,
vBytes,
SYSDATE,
'INGESTED',
NULL, -- PARTITION_YEAR not used for CSV exports
NULL, -- PARTITION_MONTH not used for CSV exports
NULL, -- ARCH_PATH not used for CSV exports
pProcessName -- Process name from parameter
);
ENV_MANAGER.LOG_PROCESS_EVENT('Registered file: FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vActualFileName || ', Size=' || vBytes || ' bytes', 'DEBUG', vParameters);
EXCEPTION
WHEN NO_DATA_FOUND THEN
-- File not found after retries - log warning and continue without metadata
ENV_MANAGER.LOG_PROCESS_EVENT('WARNING: File not found in bucket after ' || (vMaxRetries + 1) || ' attempts: ' || vFileName, 'WARNING', vParameters);
-- Sanitize filename for fallback INSERT (function cannot be used in SQL)
vSanitizedFileName := sanitizeFilename(vFileName);
-- Insert without metadata
vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL;
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
A_SOURCE_FILE_RECEIVED_KEY,
A_SOURCE_FILE_CONFIG_KEY,
SOURCE_FILE_NAME,
RECEPTION_DATE,
PROCESSING_STATUS,
PARTITION_YEAR,
PARTITION_MONTH,
ARCH_PATH,
PROCESS_NAME
) VALUES (
vSourceFileReceivedKey,
vConfigKey, -- Config key from A_SOURCE_FILE_CONFIG lookup
vSanitizedFileName, -- Fallback: use theoretical filename if actual not found
SYSDATE,
'INGESTED',
NULL, -- PARTITION_YEAR not used for CSV exports
NULL, -- PARTITION_MONTH not used for CSV exports
NULL, -- ARCH_PATH not used for CSV exports
pProcessName -- Process name from parameter
);
END;
END LOOP;
COMMIT;
ENV_MANAGER.LOG_PROCESS_EVENT('Successfully registered all ' || vPartitions.COUNT || ' files', 'INFO', vParameters);
END IF;
ENV_MANAGER.LOG_PROCESS_EVENT('Export completed successfully for ' || vPartitions.COUNT || ' files', 'INFO', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
EXCEPTION
WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN
vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp);
WHEN ENV_MANAGER.ERR_INVALID_PARALLEL_DEGREE THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp);
WHEN ENV_MANAGER.ERR_PARALLEL_EXECUTION_FAILED THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp);
WHEN OTHERS THEN
-- Log complete error details including full stack trace and backtrace
ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER');
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
END EXPORT_TABLE_DATA_TO_CSV_BY_DATE;
----------------------------------------------------------------------------------------------------
-- VERSION MANAGEMENT FUNCTIONS
----------------------------------------------------------------------------------------------------
FUNCTION GET_VERSION RETURN VARCHAR2 IS
BEGIN
RETURN PACKAGE_VERSION;
END GET_VERSION;
----------------------------------------------------------------------------------------------------
FUNCTION GET_BUILD_INFO RETURN VARCHAR2 IS
BEGIN
RETURN ENV_MANAGER.GET_PACKAGE_VERSION_INFO(
pPackageName => 'DATA_EXPORTER',
pVersion => PACKAGE_VERSION,
pBuildDate => PACKAGE_BUILD_DATE,
pAuthor => PACKAGE_AUTHOR
);
END GET_BUILD_INFO;
----------------------------------------------------------------------------------------------------
FUNCTION GET_VERSION_HISTORY RETURN VARCHAR2 IS
BEGIN
RETURN ENV_MANAGER.FORMAT_VERSION_HISTORY(
pPackageName => 'DATA_EXPORTER',
pVersionHistory => VERSION_HISTORY
);
END GET_VERSION_HISTORY;
----------------------------------------------------------------------------------------------------
END;
/