create or replace PACKAGE BODY CT_MRDS.DATA_EXPORTER AS ---------------------------------------------------------------------------------------------------- -- PRIVATE HELPER FUNCTIONS (USED BY MULTIPLE PROCEDURES) ---------------------------------------------------------------------------------------------------- /** * Sanitizes filename by replacing disallowed characters with underscores **/ FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS vFilename VARCHAR2(1000); BEGIN vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_'); RETURN vFilename; END sanitizeFilename; ---------------------------------------------------------------------------------------------------- /** * Deletes export file from OCI bucket if it exists (used for cleanup before retry) * Silently ignores if file doesn't exist (ORA-20404) **/ PROCEDURE DELETE_FAILED_EXPORT_FILE( pFileUri IN VARCHAR2, pCredentialName IN VARCHAR2, pParameters IN VARCHAR2 ) IS BEGIN BEGIN ENV_MANAGER.LOG_PROCESS_EVENT('Attempting to delete potentially corrupted file: ' || pFileUri, 'DEBUG', pParameters); DBMS_CLOUD.DELETE_OBJECT( credential_name => pCredentialName, object_uri => pFileUri ); ENV_MANAGER.LOG_PROCESS_EVENT('Deleted existing file (cleanup before retry): ' || pFileUri, 'INFO', pParameters); EXCEPTION WHEN OTHERS THEN -- Object not found is OK (file doesn't exist) IF SQLCODE = -20404 THEN ENV_MANAGER.LOG_PROCESS_EVENT('File does not exist (OK): ' || pFileUri, 'DEBUG', pParameters); ELSE -- Log but don't fail - export will attempt anyway ENV_MANAGER.LOG_PROCESS_EVENT('Warning: Could not delete file (will retry export anyway): ' || SQLERRM, 'WARNING', pParameters); END IF; END; END DELETE_FAILED_EXPORT_FILE; ---------------------------------------------------------------------------------------------------- /** * Builds query with TO_CHAR for date/timestamp columns using per-column formats * Retrieves format for each date column from FILE_MANAGER.GET_DATE_FORMAT **/ FUNCTION buildQueryWithDateFormats( pColumnList IN VARCHAR2, pTableName IN VARCHAR2, pSchemaName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pTemplateTableName IN VARCHAR2, pFormat IN VARCHAR2 DEFAULT 'CSV' ) RETURN VARCHAR2 IS vResult VARCHAR2(32767); vColumns VARCHAR2(32767); vPos PLS_INTEGER; vNextPos PLS_INTEGER; vCurrentCol VARCHAR2(128); vAllCols VARCHAR2(32767); vDataType VARCHAR2(30); vDateFormat VARCHAR2(200); vTemplateSchema VARCHAR2(128); vTemplateTable VARCHAR2(128); vColExists NUMBER; BEGIN -- Build column list if not provided IF pColumnList IS NULL THEN -- Use template table for column order when provided -- Template defines which columns to export and in what order IF pTemplateTableName IS NOT NULL THEN -- Parse template table name (SCHEMA.TABLE or just TABLE) IF INSTR(pTemplateTableName, '.') > 0 THEN vTemplateSchema := SUBSTR(pTemplateTableName, 1, INSTR(pTemplateTableName, '.') - 1); vTemplateTable := SUBSTR(pTemplateTableName, INSTR(pTemplateTableName, '.') + 1); ELSE vTemplateSchema := pSchemaName; vTemplateTable := pTemplateTableName; END IF; -- Get columns from TEMPLATE table in template column order -- Template defines target CSV structure (column order and which columns to include) SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id) INTO vAllCols FROM all_tab_columns WHERE table_name = vTemplateTable AND owner = vTemplateSchema; ELSE -- Get columns from source table when no template SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id) INTO vAllCols FROM all_tab_columns WHERE table_name = pTableName AND owner = pSchemaName; END IF; ELSE vAllCols := pColumnList; END IF; -- Process each column vColumns := UPPER(REPLACE(vAllCols, ' ', '')); vPos := 1; vResult := ''; WHILE vPos <= LENGTH(vColumns) LOOP vNextPos := INSTR(vColumns, ',', vPos); IF vNextPos = 0 THEN vNextPos := LENGTH(vColumns) + 1; END IF; vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos); -- When using template table, check if column exists in SOURCE table -- Template defines target structure, source provides data -- Skip template columns that don't exist in source (except A_WORKFLOW_HISTORY_KEY) IF pTemplateTableName IS NOT NULL THEN -- Check if template column exists in SOURCE table SELECT COUNT(*) INTO vColExists FROM all_tab_columns WHERE table_name = pTableName AND column_name = vCurrentCol AND owner = pSchemaName; -- Skip columns that don't exist in source table -- Exception: A_WORKFLOW_HISTORY_KEY is virtual (mapped from pKeyColumnName) IF vColExists = 0 AND UPPER(vCurrentCol) != 'A_WORKFLOW_HISTORY_KEY' THEN vPos := vNextPos + 1; CONTINUE; END IF; END IF; -- Get column data type from appropriate table (template or source) IF pTemplateTableName IS NOT NULL THEN -- Get data type from template table SELECT data_type INTO vDataType FROM all_tab_columns WHERE table_name = vTemplateTable AND column_name = vCurrentCol AND owner = vTemplateSchema; ELSE -- Get data type from source table SELECT data_type INTO vDataType FROM all_tab_columns WHERE table_name = pTableName AND column_name = vCurrentCol AND owner = pSchemaName; END IF; -- Handle key column alias (template table has A_WORKFLOW_HISTORY_KEY, source table has pKeyColumnName) IF UPPER(vCurrentCol) = 'A_WORKFLOW_HISTORY_KEY' THEN vResult := vResult || CASE WHEN vResult IS NOT NULL THEN ', ' ELSE '' END || 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY'; -- Convert DATE/TIMESTAMP columns to CHAR with specific format ELSIF vDataType IN ('DATE', 'TIMESTAMP', 'TIMESTAMP WITH TIME ZONE', 'TIMESTAMP WITH LOCAL TIME ZONE') THEN IF pTemplateTableName IS NOT NULL THEN vDateFormat := CT_MRDS.FILE_MANAGER.GET_DATE_FORMAT( pTemplateTableName => pTemplateTableName, pColumnName => vCurrentCol ); ELSE vDateFormat := ENV_MANAGER.gvDefaultDateFormat; END IF; vResult := vResult || CASE WHEN vResult IS NOT NULL THEN ', ' ELSE '' END || 'TO_CHAR(T.' || vCurrentCol || ', ''' || vDateFormat || ''') AS ' || vCurrentCol; -- Other columns: RFC 4180 quote-doubling for CSV character types, as-is for Parquet/others -- Oracle DBMS_CLOUD.EXPORT_DATA has no native RFC 4180 "" doubling. -- escape=true -> backslash-escaped (\"\) - incompatible with ORACLE_LOADER. -- escape=false -> unescaped embedded quotes - also incompatible. -- Solution (CSV only): pre-double any " in VARCHAR2/CHAR/CLOB before Oracle wraps in quote. -- Oracle then writes: "BIDDER-'""=:" which ORACLE_LOADER reads as BIDDER-'"=:. -- Parquet is binary - no quoting needed; REPLACE would corrupt string data. ELSE IF pFormat = 'CSV' AND vDataType IN ('VARCHAR2', 'NVARCHAR2', 'CHAR', 'NCHAR', 'CLOB', 'NCLOB') THEN vResult := vResult || CASE WHEN vResult IS NOT NULL THEN ', ' ELSE '' END || 'REPLACE(T.' || vCurrentCol || ', CHR(34), CHR(34)||CHR(34)) AS ' || vCurrentCol; ELSE vResult := vResult || CASE WHEN vResult IS NOT NULL THEN ', ' ELSE '' END || 'T.' || vCurrentCol; END IF; END IF; vPos := vNextPos + 1; END LOOP; RETURN vResult; END buildQueryWithDateFormats; ---------------------------------------------------------------------------------------------------- -- Internal shared function to process column list with T. prefix and key column mapping FUNCTION processColumnList(pColumnList IN VARCHAR2, pTableName IN VARCHAR2, pSchemaName IN VARCHAR2, pKeyColumnName IN VARCHAR2) RETURN VARCHAR2 IS vResult VARCHAR2(32767); vColumns VARCHAR2(32767); vPos PLS_INTEGER; vNextPos PLS_INTEGER; vCurrentCol VARCHAR2(128); vAllCols VARCHAR2(32767); BEGIN IF pColumnList IS NULL THEN -- Build list of all columns SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id) INTO vAllCols FROM all_tab_columns WHERE table_name = pTableName AND owner = pSchemaName; -- Add T. prefix to all columns vResult := 'T.' || REPLACE(vAllCols, ', ', ', T.'); -- Replace key column with aliased version (e.g., T.A_ETL_LOAD_SET_KEY_FK AS A_WORKFLOW_HISTORY_KEY) vResult := REPLACE(vResult, 'T.' || pKeyColumnName, 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY'); RETURN vResult; END IF; -- Remove extra spaces and convert to uppercase vColumns := UPPER(REPLACE(pColumnList, ' ', '')); vPos := 1; vResult := ''; -- Parse comma-separated column list and add T. prefix WHILE vPos <= LENGTH(vColumns) LOOP vNextPos := INSTR(vColumns, ',', vPos); IF vNextPos = 0 THEN vNextPos := LENGTH(vColumns) + 1; END IF; vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos); -- Check if this is the key column (e.g., A_ETL_LOAD_SET_KEY_FK) and add alias IF UPPER(vCurrentCol) = UPPER(pKeyColumnName) THEN vCurrentCol := 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY'; ELSE -- Add T. prefix if not already present IF INSTR(vCurrentCol, '.') = 0 THEN vCurrentCol := 'T.' || vCurrentCol; END IF; END IF; -- Add to result with comma separator IF vResult IS NOT NULL THEN vResult := vResult || ', '; END IF; vResult := vResult || vCurrentCol; vPos := vNextPos + 1; END LOOP; RETURN vResult; END processColumnList; ---------------------------------------------------------------------------------------------------- /** * Validates table existence, key column existence, and column list **/ PROCEDURE VALIDATE_TABLE_AND_COLUMNS ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pColumnList IN VARCHAR2, pParameters IN VARCHAR2 ) IS vCount INTEGER; vColumns VARCHAR2(32767); vPos PLS_INTEGER; vNextPos PLS_INTEGER; vCurrentCol VARCHAR2(128); BEGIN -- Check if table exists SELECT COUNT(*) INTO vCount FROM all_tables WHERE table_name = pTableName AND owner = pSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS); END IF; -- Check if key column exists SELECT COUNT(*) INTO vCount FROM all_tab_columns WHERE table_name = pTableName AND column_name = pKeyColumnName AND owner = pSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); END IF; -- Validate pColumnList - check if all column names exist in the table IF pColumnList IS NOT NULL THEN vColumns := UPPER(REPLACE(pColumnList, ' ', '')); vPos := 1; WHILE vPos <= LENGTH(vColumns) LOOP vNextPos := INSTR(vColumns, ',', vPos); IF vNextPos = 0 THEN vNextPos := LENGTH(vColumns) + 1; END IF; vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos); -- Remove table alias prefix if present IF INSTR(vCurrentCol, '.') > 0 THEN vCurrentCol := SUBSTR(vCurrentCol, INSTR(vCurrentCol, '.') + 1); END IF; -- Check if column exists SELECT COUNT(*) INTO vCount FROM all_tab_columns WHERE table_name = pTableName AND column_name = vCurrentCol AND owner = pSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); END IF; vPos := vNextPos + 1; END LOOP; END IF; END VALIDATE_TABLE_AND_COLUMNS; ---------------------------------------------------------------------------------------------------- /** * Retrieves list of year/month partitions based on date range **/ FUNCTION GET_PARTITIONS ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pMinDate IN DATE, pMaxDate IN DATE, pParameters IN VARCHAR2 ) RETURN partition_tab IS vSql VARCHAR2(32000); vPartitions partition_tab; vKeyValuesYear DBMS_SQL.VARCHAR2_TABLE; vKeyValuesMonth DBMS_SQL.VARCHAR2_TABLE; vFullTableName VARCHAR2(200); BEGIN -- Build fully qualified table name if not already qualified IF INSTR(pTableName, '.') > 0 THEN vFullTableName := pTableName; -- Already fully qualified ELSE vFullTableName := pSchemaName || '.' || pTableName; END IF; vSql := 'SELECT DISTINCT TO_CHAR(L.LOAD_START,''YYYY'') AS YR, TO_CHAR(L.LOAD_START,''MM'') AS MN FROM ' || vFullTableName || ' T, CT_ODS.A_LOAD_HISTORY L WHERE T.' || pKeyColumnName || ' = L.A_ETL_LOAD_SET_KEY AND L.LOAD_START >= :pMinDate AND L.LOAD_START < :pMaxDate ORDER BY YR, MN'; ENV_MANAGER.LOG_PROCESS_EVENT('Executing date range query: ' || vSql, 'DEBUG', pParameters); EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValuesYear, vKeyValuesMonth USING pMinDate, pMaxDate; ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValuesYear.COUNT || ' year/month combinations to export', 'DEBUG', pParameters); -- Convert to partition_tab vPartitions := partition_tab(); vPartitions.EXTEND(vKeyValuesYear.COUNT); FOR i IN 1 .. vKeyValuesYear.COUNT LOOP vPartitions(i).year := vKeyValuesYear(i); vPartitions(i).month := vKeyValuesMonth(i); END LOOP; RETURN vPartitions; END GET_PARTITIONS; ---------------------------------------------------------------------------------------------------- /** * Exports single partition (year/month) to specified format (PARQUET or CSV) * This is the core worker procedure that will be used for parallel processing in v2.3.0 **/ PROCEDURE EXPORT_SINGLE_PARTITION ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pYear IN VARCHAR2, pMonth IN VARCHAR2, pBucketUri IN VARCHAR2, pFolderName IN VARCHAR2, pProcessedColumns IN VARCHAR2, pMinDate IN DATE, pMaxDate IN DATE, pCredentialName IN VARCHAR2, pFormat IN VARCHAR2 DEFAULT 'PARQUET', pFileBaseName IN VARCHAR2 DEFAULT NULL, pMaxFileSize IN NUMBER DEFAULT 104857600, pParameters IN VARCHAR2 ) IS vQuery VARCHAR2(32767); vUri VARCHAR2(4000); vFileName VARCHAR2(1000); vFullTableName VARCHAR2(200); BEGIN -- Build fully qualified table name if not already qualified IF INSTR(pTableName, '.') > 0 THEN vFullTableName := pTableName; -- Already fully qualified ELSE vFullTableName := pSchemaName || '.' || pTableName; END IF; -- Construct the query to extract data for the current year/month vQuery := 'SELECT ' || pProcessedColumns || ' FROM ' || vFullTableName || ' T, CT_ODS.A_LOAD_HISTORY L WHERE T.' || pKeyColumnName || ' = L.A_ETL_LOAD_SET_KEY AND TO_CHAR(L.LOAD_START,''YYYY'') = ' || CHR(39) || pYear || CHR(39) || ' AND TO_CHAR(L.LOAD_START,''MM'') = ' || CHR(39) || pMonth || CHR(39) || ' AND L.LOAD_START >= TO_DATE(' || CHR(39) || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'') AND L.LOAD_START < TO_DATE(' || CHR(39) || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')'; -- Construct the URI based on format IF pFormat = 'PARQUET' THEN -- Parquet: Use Hive-style partitioning -- Note: maxfilesize is NOT supported for Parquet format (Oracle limitation) vUri := pBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || 'PARTITION_YEAR=' || sanitizeFilename(pYear) || '/' || 'PARTITION_MONTH=' || sanitizeFilename(pMonth) || '/' || sanitizeFilename(pYear) || sanitizeFilename(pMonth) || '.parquet'; ENV_MANAGER.LOG_PROCESS_EVENT('Parquet export URI: ' || vUri, 'DEBUG', pParameters); -- Delete potentially corrupted file from previous failed attempt -- This prevents Oracle from creating _1 suffixed files on retry DELETE_FAILED_EXPORT_FILE(vUri, pCredentialName, pParameters); DBMS_CLOUD.EXPORT_DATA( credential_name => pCredentialName, file_uri_list => vUri, query => vQuery, format => json_object('type' VALUE 'parquet') ); ELSIF pFormat = 'CSV' THEN -- CSV: Flat file structure with year/month in filename vFileName := NVL(pFileBaseName, UPPER(pTableName)) || '_' || pYear || pMonth || '.csv'; vUri := pBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || sanitizeFilename(vFileName); ENV_MANAGER.LOG_PROCESS_EVENT('CSV export URI: ' || vUri, 'DEBUG', pParameters); -- Delete potentially corrupted file from previous failed attempt -- This prevents Oracle from creating _1 suffixed files on retry DELETE_FAILED_EXPORT_FILE(vUri, pCredentialName, pParameters); -- Use json_object() for CSV export with maxfilesize in bytes (Oracle requirement) -- Oracle maxfilesize: min 10MB (10485760), max 1GB (1073741824), default 10MB -- NOTE: maxfilesize must be NUMBER (bytes), not string like '1000M' -- Using 100MB (104857600) to avoid PGA memory issues with large files DBMS_CLOUD.EXPORT_DATA( credential_name => pCredentialName, file_uri_list => vUri, query => vQuery, format => json_object( 'type' VALUE 'CSV', 'header' VALUE true, 'quote' VALUE CHR(34), 'delimiter' VALUE ',', 'escape' VALUE true, 'recorddelimiter' VALUE CHR(13)||CHR(10), -- CRLF dla Windows 'maxfilesize' VALUE pMaxFileSize -- Dynamic maxfilesize in bytes (e.g., 104857600 = 100MB) ) ); ELSE RAISE_APPLICATION_ERROR(-20001, 'Unsupported format: ' || pFormat || '. Use PARQUET or CSV.'); END IF; ENV_MANAGER.LOG_PROCESS_EVENT('Processing Year/Month: ' || pYear || '/' || pMonth || ' (Format: ' || pFormat || ')', 'DEBUG', pParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', pParameters); END EXPORT_SINGLE_PARTITION; ---------------------------------------------------------------------------------------------------- /** * Callback procedure for DBMS_PARALLEL_EXECUTE * Processes single partition (year/month) chunk in parallel task * Called by DBMS_PARALLEL_EXECUTE framework for each chunk **/ PROCEDURE EXPORT_PARTITION_PARALLEL ( pStartId IN NUMBER, pEndId IN NUMBER ) IS vYear VARCHAR2(4); vMonth VARCHAR2(2); vSchemaName VARCHAR2(128); vTableName VARCHAR2(128); vKeyColumnName VARCHAR2(128); vBucketUri VARCHAR2(4000); vFolderName VARCHAR2(1000); vProcessedColumns VARCHAR2(32767); vMinDate DATE; vMaxDate DATE; vCredentialName VARCHAR2(200); vFormat VARCHAR2(20); vFileBaseName VARCHAR2(1000); vMaxFileSize NUMBER; vParameters VARCHAR2(4000); BEGIN -- Retrieve chunk context from global temporary table SELECT YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME, BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE, CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, MAX_FILE_SIZE INTO vYear, vMonth, vSchemaName, vTableName, vKeyColumnName, vBucketUri, vFolderName, vProcessedColumns, vMinDate, vMaxDate, vCredentialName, vFormat, vFileBaseName, vMaxFileSize FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE CHUNK_ID = pStartId; vParameters := 'Parallel task - Year: ' || vYear || ', Month: ' || vMonth || ', ChunkID: ' || pStartId; ENV_MANAGER.LOG_PROCESS_EVENT('Starting parallel export for partition ' || vYear || '/' || vMonth, 'DEBUG', vParameters); -- Mark chunk as PROCESSING UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS SET STATUS = 'PROCESSING', ERROR_MESSAGE = NULL WHERE CHUNK_ID = pStartId; COMMIT; -- Call the worker procedure EXPORT_SINGLE_PARTITION( pSchemaName => vSchemaName, pTableName => vTableName, pKeyColumnName => vKeyColumnName, pYear => vYear, pMonth => vMonth, pBucketUri => vBucketUri, pFolderName => vFolderName, pProcessedColumns => vProcessedColumns, pMinDate => vMinDate, pMaxDate => vMaxDate, pCredentialName => vCredentialName, pFormat => vFormat, pFileBaseName => vFileBaseName, pMaxFileSize => vMaxFileSize, pParameters => vParameters ); -- Mark chunk as COMPLETED UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS SET STATUS = 'COMPLETED', EXPORT_TIMESTAMP = SYSTIMESTAMP, ERROR_MESSAGE = NULL WHERE CHUNK_ID = pStartId; COMMIT; ENV_MANAGER.LOG_PROCESS_EVENT('Completed parallel export for partition ' || vYear || '/' || vMonth, 'DEBUG', vParameters); EXCEPTION WHEN OTHERS THEN -- Capture error details in variable (SQLERRM cannot be used directly in SQL) vgMsgTmp := 'Parallel task error for partition ' || vYear || '/' || vMonth || ' (ChunkID: ' || pStartId || '): ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); -- Mark chunk as FAILED with error message -- Use vgMsgTmp variable instead of SQLERRM directly (Oracle limitation in SQL context) UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS SET STATUS = 'FAILED', ERROR_MESSAGE = SUBSTR(vgMsgTmp, 1, 4000) WHERE CHUNK_ID = pStartId; COMMIT; RAISE; END EXPORT_PARTITION_PARALLEL; ---------------------------------------------------------------------------------------------------- -- MAIN EXPORT PROCEDURES ---------------------------------------------------------------------------------------------------- PROCEDURE EXPORT_TABLE_DATA ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pBucketArea IN VARCHAR2, pFolderName IN VARCHAR2, pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName ) IS -- Type definition for key values TYPE key_value_tab IS TABLE OF VARCHAR2(4000); vKeyValues key_value_tab; vCount INTEGER; vSql VARCHAR2(4000); vKeyValue VARCHAR2(4000); vQuery VARCHAR2(32767); vUri VARCHAR2(4000); vDataType VARCHAR2(30); vTableName VARCHAR2(128); vSchemaName VARCHAR2(128); vKeyColumnName VARCHAR2(128); vParameters VARCHAR2(4000); vBucketUri VARCHAR2(4000); vProcessedColumnList VARCHAR2(32767); vCurrentCol VARCHAR2(128); vAllColumnsList VARCHAR2(32767); BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||'''' ,'pTableName => '''||nvl(pTableName, 'NULL')||'''' ,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||'''' ,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||'''' ,'pFolderName => '''||nvl(pFolderName, 'NULL')||'''' ,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||'''' )); ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); -- Get bucket URI based on bucket area using FILE_MANAGER function vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea); -- Convert table and column names to uppercase to match data dictionary vTableName := UPPER(pTableName); vSchemaName := UPPER(pSchemaName); vKeyColumnName := UPPER(pKeyColumnName); -- Check if table exists SELECT COUNT(*) INTO vCount FROM all_tables WHERE table_name = vTableName AND owner = vSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS); END IF; -- Check if key column exists SELECT COUNT(*) INTO vCount FROM all_tab_columns WHERE table_name = vTableName AND column_name = vKeyColumnName AND owner = vSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); END IF; -- Get the data type of the key column SELECT data_type INTO vDataType FROM all_tab_columns WHERE table_name = vTableName AND column_name = vKeyColumnName AND owner = vSchemaName; -- Build list of all columns for the table (including key column for aliasing) SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id) INTO vAllColumnsList FROM all_tab_columns WHERE table_name = vTableName AND owner = vSchemaName; -- Process column list to add T. prefix and alias key column as A_WORKFLOW_HISTORY_KEY vProcessedColumnList := processColumnList(vAllColumnsList, vTableName, vSchemaName, vKeyColumnName); ENV_MANAGER.LOG_PROCESS_EVENT('Dynamic column list built: ' || vAllColumnsList, 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with T. prefix: ' || vProcessedColumnList, 'DEBUG', vParameters); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName); -- Fetch unique key values from A_LOAD_HISTORY vSql := 'SELECT DISTINCT L.A_ETL_LOAD_SET_KEY' || ' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' || ' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY'; ENV_MANAGER.LOG_PROCESS_EVENT('Executing key values query: ' || vSql, 'DEBUG', vParameters); EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValues; ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValues.COUNT || ' unique key values to process', 'DEBUG', vParameters); -- Loop over each unique key value FOR i IN 1 .. vKeyValues.COUNT LOOP vKeyValue := vKeyValues(i); -- Construct the query to extract data for the current key value with A_WORKFLOW_HISTORY_KEY mapping IF vDataType IN ('VARCHAR2', 'CHAR', 'NCHAR', 'NVARCHAR2') THEN vQuery := 'SELECT ' || vProcessedColumnList || ' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' || ' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' || ' AND L.A_ETL_LOAD_SET_KEY = ' || CHR(39) || vKeyValue || CHR(39); ELSIF vDataType IN ('NUMBER', 'FLOAT', 'BINARY_FLOAT', 'BINARY_DOUBLE') THEN vQuery := 'SELECT ' || vProcessedColumnList || ' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' || ' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' || ' AND L.A_ETL_LOAD_SET_KEY = ' || vKeyValue; ELSIF vDataType LIKE 'TIMESTAMP%' OR vDataType = 'DATE' THEN vQuery := 'SELECT ' || vProcessedColumnList || ' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' || ' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' || ' AND L.A_ETL_LOAD_SET_KEY = TO_TIMESTAMP(' || CHR(39) || vKeyValue || CHR(39) ||', ''YYYY-MM-DD HH24:MI:SS.FF'')'; ELSE RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNSUPPORTED_DATA_TYPE, ENV_MANAGER.MSG_UNSUPPORTED_DATA_TYPE); END IF; -- Construct the URI for the file in OCI Object Storage vUri := vBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || sanitizeFilename(vKeyValue) || '.csv'; ENV_MANAGER.LOG_PROCESS_EVENT('Processing key value: ' || vKeyValue || ' (' || (i) || '/' || vKeyValues.COUNT || ')', 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Export URI: ' || vUri, 'DEBUG', vParameters); -- Use DBMS_CLOUD package to export data to the URI DBMS_CLOUD.EXPORT_DATA( credential_name => pCredentialName, file_uri_list => vUri, query => vQuery, format => json_object('type' VALUE 'CSV', 'header' VALUE true) ); END LOOP; ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp); WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in column list' ELSE '' END; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp); WHEN ENV_MANAGER.ERR_UNSUPPORTED_DATA_TYPE THEN vgMsgTmp := ENV_MANAGER.MSG_UNSUPPORTED_DATA_TYPE || ' vDataType: '||vDataType; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNSUPPORTED_DATA_TYPE, vgMsgTmp); WHEN OTHERS THEN -- Log complete error details including full stack trace and backtrace ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER'); ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END EXPORT_TABLE_DATA; ---------------------------------------------------------------------------------------------------- PROCEDURE EXPORT_TABLE_DATA_BY_DATE ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pBucketArea IN VARCHAR2, pFolderName IN VARCHAR2, pColumnList IN VARCHAR2 default NULL, pMinDate IN DATE default DATE '1900-01-01', pMaxDate IN DATE default SYSDATE, pParallelDegree IN NUMBER default 1, pTemplateTableName IN VARCHAR2 default NULL, pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName ) IS vTableName VARCHAR2(128); vSchemaName VARCHAR2(128); vKeyColumnName VARCHAR2(128); vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vProcessedColumnList VARCHAR2(32767); vBucketUri VARCHAR2(4000); vCurrentCol VARCHAR2(128); vPartitions partition_tab; BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||'''' ,'pTableName => '''||nvl(pTableName, 'NULL')||'''' ,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||'''' ,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||'''' ,'pFolderName => '''||nvl(pFolderName, 'NULL')||'''' ,'pColumnList => '''||nvl(pColumnList, 'NULL')||'''' ,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' ,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' ,'pParallelDegree => '''||nvl(TO_CHAR(pParallelDegree), 'NULL')||'''' ,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||'''' ,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||'''' )); ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); -- Get bucket URI based on bucket area using FILE_MANAGER function vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea); -- Convert table and column names to uppercase to match data dictionary vTableName := UPPER(pTableName); vSchemaName := UPPER(pSchemaName); vKeyColumnName := UPPER(pKeyColumnName); -- Validate table, key column, and column list using shared procedure VALIDATE_TABLE_AND_COLUMNS(vSchemaName, vTableName, vKeyColumnName, pColumnList, vParameters); -- Build query with TO_CHAR for date columns (per-column format support) vProcessedColumnList := buildQueryWithDateFormats(pColumnList, vTableName, vSchemaName, vKeyColumnName, pTemplateTableName, 'PARQUET'); ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (building dynamic list from table metadata)'), 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with TO_CHAR for date columns: ' || vProcessedColumnList, 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Template table: ' || NVL(pTemplateTableName, 'NULL - using global default for all dates'), 'INFO', vParameters); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName); -- Validate parallel degree parameter IF pParallelDegree < 1 OR pParallelDegree > 16 THEN vgMsgTmp := ENV_MANAGER.MSG_INVALID_PARALLEL_DEGREE || ': ' || pParallelDegree || '. Valid range: 1-16'; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp); END IF; -- Get partitions using shared function vPartitions := GET_PARTITIONS(vSchemaName, vTableName, vKeyColumnName, pMinDate, pMaxDate, vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vPartitions.COUNT || ' partitions to export with parallel degree ' || pParallelDegree, 'INFO', vParameters); -- Sequential processing (parallel degree = 1) IF pParallelDegree = 1 THEN ENV_MANAGER.LOG_PROCESS_EVENT('Using sequential processing (pParallelDegree = 1)', 'DEBUG', vParameters); FOR i IN 1 .. vPartitions.COUNT LOOP EXPORT_SINGLE_PARTITION( pSchemaName => vSchemaName, pTableName => vTableName, pKeyColumnName => vKeyColumnName, pYear => vPartitions(i).year, pMonth => vPartitions(i).month, pBucketUri => vBucketUri, pFolderName => pFolderName, pProcessedColumns => vProcessedColumnList, pMinDate => pMinDate, pMaxDate => pMaxDate, pCredentialName => pCredentialName, pFormat => 'PARQUET', pFileBaseName => NULL, pMaxFileSize => 104857600, pParameters => vParameters ); END LOOP; -- Parallel processing (parallel degree > 1) ELSE -- Skip parallel processing if no partitions found IF vPartitions.COUNT = 0 THEN ENV_MANAGER.LOG_PROCESS_EVENT('No partitions to export - skipping parallel processing', 'INFO', vParameters); ELSE DECLARE vTaskName VARCHAR2(128) := 'DATA_EXPORT_TASK_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF'); vChunkId NUMBER; BEGIN ENV_MANAGER.LOG_PROCESS_EVENT('Using parallel processing with ' || pParallelDegree || ' threads', 'INFO', vParameters); -- Clean up old completed chunks (>24 hours) to prevent table bloat -- CRITICAL: Do NOT delete chunks from other active sessions (same-day tasks) -- This prevents race conditions when multiple exports run simultaneously DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'COMPLETED' AND CREATED_DATE < SYSTIMESTAMP - INTERVAL '1' DAY; COMMIT; ENV_MANAGER.LOG_PROCESS_EVENT('Cleared old COMPLETED chunks (>24h). Active session chunks preserved.', 'DEBUG', vParameters); -- This prevents re-exporting successfully completed partitions DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'COMPLETED'; COMMIT; ENV_MANAGER.LOG_PROCESS_EVENT('Cleared COMPLETED chunks. FAILED chunks retained for retry.', 'DEBUG', vParameters); -- Populate chunks table (insert new chunks, preserve FAILED chunks for retry) FOR i IN 1 .. vPartitions.COUNT LOOP MERGE INTO CT_MRDS.A_PARALLEL_EXPORT_CHUNKS t USING (SELECT i AS chunk_id, vPartitions(i).year AS yr, vPartitions(i).month AS mn FROM DUAL) s ON (t.CHUNK_ID = s.chunk_id) WHEN NOT MATCHED THEN INSERT (CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME, BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE, CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE, STATUS) VALUES (i, vTaskName, vPartitions(i).year, vPartitions(i).month, vSchemaName, vTableName, vKeyColumnName, vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate, pCredentialName, 'PARQUET', NULL, pTemplateTableName, 104857600, 'PENDING') WHEN MATCHED THEN UPDATE SET TASK_NAME = vTaskName, STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END, ERROR_MESSAGE = CASE WHEN t.STATUS = 'FAILED' THEN NULL ELSE t.ERROR_MESSAGE END; END LOOP; COMMIT; -- Log chunk statistics DECLARE vPendingCount NUMBER; vFailedCount NUMBER; BEGIN SELECT COUNT(*) INTO vPendingCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'PENDING'; SELECT COUNT(*) INTO vFailedCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'FAILED'; ENV_MANAGER.LOG_PROCESS_EVENT('Chunk statistics: PENDING=' || vPendingCount || ', FAILED (retry)=' || vFailedCount, 'INFO', vParameters); END; -- Create parallel task DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => vTaskName); -- Define chunks by number range (1 to partition count) DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_NUMBER_COL( task_name => vTaskName, table_owner => 'CT_MRDS', table_name => 'A_PARALLEL_EXPORT_CHUNKS', table_column => 'CHUNK_ID', chunk_size => 1 -- Each partition is one chunk ); -- Execute task in parallel ENV_MANAGER.LOG_PROCESS_EVENT('Executing parallel task: ' || vTaskName, 'DEBUG', vParameters); DBMS_PARALLEL_EXECUTE.RUN_TASK( task_name => vTaskName, sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id); END;', language_flag => DBMS_SQL.NATIVE, parallel_level => pParallelDegree ); -- Check for errors DECLARE vErrorCount NUMBER; BEGIN SELECT COUNT(*) INTO vErrorCount FROM USER_PARALLEL_EXECUTE_CHUNKS WHERE task_name = vTaskName AND status = 'PROCESSED_WITH_ERROR'; IF vErrorCount > 0 THEN vgMsgTmp := 'Parallel execution completed with ' || vErrorCount || ' errors. Check USER_PARALLEL_EXECUTE_CHUNKS for details.'; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); END IF; END; -- Clean up task DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName); -- Clean up chunks for THIS specific task only (session-safe) -- CRITICAL: Use TASK_NAME filter to avoid deleting chunks from other active sessions DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = vTaskName; COMMIT; ENV_MANAGER.LOG_PROCESS_EVENT('Parallel execution completed successfully', 'INFO', vParameters); EXCEPTION WHEN OTHERS THEN -- Attempt to drop task on error BEGIN DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName); EXCEPTION WHEN OTHERS THEN NULL; -- Ignore drop errors END; vgMsgTmp := ENV_MANAGER.MSG_PARALLEL_EXECUTION_FAILED || ': ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); END; END IF; END IF; ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp); WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp); WHEN ENV_MANAGER.ERR_INVALID_PARALLEL_DEGREE THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp); WHEN ENV_MANAGER.ERR_PARALLEL_EXECUTION_FAILED THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); WHEN OTHERS THEN -- Log complete error details including full stack trace and backtrace ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER'); ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END EXPORT_TABLE_DATA_BY_DATE; ---------------------------------------------------------------------------------------------------- /** * @name EXPORT_TABLE_DATA_TO_CSV_BY_DATE * @desc Exports data to a single CSV file with date filtering. * Unlike EXPORT_TABLE_DATA_BY_DATE, this procedure creates one CSV file * instead of multiple Parquet files partitioned by year/month. * Uses the same date filtering mechanism with CT_ODS.A_LOAD_HISTORY. * Allows specifying custom column list or uses T.* if pColumnList is NULL. * Validates that all columns in pColumnList exist in the target table. * Automatically adds 'T.' prefix to column names in pColumnList. * @example * begin * DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE( * pSchemaName => 'CT_MRDS', * pTableName => 'MY_TABLE', * pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', * pBucketArea => 'DATA', * pFolderName => 'exports', * pFileName => 'my_export.csv', * pColumnList => 'COLUMN1, COLUMN2, COLUMN3', -- Optional * pMinDate => DATE '2024-01-01', * pMaxDate => SYSDATE * ); * end; **/ PROCEDURE EXPORT_TABLE_DATA_TO_CSV_BY_DATE ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pBucketArea IN VARCHAR2, pFolderName IN VARCHAR2, pFileName IN VARCHAR2 DEFAULT NULL, pColumnList IN VARCHAR2 default NULL, pMinDate IN DATE default DATE '1900-01-01', pMaxDate IN DATE default SYSDATE, pParallelDegree IN NUMBER default 1, pTemplateTableName IN VARCHAR2 default NULL, pMaxFileSize IN NUMBER default 104857600, pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName ) IS vTableName VARCHAR2(128); vSchemaName VARCHAR2(128); vKeyColumnName VARCHAR2(128); vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vFileBaseName VARCHAR2(4000); vFileExtension VARCHAR2(10); vProcessedColumnList VARCHAR2(32767); vBucketUri VARCHAR2(4000); vCurrentCol VARCHAR2(128); vPartitions partition_tab; BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||'''' ,'pTableName => '''||nvl(pTableName, 'NULL')||'''' ,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||'''' ,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||'''' ,'pFolderName => '''||nvl(pFolderName, 'NULL')||'''' ,'pFileName => '''||nvl(pFileName, 'NULL')||'''' ,'pColumnList => '''||nvl(pColumnList, 'NULL')||'''' ,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' ,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' ,'pParallelDegree => '''||nvl(TO_CHAR(pParallelDegree), 'NULL')||'''' ,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||'''' ,'pMaxFileSize => '''||nvl(TO_CHAR(pMaxFileSize), 'NULL')||'''' ,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||'''' )); ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); -- Get bucket URI based on bucket area using FILE_MANAGER function vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea); -- Convert table and column names to uppercase to match data dictionary vTableName := UPPER(pTableName); vSchemaName := UPPER(pSchemaName); vKeyColumnName := UPPER(pKeyColumnName); -- Extract base filename and extension or construct default filename IF pFileName IS NOT NULL THEN -- Use provided filename IF INSTR(pFileName, '.') > 0 THEN vFileBaseName := SUBSTR(pFileName, 1, INSTR(pFileName, '.', -1) - 1); vFileExtension := SUBSTR(pFileName, INSTR(pFileName, '.', -1)); ELSE vFileBaseName := pFileName; vFileExtension := '.csv'; END IF; ELSE -- Construct default filename: TABLENAME (without extension, will be added by worker) vFileBaseName := UPPER(pTableName); vFileExtension := '.csv'; END IF; -- Validate table, key column, and column list using shared procedure VALIDATE_TABLE_AND_COLUMNS(vSchemaName, vTableName, vKeyColumnName, pColumnList, vParameters); -- Build query with TO_CHAR for date columns (per-column format support) vProcessedColumnList := buildQueryWithDateFormats(pColumnList, vTableName, vSchemaName, vKeyColumnName, pTemplateTableName); ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (using dynamic column list)'), 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with TO_CHAR for date columns: ' || vProcessedColumnList, 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Template table: ' || NVL(pTemplateTableName, 'NULL - using global default for all dates'), 'INFO', vParameters); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName); -- Validate parallel degree parameter IF pParallelDegree < 1 OR pParallelDegree > 16 THEN vgMsgTmp := ENV_MANAGER.MSG_INVALID_PARALLEL_DEGREE || ': ' || pParallelDegree || '. Valid range: 1-16'; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp); END IF; -- Get partitions using shared function vPartitions := GET_PARTITIONS(vSchemaName, vTableName, vKeyColumnName, pMinDate, pMaxDate, vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vPartitions.COUNT || ' year/month combinations to export', 'INFO', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Date range: ' || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || ' to ' || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Parallel degree: ' || pParallelDegree, 'INFO', vParameters); -- Sequential processing (parallel degree = 1) IF pParallelDegree = 1 THEN ENV_MANAGER.LOG_PROCESS_EVENT('Using sequential processing (pParallelDegree = 1)', 'DEBUG', vParameters); FOR i IN 1 .. vPartitions.COUNT LOOP EXPORT_SINGLE_PARTITION( pSchemaName => vSchemaName, pTableName => vTableName, pKeyColumnName => vKeyColumnName, pYear => vPartitions(i).year, pMonth => vPartitions(i).month, pBucketUri => vBucketUri, pFolderName => pFolderName, pProcessedColumns => vProcessedColumnList, pMinDate => pMinDate, pMaxDate => pMaxDate, pCredentialName => pCredentialName, pFormat => 'CSV', pFileBaseName => vFileBaseName, pMaxFileSize => pMaxFileSize, pParameters => vParameters ); END LOOP; -- Parallel processing (parallel degree > 1) ELSE -- Skip parallel processing if no partitions found IF vPartitions.COUNT = 0 THEN ENV_MANAGER.LOG_PROCESS_EVENT('No partitions to export - skipping parallel CSV processing', 'INFO', vParameters); ELSE DECLARE vTaskName VARCHAR2(128) := 'DATA_CSV_EXPORT_TASK_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF'); vChunkId NUMBER; BEGIN ENV_MANAGER.LOG_PROCESS_EVENT('Using parallel processing with ' || pParallelDegree || ' threads', 'INFO', vParameters); -- Clean up old completed chunks (>24 hours) to prevent table bloat -- CRITICAL: Do NOT delete chunks from other active sessions (same-day tasks) -- This prevents race conditions when multiple CSV exports run simultaneously DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'COMPLETED' AND CREATED_DATE < SYSTIMESTAMP - INTERVAL '1' DAY; COMMIT; ENV_MANAGER.LOG_PROCESS_EVENT('Cleared old COMPLETED chunks (>24h). Active session chunks preserved.', 'DEBUG', vParameters); -- Populate chunks table (insert new chunks, preserve FAILED chunks for retry) FOR i IN 1 .. vPartitions.COUNT LOOP MERGE INTO CT_MRDS.A_PARALLEL_EXPORT_CHUNKS t USING (SELECT i AS chunk_id, vPartitions(i).year AS yr, vPartitions(i).month AS mn FROM DUAL) s ON (t.CHUNK_ID = s.chunk_id) WHEN NOT MATCHED THEN INSERT (CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME, BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE, CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE, STATUS) VALUES (i, vTaskName, vPartitions(i).year, vPartitions(i).month, vSchemaName, vTableName, vKeyColumnName, vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate, pCredentialName, 'CSV', vFileBaseName, pTemplateTableName, pMaxFileSize, 'PENDING') WHEN MATCHED THEN UPDATE SET TASK_NAME = vTaskName, STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END, ERROR_MESSAGE = CASE WHEN t.STATUS = 'FAILED' THEN NULL ELSE t.ERROR_MESSAGE END; END LOOP; COMMIT; -- Log chunk statistics DECLARE vPendingCount NUMBER; vFailedCount NUMBER; BEGIN SELECT COUNT(*) INTO vPendingCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'PENDING'; SELECT COUNT(*) INTO vFailedCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'FAILED'; ENV_MANAGER.LOG_PROCESS_EVENT('Chunk statistics: PENDING=' || vPendingCount || ', FAILED (retry)=' || vFailedCount, 'INFO', vParameters); END; -- Create parallel task DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => vTaskName); -- Define chunks by number range (1 to partition count) DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_NUMBER_COL( task_name => vTaskName, table_owner => 'CT_MRDS', table_name => 'A_PARALLEL_EXPORT_CHUNKS', table_column => 'CHUNK_ID', chunk_size => 1 -- Each partition is one chunk ); -- Execute task in parallel ENV_MANAGER.LOG_PROCESS_EVENT('Executing parallel CSV export task: ' || vTaskName, 'DEBUG', vParameters); DBMS_PARALLEL_EXECUTE.RUN_TASK( task_name => vTaskName, sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id); END;', language_flag => DBMS_SQL.NATIVE, parallel_level => pParallelDegree ); -- Check for errors DECLARE vErrorCount NUMBER; BEGIN SELECT COUNT(*) INTO vErrorCount FROM USER_PARALLEL_EXECUTE_CHUNKS WHERE task_name = vTaskName AND status = 'PROCESSED_WITH_ERROR'; IF vErrorCount > 0 THEN vgMsgTmp := 'Parallel CSV export completed with ' || vErrorCount || ' errors. Check USER_PARALLEL_EXECUTE_CHUNKS for details.'; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); END IF; END; -- Clean up task DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName); -- Clean up chunks for THIS specific task only (session-safe) -- CRITICAL: Use TASK_NAME filter to avoid deleting chunks from other active CSV sessions DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = vTaskName; COMMIT; ENV_MANAGER.LOG_PROCESS_EVENT('Parallel CSV execution completed successfully', 'INFO', vParameters); EXCEPTION WHEN OTHERS THEN -- Attempt to drop task on error BEGIN DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName); EXCEPTION WHEN OTHERS THEN NULL; -- Ignore drop errors END; vgMsgTmp := ENV_MANAGER.MSG_PARALLEL_EXECUTION_FAILED || ': ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); END; END IF; END IF; ENV_MANAGER.LOG_PROCESS_EVENT('Export completed successfully for ' || vPartitions.COUNT || ' files', 'INFO', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp); WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp); WHEN ENV_MANAGER.ERR_INVALID_PARALLEL_DEGREE THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp); WHEN ENV_MANAGER.ERR_PARALLEL_EXECUTION_FAILED THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); WHEN OTHERS THEN -- Log complete error details including full stack trace and backtrace ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER'); ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END EXPORT_TABLE_DATA_TO_CSV_BY_DATE; ---------------------------------------------------------------------------------------------------- -- VERSION MANAGEMENT FUNCTIONS ---------------------------------------------------------------------------------------------------- FUNCTION GET_VERSION RETURN VARCHAR2 IS BEGIN RETURN PACKAGE_VERSION; END GET_VERSION; ---------------------------------------------------------------------------------------------------- FUNCTION GET_BUILD_INFO RETURN VARCHAR2 IS BEGIN RETURN ENV_MANAGER.GET_PACKAGE_VERSION_INFO( pPackageName => 'DATA_EXPORTER', pVersion => PACKAGE_VERSION, pBuildDate => PACKAGE_BUILD_DATE, pAuthor => PACKAGE_AUTHOR ); END GET_BUILD_INFO; ---------------------------------------------------------------------------------------------------- FUNCTION GET_VERSION_HISTORY RETURN VARCHAR2 IS BEGIN RETURN ENV_MANAGER.FORMAT_VERSION_HISTORY( pPackageName => 'DATA_EXPORTER', pVersionHistory => VERSION_HISTORY ); END GET_VERSION_HISTORY; ---------------------------------------------------------------------------------------------------- END; /