1783 lines
88 KiB
Plaintext
1783 lines
88 KiB
Plaintext
create or replace PACKAGE BODY CT_MRDS.DATA_EXPORTER
|
|
AS
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
-- PRIVATE HELPER FUNCTIONS (USED BY MULTIPLE PROCEDURES)
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Sanitizes filename by replacing disallowed characters with underscores
|
|
**/
|
|
FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS
|
|
vFilename VARCHAR2(1000);
|
|
BEGIN
|
|
vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_');
|
|
RETURN vFilename;
|
|
END sanitizeFilename;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Deletes ALL files matching specific file pattern before retry export
|
|
* Critical for preventing data duplication when DBMS_CLOUD.EXPORT_DATA fails mid-process
|
|
*
|
|
* Problem: Export fails after creating partial file(s), retry creates new _2, _3 suffixed files
|
|
* Solution: Delete ALL files matching the base filename pattern before retry
|
|
*
|
|
* Pattern matching strategy:
|
|
* - Parquet: folder/PARTITION_YEAR=2024/PARTITION_MONTH=11/*.parquet (folder-level safe - each chunk has own partition folder)
|
|
* - CSV: folder/TABLENAME_202411*.csv (file-level pattern - multiple chunks share same folder!)
|
|
*
|
|
* CRITICAL for parallel processing:
|
|
* - Parquet chunks are isolated by partition folder structure (safe to delete folder/*)
|
|
* - CSV chunks share flat folder structure - MUST use file-specific pattern (TABLENAME_YYYYMM*)
|
|
* to avoid deleting files from other parallel chunks in same folder
|
|
**/
|
|
PROCEDURE DELETE_FAILED_EXPORT_FILE(
|
|
pFileUri IN VARCHAR2,
|
|
pCredentialName IN VARCHAR2,
|
|
pParameters IN VARCHAR2
|
|
) IS
|
|
vBucketUri VARCHAR2(4000);
|
|
vFolderPath VARCHAR2(4000);
|
|
vFileName VARCHAR2(1000);
|
|
vFileNamePattern VARCHAR2(1000);
|
|
vSlashPos NUMBER;
|
|
vDotPos NUMBER;
|
|
vFilesDeleted NUMBER := 0;
|
|
BEGIN
|
|
-- Extract components from URI
|
|
-- Example Parquet: https://.../bucket/folder/PARTITION_YEAR=2024/PARTITION_MONTH=11/202411.parquet
|
|
-- Example CSV: https://.../bucket/folder/TABLENAME_202411.csv
|
|
|
|
-- Find last slash before filename
|
|
vSlashPos := INSTR(pFileUri, '/', -1);
|
|
|
|
IF vSlashPos > 0 THEN
|
|
-- Extract filename from URI (after last slash)
|
|
vFileName := SUBSTR(pFileUri, vSlashPos + 1);
|
|
|
|
-- Extract folder path (before last slash)
|
|
vFolderPath := SUBSTR(pFileUri, 1, vSlashPos - 1);
|
|
|
|
-- Find bucket URI (protocol + namespace + bucket name)
|
|
-- Bucket URI ends after /o/ in OCI Object Storage URLs
|
|
vBucketUri := SUBSTR(pFileUri, 1, INSTR(pFileUri, '/o/') + 2);
|
|
|
|
-- Extract relative folder path (after bucket)
|
|
vFolderPath := SUBSTR(vFolderPath, LENGTH(vBucketUri) + 1);
|
|
|
|
-- Create file pattern by removing extension
|
|
-- Oracle adds suffixes BEFORE extension: file.csv -> file_1_timestamp.csv
|
|
-- Pattern: file* matches file_1_timestamp.csv, file_2_timestamp.csv
|
|
vDotPos := INSTR(vFileName, '.', -1);
|
|
IF vDotPos > 0 THEN
|
|
vFileNamePattern := SUBSTR(vFileName, 1, vDotPos - 1) || '%';
|
|
ELSE
|
|
vFileNamePattern := vFileName || '%';
|
|
END IF;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Cleanup before retry - Pattern: ' || vFolderPath || '/' || vFileNamePattern, 'DEBUG', pParameters);
|
|
|
|
-- List and delete ALL files matching pattern
|
|
-- CRITICAL: Uses file-specific pattern for CSV chunk isolation in shared folder
|
|
FOR rec IN (
|
|
SELECT object_name
|
|
FROM TABLE(DBMS_CLOUD.LIST_OBJECTS(
|
|
credential_name => pCredentialName,
|
|
location_uri => vBucketUri
|
|
))
|
|
WHERE object_name LIKE vFolderPath || '/' || vFileNamePattern
|
|
) LOOP
|
|
BEGIN
|
|
DBMS_CLOUD.DELETE_OBJECT(
|
|
credential_name => pCredentialName,
|
|
object_uri => vBucketUri || rec.object_name
|
|
);
|
|
|
|
vFilesDeleted := vFilesDeleted + 1;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Deleted partial file ' || vFilesDeleted || ': ' || rec.object_name, 'DEBUG', pParameters);
|
|
EXCEPTION
|
|
WHEN OTHERS THEN
|
|
-- Log but continue - don't fail entire cleanup
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Warning: Could not delete ' || rec.object_name || ': ' || SQLERRM, 'WARNING', pParameters);
|
|
END;
|
|
END LOOP;
|
|
|
|
IF vFilesDeleted > 0 THEN
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Cleanup completed: Deleted ' || vFilesDeleted || ' partial file(s) from previous failed export', 'INFO', pParameters);
|
|
ELSE
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('No existing files to clean up (pattern match: ' || vFileNamePattern || ')', 'DEBUG', pParameters);
|
|
END IF;
|
|
ELSE
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Warning: Cannot parse file URI for cleanup: ' || pFileUri, 'WARNING', pParameters);
|
|
END IF;
|
|
EXCEPTION
|
|
WHEN OTHERS THEN
|
|
-- Don't fail export if cleanup fails - log and continue
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Warning: Cleanup failed (will retry export anyway): ' || SQLERRM, 'WARNING', pParameters);
|
|
END DELETE_FAILED_EXPORT_FILE;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Builds query with TO_CHAR for date/timestamp columns using per-column formats
|
|
* Retrieves format for each date column from FILE_MANAGER.GET_DATE_FORMAT
|
|
**/
|
|
FUNCTION buildQueryWithDateFormats(
|
|
pColumnList IN VARCHAR2,
|
|
pTableName IN VARCHAR2,
|
|
pSchemaName IN VARCHAR2,
|
|
pKeyColumnName IN VARCHAR2,
|
|
pTemplateTableName IN VARCHAR2
|
|
) RETURN VARCHAR2 IS
|
|
vResult VARCHAR2(32767);
|
|
vColumns VARCHAR2(32767);
|
|
vPos PLS_INTEGER;
|
|
vNextPos PLS_INTEGER;
|
|
vCurrentCol VARCHAR2(128);
|
|
vAllCols VARCHAR2(32767);
|
|
vDataType VARCHAR2(30);
|
|
vDateFormat VARCHAR2(200);
|
|
vTemplateSchema VARCHAR2(128);
|
|
vTemplateTable VARCHAR2(128);
|
|
vColExists NUMBER;
|
|
BEGIN
|
|
-- Build column list if not provided
|
|
IF pColumnList IS NULL THEN
|
|
-- Use template table for column order when provided
|
|
-- Template defines which columns to export and in what order
|
|
IF pTemplateTableName IS NOT NULL THEN
|
|
-- Parse template table name (SCHEMA.TABLE or just TABLE)
|
|
IF INSTR(pTemplateTableName, '.') > 0 THEN
|
|
vTemplateSchema := SUBSTR(pTemplateTableName, 1, INSTR(pTemplateTableName, '.') - 1);
|
|
vTemplateTable := SUBSTR(pTemplateTableName, INSTR(pTemplateTableName, '.') + 1);
|
|
ELSE
|
|
vTemplateSchema := pSchemaName;
|
|
vTemplateTable := pTemplateTableName;
|
|
END IF;
|
|
|
|
-- Get columns from TEMPLATE table in template column order
|
|
-- Template defines target CSV structure (column order and which columns to include)
|
|
SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id)
|
|
INTO vAllCols
|
|
FROM all_tab_columns
|
|
WHERE table_name = vTemplateTable
|
|
AND owner = vTemplateSchema;
|
|
ELSE
|
|
-- Get columns from source table when no template
|
|
SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id)
|
|
INTO vAllCols
|
|
FROM all_tab_columns
|
|
WHERE table_name = pTableName
|
|
AND owner = pSchemaName;
|
|
END IF;
|
|
ELSE
|
|
vAllCols := pColumnList;
|
|
END IF;
|
|
|
|
-- Process each column
|
|
vColumns := UPPER(REPLACE(vAllCols, ' ', ''));
|
|
vPos := 1;
|
|
vResult := '';
|
|
|
|
WHILE vPos <= LENGTH(vColumns) LOOP
|
|
vNextPos := INSTR(vColumns, ',', vPos);
|
|
IF vNextPos = 0 THEN
|
|
vNextPos := LENGTH(vColumns) + 1;
|
|
END IF;
|
|
|
|
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
|
|
|
|
-- When using template table, check if column exists in SOURCE table
|
|
-- Template defines target structure, source provides data
|
|
-- Skip template columns that don't exist in source (except A_WORKFLOW_HISTORY_KEY)
|
|
IF pTemplateTableName IS NOT NULL THEN
|
|
-- Check if template column exists in SOURCE table
|
|
SELECT COUNT(*) INTO vColExists
|
|
FROM all_tab_columns
|
|
WHERE table_name = pTableName
|
|
AND column_name = vCurrentCol
|
|
AND owner = pSchemaName;
|
|
|
|
-- Skip columns that don't exist in source table
|
|
-- Exception: A_WORKFLOW_HISTORY_KEY is virtual (mapped from pKeyColumnName)
|
|
IF vColExists = 0 AND UPPER(vCurrentCol) != 'A_WORKFLOW_HISTORY_KEY' THEN
|
|
vPos := vNextPos + 1;
|
|
CONTINUE;
|
|
END IF;
|
|
END IF;
|
|
|
|
-- Get column data type from appropriate table (template or source)
|
|
IF pTemplateTableName IS NOT NULL THEN
|
|
-- Get data type from template table
|
|
SELECT data_type INTO vDataType
|
|
FROM all_tab_columns
|
|
WHERE table_name = vTemplateTable
|
|
AND column_name = vCurrentCol
|
|
AND owner = vTemplateSchema;
|
|
ELSE
|
|
-- Get data type from source table
|
|
SELECT data_type INTO vDataType
|
|
FROM all_tab_columns
|
|
WHERE table_name = pTableName
|
|
AND column_name = vCurrentCol
|
|
AND owner = pSchemaName;
|
|
END IF;
|
|
|
|
-- Handle key column alias (template table has A_WORKFLOW_HISTORY_KEY, source table has pKeyColumnName)
|
|
IF UPPER(vCurrentCol) = 'A_WORKFLOW_HISTORY_KEY' THEN
|
|
vResult := vResult || CASE WHEN vResult IS NOT NULL THEN ', ' ELSE '' END ||
|
|
'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY';
|
|
|
|
-- Convert DATE/TIMESTAMP columns to CHAR with specific format
|
|
ELSIF vDataType IN ('DATE', 'TIMESTAMP', 'TIMESTAMP WITH TIME ZONE', 'TIMESTAMP WITH LOCAL TIME ZONE') THEN
|
|
IF pTemplateTableName IS NOT NULL THEN
|
|
vDateFormat := CT_MRDS.FILE_MANAGER.GET_DATE_FORMAT(
|
|
pTemplateTableName => pTemplateTableName,
|
|
pColumnName => vCurrentCol
|
|
);
|
|
ELSE
|
|
vDateFormat := ENV_MANAGER.gvDefaultDateFormat;
|
|
END IF;
|
|
vResult := vResult || CASE WHEN vResult IS NOT NULL THEN ', ' ELSE '' END ||
|
|
'TO_CHAR(T.' || vCurrentCol || ', ''' || vDateFormat || ''') AS ' || vCurrentCol;
|
|
|
|
-- Other columns as-is with T. prefix
|
|
ELSE
|
|
vResult := vResult || CASE WHEN vResult IS NOT NULL THEN ', ' ELSE '' END ||
|
|
'T.' || vCurrentCol;
|
|
END IF;
|
|
|
|
vPos := vNextPos + 1;
|
|
END LOOP;
|
|
|
|
RETURN vResult;
|
|
END buildQueryWithDateFormats;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
-- Internal shared function to process column list with T. prefix and key column mapping
|
|
FUNCTION processColumnList(pColumnList IN VARCHAR2, pTableName IN VARCHAR2, pSchemaName IN VARCHAR2, pKeyColumnName IN VARCHAR2) RETURN VARCHAR2 IS
|
|
vResult VARCHAR2(32767);
|
|
vColumns VARCHAR2(32767);
|
|
vPos PLS_INTEGER;
|
|
vNextPos PLS_INTEGER;
|
|
vCurrentCol VARCHAR2(128);
|
|
vAllCols VARCHAR2(32767);
|
|
BEGIN
|
|
IF pColumnList IS NULL THEN
|
|
-- Build list of all columns
|
|
SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id)
|
|
INTO vAllCols
|
|
FROM all_tab_columns
|
|
WHERE table_name = pTableName
|
|
AND owner = pSchemaName;
|
|
|
|
-- Add T. prefix to all columns
|
|
vResult := 'T.' || REPLACE(vAllCols, ', ', ', T.');
|
|
|
|
-- Replace key column with aliased version (e.g., T.A_ETL_LOAD_SET_KEY_FK AS A_WORKFLOW_HISTORY_KEY)
|
|
vResult := REPLACE(vResult, 'T.' || pKeyColumnName, 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY');
|
|
|
|
RETURN vResult;
|
|
END IF;
|
|
|
|
-- Remove extra spaces and convert to uppercase
|
|
vColumns := UPPER(REPLACE(pColumnList, ' ', ''));
|
|
vPos := 1;
|
|
vResult := '';
|
|
|
|
-- Parse comma-separated column list and add T. prefix
|
|
WHILE vPos <= LENGTH(vColumns) LOOP
|
|
vNextPos := INSTR(vColumns, ',', vPos);
|
|
IF vNextPos = 0 THEN
|
|
vNextPos := LENGTH(vColumns) + 1;
|
|
END IF;
|
|
|
|
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
|
|
|
|
-- Check if this is the key column (e.g., A_ETL_LOAD_SET_KEY_FK) and add alias
|
|
IF UPPER(vCurrentCol) = UPPER(pKeyColumnName) THEN
|
|
vCurrentCol := 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY';
|
|
ELSE
|
|
-- Add T. prefix if not already present
|
|
IF INSTR(vCurrentCol, '.') = 0 THEN
|
|
vCurrentCol := 'T.' || vCurrentCol;
|
|
END IF;
|
|
END IF;
|
|
|
|
-- Add to result with comma separator
|
|
IF vResult IS NOT NULL THEN
|
|
vResult := vResult || ', ';
|
|
END IF;
|
|
vResult := vResult || vCurrentCol;
|
|
|
|
vPos := vNextPos + 1;
|
|
END LOOP;
|
|
|
|
RETURN vResult;
|
|
END processColumnList;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Validates table existence, key column existence, and column list
|
|
**/
|
|
PROCEDURE VALIDATE_TABLE_AND_COLUMNS (
|
|
pSchemaName IN VARCHAR2,
|
|
pTableName IN VARCHAR2,
|
|
pKeyColumnName IN VARCHAR2,
|
|
pColumnList IN VARCHAR2,
|
|
pParameters IN VARCHAR2
|
|
) IS
|
|
vCount INTEGER;
|
|
vColumns VARCHAR2(32767);
|
|
vPos PLS_INTEGER;
|
|
vNextPos PLS_INTEGER;
|
|
vCurrentCol VARCHAR2(128);
|
|
BEGIN
|
|
-- Check if table exists
|
|
SELECT COUNT(*) INTO vCount
|
|
FROM all_tables
|
|
WHERE table_name = pTableName
|
|
AND owner = pSchemaName;
|
|
|
|
IF vCount = 0 THEN
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS);
|
|
END IF;
|
|
|
|
-- Check if key column exists
|
|
SELECT COUNT(*) INTO vCount
|
|
FROM all_tab_columns
|
|
WHERE table_name = pTableName
|
|
AND column_name = pKeyColumnName
|
|
AND owner = pSchemaName;
|
|
|
|
IF vCount = 0 THEN
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
|
|
END IF;
|
|
|
|
-- Validate pColumnList - check if all column names exist in the table
|
|
IF pColumnList IS NOT NULL THEN
|
|
vColumns := UPPER(REPLACE(pColumnList, ' ', ''));
|
|
vPos := 1;
|
|
|
|
WHILE vPos <= LENGTH(vColumns) LOOP
|
|
vNextPos := INSTR(vColumns, ',', vPos);
|
|
IF vNextPos = 0 THEN
|
|
vNextPos := LENGTH(vColumns) + 1;
|
|
END IF;
|
|
|
|
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
|
|
|
|
-- Remove table alias prefix if present
|
|
IF INSTR(vCurrentCol, '.') > 0 THEN
|
|
vCurrentCol := SUBSTR(vCurrentCol, INSTR(vCurrentCol, '.') + 1);
|
|
END IF;
|
|
|
|
-- Check if column exists
|
|
SELECT COUNT(*) INTO vCount
|
|
FROM all_tab_columns
|
|
WHERE table_name = pTableName
|
|
AND column_name = vCurrentCol
|
|
AND owner = pSchemaName;
|
|
|
|
IF vCount = 0 THEN
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
|
|
END IF;
|
|
|
|
vPos := vNextPos + 1;
|
|
END LOOP;
|
|
END IF;
|
|
END VALIDATE_TABLE_AND_COLUMNS;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Retrieves list of year/month partitions based on date range
|
|
**/
|
|
FUNCTION GET_PARTITIONS (
|
|
pSchemaName IN VARCHAR2,
|
|
pTableName IN VARCHAR2,
|
|
pKeyColumnName IN VARCHAR2,
|
|
pMinDate IN DATE,
|
|
pMaxDate IN DATE,
|
|
pParameters IN VARCHAR2
|
|
) RETURN partition_tab IS
|
|
vSql VARCHAR2(32000);
|
|
vPartitions partition_tab;
|
|
vKeyValuesYear DBMS_SQL.VARCHAR2_TABLE;
|
|
vKeyValuesMonth DBMS_SQL.VARCHAR2_TABLE;
|
|
vFullTableName VARCHAR2(200);
|
|
BEGIN
|
|
-- Build fully qualified table name if not already qualified
|
|
IF INSTR(pTableName, '.') > 0 THEN
|
|
vFullTableName := pTableName; -- Already fully qualified
|
|
ELSE
|
|
vFullTableName := pSchemaName || '.' || pTableName;
|
|
END IF;
|
|
|
|
vSql := 'SELECT DISTINCT TO_CHAR(L.LOAD_START,''YYYY'') AS YR, TO_CHAR(L.LOAD_START,''MM'') AS MN
|
|
FROM ' || vFullTableName || ' T, CT_ODS.A_LOAD_HISTORY L
|
|
WHERE T.' || pKeyColumnName || ' = L.A_ETL_LOAD_SET_KEY
|
|
AND L.LOAD_START >= :pMinDate
|
|
AND L.LOAD_START < :pMaxDate
|
|
ORDER BY YR, MN';
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Executing date range query: ' || vSql, 'DEBUG', pParameters);
|
|
EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValuesYear, vKeyValuesMonth USING pMinDate, pMaxDate;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValuesYear.COUNT || ' year/month combinations to export', 'DEBUG', pParameters);
|
|
|
|
-- Convert to partition_tab
|
|
vPartitions := partition_tab();
|
|
vPartitions.EXTEND(vKeyValuesYear.COUNT);
|
|
FOR i IN 1 .. vKeyValuesYear.COUNT LOOP
|
|
vPartitions(i).year := vKeyValuesYear(i);
|
|
vPartitions(i).month := vKeyValuesMonth(i);
|
|
END LOOP;
|
|
|
|
RETURN vPartitions;
|
|
END GET_PARTITIONS;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Exports single partition (year/month) to specified format (PARQUET or CSV)
|
|
* This is the core worker procedure that will be used for parallel processing in v2.3.0
|
|
**/
|
|
PROCEDURE EXPORT_SINGLE_PARTITION (
|
|
pSchemaName IN VARCHAR2,
|
|
pTableName IN VARCHAR2,
|
|
pKeyColumnName IN VARCHAR2,
|
|
pYear IN VARCHAR2,
|
|
pMonth IN VARCHAR2,
|
|
pBucketUri IN VARCHAR2,
|
|
pFolderName IN VARCHAR2,
|
|
pProcessedColumns IN VARCHAR2,
|
|
pMinDate IN DATE,
|
|
pMaxDate IN DATE,
|
|
pCredentialName IN VARCHAR2,
|
|
pFormat IN VARCHAR2 DEFAULT 'PARQUET',
|
|
pFileBaseName IN VARCHAR2 DEFAULT NULL,
|
|
pMaxFileSize IN NUMBER DEFAULT 104857600,
|
|
pParameters IN VARCHAR2
|
|
) IS
|
|
vQuery VARCHAR2(32767);
|
|
vUri VARCHAR2(4000);
|
|
vFileName VARCHAR2(1000);
|
|
vFullTableName VARCHAR2(200);
|
|
BEGIN
|
|
-- Build fully qualified table name if not already qualified
|
|
IF INSTR(pTableName, '.') > 0 THEN
|
|
vFullTableName := pTableName; -- Already fully qualified
|
|
ELSE
|
|
vFullTableName := pSchemaName || '.' || pTableName;
|
|
END IF;
|
|
|
|
-- Construct the query to extract data for the current year/month
|
|
vQuery := 'SELECT ' || pProcessedColumns || '
|
|
FROM ' || vFullTableName || ' T, CT_ODS.A_LOAD_HISTORY L
|
|
WHERE T.' || pKeyColumnName || ' = L.A_ETL_LOAD_SET_KEY
|
|
AND TO_CHAR(L.LOAD_START,''YYYY'') = ' || CHR(39) || pYear || CHR(39) || '
|
|
AND TO_CHAR(L.LOAD_START,''MM'') = ' || CHR(39) || pMonth || CHR(39) || '
|
|
AND L.LOAD_START >= TO_DATE(' || CHR(39) || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')
|
|
AND L.LOAD_START < TO_DATE(' || CHR(39) || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')';
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Processing Year/Month: ' || pYear || '/' || pMonth || ' (Format: '||pFormat||')', 'DEBUG', pParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', pParameters);
|
|
-- Construct the URI based on format
|
|
IF pFormat = 'PARQUET' THEN
|
|
-- Parquet: Use Hive-style partitioning
|
|
-- Note: maxfilesize is NOT supported for Parquet format (Oracle limitation)
|
|
vUri := pBucketUri ||
|
|
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
|
|
'PARTITION_YEAR=' || sanitizeFilename(pYear) || '/' ||
|
|
'PARTITION_MONTH=' || sanitizeFilename(pMonth) || '/' ||
|
|
sanitizeFilename(pYear) || sanitizeFilename(pMonth) || '.parquet';
|
|
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Parquet export URI: ' || vUri, 'DEBUG', pParameters);
|
|
|
|
-- Delete potentially corrupted file from previous failed attempt
|
|
-- This prevents Oracle from creating _1 suffixed files on retry
|
|
DELETE_FAILED_EXPORT_FILE(vUri, pCredentialName, pParameters);
|
|
|
|
DBMS_CLOUD.EXPORT_DATA(
|
|
credential_name => pCredentialName,
|
|
file_uri_list => vUri,
|
|
query => vQuery,
|
|
format => json_object('type' VALUE 'parquet')
|
|
);
|
|
ELSIF pFormat = 'CSV' THEN
|
|
-- CSV: Flat file structure with year/month in filename
|
|
vFileName := NVL(pFileBaseName, UPPER(pTableName)) || '_' || pYear || pMonth || '.csv';
|
|
vUri := pBucketUri ||
|
|
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
|
|
sanitizeFilename(vFileName);
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('CSV export URI: ' || vUri, 'DEBUG', pParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('CSV maxfilesize: ' || pMaxFileSize || ' bytes (' || ROUND(pMaxFileSize/1048576, 2) || ' MB)', 'DEBUG', pParameters);
|
|
|
|
-- Delete potentially corrupted file from previous failed attempt
|
|
-- This prevents Oracle from creating _1 suffixed files on retry
|
|
DELETE_FAILED_EXPORT_FILE(vUri, pCredentialName, pParameters);
|
|
|
|
-- Use json_object() for CSV export with maxfilesize in bytes (Oracle requirement)
|
|
-- Oracle maxfilesize: min 10MB (10485760), max 1GB (1073741824), default 10MB
|
|
-- NOTE: maxfilesize must be NUMBER (bytes), not string like '1000M'
|
|
-- Using 100MB (104857600) to avoid PGA memory issues with large files
|
|
DBMS_CLOUD.EXPORT_DATA(
|
|
credential_name => pCredentialName,
|
|
file_uri_list => vUri,
|
|
query => vQuery,
|
|
format => json_object(
|
|
'type' VALUE 'CSV',
|
|
'header' VALUE true,
|
|
'quote' VALUE CHR(34),
|
|
'delimiter' VALUE ',',
|
|
'escape' VALUE true,
|
|
'recorddelimiter' VALUE CHR(13)||CHR(10), -- CRLF dla Windows
|
|
'maxfilesize' VALUE pMaxFileSize -- Dynamic maxfilesize in bytes (e.g., 104857600 = 100MB)
|
|
)
|
|
);
|
|
ELSE
|
|
RAISE_APPLICATION_ERROR(-20001, 'Unsupported format: ' || pFormat || '. Use PARQUET or CSV.');
|
|
END IF;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Export completed successfully for ' || pYear || '/' || pMonth, 'DEBUG', pParameters);
|
|
END EXPORT_SINGLE_PARTITION;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Callback procedure for DBMS_PARALLEL_EXECUTE
|
|
* Processes single partition (year/month) chunk in parallel task
|
|
* Called by DBMS_PARALLEL_EXECUTE framework for each chunk
|
|
**/
|
|
PROCEDURE EXPORT_PARTITION_PARALLEL (
|
|
pStartId IN NUMBER,
|
|
pEndId IN NUMBER,
|
|
pTaskName IN VARCHAR2 DEFAULT NULL
|
|
) IS
|
|
vYear VARCHAR2(4);
|
|
vMonth VARCHAR2(2);
|
|
vSchemaName VARCHAR2(128);
|
|
vTableName VARCHAR2(128);
|
|
vKeyColumnName VARCHAR2(128);
|
|
vBucketUri VARCHAR2(4000);
|
|
vFolderName VARCHAR2(1000);
|
|
vProcessedColumns VARCHAR2(32767);
|
|
vMinDate DATE;
|
|
vMaxDate DATE;
|
|
vCredentialName VARCHAR2(200);
|
|
vFormat VARCHAR2(20);
|
|
vFileBaseName VARCHAR2(1000);
|
|
vMaxFileSize NUMBER;
|
|
vJobClass VARCHAR2(128);
|
|
vTaskName VARCHAR2(128);
|
|
vParameters VARCHAR2(4000);
|
|
BEGIN
|
|
-- Retrieve chunk context from A_PARALLEL_EXPORT_CHUNKS table
|
|
-- CRITICAL: Filter by CHUNK_ID and TASK_NAME for precise session isolation
|
|
-- pTaskName parameter passed from RUN_TASK ensures deterministic single-row retrieval
|
|
SELECT
|
|
YEAR_VALUE,
|
|
MONTH_VALUE,
|
|
SCHEMA_NAME,
|
|
TABLE_NAME,
|
|
KEY_COLUMN_NAME,
|
|
BUCKET_URI,
|
|
FOLDER_NAME,
|
|
PROCESSED_COLUMNS,
|
|
MIN_DATE,
|
|
MAX_DATE,
|
|
CREDENTIAL_NAME,
|
|
FORMAT_TYPE,
|
|
FILE_BASE_NAME,
|
|
MAX_FILE_SIZE,
|
|
JOB_CLASS,
|
|
TASK_NAME
|
|
INTO
|
|
vYear,
|
|
vMonth,
|
|
vSchemaName,
|
|
vTableName,
|
|
vKeyColumnName,
|
|
vBucketUri,
|
|
vFolderName,
|
|
vProcessedColumns,
|
|
vMinDate,
|
|
vMaxDate,
|
|
vCredentialName,
|
|
vFormat,
|
|
vFileBaseName,
|
|
vMaxFileSize,
|
|
vJobClass,
|
|
vTaskName
|
|
FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
|
|
WHERE CHUNK_ID = pStartId
|
|
AND TASK_NAME = pTaskName;
|
|
|
|
vParameters := 'Parallel task - Year: ' || vYear || ', Month: ' || vMonth || ', ChunkID: ' || pStartId || ', TaskName: ' || vTaskName;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Starting parallel export for partition ' || vYear || '/' || vMonth, 'DEBUG', vParameters);
|
|
|
|
-- Mark chunk as PROCESSING
|
|
-- CRITICAL: Use both CHUNK_ID AND TASK_NAME for session isolation
|
|
UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
|
|
SET STATUS = 'PROCESSING',
|
|
ERROR_MESSAGE = NULL
|
|
WHERE CHUNK_ID = pStartId
|
|
AND TASK_NAME = vTaskName;
|
|
COMMIT;
|
|
|
|
-- Call the worker procedure
|
|
EXPORT_SINGLE_PARTITION(
|
|
pSchemaName => vSchemaName,
|
|
pTableName => vTableName,
|
|
pKeyColumnName => vKeyColumnName,
|
|
pYear => vYear,
|
|
pMonth => vMonth,
|
|
pBucketUri => vBucketUri,
|
|
pFolderName => vFolderName,
|
|
pProcessedColumns => vProcessedColumns,
|
|
pMinDate => vMinDate,
|
|
pMaxDate => vMaxDate,
|
|
pCredentialName => vCredentialName,
|
|
pFormat => vFormat,
|
|
pFileBaseName => vFileBaseName,
|
|
pMaxFileSize => vMaxFileSize,
|
|
pParameters => vParameters
|
|
);
|
|
|
|
-- Mark chunk as COMPLETED
|
|
-- CRITICAL: Use both CHUNK_ID AND TASK_NAME for session isolation
|
|
UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
|
|
SET STATUS = 'COMPLETED',
|
|
EXPORT_TIMESTAMP = SYSTIMESTAMP,
|
|
ERROR_MESSAGE = NULL
|
|
WHERE CHUNK_ID = pStartId
|
|
AND TASK_NAME = vTaskName;
|
|
COMMIT;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Completed parallel export for partition ' || vYear || '/' || vMonth, 'DEBUG', vParameters);
|
|
EXCEPTION
|
|
WHEN OTHERS THEN
|
|
-- Capture error details in variable (SQLERRM cannot be used directly in SQL)
|
|
vgMsgTmp := 'Parallel task error for partition ' || vYear || '/' || vMonth || ' (ChunkID: ' || pStartId || ', TaskName: ' || vTaskName || '): ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
|
|
-- Mark chunk as FAILED with error message
|
|
-- CRITICAL: Use both CHUNK_ID AND TASK_NAME for session isolation
|
|
-- Use vgMsgTmp variable instead of SQLERRM directly (Oracle limitation in SQL context)
|
|
UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
|
|
SET STATUS = 'FAILED',
|
|
ERROR_MESSAGE = SUBSTR(vgMsgTmp, 1, 4000)
|
|
WHERE CHUNK_ID = pStartId
|
|
AND TASK_NAME = vTaskName;
|
|
COMMIT;
|
|
|
|
RAISE;
|
|
END EXPORT_PARTITION_PARALLEL;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
-- MAIN EXPORT PROCEDURES
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
PROCEDURE EXPORT_TABLE_DATA (
|
|
pSchemaName IN VARCHAR2,
|
|
pTableName IN VARCHAR2,
|
|
pKeyColumnName IN VARCHAR2,
|
|
pBucketArea IN VARCHAR2,
|
|
pFolderName IN VARCHAR2,
|
|
pFileName IN VARCHAR2 default NULL,
|
|
pTemplateTableName IN VARCHAR2 default NULL,
|
|
pMaxFileSize IN NUMBER default 104857600,
|
|
pRegisterExport IN BOOLEAN default FALSE,
|
|
pProcessName IN VARCHAR2 default 'DATA_EXPORTER',
|
|
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
|
|
)
|
|
IS
|
|
vCount INTEGER;
|
|
vQuery VARCHAR2(32767);
|
|
vUri VARCHAR2(4000);
|
|
vTableName VARCHAR2(128);
|
|
vSchemaName VARCHAR2(128);
|
|
vKeyColumnName VARCHAR2(128);
|
|
vParameters VARCHAR2(4000);
|
|
vBucketUri VARCHAR2(4000);
|
|
vProcessedColumnList VARCHAR2(32767);
|
|
vCurrentCol VARCHAR2(128);
|
|
|
|
-- Variables for file registration (when pRegisterExport=TRUE)
|
|
vConfigKey NUMBER;
|
|
vSourceKey VARCHAR2(100);
|
|
vTableId VARCHAR2(100);
|
|
vSlashPos1 NUMBER;
|
|
vSlashPos2 NUMBER;
|
|
vSourceFileReceivedKey NUMBER;
|
|
|
|
BEGIN
|
|
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
|
|
,'pTableName => '''||nvl(pTableName, 'NULL')||''''
|
|
,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||''''
|
|
,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||''''
|
|
,'pFolderName => '''||nvl(pFolderName, 'NULL')||''''
|
|
,'pFileName => '''||nvl(pFileName, 'NULL')||''''
|
|
,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||''''
|
|
,'pMaxFileSize => '''||nvl(TO_CHAR(pMaxFileSize), 'NULL')||''''
|
|
,'pRegisterExport => '''||CASE WHEN pRegisterExport THEN 'TRUE' ELSE 'FALSE' END||''''
|
|
,'pProcessName => '''||nvl(pProcessName, 'NULL')||''''
|
|
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
|
|
));
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
|
|
|
|
-- Get bucket URI based on bucket area using FILE_MANAGER function
|
|
vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea);
|
|
|
|
-- Convert table and column names to uppercase to match data dictionary
|
|
vTableName := UPPER(pTableName);
|
|
vSchemaName := UPPER(pSchemaName);
|
|
vKeyColumnName := UPPER(pKeyColumnName);
|
|
|
|
-- Check if table exists
|
|
SELECT COUNT(*) INTO vCount
|
|
FROM all_tables
|
|
WHERE table_name = vTableName
|
|
AND owner = vSchemaName;
|
|
|
|
IF vCount = 0 THEN
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS);
|
|
END IF;
|
|
|
|
-- Check if key column exists
|
|
SELECT COUNT(*) INTO vCount
|
|
FROM all_tab_columns
|
|
WHERE table_name = vTableName
|
|
AND column_name = vKeyColumnName
|
|
AND owner = vSchemaName;
|
|
|
|
IF vCount = 0 THEN
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
|
|
END IF;
|
|
|
|
-- Validate template table if provided
|
|
IF pTemplateTableName IS NOT NULL THEN
|
|
DECLARE
|
|
vTemplateSchema VARCHAR2(128);
|
|
vTemplateTable VARCHAR2(128);
|
|
vTemplateCount NUMBER;
|
|
BEGIN
|
|
-- Parse template table name (SCHEMA.TABLE or just TABLE)
|
|
IF INSTR(pTemplateTableName, '.') > 0 THEN
|
|
vTemplateSchema := UPPER(SUBSTR(pTemplateTableName, 1, INSTR(pTemplateTableName, '.') - 1));
|
|
vTemplateTable := UPPER(SUBSTR(pTemplateTableName, INSTR(pTemplateTableName, '.') + 1));
|
|
ELSE
|
|
vTemplateSchema := vSchemaName;
|
|
vTemplateTable := UPPER(pTemplateTableName);
|
|
END IF;
|
|
|
|
-- Check if template table exists
|
|
SELECT COUNT(*) INTO vTemplateCount
|
|
FROM all_tables
|
|
WHERE table_name = vTemplateTable
|
|
AND owner = vTemplateSchema;
|
|
|
|
IF vTemplateCount = 0 THEN
|
|
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS || ': Template table ' || vTemplateSchema || '.' || vTemplateTable;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
|
|
END IF;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Template table validated: ' || vTemplateSchema || '.' || vTemplateTable, 'DEBUG', vParameters);
|
|
END;
|
|
END IF;
|
|
|
|
-- Build query with TO_CHAR for date columns (per-column format support)
|
|
vProcessedColumnList := buildQueryWithDateFormats(NULL, vTableName, vSchemaName, vKeyColumnName, pTemplateTableName);
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with TO_CHAR for date columns: ' || vProcessedColumnList, 'DEBUG', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Template table: ' || NVL(pTemplateTableName, 'NULL - using global default for all dates'), 'INFO', vParameters);
|
|
|
|
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName);
|
|
|
|
-- Lookup A_SOURCE_FILE_CONFIG_KEY based on pFolderName parsing if pRegisterExport is enabled
|
|
IF pRegisterExport THEN
|
|
-- Format: {BUCKET_AREA}/{SOURCE_KEY}/{TABLE_ID}
|
|
-- Example: 'ODS/CSDB/CSDB_DEBT_DAILY' -> SOURCE_KEY='CSDB', TABLE_ID='CSDB_DEBT_DAILY'
|
|
|
|
-- Parse pFolderName to extract SOURCE_KEY and TABLE_ID
|
|
vSlashPos1 := INSTR(pFolderName, '/', 1, 1); -- First '/' position
|
|
vSlashPos2 := INSTR(pFolderName, '/', 1, 2); -- Second '/' position
|
|
|
|
IF vSlashPos1 > 0 AND vSlashPos2 > 0 THEN
|
|
-- Extract segment 2 (SOURCE_KEY) and segment 3 (TABLE_ID)
|
|
vSourceKey := SUBSTR(pFolderName, vSlashPos1 + 1, vSlashPos2 - vSlashPos1 - 1);
|
|
vTableId := SUBSTR(pFolderName, vSlashPos2 + 1);
|
|
|
|
-- Find configuration based on SOURCE_KEY and TABLE_ID
|
|
BEGIN
|
|
SELECT A_SOURCE_FILE_CONFIG_KEY
|
|
INTO vConfigKey
|
|
FROM CT_MRDS.A_SOURCE_FILE_CONFIG
|
|
WHERE A_SOURCE_KEY = vSourceKey
|
|
AND TABLE_ID = vTableId
|
|
AND SOURCE_FILE_TYPE = 'INPUT'
|
|
AND ROWNUM = 1;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Found config key: ' || vConfigKey || ' for SOURCE=' || vSourceKey || ', TABLE=' || vTableId, 'DEBUG', vParameters);
|
|
EXCEPTION
|
|
WHEN NO_DATA_FOUND THEN
|
|
vConfigKey := -1;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('No config found for SOURCE=' || vSourceKey || ', TABLE=' || vTableId || ' - using default (-1)', 'INFO', vParameters);
|
|
END;
|
|
ELSE
|
|
-- Cannot parse folder name - use default
|
|
vConfigKey := -1;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Cannot parse pFolderName: ' || pFolderName || ' - using default (-1)', 'WARNING', vParameters);
|
|
END IF;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('File registration enabled with config key: ' || vConfigKey, 'INFO', vParameters);
|
|
END IF;
|
|
|
|
-- Construct single query for entire table (no join with A_LOAD_HISTORY - ensures single file output)
|
|
vQuery := 'SELECT ' || vProcessedColumnList ||
|
|
' FROM ' || vTableName || ' T';
|
|
|
|
-- Construct the URI for the file in OCI Object Storage
|
|
vUri := vBucketUri ||
|
|
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
|
|
NVL(pFileName, UPPER(vTableName) || '.csv');
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Exporting to single file: ' || vUri, 'INFO', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Max file size: ' || pMaxFileSize || ' bytes (' || ROUND(pMaxFileSize/1048576, 2) || ' MB)', 'DEBUG', vParameters);
|
|
|
|
-- Use DBMS_CLOUD package to export data to the URI
|
|
-- Oracle maxfilesize: min 10MB (10485760), max 1GB (1073741824), default 100MB (104857600)
|
|
DBMS_CLOUD.EXPORT_DATA(
|
|
credential_name => pCredentialName,
|
|
file_uri_list => vUri,
|
|
query => vQuery,
|
|
format => json_object(
|
|
'type' VALUE 'CSV',
|
|
'header' VALUE true,
|
|
'quote' VALUE CHR(34),
|
|
'delimiter' VALUE ',',
|
|
'escape' VALUE true,
|
|
'recorddelimiter' VALUE CHR(13)||CHR(10), -- CRLF dla Windows
|
|
'maxfilesize' VALUE pMaxFileSize -- Dynamic maxfilesize in bytes
|
|
)
|
|
);
|
|
|
|
-- Register exported file to A_SOURCE_FILE_RECEIVED if requested
|
|
IF pRegisterExport THEN
|
|
DECLARE
|
|
vActualFileName VARCHAR2(1000); -- Actual filename with Oracle suffix
|
|
vSanitizedFileName VARCHAR2(1000);
|
|
vFileName VARCHAR2(1000);
|
|
vRetryCount NUMBER := 0;
|
|
vMaxRetries NUMBER := 1; -- One retry after initial attempt
|
|
vRetryDelay NUMBER := 2; -- 2 seconds delay
|
|
vFilesFound NUMBER := 0;
|
|
vTotalBytes NUMBER := 0;
|
|
BEGIN
|
|
-- Extract filename from URI (after last '/')
|
|
vFileName := SUBSTR(vUri, INSTR(vUri, '/', -1) + 1);
|
|
|
|
-- Sanitize filename first (PL/SQL function cannot be used directly in SQL)
|
|
vSanitizedFileName := sanitizeFilename(vFileName);
|
|
|
|
-- Remove .csv extension for LIKE pattern matching (Oracle adds suffixes BEFORE .csv)
|
|
-- Example: tablename.csv becomes tablename_1_20260211T102621591769Z.csv
|
|
vSanitizedFileName := REGEXP_REPLACE(vSanitizedFileName, '\.csv$', '', 1, 0, 'i');
|
|
|
|
-- Try to get ALL exported files with retry logic
|
|
-- Oracle DBMS_CLOUD.EXPORT_DATA can create MULTIPLE files due to:
|
|
-- 1. maxfilesize parameter (splits files larger than limit)
|
|
-- 2. Automatic parallel processing (especially on large production instances)
|
|
-- We must register ALL files, not just the first one
|
|
<<metadata_retry_loop>>
|
|
LOOP
|
|
BEGIN
|
|
-- Register ALL files matching the pattern (cursor loop)
|
|
FOR rec IN (
|
|
SELECT object_name, checksum, created, bytes
|
|
FROM TABLE(DBMS_CLOUD.LIST_OBJECTS(
|
|
credential_name => pCredentialName,
|
|
location_uri => vBucketUri
|
|
))
|
|
WHERE object_name LIKE CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || vSanitizedFileName || '%'
|
|
ORDER BY created DESC, bytes DESC
|
|
) LOOP
|
|
-- Extract filename only from full path (remove bucket folder prefix)
|
|
vActualFileName := SUBSTR(rec.object_name, INSTR(rec.object_name, '/', -1) + 1);
|
|
|
|
-- Create A_SOURCE_FILE_RECEIVED record for EACH exported file
|
|
vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL;
|
|
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
|
|
A_SOURCE_FILE_RECEIVED_KEY,
|
|
A_SOURCE_FILE_CONFIG_KEY,
|
|
SOURCE_FILE_NAME,
|
|
CHECKSUM,
|
|
CREATED,
|
|
BYTES,
|
|
RECEPTION_DATE,
|
|
PROCESSING_STATUS,
|
|
PARTITION_YEAR,
|
|
PARTITION_MONTH,
|
|
ARCH_PATH,
|
|
PROCESS_NAME
|
|
) VALUES (
|
|
vSourceFileReceivedKey,
|
|
NVL(vConfigKey, -1), -- Use config key if found, otherwise -1
|
|
vActualFileName, -- Use actual filename with Oracle suffix
|
|
rec.checksum,
|
|
rec.created,
|
|
rec.bytes,
|
|
SYSDATE,
|
|
'INGESTED',
|
|
NULL, -- PARTITION_YEAR not used for single-file exports
|
|
NULL, -- PARTITION_MONTH not used for single-file exports
|
|
NULL, -- ARCH_PATH not used for single-file exports
|
|
pProcessName -- Process name from parameter
|
|
);
|
|
|
|
vFilesFound := vFilesFound + 1;
|
|
vTotalBytes := vTotalBytes + rec.bytes;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Registered file ' || vFilesFound || ': FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vActualFileName || ', Size=' || rec.bytes || ' bytes', 'INFO', vParameters);
|
|
END LOOP;
|
|
|
|
-- Check if any files were found
|
|
IF vFilesFound = 0 THEN
|
|
RAISE NO_DATA_FOUND;
|
|
END IF;
|
|
|
|
-- Success - exit retry loop
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Total registered: ' || vFilesFound || ' file(s), Total size: ' || vTotalBytes || ' bytes (' || ROUND(vTotalBytes/1048576, 2) || ' MB)', 'INFO', vParameters);
|
|
EXIT metadata_retry_loop;
|
|
|
|
EXCEPTION
|
|
WHEN NO_DATA_FOUND THEN
|
|
vRetryCount := vRetryCount + 1;
|
|
|
|
IF vRetryCount <= vMaxRetries THEN
|
|
-- Log retry attempt
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('File(s) not found in bucket (attempt ' || vRetryCount || '/' || (vMaxRetries + 1) || '), retrying after ' || vRetryDelay || ' seconds: ' || vFileName, 'DEBUG', vParameters);
|
|
|
|
-- Wait before retry using DBMS_SESSION.SLEEP (alternative to DBMS_LOCK)
|
|
DBMS_SESSION.SLEEP(vRetryDelay);
|
|
ELSE
|
|
-- Max retries exceeded - re-raise exception
|
|
RAISE;
|
|
END IF;
|
|
END;
|
|
END LOOP metadata_retry_loop;
|
|
EXCEPTION
|
|
WHEN NO_DATA_FOUND THEN
|
|
-- File not found after retries - log warning and continue without metadata
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('WARNING: File not found in bucket after ' || (vMaxRetries + 1) || ' attempts: ' || vFileName, 'WARNING', vParameters);
|
|
|
|
-- Sanitize filename for fallback INSERT (function cannot be used in SQL)
|
|
vSanitizedFileName := sanitizeFilename(vFileName);
|
|
|
|
-- Insert without metadata using theoretical filename
|
|
vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL;
|
|
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
|
|
A_SOURCE_FILE_RECEIVED_KEY,
|
|
A_SOURCE_FILE_CONFIG_KEY,
|
|
SOURCE_FILE_NAME,
|
|
RECEPTION_DATE,
|
|
PROCESSING_STATUS,
|
|
PARTITION_YEAR,
|
|
PARTITION_MONTH,
|
|
ARCH_PATH,
|
|
PROCESS_NAME
|
|
) VALUES (
|
|
vSourceFileReceivedKey,
|
|
NVL(vConfigKey, -1), -- Use config key if found, otherwise -1
|
|
vSanitizedFileName, -- Use pre-calculated sanitized filename
|
|
SYSDATE,
|
|
'INGESTED',
|
|
NULL, -- PARTITION_YEAR not used for single-file exports
|
|
NULL, -- PARTITION_MONTH not used for single-file exports
|
|
NULL, -- ARCH_PATH not used for single-file exports
|
|
pProcessName -- Process name from parameter
|
|
);
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Registered file without metadata: FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vSanitizedFileName, 'INFO', vParameters);
|
|
END;
|
|
END IF;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
|
|
EXCEPTION
|
|
WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN
|
|
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
|
|
WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN
|
|
vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in column list' ELSE '' END;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp);
|
|
WHEN OTHERS THEN
|
|
-- Log complete error details including full stack trace and backtrace
|
|
ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER');
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
|
|
|
|
END EXPORT_TABLE_DATA;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
PROCEDURE EXPORT_TABLE_DATA_BY_DATE (
|
|
pSchemaName IN VARCHAR2,
|
|
pTableName IN VARCHAR2,
|
|
pKeyColumnName IN VARCHAR2,
|
|
pBucketArea IN VARCHAR2,
|
|
pFolderName IN VARCHAR2,
|
|
pColumnList IN VARCHAR2 default NULL,
|
|
pMinDate IN DATE default DATE '1900-01-01',
|
|
pMaxDate IN DATE default SYSDATE,
|
|
pParallelDegree IN NUMBER default 1,
|
|
pTemplateTableName IN VARCHAR2 default NULL,
|
|
pJobClass IN VARCHAR2 default NULL,
|
|
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
|
|
)
|
|
IS
|
|
vTableName VARCHAR2(128);
|
|
vSchemaName VARCHAR2(128);
|
|
vKeyColumnName VARCHAR2(128);
|
|
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
|
|
vProcessedColumnList VARCHAR2(32767);
|
|
vBucketUri VARCHAR2(4000);
|
|
vCurrentCol VARCHAR2(128);
|
|
vPartitions partition_tab;
|
|
|
|
BEGIN
|
|
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
|
|
,'pTableName => '''||nvl(pTableName, 'NULL')||''''
|
|
,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||''''
|
|
,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||''''
|
|
,'pFolderName => '''||nvl(pFolderName, 'NULL')||''''
|
|
,'pColumnList => '''||nvl(pColumnList, 'NULL')||''''
|
|
,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
|
|
,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
|
|
,'pParallelDegree => '''||nvl(TO_CHAR(pParallelDegree), 'NULL')||''''
|
|
,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||''''
|
|
,'pJobClass => '''||nvl(pJobClass, 'NULL')||''''
|
|
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
|
|
));
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
|
|
|
|
-- Get bucket URI based on bucket area using FILE_MANAGER function
|
|
vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea);
|
|
|
|
-- Convert table and column names to uppercase to match data dictionary
|
|
vTableName := UPPER(pTableName);
|
|
vSchemaName := UPPER(pSchemaName);
|
|
vKeyColumnName := UPPER(pKeyColumnName);
|
|
|
|
-- Validate table, key column, and column list using shared procedure
|
|
VALIDATE_TABLE_AND_COLUMNS(vSchemaName, vTableName, vKeyColumnName, pColumnList, vParameters);
|
|
|
|
-- Build query with TO_CHAR for date columns (per-column format support)
|
|
vProcessedColumnList := buildQueryWithDateFormats(pColumnList, vTableName, vSchemaName, vKeyColumnName, pTemplateTableName);
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (building dynamic list from table metadata)'), 'DEBUG', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with TO_CHAR for date columns: ' || vProcessedColumnList, 'DEBUG', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Template table: ' || NVL(pTemplateTableName, 'NULL - using global default for all dates'), 'INFO', vParameters);
|
|
|
|
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName);
|
|
|
|
-- Validate parallel degree parameter
|
|
IF pParallelDegree < 1 OR pParallelDegree > 16 THEN
|
|
vgMsgTmp := ENV_MANAGER.MSG_INVALID_PARALLEL_DEGREE || ': ' || pParallelDegree || '. Valid range: 1-16';
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp);
|
|
END IF;
|
|
|
|
-- Get partitions using shared function
|
|
vPartitions := GET_PARTITIONS(vSchemaName, vTableName, vKeyColumnName, pMinDate, pMaxDate, vParameters);
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vPartitions.COUNT || ' partitions to export with parallel degree ' || pParallelDegree, 'INFO', vParameters);
|
|
|
|
-- Sequential processing (parallel degree = 1)
|
|
IF pParallelDegree = 1 THEN
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Using sequential processing (pParallelDegree = 1)', 'DEBUG', vParameters);
|
|
|
|
FOR i IN 1 .. vPartitions.COUNT LOOP
|
|
EXPORT_SINGLE_PARTITION(
|
|
pSchemaName => vSchemaName,
|
|
pTableName => vTableName,
|
|
pKeyColumnName => vKeyColumnName,
|
|
pYear => vPartitions(i).year,
|
|
pMonth => vPartitions(i).month,
|
|
pBucketUri => vBucketUri,
|
|
pFolderName => pFolderName,
|
|
pProcessedColumns => vProcessedColumnList,
|
|
pMinDate => pMinDate,
|
|
pMaxDate => pMaxDate,
|
|
pCredentialName => pCredentialName,
|
|
pFormat => 'PARQUET',
|
|
pFileBaseName => NULL,
|
|
pMaxFileSize => 104857600,
|
|
pParameters => vParameters
|
|
);
|
|
END LOOP;
|
|
|
|
-- Parallel processing (parallel degree > 1)
|
|
ELSE
|
|
-- Skip parallel processing if no partitions found
|
|
IF vPartitions.COUNT = 0 THEN
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('No partitions to export - skipping parallel processing', 'INFO', vParameters);
|
|
ELSE
|
|
DECLARE
|
|
vTaskName VARCHAR2(128) := 'DATA_EXPORT_TASK_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF');
|
|
vChunkId NUMBER;
|
|
BEGIN
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Using parallel processing with ' || pParallelDegree || ' threads', 'INFO', vParameters);
|
|
|
|
-- Populate chunks table (insert new chunks, preserve FAILED chunks for retry)
|
|
FOR i IN 1 .. vPartitions.COUNT LOOP
|
|
MERGE INTO CT_MRDS.A_PARALLEL_EXPORT_CHUNKS t
|
|
USING (SELECT i AS chunk_id, vTaskName AS task_name, vPartitions(i).year AS yr, vPartitions(i).month AS mn FROM DUAL) s
|
|
ON (t.CHUNK_ID = s.chunk_id AND t.TASK_NAME = s.task_name)
|
|
WHEN NOT MATCHED THEN
|
|
INSERT (CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME,
|
|
BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE,
|
|
CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE, JOB_CLASS, STATUS)
|
|
VALUES (i, vTaskName, vPartitions(i).year, vPartitions(i).month, vSchemaName, vTableName, vKeyColumnName,
|
|
vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate,
|
|
pCredentialName, 'PARQUET', NULL, pTemplateTableName, 104857600, pJobClass, 'PENDING')
|
|
WHEN MATCHED THEN
|
|
-- Match found: chunk exists for SAME task (composite PK: TASK_NAME, CHUNK_ID)
|
|
-- This handles retry scenario: reset FAILED chunks to PENDING for re-processing
|
|
UPDATE SET STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END,
|
|
ERROR_MESSAGE = CASE WHEN t.STATUS = 'FAILED' THEN NULL ELSE t.ERROR_MESSAGE END;
|
|
END LOOP;
|
|
COMMIT;
|
|
|
|
-- Log chunk statistics (session-safe: only count chunks for THIS task)
|
|
DECLARE
|
|
vPendingCount NUMBER;
|
|
vFailedCount NUMBER;
|
|
BEGIN
|
|
SELECT COUNT(*) INTO vPendingCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'PENDING' AND TASK_NAME = vTaskName;
|
|
SELECT COUNT(*) INTO vFailedCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'FAILED' AND TASK_NAME = vTaskName;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Chunk statistics for task ' || vTaskName || ': PENDING=' || vPendingCount || ', FAILED (retry)=' || vFailedCount, 'INFO', vParameters);
|
|
END;
|
|
|
|
-- Create parallel task
|
|
DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => vTaskName);
|
|
|
|
-- Define chunks using SQL query to ensure TASK_NAME isolation
|
|
-- CRITICAL: Filter by TASK_NAME to avoid selecting chunks from other concurrent sessions
|
|
-- CRITICAL: Use START_ID and END_ID aliases to avoid ORA-00960 ambiguous column naming
|
|
DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(
|
|
task_name => vTaskName,
|
|
sql_stmt => 'SELECT CHUNK_ID AS START_ID, CHUNK_ID AS END_ID FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = ''' || vTaskName || ''' ORDER BY CHUNK_ID',
|
|
by_rowid => FALSE
|
|
);
|
|
|
|
-- Execute task in parallel
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Executing parallel task: ' || vTaskName || CASE WHEN pJobClass IS NOT NULL THEN ' with job class: ' || pJobClass ELSE '' END, 'DEBUG', vParameters);
|
|
|
|
IF pJobClass IS NOT NULL THEN
|
|
DBMS_PARALLEL_EXECUTE.RUN_TASK(
|
|
task_name => vTaskName,
|
|
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id, ''' || vTaskName || '''); END;',
|
|
language_flag => DBMS_SQL.NATIVE,
|
|
parallel_level => pParallelDegree,
|
|
job_class => pJobClass
|
|
);
|
|
ELSE
|
|
DBMS_PARALLEL_EXECUTE.RUN_TASK(
|
|
task_name => vTaskName,
|
|
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id, ''' || vTaskName || '''); END;',
|
|
language_flag => DBMS_SQL.NATIVE,
|
|
parallel_level => pParallelDegree
|
|
);
|
|
END IF;
|
|
|
|
-- Check for errors
|
|
DECLARE
|
|
vErrorCount NUMBER;
|
|
BEGIN
|
|
SELECT COUNT(*) INTO vErrorCount
|
|
FROM USER_PARALLEL_EXECUTE_CHUNKS
|
|
WHERE task_name = vTaskName AND status = 'PROCESSED_WITH_ERROR';
|
|
|
|
IF vErrorCount > 0 THEN
|
|
vgMsgTmp := 'Parallel execution completed with ' || vErrorCount || ' errors. Check USER_PARALLEL_EXECUTE_CHUNKS for details.';
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp);
|
|
END IF;
|
|
END;
|
|
|
|
-- Clean up task
|
|
DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName);
|
|
|
|
-- Clean up chunks for THIS specific task only (session-safe)
|
|
-- CRITICAL: Use TASK_NAME filter to avoid deleting chunks from other active sessions
|
|
DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = vTaskName;
|
|
COMMIT;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Parallel execution completed successfully', 'INFO', vParameters);
|
|
EXCEPTION
|
|
WHEN OTHERS THEN
|
|
-- Attempt to drop task on error
|
|
BEGIN
|
|
DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName);
|
|
EXCEPTION
|
|
WHEN OTHERS THEN NULL; -- Ignore drop errors
|
|
END;
|
|
|
|
vgMsgTmp := ENV_MANAGER.MSG_PARALLEL_EXECUTION_FAILED || ': ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp);
|
|
END;
|
|
END IF;
|
|
END IF;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
|
|
EXCEPTION
|
|
WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN
|
|
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
|
|
WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN
|
|
vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp);
|
|
WHEN ENV_MANAGER.ERR_INVALID_PARALLEL_DEGREE THEN
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp);
|
|
WHEN ENV_MANAGER.ERR_PARALLEL_EXECUTION_FAILED THEN
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp);
|
|
WHEN OTHERS THEN
|
|
-- Log complete error details including full stack trace and backtrace
|
|
ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER');
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
|
|
|
|
END EXPORT_TABLE_DATA_BY_DATE;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
/**
|
|
* @name EXPORT_TABLE_DATA_TO_CSV_BY_DATE
|
|
* @desc Exports data to a single CSV file with date filtering.
|
|
* Unlike EXPORT_TABLE_DATA_BY_DATE, this procedure creates one CSV file
|
|
* instead of multiple Parquet files partitioned by year/month.
|
|
* Uses the same date filtering mechanism with CT_ODS.A_LOAD_HISTORY.
|
|
* Allows specifying custom column list or uses T.* if pColumnList is NULL.
|
|
* Validates that all columns in pColumnList exist in the target table.
|
|
* Automatically adds 'T.' prefix to column names in pColumnList.
|
|
* @example
|
|
* begin
|
|
* DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
|
|
* pSchemaName => 'CT_MRDS',
|
|
* pTableName => 'MY_TABLE',
|
|
* pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK',
|
|
* pBucketArea => 'DATA',
|
|
* pFolderName => 'exports',
|
|
* pFileName => 'my_export.csv',
|
|
* pColumnList => 'COLUMN1, COLUMN2, COLUMN3', -- Optional
|
|
* pMinDate => DATE '2024-01-01',
|
|
* pMaxDate => SYSDATE
|
|
* );
|
|
* end;
|
|
**/
|
|
PROCEDURE EXPORT_TABLE_DATA_TO_CSV_BY_DATE (
|
|
pSchemaName IN VARCHAR2,
|
|
pTableName IN VARCHAR2,
|
|
pKeyColumnName IN VARCHAR2,
|
|
pBucketArea IN VARCHAR2,
|
|
pFolderName IN VARCHAR2,
|
|
pFileName IN VARCHAR2 DEFAULT NULL,
|
|
pColumnList IN VARCHAR2 default NULL,
|
|
pMinDate IN DATE default DATE '1900-01-01',
|
|
pMaxDate IN DATE default SYSDATE,
|
|
pParallelDegree IN NUMBER default 1,
|
|
pTemplateTableName IN VARCHAR2 default NULL,
|
|
pMaxFileSize IN NUMBER default 104857600,
|
|
pRegisterExport IN BOOLEAN default FALSE,
|
|
pProcessName IN VARCHAR2 default 'DATA_EXPORTER',
|
|
pJobClass IN VARCHAR2 default NULL,
|
|
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
|
|
)
|
|
IS
|
|
vTableName VARCHAR2(128);
|
|
vSchemaName VARCHAR2(128);
|
|
vKeyColumnName VARCHAR2(128);
|
|
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
|
|
vFileBaseName VARCHAR2(4000);
|
|
vFileExtension VARCHAR2(10);
|
|
vProcessedColumnList VARCHAR2(32767);
|
|
vBucketUri VARCHAR2(4000);
|
|
vCurrentCol VARCHAR2(128);
|
|
vPartitions partition_tab;
|
|
vSourceFileReceivedKey NUMBER;
|
|
vFileName VARCHAR2(1000);
|
|
vFileUri VARCHAR2(4000);
|
|
-- Variables for A_SOURCE_FILE_CONFIG lookup
|
|
vSourceKey VARCHAR2(100);
|
|
vTableId VARCHAR2(200);
|
|
vConfigKey NUMBER := -1;
|
|
vSlashPos1 NUMBER;
|
|
vSlashPos2 NUMBER;
|
|
|
|
BEGIN
|
|
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
|
|
,'pTableName => '''||nvl(pTableName, 'NULL')||''''
|
|
,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||''''
|
|
,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||''''
|
|
,'pFolderName => '''||nvl(pFolderName, 'NULL')||''''
|
|
,'pFileName => '''||nvl(pFileName, 'NULL')||''''
|
|
,'pColumnList => '''||nvl(pColumnList, 'NULL')||''''
|
|
,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
|
|
,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
|
|
,'pParallelDegree => '''||nvl(TO_CHAR(pParallelDegree), 'NULL')||''''
|
|
,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||''''
|
|
,'pMaxFileSize => '''||nvl(TO_CHAR(pMaxFileSize), 'NULL')||''''
|
|
,'pRegisterExport => '''||CASE WHEN pRegisterExport THEN 'TRUE' ELSE 'FALSE' END||''''
|
|
,'pJobClass => '''||nvl(pJobClass, 'NULL')||''''
|
|
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
|
|
));
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
|
|
|
|
-- Get bucket URI based on bucket area using FILE_MANAGER function
|
|
vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea);
|
|
|
|
-- Convert table and column names to uppercase to match data dictionary
|
|
vTableName := UPPER(pTableName);
|
|
vSchemaName := UPPER(pSchemaName);
|
|
vKeyColumnName := UPPER(pKeyColumnName);
|
|
|
|
-- Extract base filename and extension or construct default filename
|
|
IF pFileName IS NOT NULL THEN
|
|
-- Use provided filename
|
|
IF INSTR(pFileName, '.') > 0 THEN
|
|
vFileBaseName := SUBSTR(pFileName, 1, INSTR(pFileName, '.', -1) - 1);
|
|
vFileExtension := SUBSTR(pFileName, INSTR(pFileName, '.', -1));
|
|
ELSE
|
|
vFileBaseName := pFileName;
|
|
vFileExtension := '.csv';
|
|
END IF;
|
|
ELSE
|
|
-- Construct default filename: TABLENAME (without extension, will be added by worker)
|
|
vFileBaseName := UPPER(pTableName);
|
|
vFileExtension := '.csv';
|
|
END IF;
|
|
|
|
-- Validate table, key column, and column list using shared procedure
|
|
VALIDATE_TABLE_AND_COLUMNS(vSchemaName, vTableName, vKeyColumnName, pColumnList, vParameters);
|
|
|
|
-- Build query with TO_CHAR for date columns (per-column format support)
|
|
vProcessedColumnList := buildQueryWithDateFormats(pColumnList, vTableName, vSchemaName, vKeyColumnName, pTemplateTableName);
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (using dynamic column list)'), 'DEBUG', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with TO_CHAR for date columns: ' || vProcessedColumnList, 'DEBUG', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Template table: ' || NVL(pTemplateTableName, 'NULL - using global default for all dates'), 'INFO', vParameters);
|
|
|
|
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName);
|
|
|
|
-- Validate parallel degree parameter
|
|
IF pParallelDegree < 1 OR pParallelDegree > 16 THEN
|
|
vgMsgTmp := ENV_MANAGER.MSG_INVALID_PARALLEL_DEGREE || ': ' || pParallelDegree || '. Valid range: 1-16';
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp);
|
|
END IF;
|
|
|
|
-- Get partitions using shared function
|
|
vPartitions := GET_PARTITIONS(vSchemaName, vTableName, vKeyColumnName, pMinDate, pMaxDate, vParameters);
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vPartitions.COUNT || ' year/month combinations to export', 'INFO', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Date range: ' || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || ' to ' || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'DEBUG', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Parallel degree: ' || pParallelDegree, 'INFO', vParameters);
|
|
|
|
-- Sequential processing (parallel degree = 1)
|
|
IF pParallelDegree = 1 THEN
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Using sequential processing (pParallelDegree = 1)', 'DEBUG', vParameters);
|
|
|
|
FOR i IN 1 .. vPartitions.COUNT LOOP
|
|
EXPORT_SINGLE_PARTITION(
|
|
pSchemaName => vSchemaName,
|
|
pTableName => vTableName,
|
|
pKeyColumnName => vKeyColumnName,
|
|
pYear => vPartitions(i).year,
|
|
pMonth => vPartitions(i).month,
|
|
pBucketUri => vBucketUri,
|
|
pFolderName => pFolderName,
|
|
pProcessedColumns => vProcessedColumnList,
|
|
pMinDate => pMinDate,
|
|
pMaxDate => pMaxDate,
|
|
pCredentialName => pCredentialName,
|
|
pFormat => 'CSV',
|
|
pFileBaseName => vFileBaseName,
|
|
pMaxFileSize => pMaxFileSize,
|
|
pParameters => vParameters
|
|
);
|
|
END LOOP;
|
|
|
|
-- Parallel processing (parallel degree > 1)
|
|
ELSE
|
|
-- Skip parallel processing if no partitions found
|
|
IF vPartitions.COUNT = 0 THEN
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('No partitions to export - skipping parallel CSV processing', 'INFO', vParameters);
|
|
ELSE
|
|
DECLARE
|
|
vTaskName VARCHAR2(128) := 'DATA_CSV_EXPORT_TASK_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF');
|
|
vChunkId NUMBER;
|
|
BEGIN
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Using parallel processing with ' || pParallelDegree || ' threads', 'INFO', vParameters);
|
|
|
|
-- Clean up old completed chunks (>24 hours) to prevent table bloat
|
|
-- CRITICAL: Do NOT delete chunks from other active sessions (same-day tasks)
|
|
-- This prevents race conditions when multiple CSV exports run simultaneously
|
|
DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS
|
|
WHERE STATUS = 'COMPLETED'
|
|
AND CREATED_DATE < SYSTIMESTAMP - INTERVAL '1' DAY;
|
|
COMMIT;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Cleared old COMPLETED chunks (>24h). Active session chunks preserved.', 'DEBUG', vParameters);
|
|
|
|
-- Populate chunks table (insert new chunks, preserve FAILED chunks for retry)
|
|
FOR i IN 1 .. vPartitions.COUNT LOOP
|
|
MERGE INTO CT_MRDS.A_PARALLEL_EXPORT_CHUNKS t
|
|
USING (SELECT i AS chunk_id, vTaskName AS task_name, vPartitions(i).year AS yr, vPartitions(i).month AS mn FROM DUAL) s
|
|
ON (t.CHUNK_ID = s.chunk_id AND t.TASK_NAME = s.task_name)
|
|
WHEN NOT MATCHED THEN
|
|
INSERT (CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME,
|
|
BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE,
|
|
CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE, JOB_CLASS, STATUS)
|
|
VALUES (i, vTaskName, vPartitions(i).year, vPartitions(i).month, vSchemaName, vTableName, vKeyColumnName,
|
|
vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate,
|
|
pCredentialName, 'CSV', vFileBaseName, pTemplateTableName, pMaxFileSize, pJobClass, 'PENDING')
|
|
WHEN MATCHED THEN
|
|
-- Match found: chunk exists for SAME task (composite PK: TASK_NAME, CHUNK_ID)
|
|
-- This handles retry scenario: reset FAILED chunks to PENDING for re-processing
|
|
UPDATE SET STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END,
|
|
ERROR_MESSAGE = CASE WHEN t.STATUS = 'FAILED' THEN NULL ELSE t.ERROR_MESSAGE END;
|
|
END LOOP;
|
|
COMMIT;
|
|
|
|
-- Log chunk statistics (session-safe: only count chunks for THIS task)
|
|
DECLARE
|
|
vPendingCount NUMBER;
|
|
vFailedCount NUMBER;
|
|
BEGIN
|
|
SELECT COUNT(*) INTO vPendingCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'PENDING' AND TASK_NAME = vTaskName;
|
|
SELECT COUNT(*) INTO vFailedCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'FAILED' AND TASK_NAME = vTaskName;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Chunk statistics for task ' || vTaskName || ': PENDING=' || vPendingCount || ', FAILED (retry)=' || vFailedCount, 'INFO', vParameters);
|
|
END;
|
|
|
|
-- Create parallel task
|
|
DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => vTaskName);
|
|
|
|
-- Define chunks using SQL query to ensure TASK_NAME isolation
|
|
-- CRITICAL: Filter by TASK_NAME to avoid selecting chunks from other concurrent sessions
|
|
-- CRITICAL: Use START_ID and END_ID aliases to avoid ORA-00960 ambiguous column naming
|
|
DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(
|
|
task_name => vTaskName,
|
|
sql_stmt => 'SELECT CHUNK_ID AS START_ID, CHUNK_ID AS END_ID FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = ''' || vTaskName || ''' ORDER BY CHUNK_ID',
|
|
by_rowid => FALSE
|
|
);
|
|
|
|
-- Execute task in parallel
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Executing parallel CSV export task: ' || vTaskName || CASE WHEN pJobClass IS NOT NULL THEN ' with job class: ' || pJobClass ELSE '' END, 'DEBUG', vParameters);
|
|
|
|
IF pJobClass IS NOT NULL THEN
|
|
DBMS_PARALLEL_EXECUTE.RUN_TASK(
|
|
task_name => vTaskName,
|
|
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id, ''' || vTaskName || '''); END;',
|
|
language_flag => DBMS_SQL.NATIVE,
|
|
parallel_level => pParallelDegree,
|
|
job_class => pJobClass
|
|
);
|
|
ELSE
|
|
DBMS_PARALLEL_EXECUTE.RUN_TASK(
|
|
task_name => vTaskName,
|
|
sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id, ''' || vTaskName || '''); END;',
|
|
language_flag => DBMS_SQL.NATIVE,
|
|
parallel_level => pParallelDegree
|
|
);
|
|
END IF;
|
|
|
|
-- Check for errors
|
|
DECLARE
|
|
vErrorCount NUMBER;
|
|
BEGIN
|
|
SELECT COUNT(*) INTO vErrorCount
|
|
FROM USER_PARALLEL_EXECUTE_CHUNKS
|
|
WHERE task_name = vTaskName AND status = 'PROCESSED_WITH_ERROR';
|
|
|
|
IF vErrorCount > 0 THEN
|
|
vgMsgTmp := 'Parallel CSV export completed with ' || vErrorCount || ' errors. Check USER_PARALLEL_EXECUTE_CHUNKS for details.';
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp);
|
|
END IF;
|
|
END;
|
|
|
|
-- Clean up task
|
|
DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName);
|
|
|
|
-- Clean up chunks for THIS specific task only (session-safe)
|
|
-- CRITICAL: Use TASK_NAME filter to avoid deleting chunks from other active CSV sessions
|
|
DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = vTaskName;
|
|
COMMIT;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Parallel CSV execution completed successfully', 'INFO', vParameters);
|
|
EXCEPTION
|
|
WHEN OTHERS THEN
|
|
-- Attempt to drop task on error
|
|
BEGIN
|
|
DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName);
|
|
EXCEPTION
|
|
WHEN OTHERS THEN NULL; -- Ignore drop errors
|
|
END;
|
|
|
|
vgMsgTmp := ENV_MANAGER.MSG_PARALLEL_EXECUTION_FAILED || ': ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp);
|
|
END;
|
|
END IF;
|
|
END IF;
|
|
|
|
-- Note: File registration handled by EXPORT_SINGLE_PARTITION when pRegisterExport=TRUE
|
|
-- Each partition calls pRegisterExport logic independently during serial/parallel execution
|
|
|
|
-- Register exported files to A_SOURCE_FILE_RECEIVED if requested (after successful export)
|
|
IF pRegisterExport THEN
|
|
-- Lookup A_SOURCE_FILE_CONFIG_KEY based on pFolderName parsing
|
|
-- Format: {BUCKET_AREA}/{SOURCE_KEY}/{TABLE_ID}
|
|
-- Example: 'ODS/CSDB/CSDB_DEBT_DAILY' -> SOURCE_KEY='CSDB', TABLE_ID='CSDB_DEBT_DAILY'
|
|
|
|
-- Parse pFolderName to extract SOURCE_KEY and TABLE_ID
|
|
vSlashPos1 := INSTR(pFolderName, '/', 1, 1); -- First '/' position
|
|
vSlashPos2 := INSTR(pFolderName, '/', 1, 2); -- Second '/' position
|
|
|
|
IF vSlashPos1 > 0 AND vSlashPos2 > 0 THEN
|
|
-- Extract segment 2 (SOURCE_KEY) and segment 3 (TABLE_ID)
|
|
vSourceKey := SUBSTR(pFolderName, vSlashPos1 + 1, vSlashPos2 - vSlashPos1 - 1);
|
|
vTableId := SUBSTR(pFolderName, vSlashPos2 + 1);
|
|
|
|
-- Find configuration based on SOURCE_KEY and TABLE_ID
|
|
BEGIN
|
|
SELECT A_SOURCE_FILE_CONFIG_KEY
|
|
INTO vConfigKey
|
|
FROM CT_MRDS.A_SOURCE_FILE_CONFIG
|
|
WHERE A_SOURCE_KEY = vSourceKey
|
|
AND TABLE_ID = vTableId
|
|
AND SOURCE_FILE_TYPE = 'INPUT'
|
|
AND ROWNUM = 1;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Found config key: ' || vConfigKey || ' for SOURCE=' || vSourceKey || ', TABLE=' || vTableId, 'DEBUG', vParameters);
|
|
EXCEPTION
|
|
WHEN NO_DATA_FOUND THEN
|
|
vConfigKey := -1;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('No config found for SOURCE=' || vSourceKey || ', TABLE=' || vTableId || ' - using default (-1)', 'INFO', vParameters);
|
|
END;
|
|
ELSE
|
|
-- Cannot parse folder name - use default
|
|
vConfigKey := -1;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Cannot parse pFolderName: ' || pFolderName || ' - using default (-1)', 'WARNING', vParameters);
|
|
END IF;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Registering ' || vPartitions.COUNT || ' exported files to A_SOURCE_FILE_RECEIVED with config key: ' || vConfigKey, 'INFO', vParameters);
|
|
|
|
FOR i IN 1 .. vPartitions.COUNT LOOP
|
|
-- Construct filename and URI for this partition
|
|
vFileName := NVL(vFileBaseName, UPPER(REPLACE(vTableName, vSchemaName || '.', ''))) || '_' || vPartitions(i).year || vPartitions(i).month || '.csv';
|
|
vFileUri := vBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || sanitizeFilename(vFileName);
|
|
|
|
-- Get file metadata from OCI bucket (CHECKSUM, CREATED, BYTES) with retry logic
|
|
DECLARE
|
|
vChecksum VARCHAR2(128);
|
|
vCreated TIMESTAMP WITH TIME ZONE;
|
|
vBytes NUMBER;
|
|
vActualFileName VARCHAR2(1000); -- Actual filename with Oracle suffix
|
|
vSanitizedFileName VARCHAR2(1000);
|
|
vRetryCount NUMBER := 0;
|
|
vMaxRetries NUMBER := 1; -- One retry after initial attempt
|
|
vRetryDelay NUMBER := 2; -- 2 seconds delay
|
|
BEGIN
|
|
-- Sanitize filename first (PL/SQL function cannot be used directly in SQL)
|
|
vSanitizedFileName := sanitizeFilename(vFileName);
|
|
|
|
-- Remove .csv extension for LIKE pattern matching (Oracle adds suffixes BEFORE .csv)
|
|
-- Example: LEGACY_DEBT_202508.csv becomes LEGACY_DEBT_202508_1_20260211T102621591769Z.csv
|
|
vSanitizedFileName := REGEXP_REPLACE(vSanitizedFileName, '\.csv$', '', 1, 0, 'i');
|
|
|
|
-- Try to get file metadata with retry logic
|
|
<<metadata_retry_loop>>
|
|
LOOP
|
|
BEGIN
|
|
SELECT object_name, checksum, created, bytes
|
|
INTO vActualFileName, vChecksum, vCreated, vBytes
|
|
FROM TABLE(DBMS_CLOUD.LIST_OBJECTS(
|
|
credential_name => pCredentialName,
|
|
location_uri => vBucketUri
|
|
))
|
|
WHERE object_name LIKE CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || vSanitizedFileName || '%'
|
|
ORDER BY created DESC, bytes DESC
|
|
FETCH FIRST 1 ROW ONLY;
|
|
|
|
-- Extract filename only from full path (remove bucket folder prefix)
|
|
-- vActualFileName contains: 'ODS/CSDB/CSDB_DEBT/LEGACY_DEBT_202508_1_20260211T111341375171Z.csv'
|
|
-- Extract only: 'LEGACY_DEBT_202508_1_20260211T111341375171Z.csv'
|
|
vActualFileName := SUBSTR(vActualFileName, INSTR(vActualFileName, '/', -1) + 1);
|
|
|
|
-- Success - exit retry loop
|
|
EXIT metadata_retry_loop;
|
|
|
|
EXCEPTION
|
|
WHEN NO_DATA_FOUND THEN
|
|
vRetryCount := vRetryCount + 1;
|
|
|
|
IF vRetryCount <= vMaxRetries THEN
|
|
-- Log retry attempt
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('File not found in bucket (attempt ' || vRetryCount || '/' || (vMaxRetries + 1) || '), retrying after ' || vRetryDelay || ' seconds: ' || vFileName, 'DEBUG', vParameters);
|
|
|
|
-- Wait before retry using DBMS_SESSION.SLEEP (alternative to DBMS_LOCK)
|
|
DBMS_SESSION.SLEEP(vRetryDelay);
|
|
ELSE
|
|
-- Max retries exceeded - re-raise exception
|
|
RAISE;
|
|
END IF;
|
|
END;
|
|
END LOOP metadata_retry_loop;
|
|
|
|
-- Create A_SOURCE_FILE_RECEIVED record for this export with metadata
|
|
vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL;
|
|
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
|
|
A_SOURCE_FILE_RECEIVED_KEY,
|
|
A_SOURCE_FILE_CONFIG_KEY,
|
|
SOURCE_FILE_NAME,
|
|
CHECKSUM,
|
|
CREATED,
|
|
BYTES,
|
|
RECEPTION_DATE,
|
|
PROCESSING_STATUS,
|
|
PARTITION_YEAR,
|
|
PARTITION_MONTH,
|
|
ARCH_PATH,
|
|
PROCESS_NAME
|
|
) VALUES (
|
|
vSourceFileReceivedKey,
|
|
vConfigKey, -- Config key from A_SOURCE_FILE_CONFIG lookup
|
|
vActualFileName, -- Use actual filename with Oracle suffix
|
|
vChecksum,
|
|
vCreated,
|
|
vBytes,
|
|
SYSDATE,
|
|
'INGESTED',
|
|
NULL, -- PARTITION_YEAR not used for CSV exports
|
|
NULL, -- PARTITION_MONTH not used for CSV exports
|
|
NULL, -- ARCH_PATH not used for CSV exports
|
|
pProcessName -- Process name from parameter
|
|
);
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Registered file: FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vActualFileName || ', Size=' || vBytes || ' bytes', 'DEBUG', vParameters);
|
|
EXCEPTION
|
|
WHEN NO_DATA_FOUND THEN
|
|
-- File not found after retries - log warning and continue without metadata
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('WARNING: File not found in bucket after ' || (vMaxRetries + 1) || ' attempts: ' || vFileName, 'WARNING', vParameters);
|
|
|
|
-- Sanitize filename for fallback INSERT (function cannot be used in SQL)
|
|
vSanitizedFileName := sanitizeFilename(vFileName);
|
|
|
|
-- Insert without metadata
|
|
vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL;
|
|
INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED (
|
|
A_SOURCE_FILE_RECEIVED_KEY,
|
|
A_SOURCE_FILE_CONFIG_KEY,
|
|
SOURCE_FILE_NAME,
|
|
RECEPTION_DATE,
|
|
PROCESSING_STATUS,
|
|
PARTITION_YEAR,
|
|
PARTITION_MONTH,
|
|
ARCH_PATH,
|
|
PROCESS_NAME
|
|
) VALUES (
|
|
vSourceFileReceivedKey,
|
|
vConfigKey, -- Config key from A_SOURCE_FILE_CONFIG lookup
|
|
vSanitizedFileName, -- Fallback: use theoretical filename if actual not found
|
|
SYSDATE,
|
|
'INGESTED',
|
|
NULL, -- PARTITION_YEAR not used for CSV exports
|
|
NULL, -- PARTITION_MONTH not used for CSV exports
|
|
NULL, -- ARCH_PATH not used for CSV exports
|
|
pProcessName -- Process name from parameter
|
|
);
|
|
END;
|
|
END LOOP;
|
|
|
|
COMMIT;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Successfully registered all ' || vPartitions.COUNT || ' files', 'INFO', vParameters);
|
|
END IF;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Export completed successfully for ' || vPartitions.COUNT || ' files', 'INFO', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
|
|
|
|
EXCEPTION
|
|
WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN
|
|
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
|
|
WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN
|
|
vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp);
|
|
WHEN ENV_MANAGER.ERR_INVALID_PARALLEL_DEGREE THEN
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp);
|
|
WHEN ENV_MANAGER.ERR_PARALLEL_EXECUTION_FAILED THEN
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp);
|
|
WHEN OTHERS THEN
|
|
-- Log complete error details including full stack trace and backtrace
|
|
ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER');
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
|
|
|
|
END EXPORT_TABLE_DATA_TO_CSV_BY_DATE;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
-- VERSION MANAGEMENT FUNCTIONS
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
FUNCTION GET_VERSION RETURN VARCHAR2 IS
|
|
BEGIN
|
|
RETURN PACKAGE_VERSION;
|
|
END GET_VERSION;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
FUNCTION GET_BUILD_INFO RETURN VARCHAR2 IS
|
|
BEGIN
|
|
RETURN ENV_MANAGER.GET_PACKAGE_VERSION_INFO(
|
|
pPackageName => 'DATA_EXPORTER',
|
|
pVersion => PACKAGE_VERSION,
|
|
pBuildDate => PACKAGE_BUILD_DATE,
|
|
pAuthor => PACKAGE_AUTHOR
|
|
);
|
|
END GET_BUILD_INFO;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
FUNCTION GET_VERSION_HISTORY RETURN VARCHAR2 IS
|
|
BEGIN
|
|
RETURN ENV_MANAGER.FORMAT_VERSION_HISTORY(
|
|
pPackageName => 'DATA_EXPORTER',
|
|
pVersionHistory => VERSION_HISTORY
|
|
);
|
|
END GET_VERSION_HISTORY;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
END;
|
|
|
|
/
|