create or replace PACKAGE BODY CT_MRDS.DATA_EXPORTER AS ---------------------------------------------------------------------------------------------------- -- PRIVATE HELPER FUNCTIONS (USED BY MULTIPLE PROCEDURES) ---------------------------------------------------------------------------------------------------- /** * Sanitizes filename by replacing disallowed characters with underscores **/ FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS vFilename VARCHAR2(1000); BEGIN vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_'); RETURN vFilename; END sanitizeFilename; ---------------------------------------------------------------------------------------------------- /** * Deletes ALL files matching specific file pattern before retry export * Critical for preventing data duplication when DBMS_CLOUD.EXPORT_DATA fails mid-process * * Problem: Export fails after creating partial file(s), retry creates new _2, _3 suffixed files * Solution: Delete ALL files matching the base filename pattern before retry * * Pattern matching strategy: * - Parquet: folder/PARTITION_YEAR=2024/PARTITION_MONTH=11/*.parquet (folder-level safe - each chunk has own partition folder) * - CSV: folder/TABLENAME_202411*.csv (file-level pattern - multiple chunks share same folder!) * * CRITICAL for parallel processing: * - Parquet chunks are isolated by partition folder structure (safe to delete folder/*) * - CSV chunks share flat folder structure - MUST use file-specific pattern (TABLENAME_YYYYMM*) * to avoid deleting files from other parallel chunks in same folder **/ PROCEDURE DELETE_FAILED_EXPORT_FILE( pFileUri IN VARCHAR2, pCredentialName IN VARCHAR2, pParameters IN VARCHAR2 ) IS vBucketUri VARCHAR2(4000); vFolderPath VARCHAR2(4000); vFileName VARCHAR2(1000); vFileNamePattern VARCHAR2(1000); vSlashPos NUMBER; vDotPos NUMBER; vFilesDeleted NUMBER := 0; BEGIN -- Extract components from URI -- Example Parquet: https://.../bucket/folder/PARTITION_YEAR=2024/PARTITION_MONTH=11/202411.parquet -- Example CSV: https://.../bucket/folder/TABLENAME_202411.csv -- Find last slash before filename vSlashPos := INSTR(pFileUri, '/', -1); IF vSlashPos > 0 THEN -- Extract filename from URI (after last slash) vFileName := SUBSTR(pFileUri, vSlashPos + 1); -- Extract folder path (before last slash) vFolderPath := SUBSTR(pFileUri, 1, vSlashPos - 1); -- Find bucket URI (protocol + namespace + bucket name) -- Bucket URI ends after /o/ in OCI Object Storage URLs vBucketUri := SUBSTR(pFileUri, 1, INSTR(pFileUri, '/o/') + 2); -- Extract relative folder path (after bucket) vFolderPath := SUBSTR(vFolderPath, LENGTH(vBucketUri) + 1); -- Create file pattern by removing extension -- Oracle adds suffixes BEFORE extension: file.csv -> file_1_timestamp.csv -- Pattern: file* matches file_1_timestamp.csv, file_2_timestamp.csv vDotPos := INSTR(vFileName, '.', -1); IF vDotPos > 0 THEN vFileNamePattern := SUBSTR(vFileName, 1, vDotPos - 1) || '%'; ELSE vFileNamePattern := vFileName || '%'; END IF; ENV_MANAGER.LOG_PROCESS_EVENT('Cleanup before retry - Pattern: ' || vFolderPath || '/' || vFileNamePattern, 'DEBUG', pParameters); -- List and delete ALL files matching pattern -- CRITICAL: Uses file-specific pattern for CSV chunk isolation in shared folder FOR rec IN ( SELECT object_name FROM TABLE(DBMS_CLOUD.LIST_OBJECTS( credential_name => pCredentialName, location_uri => vBucketUri )) WHERE object_name LIKE vFolderPath || '/' || vFileNamePattern ) LOOP BEGIN DBMS_CLOUD.DELETE_OBJECT( credential_name => pCredentialName, object_uri => vBucketUri || rec.object_name ); vFilesDeleted := vFilesDeleted + 1; ENV_MANAGER.LOG_PROCESS_EVENT('Deleted partial file ' || vFilesDeleted || ': ' || rec.object_name, 'DEBUG', pParameters); EXCEPTION WHEN OTHERS THEN -- Log but continue - don't fail entire cleanup ENV_MANAGER.LOG_PROCESS_EVENT('Warning: Could not delete ' || rec.object_name || ': ' || SQLERRM, 'WARNING', pParameters); END; END LOOP; IF vFilesDeleted > 0 THEN ENV_MANAGER.LOG_PROCESS_EVENT('Cleanup completed: Deleted ' || vFilesDeleted || ' partial file(s) from previous failed export', 'INFO', pParameters); ELSE ENV_MANAGER.LOG_PROCESS_EVENT('No existing files to clean up (pattern match: ' || vFileNamePattern || ')', 'DEBUG', pParameters); END IF; ELSE ENV_MANAGER.LOG_PROCESS_EVENT('Warning: Cannot parse file URI for cleanup: ' || pFileUri, 'WARNING', pParameters); END IF; EXCEPTION WHEN OTHERS THEN -- Don't fail export if cleanup fails - log and continue ENV_MANAGER.LOG_PROCESS_EVENT('Warning: Cleanup failed (will retry export anyway): ' || SQLERRM, 'WARNING', pParameters); END DELETE_FAILED_EXPORT_FILE; ---------------------------------------------------------------------------------------------------- /** * Builds query with TO_CHAR for date/timestamp columns using per-column formats * Retrieves format for each date column from FILE_MANAGER.GET_DATE_FORMAT **/ FUNCTION buildQueryWithDateFormats( pColumnList IN VARCHAR2, pTableName IN VARCHAR2, pSchemaName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pTemplateTableName IN VARCHAR2 ) RETURN VARCHAR2 IS vResult VARCHAR2(32767); vColumns VARCHAR2(32767); vPos PLS_INTEGER; vNextPos PLS_INTEGER; vCurrentCol VARCHAR2(128); vAllCols VARCHAR2(32767); vDataType VARCHAR2(30); vDateFormat VARCHAR2(200); vTemplateSchema VARCHAR2(128); vTemplateTable VARCHAR2(128); vColExists NUMBER; BEGIN -- Build column list if not provided IF pColumnList IS NULL THEN -- Use template table for column order when provided -- Template defines which columns to export and in what order IF pTemplateTableName IS NOT NULL THEN -- Parse template table name (SCHEMA.TABLE or just TABLE) IF INSTR(pTemplateTableName, '.') > 0 THEN vTemplateSchema := SUBSTR(pTemplateTableName, 1, INSTR(pTemplateTableName, '.') - 1); vTemplateTable := SUBSTR(pTemplateTableName, INSTR(pTemplateTableName, '.') + 1); ELSE vTemplateSchema := pSchemaName; vTemplateTable := pTemplateTableName; END IF; -- Get columns from TEMPLATE table in template column order -- Template defines target CSV structure (column order and which columns to include) SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id) INTO vAllCols FROM all_tab_columns WHERE table_name = vTemplateTable AND owner = vTemplateSchema; ELSE -- Get columns from source table when no template SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id) INTO vAllCols FROM all_tab_columns WHERE table_name = pTableName AND owner = pSchemaName; END IF; ELSE vAllCols := pColumnList; END IF; -- Process each column vColumns := UPPER(REPLACE(vAllCols, ' ', '')); vPos := 1; vResult := ''; WHILE vPos <= LENGTH(vColumns) LOOP vNextPos := INSTR(vColumns, ',', vPos); IF vNextPos = 0 THEN vNextPos := LENGTH(vColumns) + 1; END IF; vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos); -- When using template table, check if column exists in SOURCE table -- Template defines target structure, source provides data -- Skip template columns that don't exist in source (except A_WORKFLOW_HISTORY_KEY) IF pTemplateTableName IS NOT NULL THEN -- Check if template column exists in SOURCE table SELECT COUNT(*) INTO vColExists FROM all_tab_columns WHERE table_name = pTableName AND column_name = vCurrentCol AND owner = pSchemaName; -- Skip columns that don't exist in source table -- Exception: A_WORKFLOW_HISTORY_KEY is virtual (mapped from pKeyColumnName) IF vColExists = 0 AND UPPER(vCurrentCol) != 'A_WORKFLOW_HISTORY_KEY' THEN vPos := vNextPos + 1; CONTINUE; END IF; END IF; -- Get column data type from appropriate table (template or source) IF pTemplateTableName IS NOT NULL THEN -- Get data type from template table SELECT data_type INTO vDataType FROM all_tab_columns WHERE table_name = vTemplateTable AND column_name = vCurrentCol AND owner = vTemplateSchema; ELSE -- Get data type from source table SELECT data_type INTO vDataType FROM all_tab_columns WHERE table_name = pTableName AND column_name = vCurrentCol AND owner = pSchemaName; END IF; -- Handle key column alias (template table has A_WORKFLOW_HISTORY_KEY, source table has pKeyColumnName) IF UPPER(vCurrentCol) = 'A_WORKFLOW_HISTORY_KEY' THEN vResult := vResult || CASE WHEN vResult IS NOT NULL THEN ', ' ELSE '' END || 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY'; -- Convert DATE/TIMESTAMP columns to CHAR with specific format ELSIF vDataType IN ('DATE', 'TIMESTAMP', 'TIMESTAMP WITH TIME ZONE', 'TIMESTAMP WITH LOCAL TIME ZONE') THEN IF pTemplateTableName IS NOT NULL THEN vDateFormat := CT_MRDS.FILE_MANAGER.GET_DATE_FORMAT( pTemplateTableName => pTemplateTableName, pColumnName => vCurrentCol ); ELSE vDateFormat := ENV_MANAGER.gvDefaultDateFormat; END IF; vResult := vResult || CASE WHEN vResult IS NOT NULL THEN ', ' ELSE '' END || 'TO_CHAR(T.' || vCurrentCol || ', ''' || vDateFormat || ''') AS ' || vCurrentCol; -- Other columns as-is with T. prefix ELSE vResult := vResult || CASE WHEN vResult IS NOT NULL THEN ', ' ELSE '' END || 'T.' || vCurrentCol; END IF; vPos := vNextPos + 1; END LOOP; RETURN vResult; END buildQueryWithDateFormats; ---------------------------------------------------------------------------------------------------- -- Internal shared function to process column list with T. prefix and key column mapping FUNCTION processColumnList(pColumnList IN VARCHAR2, pTableName IN VARCHAR2, pSchemaName IN VARCHAR2, pKeyColumnName IN VARCHAR2) RETURN VARCHAR2 IS vResult VARCHAR2(32767); vColumns VARCHAR2(32767); vPos PLS_INTEGER; vNextPos PLS_INTEGER; vCurrentCol VARCHAR2(128); vAllCols VARCHAR2(32767); BEGIN IF pColumnList IS NULL THEN -- Build list of all columns SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id) INTO vAllCols FROM all_tab_columns WHERE table_name = pTableName AND owner = pSchemaName; -- Add T. prefix to all columns vResult := 'T.' || REPLACE(vAllCols, ', ', ', T.'); -- Replace key column with aliased version (e.g., T.A_ETL_LOAD_SET_KEY_FK AS A_WORKFLOW_HISTORY_KEY) vResult := REPLACE(vResult, 'T.' || pKeyColumnName, 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY'); RETURN vResult; END IF; -- Remove extra spaces and convert to uppercase vColumns := UPPER(REPLACE(pColumnList, ' ', '')); vPos := 1; vResult := ''; -- Parse comma-separated column list and add T. prefix WHILE vPos <= LENGTH(vColumns) LOOP vNextPos := INSTR(vColumns, ',', vPos); IF vNextPos = 0 THEN vNextPos := LENGTH(vColumns) + 1; END IF; vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos); -- Check if this is the key column (e.g., A_ETL_LOAD_SET_KEY_FK) and add alias IF UPPER(vCurrentCol) = UPPER(pKeyColumnName) THEN vCurrentCol := 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY'; ELSE -- Add T. prefix if not already present IF INSTR(vCurrentCol, '.') = 0 THEN vCurrentCol := 'T.' || vCurrentCol; END IF; END IF; -- Add to result with comma separator IF vResult IS NOT NULL THEN vResult := vResult || ', '; END IF; vResult := vResult || vCurrentCol; vPos := vNextPos + 1; END LOOP; RETURN vResult; END processColumnList; ---------------------------------------------------------------------------------------------------- /** * Validates table existence, key column existence, and column list **/ PROCEDURE VALIDATE_TABLE_AND_COLUMNS ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pColumnList IN VARCHAR2, pParameters IN VARCHAR2 ) IS vCount INTEGER; vColumns VARCHAR2(32767); vPos PLS_INTEGER; vNextPos PLS_INTEGER; vCurrentCol VARCHAR2(128); BEGIN -- Check if table exists SELECT COUNT(*) INTO vCount FROM all_tables WHERE table_name = pTableName AND owner = pSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS); END IF; -- Check if key column exists SELECT COUNT(*) INTO vCount FROM all_tab_columns WHERE table_name = pTableName AND column_name = pKeyColumnName AND owner = pSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); END IF; -- Validate pColumnList - check if all column names exist in the table IF pColumnList IS NOT NULL THEN vColumns := UPPER(REPLACE(pColumnList, ' ', '')); vPos := 1; WHILE vPos <= LENGTH(vColumns) LOOP vNextPos := INSTR(vColumns, ',', vPos); IF vNextPos = 0 THEN vNextPos := LENGTH(vColumns) + 1; END IF; vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos); -- Remove table alias prefix if present IF INSTR(vCurrentCol, '.') > 0 THEN vCurrentCol := SUBSTR(vCurrentCol, INSTR(vCurrentCol, '.') + 1); END IF; -- Check if column exists SELECT COUNT(*) INTO vCount FROM all_tab_columns WHERE table_name = pTableName AND column_name = vCurrentCol AND owner = pSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); END IF; vPos := vNextPos + 1; END LOOP; END IF; END VALIDATE_TABLE_AND_COLUMNS; ---------------------------------------------------------------------------------------------------- /** * Retrieves list of year/month partitions based on date range **/ FUNCTION GET_PARTITIONS ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pMinDate IN DATE, pMaxDate IN DATE, pParameters IN VARCHAR2 ) RETURN partition_tab IS vSql VARCHAR2(32000); vPartitions partition_tab; vKeyValuesYear DBMS_SQL.VARCHAR2_TABLE; vKeyValuesMonth DBMS_SQL.VARCHAR2_TABLE; vFullTableName VARCHAR2(200); BEGIN -- Build fully qualified table name if not already qualified IF INSTR(pTableName, '.') > 0 THEN vFullTableName := pTableName; -- Already fully qualified ELSE vFullTableName := pSchemaName || '.' || pTableName; END IF; vSql := 'SELECT DISTINCT TO_CHAR(L.LOAD_START,''YYYY'') AS YR, TO_CHAR(L.LOAD_START,''MM'') AS MN FROM ' || vFullTableName || ' T, CT_ODS.A_LOAD_HISTORY L WHERE T.' || pKeyColumnName || ' = L.A_ETL_LOAD_SET_KEY AND L.LOAD_START >= :pMinDate AND L.LOAD_START < :pMaxDate ORDER BY YR, MN'; ENV_MANAGER.LOG_PROCESS_EVENT('Executing date range query: ' || vSql, 'DEBUG', pParameters); EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValuesYear, vKeyValuesMonth USING pMinDate, pMaxDate; ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValuesYear.COUNT || ' year/month combinations to export', 'DEBUG', pParameters); -- Convert to partition_tab vPartitions := partition_tab(); vPartitions.EXTEND(vKeyValuesYear.COUNT); FOR i IN 1 .. vKeyValuesYear.COUNT LOOP vPartitions(i).year := vKeyValuesYear(i); vPartitions(i).month := vKeyValuesMonth(i); END LOOP; RETURN vPartitions; END GET_PARTITIONS; ---------------------------------------------------------------------------------------------------- /** * Exports single partition (year/month) to specified format (PARQUET or CSV) * This is the core worker procedure that will be used for parallel processing in v2.3.0 **/ PROCEDURE EXPORT_SINGLE_PARTITION ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pYear IN VARCHAR2, pMonth IN VARCHAR2, pBucketUri IN VARCHAR2, pFolderName IN VARCHAR2, pProcessedColumns IN VARCHAR2, pMinDate IN DATE, pMaxDate IN DATE, pCredentialName IN VARCHAR2, pFormat IN VARCHAR2 DEFAULT 'PARQUET', pFileBaseName IN VARCHAR2 DEFAULT NULL, pMaxFileSize IN NUMBER DEFAULT 104857600, pParameters IN VARCHAR2 ) IS vQuery VARCHAR2(32767); vUri VARCHAR2(4000); vFileName VARCHAR2(1000); vFullTableName VARCHAR2(200); BEGIN -- Build fully qualified table name if not already qualified IF INSTR(pTableName, '.') > 0 THEN vFullTableName := pTableName; -- Already fully qualified ELSE vFullTableName := pSchemaName || '.' || pTableName; END IF; -- Construct the query to extract data for the current year/month vQuery := 'SELECT ' || pProcessedColumns || ' FROM ' || vFullTableName || ' T, CT_ODS.A_LOAD_HISTORY L WHERE T.' || pKeyColumnName || ' = L.A_ETL_LOAD_SET_KEY AND TO_CHAR(L.LOAD_START,''YYYY'') = ' || CHR(39) || pYear || CHR(39) || ' AND TO_CHAR(L.LOAD_START,''MM'') = ' || CHR(39) || pMonth || CHR(39) || ' AND L.LOAD_START >= TO_DATE(' || CHR(39) || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'') AND L.LOAD_START < TO_DATE(' || CHR(39) || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')'; ENV_MANAGER.LOG_PROCESS_EVENT('Processing Year/Month: ' || pYear || '/' || pMonth || ' (Format: '||pFormat||')', 'DEBUG', pParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', pParameters); -- Construct the URI based on format IF pFormat = 'PARQUET' THEN -- Parquet: Use Hive-style partitioning -- Note: maxfilesize is NOT supported for Parquet format (Oracle limitation) vUri := pBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || 'PARTITION_YEAR=' || sanitizeFilename(pYear) || '/' || 'PARTITION_MONTH=' || sanitizeFilename(pMonth) || '/' || sanitizeFilename(pYear) || sanitizeFilename(pMonth) || '.parquet'; ENV_MANAGER.LOG_PROCESS_EVENT('Parquet export URI: ' || vUri, 'DEBUG', pParameters); -- Delete potentially corrupted file from previous failed attempt -- This prevents Oracle from creating _1 suffixed files on retry DELETE_FAILED_EXPORT_FILE(vUri, pCredentialName, pParameters); DBMS_CLOUD.EXPORT_DATA( credential_name => pCredentialName, file_uri_list => vUri, query => vQuery, format => json_object('type' VALUE 'parquet') ); ELSIF pFormat = 'CSV' THEN -- CSV: Flat file structure with year/month in filename vFileName := NVL(pFileBaseName, UPPER(pTableName)) || '_' || pYear || pMonth || '.csv'; vUri := pBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || sanitizeFilename(vFileName); ENV_MANAGER.LOG_PROCESS_EVENT('CSV export URI: ' || vUri, 'DEBUG', pParameters); ENV_MANAGER.LOG_PROCESS_EVENT('CSV maxfilesize: ' || pMaxFileSize || ' bytes (' || ROUND(pMaxFileSize/1048576, 2) || ' MB)', 'DEBUG', pParameters); -- Delete potentially corrupted file from previous failed attempt -- This prevents Oracle from creating _1 suffixed files on retry DELETE_FAILED_EXPORT_FILE(vUri, pCredentialName, pParameters); -- Use json_object() for CSV export with maxfilesize in bytes (Oracle requirement) -- Oracle maxfilesize: min 10MB (10485760), max 1GB (1073741824), default 10MB -- NOTE: maxfilesize must be NUMBER (bytes), not string like '1000M' -- Using 100MB (104857600) to avoid PGA memory issues with large files DBMS_CLOUD.EXPORT_DATA( credential_name => pCredentialName, file_uri_list => vUri, query => vQuery, format => json_object( 'type' VALUE 'CSV', 'header' VALUE true, 'quote' VALUE CHR(34), 'delimiter' VALUE ',', 'escape' VALUE true, 'recorddelimiter' VALUE CHR(13)||CHR(10), -- CRLF dla Windows 'maxfilesize' VALUE pMaxFileSize -- Dynamic maxfilesize in bytes (e.g., 104857600 = 100MB) ) ); ELSE RAISE_APPLICATION_ERROR(-20001, 'Unsupported format: ' || pFormat || '. Use PARQUET or CSV.'); END IF; ENV_MANAGER.LOG_PROCESS_EVENT('Export completed successfully for ' || pYear || '/' || pMonth, 'DEBUG', pParameters); END EXPORT_SINGLE_PARTITION; ---------------------------------------------------------------------------------------------------- /** * Callback procedure for DBMS_PARALLEL_EXECUTE * Processes single partition (year/month) chunk in parallel task * Called by DBMS_PARALLEL_EXECUTE framework for each chunk **/ PROCEDURE EXPORT_PARTITION_PARALLEL ( pStartId IN NUMBER, pEndId IN NUMBER, pTaskName IN VARCHAR2 DEFAULT NULL ) IS vYear VARCHAR2(4); vMonth VARCHAR2(2); vSchemaName VARCHAR2(128); vTableName VARCHAR2(128); vKeyColumnName VARCHAR2(128); vBucketUri VARCHAR2(4000); vFolderName VARCHAR2(1000); vProcessedColumns VARCHAR2(32767); vMinDate DATE; vMaxDate DATE; vCredentialName VARCHAR2(200); vFormat VARCHAR2(20); vFileBaseName VARCHAR2(1000); vMaxFileSize NUMBER; vJobClass VARCHAR2(128); vTaskName VARCHAR2(128); vParameters VARCHAR2(4000); BEGIN -- Retrieve chunk context from A_PARALLEL_EXPORT_CHUNKS table -- CRITICAL: Filter by CHUNK_ID and TASK_NAME for precise session isolation -- pTaskName parameter passed from RUN_TASK ensures deterministic single-row retrieval SELECT YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME, BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE, CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, MAX_FILE_SIZE, JOB_CLASS, TASK_NAME INTO vYear, vMonth, vSchemaName, vTableName, vKeyColumnName, vBucketUri, vFolderName, vProcessedColumns, vMinDate, vMaxDate, vCredentialName, vFormat, vFileBaseName, vMaxFileSize, vJobClass, vTaskName FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE CHUNK_ID = pStartId AND TASK_NAME = pTaskName; vParameters := 'Parallel task - Year: ' || vYear || ', Month: ' || vMonth || ', ChunkID: ' || pStartId || ', TaskName: ' || vTaskName; ENV_MANAGER.LOG_PROCESS_EVENT('Starting parallel export for partition ' || vYear || '/' || vMonth, 'DEBUG', vParameters); -- Mark chunk as PROCESSING -- CRITICAL: Use both CHUNK_ID AND TASK_NAME for session isolation UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS SET STATUS = 'PROCESSING', ERROR_MESSAGE = NULL WHERE CHUNK_ID = pStartId AND TASK_NAME = vTaskName; COMMIT; -- Call the worker procedure EXPORT_SINGLE_PARTITION( pSchemaName => vSchemaName, pTableName => vTableName, pKeyColumnName => vKeyColumnName, pYear => vYear, pMonth => vMonth, pBucketUri => vBucketUri, pFolderName => vFolderName, pProcessedColumns => vProcessedColumns, pMinDate => vMinDate, pMaxDate => vMaxDate, pCredentialName => vCredentialName, pFormat => vFormat, pFileBaseName => vFileBaseName, pMaxFileSize => vMaxFileSize, pParameters => vParameters ); -- Mark chunk as COMPLETED -- CRITICAL: Use both CHUNK_ID AND TASK_NAME for session isolation UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS SET STATUS = 'COMPLETED', EXPORT_TIMESTAMP = SYSTIMESTAMP, ERROR_MESSAGE = NULL WHERE CHUNK_ID = pStartId AND TASK_NAME = vTaskName; COMMIT; ENV_MANAGER.LOG_PROCESS_EVENT('Completed parallel export for partition ' || vYear || '/' || vMonth, 'DEBUG', vParameters); EXCEPTION WHEN OTHERS THEN -- Capture error details in variable (SQLERRM cannot be used directly in SQL) vgMsgTmp := 'Parallel task error for partition ' || vYear || '/' || vMonth || ' (ChunkID: ' || pStartId || ', TaskName: ' || vTaskName || '): ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); -- Mark chunk as FAILED with error message -- CRITICAL: Use both CHUNK_ID AND TASK_NAME for session isolation -- Use vgMsgTmp variable instead of SQLERRM directly (Oracle limitation in SQL context) UPDATE CT_MRDS.A_PARALLEL_EXPORT_CHUNKS SET STATUS = 'FAILED', ERROR_MESSAGE = SUBSTR(vgMsgTmp, 1, 4000) WHERE CHUNK_ID = pStartId AND TASK_NAME = vTaskName; COMMIT; RAISE; END EXPORT_PARTITION_PARALLEL; ---------------------------------------------------------------------------------------------------- -- MAIN EXPORT PROCEDURES ---------------------------------------------------------------------------------------------------- PROCEDURE EXPORT_TABLE_DATA ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pBucketArea IN VARCHAR2, pFolderName IN VARCHAR2, pFileName IN VARCHAR2 default NULL, pTemplateTableName IN VARCHAR2 default NULL, pMaxFileSize IN NUMBER default 104857600, pRegisterExport IN BOOLEAN default FALSE, pProcessName IN VARCHAR2 default 'DATA_EXPORTER', pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName ) IS vCount INTEGER; vQuery VARCHAR2(32767); vUri VARCHAR2(4000); vTableName VARCHAR2(128); vSchemaName VARCHAR2(128); vKeyColumnName VARCHAR2(128); vParameters VARCHAR2(4000); vBucketUri VARCHAR2(4000); vProcessedColumnList VARCHAR2(32767); vCurrentCol VARCHAR2(128); -- Variables for file registration (when pRegisterExport=TRUE) vConfigKey NUMBER; vSourceKey VARCHAR2(100); vTableId VARCHAR2(100); vSlashPos1 NUMBER; vSlashPos2 NUMBER; vSourceFileReceivedKey NUMBER; BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||'''' ,'pTableName => '''||nvl(pTableName, 'NULL')||'''' ,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||'''' ,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||'''' ,'pFolderName => '''||nvl(pFolderName, 'NULL')||'''' ,'pFileName => '''||nvl(pFileName, 'NULL')||'''' ,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||'''' ,'pMaxFileSize => '''||nvl(TO_CHAR(pMaxFileSize), 'NULL')||'''' ,'pRegisterExport => '''||CASE WHEN pRegisterExport THEN 'TRUE' ELSE 'FALSE' END||'''' ,'pProcessName => '''||nvl(pProcessName, 'NULL')||'''' ,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||'''' )); ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); -- Get bucket URI based on bucket area using FILE_MANAGER function vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea); -- Convert table and column names to uppercase to match data dictionary vTableName := UPPER(pTableName); vSchemaName := UPPER(pSchemaName); vKeyColumnName := UPPER(pKeyColumnName); -- Check if table exists SELECT COUNT(*) INTO vCount FROM all_tables WHERE table_name = vTableName AND owner = vSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS); END IF; -- Check if key column exists SELECT COUNT(*) INTO vCount FROM all_tab_columns WHERE table_name = vTableName AND column_name = vKeyColumnName AND owner = vSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); END IF; -- Validate template table if provided IF pTemplateTableName IS NOT NULL THEN DECLARE vTemplateSchema VARCHAR2(128); vTemplateTable VARCHAR2(128); vTemplateCount NUMBER; BEGIN -- Parse template table name (SCHEMA.TABLE or just TABLE) IF INSTR(pTemplateTableName, '.') > 0 THEN vTemplateSchema := UPPER(SUBSTR(pTemplateTableName, 1, INSTR(pTemplateTableName, '.') - 1)); vTemplateTable := UPPER(SUBSTR(pTemplateTableName, INSTR(pTemplateTableName, '.') + 1)); ELSE vTemplateSchema := vSchemaName; vTemplateTable := UPPER(pTemplateTableName); END IF; -- Check if template table exists SELECT COUNT(*) INTO vTemplateCount FROM all_tables WHERE table_name = vTemplateTable AND owner = vTemplateSchema; IF vTemplateCount = 0 THEN vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS || ': Template table ' || vTemplateSchema || '.' || vTemplateTable; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp); END IF; ENV_MANAGER.LOG_PROCESS_EVENT('Template table validated: ' || vTemplateSchema || '.' || vTemplateTable, 'DEBUG', vParameters); END; END IF; -- Build query with TO_CHAR for date columns (per-column format support) vProcessedColumnList := buildQueryWithDateFormats(NULL, vTableName, vSchemaName, vKeyColumnName, pTemplateTableName); ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with TO_CHAR for date columns: ' || vProcessedColumnList, 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Template table: ' || NVL(pTemplateTableName, 'NULL - using global default for all dates'), 'INFO', vParameters); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName); -- Lookup A_SOURCE_FILE_CONFIG_KEY based on pFolderName parsing if pRegisterExport is enabled IF pRegisterExport THEN -- Format: {BUCKET_AREA}/{SOURCE_KEY}/{TABLE_ID} -- Example: 'ODS/CSDB/CSDB_DEBT_DAILY' -> SOURCE_KEY='CSDB', TABLE_ID='CSDB_DEBT_DAILY' -- Parse pFolderName to extract SOURCE_KEY and TABLE_ID vSlashPos1 := INSTR(pFolderName, '/', 1, 1); -- First '/' position vSlashPos2 := INSTR(pFolderName, '/', 1, 2); -- Second '/' position IF vSlashPos1 > 0 AND vSlashPos2 > 0 THEN -- Extract segment 2 (SOURCE_KEY) and segment 3 (TABLE_ID) vSourceKey := SUBSTR(pFolderName, vSlashPos1 + 1, vSlashPos2 - vSlashPos1 - 1); vTableId := SUBSTR(pFolderName, vSlashPos2 + 1); -- Find configuration based on SOURCE_KEY and TABLE_ID BEGIN SELECT A_SOURCE_FILE_CONFIG_KEY INTO vConfigKey FROM CT_MRDS.A_SOURCE_FILE_CONFIG WHERE A_SOURCE_KEY = vSourceKey AND TABLE_ID = vTableId AND SOURCE_FILE_TYPE = 'INPUT' AND ROWNUM = 1; ENV_MANAGER.LOG_PROCESS_EVENT('Found config key: ' || vConfigKey || ' for SOURCE=' || vSourceKey || ', TABLE=' || vTableId, 'DEBUG', vParameters); EXCEPTION WHEN NO_DATA_FOUND THEN vConfigKey := -1; ENV_MANAGER.LOG_PROCESS_EVENT('No config found for SOURCE=' || vSourceKey || ', TABLE=' || vTableId || ' - using default (-1)', 'INFO', vParameters); END; ELSE -- Cannot parse folder name - use default vConfigKey := -1; ENV_MANAGER.LOG_PROCESS_EVENT('Cannot parse pFolderName: ' || pFolderName || ' - using default (-1)', 'WARNING', vParameters); END IF; ENV_MANAGER.LOG_PROCESS_EVENT('File registration enabled with config key: ' || vConfigKey, 'INFO', vParameters); END IF; -- Construct single query for entire table (no join with A_LOAD_HISTORY - ensures single file output) vQuery := 'SELECT ' || vProcessedColumnList || ' FROM ' || vTableName || ' T'; -- Construct the URI for the file in OCI Object Storage vUri := vBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || NVL(pFileName, UPPER(vTableName) || '.csv'); ENV_MANAGER.LOG_PROCESS_EVENT('Exporting to single file: ' || vUri, 'INFO', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Max file size: ' || pMaxFileSize || ' bytes (' || ROUND(pMaxFileSize/1048576, 2) || ' MB)', 'DEBUG', vParameters); -- Use DBMS_CLOUD package to export data to the URI -- Oracle maxfilesize: min 10MB (10485760), max 1GB (1073741824), default 100MB (104857600) DBMS_CLOUD.EXPORT_DATA( credential_name => pCredentialName, file_uri_list => vUri, query => vQuery, format => json_object( 'type' VALUE 'CSV', 'header' VALUE true, 'quote' VALUE CHR(34), 'delimiter' VALUE ',', 'escape' VALUE true, 'recorddelimiter' VALUE CHR(13)||CHR(10), -- CRLF dla Windows 'maxfilesize' VALUE pMaxFileSize -- Dynamic maxfilesize in bytes ) ); -- Register exported file to A_SOURCE_FILE_RECEIVED if requested IF pRegisterExport THEN DECLARE vActualFileName VARCHAR2(1000); -- Actual filename with Oracle suffix vSanitizedFileName VARCHAR2(1000); vFileName VARCHAR2(1000); vRetryCount NUMBER := 0; vMaxRetries NUMBER := 1; -- One retry after initial attempt vRetryDelay NUMBER := 2; -- 2 seconds delay vFilesFound NUMBER := 0; vTotalBytes NUMBER := 0; BEGIN -- Extract filename from URI (after last '/') vFileName := SUBSTR(vUri, INSTR(vUri, '/', -1) + 1); -- Sanitize filename first (PL/SQL function cannot be used directly in SQL) vSanitizedFileName := sanitizeFilename(vFileName); -- Remove .csv extension for LIKE pattern matching (Oracle adds suffixes BEFORE .csv) -- Example: tablename.csv becomes tablename_1_20260211T102621591769Z.csv vSanitizedFileName := REGEXP_REPLACE(vSanitizedFileName, '\.csv$', '', 1, 0, 'i'); -- Try to get ALL exported files with retry logic -- Oracle DBMS_CLOUD.EXPORT_DATA can create MULTIPLE files due to: -- 1. maxfilesize parameter (splits files larger than limit) -- 2. Automatic parallel processing (especially on large production instances) -- We must register ALL files, not just the first one <> LOOP BEGIN -- Register ALL files matching the pattern (cursor loop) FOR rec IN ( SELECT object_name, checksum, created, bytes FROM TABLE(DBMS_CLOUD.LIST_OBJECTS( credential_name => pCredentialName, location_uri => vBucketUri )) WHERE object_name LIKE CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || vSanitizedFileName || '%' ORDER BY created DESC, bytes DESC ) LOOP -- Extract filename only from full path (remove bucket folder prefix) vActualFileName := SUBSTR(rec.object_name, INSTR(rec.object_name, '/', -1) + 1); -- Create A_SOURCE_FILE_RECEIVED record for EACH exported file vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL; INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED ( A_SOURCE_FILE_RECEIVED_KEY, A_SOURCE_FILE_CONFIG_KEY, SOURCE_FILE_NAME, CHECKSUM, CREATED, BYTES, RECEPTION_DATE, PROCESSING_STATUS, PARTITION_YEAR, PARTITION_MONTH, ARCH_PATH, PROCESS_NAME ) VALUES ( vSourceFileReceivedKey, NVL(vConfigKey, -1), -- Use config key if found, otherwise -1 vActualFileName, -- Use actual filename with Oracle suffix rec.checksum, rec.created, rec.bytes, SYSDATE, 'INGESTED', NULL, -- PARTITION_YEAR not used for single-file exports NULL, -- PARTITION_MONTH not used for single-file exports NULL, -- ARCH_PATH not used for single-file exports pProcessName -- Process name from parameter ); vFilesFound := vFilesFound + 1; vTotalBytes := vTotalBytes + rec.bytes; ENV_MANAGER.LOG_PROCESS_EVENT('Registered file ' || vFilesFound || ': FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vActualFileName || ', Size=' || rec.bytes || ' bytes', 'INFO', vParameters); END LOOP; -- Check if any files were found IF vFilesFound = 0 THEN RAISE NO_DATA_FOUND; END IF; -- Success - exit retry loop ENV_MANAGER.LOG_PROCESS_EVENT('Total registered: ' || vFilesFound || ' file(s), Total size: ' || vTotalBytes || ' bytes (' || ROUND(vTotalBytes/1048576, 2) || ' MB)', 'INFO', vParameters); EXIT metadata_retry_loop; EXCEPTION WHEN NO_DATA_FOUND THEN vRetryCount := vRetryCount + 1; IF vRetryCount <= vMaxRetries THEN -- Log retry attempt ENV_MANAGER.LOG_PROCESS_EVENT('File(s) not found in bucket (attempt ' || vRetryCount || '/' || (vMaxRetries + 1) || '), retrying after ' || vRetryDelay || ' seconds: ' || vFileName, 'DEBUG', vParameters); -- Wait before retry using DBMS_SESSION.SLEEP (alternative to DBMS_LOCK) DBMS_SESSION.SLEEP(vRetryDelay); ELSE -- Max retries exceeded - re-raise exception RAISE; END IF; END; END LOOP metadata_retry_loop; EXCEPTION WHEN NO_DATA_FOUND THEN -- File not found after retries - log warning and continue without metadata ENV_MANAGER.LOG_PROCESS_EVENT('WARNING: File not found in bucket after ' || (vMaxRetries + 1) || ' attempts: ' || vFileName, 'WARNING', vParameters); -- Sanitize filename for fallback INSERT (function cannot be used in SQL) vSanitizedFileName := sanitizeFilename(vFileName); -- Insert without metadata using theoretical filename vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL; INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED ( A_SOURCE_FILE_RECEIVED_KEY, A_SOURCE_FILE_CONFIG_KEY, SOURCE_FILE_NAME, RECEPTION_DATE, PROCESSING_STATUS, PARTITION_YEAR, PARTITION_MONTH, ARCH_PATH, PROCESS_NAME ) VALUES ( vSourceFileReceivedKey, NVL(vConfigKey, -1), -- Use config key if found, otherwise -1 vSanitizedFileName, -- Use pre-calculated sanitized filename SYSDATE, 'INGESTED', NULL, -- PARTITION_YEAR not used for single-file exports NULL, -- PARTITION_MONTH not used for single-file exports NULL, -- ARCH_PATH not used for single-file exports pProcessName -- Process name from parameter ); ENV_MANAGER.LOG_PROCESS_EVENT('Registered file without metadata: FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vSanitizedFileName, 'INFO', vParameters); END; END IF; ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp); WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in column list' ELSE '' END; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp); WHEN OTHERS THEN -- Log complete error details including full stack trace and backtrace ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER'); ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END EXPORT_TABLE_DATA; ---------------------------------------------------------------------------------------------------- PROCEDURE EXPORT_TABLE_DATA_BY_DATE ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pBucketArea IN VARCHAR2, pFolderName IN VARCHAR2, pColumnList IN VARCHAR2 default NULL, pMinDate IN DATE default DATE '1900-01-01', pMaxDate IN DATE default SYSDATE, pParallelDegree IN NUMBER default 1, pTemplateTableName IN VARCHAR2 default NULL, pJobClass IN VARCHAR2 default NULL, pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName ) IS vTableName VARCHAR2(128); vSchemaName VARCHAR2(128); vKeyColumnName VARCHAR2(128); vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vProcessedColumnList VARCHAR2(32767); vBucketUri VARCHAR2(4000); vCurrentCol VARCHAR2(128); vPartitions partition_tab; BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||'''' ,'pTableName => '''||nvl(pTableName, 'NULL')||'''' ,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||'''' ,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||'''' ,'pFolderName => '''||nvl(pFolderName, 'NULL')||'''' ,'pColumnList => '''||nvl(pColumnList, 'NULL')||'''' ,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' ,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' ,'pParallelDegree => '''||nvl(TO_CHAR(pParallelDegree), 'NULL')||'''' ,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||'''' ,'pJobClass => '''||nvl(pJobClass, 'NULL')||'''' ,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||'''' )); ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); -- Get bucket URI based on bucket area using FILE_MANAGER function vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea); -- Convert table and column names to uppercase to match data dictionary vTableName := UPPER(pTableName); vSchemaName := UPPER(pSchemaName); vKeyColumnName := UPPER(pKeyColumnName); -- Validate table, key column, and column list using shared procedure VALIDATE_TABLE_AND_COLUMNS(vSchemaName, vTableName, vKeyColumnName, pColumnList, vParameters); -- Build query with TO_CHAR for date columns (per-column format support) vProcessedColumnList := buildQueryWithDateFormats(pColumnList, vTableName, vSchemaName, vKeyColumnName, pTemplateTableName); ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (building dynamic list from table metadata)'), 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with TO_CHAR for date columns: ' || vProcessedColumnList, 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Template table: ' || NVL(pTemplateTableName, 'NULL - using global default for all dates'), 'INFO', vParameters); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName); -- Validate parallel degree parameter IF pParallelDegree < 1 OR pParallelDegree > 16 THEN vgMsgTmp := ENV_MANAGER.MSG_INVALID_PARALLEL_DEGREE || ': ' || pParallelDegree || '. Valid range: 1-16'; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp); END IF; -- Get partitions using shared function vPartitions := GET_PARTITIONS(vSchemaName, vTableName, vKeyColumnName, pMinDate, pMaxDate, vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vPartitions.COUNT || ' partitions to export with parallel degree ' || pParallelDegree, 'INFO', vParameters); -- Sequential processing (parallel degree = 1) IF pParallelDegree = 1 THEN ENV_MANAGER.LOG_PROCESS_EVENT('Using sequential processing (pParallelDegree = 1)', 'DEBUG', vParameters); FOR i IN 1 .. vPartitions.COUNT LOOP EXPORT_SINGLE_PARTITION( pSchemaName => vSchemaName, pTableName => vTableName, pKeyColumnName => vKeyColumnName, pYear => vPartitions(i).year, pMonth => vPartitions(i).month, pBucketUri => vBucketUri, pFolderName => pFolderName, pProcessedColumns => vProcessedColumnList, pMinDate => pMinDate, pMaxDate => pMaxDate, pCredentialName => pCredentialName, pFormat => 'PARQUET', pFileBaseName => NULL, pMaxFileSize => 104857600, pParameters => vParameters ); END LOOP; -- Parallel processing (parallel degree > 1) ELSE -- Skip parallel processing if no partitions found IF vPartitions.COUNT = 0 THEN ENV_MANAGER.LOG_PROCESS_EVENT('No partitions to export - skipping parallel processing', 'INFO', vParameters); ELSE DECLARE vTaskName VARCHAR2(128) := 'DATA_EXPORT_TASK_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF'); vChunkId NUMBER; BEGIN ENV_MANAGER.LOG_PROCESS_EVENT('Using parallel processing with ' || pParallelDegree || ' threads', 'INFO', vParameters); -- Populate chunks table (insert new chunks, preserve FAILED chunks for retry) FOR i IN 1 .. vPartitions.COUNT LOOP MERGE INTO CT_MRDS.A_PARALLEL_EXPORT_CHUNKS t USING (SELECT i AS chunk_id, vTaskName AS task_name, vPartitions(i).year AS yr, vPartitions(i).month AS mn FROM DUAL) s ON (t.CHUNK_ID = s.chunk_id AND t.TASK_NAME = s.task_name) WHEN NOT MATCHED THEN INSERT (CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME, BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE, CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE, JOB_CLASS, STATUS) VALUES (i, vTaskName, vPartitions(i).year, vPartitions(i).month, vSchemaName, vTableName, vKeyColumnName, vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate, pCredentialName, 'PARQUET', NULL, pTemplateTableName, 104857600, pJobClass, 'PENDING') WHEN MATCHED THEN -- Match found: chunk exists for SAME task (composite PK: TASK_NAME, CHUNK_ID) -- This handles retry scenario: reset FAILED chunks to PENDING for re-processing UPDATE SET STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END, ERROR_MESSAGE = CASE WHEN t.STATUS = 'FAILED' THEN NULL ELSE t.ERROR_MESSAGE END; END LOOP; COMMIT; -- Log chunk statistics (session-safe: only count chunks for THIS task) DECLARE vPendingCount NUMBER; vFailedCount NUMBER; BEGIN SELECT COUNT(*) INTO vPendingCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'PENDING' AND TASK_NAME = vTaskName; SELECT COUNT(*) INTO vFailedCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'FAILED' AND TASK_NAME = vTaskName; ENV_MANAGER.LOG_PROCESS_EVENT('Chunk statistics for task ' || vTaskName || ': PENDING=' || vPendingCount || ', FAILED (retry)=' || vFailedCount, 'INFO', vParameters); END; -- Create parallel task DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => vTaskName); -- Define chunks using SQL query to ensure TASK_NAME isolation -- CRITICAL: Filter by TASK_NAME to avoid selecting chunks from other concurrent sessions -- CRITICAL: Use START_ID and END_ID aliases to avoid ORA-00960 ambiguous column naming DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL( task_name => vTaskName, sql_stmt => 'SELECT CHUNK_ID AS START_ID, CHUNK_ID AS END_ID FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = ''' || vTaskName || ''' ORDER BY CHUNK_ID', by_rowid => FALSE ); -- Execute task in parallel ENV_MANAGER.LOG_PROCESS_EVENT('Executing parallel task: ' || vTaskName || CASE WHEN pJobClass IS NOT NULL THEN ' with job class: ' || pJobClass ELSE '' END, 'DEBUG', vParameters); IF pJobClass IS NOT NULL THEN DBMS_PARALLEL_EXECUTE.RUN_TASK( task_name => vTaskName, sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id, ''' || vTaskName || '''); END;', language_flag => DBMS_SQL.NATIVE, parallel_level => pParallelDegree, job_class => pJobClass ); ELSE DBMS_PARALLEL_EXECUTE.RUN_TASK( task_name => vTaskName, sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id, ''' || vTaskName || '''); END;', language_flag => DBMS_SQL.NATIVE, parallel_level => pParallelDegree ); END IF; -- Check for errors DECLARE vErrorCount NUMBER; BEGIN SELECT COUNT(*) INTO vErrorCount FROM USER_PARALLEL_EXECUTE_CHUNKS WHERE task_name = vTaskName AND status = 'PROCESSED_WITH_ERROR'; IF vErrorCount > 0 THEN vgMsgTmp := 'Parallel execution completed with ' || vErrorCount || ' errors. Check USER_PARALLEL_EXECUTE_CHUNKS for details.'; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); END IF; END; -- Clean up task DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName); -- Clean up chunks for THIS specific task only (session-safe) -- CRITICAL: Use TASK_NAME filter to avoid deleting chunks from other active sessions DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = vTaskName; COMMIT; ENV_MANAGER.LOG_PROCESS_EVENT('Parallel execution completed successfully', 'INFO', vParameters); EXCEPTION WHEN OTHERS THEN -- Attempt to drop task on error BEGIN DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName); EXCEPTION WHEN OTHERS THEN NULL; -- Ignore drop errors END; vgMsgTmp := ENV_MANAGER.MSG_PARALLEL_EXECUTION_FAILED || ': ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); END; END IF; END IF; ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp); WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp); WHEN ENV_MANAGER.ERR_INVALID_PARALLEL_DEGREE THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp); WHEN ENV_MANAGER.ERR_PARALLEL_EXECUTION_FAILED THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); WHEN OTHERS THEN -- Log complete error details including full stack trace and backtrace ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER'); ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END EXPORT_TABLE_DATA_BY_DATE; ---------------------------------------------------------------------------------------------------- /** * @name EXPORT_TABLE_DATA_TO_CSV_BY_DATE * @desc Exports data to a single CSV file with date filtering. * Unlike EXPORT_TABLE_DATA_BY_DATE, this procedure creates one CSV file * instead of multiple Parquet files partitioned by year/month. * Uses the same date filtering mechanism with CT_ODS.A_LOAD_HISTORY. * Allows specifying custom column list or uses T.* if pColumnList is NULL. * Validates that all columns in pColumnList exist in the target table. * Automatically adds 'T.' prefix to column names in pColumnList. * @example * begin * DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE( * pSchemaName => 'CT_MRDS', * pTableName => 'MY_TABLE', * pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', * pBucketArea => 'DATA', * pFolderName => 'exports', * pFileName => 'my_export.csv', * pColumnList => 'COLUMN1, COLUMN2, COLUMN3', -- Optional * pMinDate => DATE '2024-01-01', * pMaxDate => SYSDATE * ); * end; **/ PROCEDURE EXPORT_TABLE_DATA_TO_CSV_BY_DATE ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pBucketArea IN VARCHAR2, pFolderName IN VARCHAR2, pFileName IN VARCHAR2 DEFAULT NULL, pColumnList IN VARCHAR2 default NULL, pMinDate IN DATE default DATE '1900-01-01', pMaxDate IN DATE default SYSDATE, pParallelDegree IN NUMBER default 1, pTemplateTableName IN VARCHAR2 default NULL, pMaxFileSize IN NUMBER default 104857600, pRegisterExport IN BOOLEAN default FALSE, pProcessName IN VARCHAR2 default 'DATA_EXPORTER', pJobClass IN VARCHAR2 default NULL, pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName ) IS vTableName VARCHAR2(128); vSchemaName VARCHAR2(128); vKeyColumnName VARCHAR2(128); vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vFileBaseName VARCHAR2(4000); vFileExtension VARCHAR2(10); vProcessedColumnList VARCHAR2(32767); vBucketUri VARCHAR2(4000); vCurrentCol VARCHAR2(128); vPartitions partition_tab; vSourceFileReceivedKey NUMBER; vFileName VARCHAR2(1000); vFileUri VARCHAR2(4000); -- Variables for A_SOURCE_FILE_CONFIG lookup vSourceKey VARCHAR2(100); vTableId VARCHAR2(200); vConfigKey NUMBER := -1; vSlashPos1 NUMBER; vSlashPos2 NUMBER; BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||'''' ,'pTableName => '''||nvl(pTableName, 'NULL')||'''' ,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||'''' ,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||'''' ,'pFolderName => '''||nvl(pFolderName, 'NULL')||'''' ,'pFileName => '''||nvl(pFileName, 'NULL')||'''' ,'pColumnList => '''||nvl(pColumnList, 'NULL')||'''' ,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' ,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' ,'pParallelDegree => '''||nvl(TO_CHAR(pParallelDegree), 'NULL')||'''' ,'pTemplateTableName => '''||nvl(pTemplateTableName, 'NULL')||'''' ,'pMaxFileSize => '''||nvl(TO_CHAR(pMaxFileSize), 'NULL')||'''' ,'pRegisterExport => '''||CASE WHEN pRegisterExport THEN 'TRUE' ELSE 'FALSE' END||'''' ,'pJobClass => '''||nvl(pJobClass, 'NULL')||'''' ,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||'''' )); ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); -- Get bucket URI based on bucket area using FILE_MANAGER function vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea); -- Convert table and column names to uppercase to match data dictionary vTableName := UPPER(pTableName); vSchemaName := UPPER(pSchemaName); vKeyColumnName := UPPER(pKeyColumnName); -- Extract base filename and extension or construct default filename IF pFileName IS NOT NULL THEN -- Use provided filename IF INSTR(pFileName, '.') > 0 THEN vFileBaseName := SUBSTR(pFileName, 1, INSTR(pFileName, '.', -1) - 1); vFileExtension := SUBSTR(pFileName, INSTR(pFileName, '.', -1)); ELSE vFileBaseName := pFileName; vFileExtension := '.csv'; END IF; ELSE -- Construct default filename: TABLENAME (without extension, will be added by worker) vFileBaseName := UPPER(pTableName); vFileExtension := '.csv'; END IF; -- Validate table, key column, and column list using shared procedure VALIDATE_TABLE_AND_COLUMNS(vSchemaName, vTableName, vKeyColumnName, pColumnList, vParameters); -- Build query with TO_CHAR for date columns (per-column format support) vProcessedColumnList := buildQueryWithDateFormats(pColumnList, vTableName, vSchemaName, vKeyColumnName, pTemplateTableName); ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (using dynamic column list)'), 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with TO_CHAR for date columns: ' || vProcessedColumnList, 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Template table: ' || NVL(pTemplateTableName, 'NULL - using global default for all dates'), 'INFO', vParameters); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName); -- Validate parallel degree parameter IF pParallelDegree < 1 OR pParallelDegree > 16 THEN vgMsgTmp := ENV_MANAGER.MSG_INVALID_PARALLEL_DEGREE || ': ' || pParallelDegree || '. Valid range: 1-16'; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp); END IF; -- Get partitions using shared function vPartitions := GET_PARTITIONS(vSchemaName, vTableName, vKeyColumnName, pMinDate, pMaxDate, vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vPartitions.COUNT || ' year/month combinations to export', 'INFO', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Date range: ' || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || ' to ' || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Parallel degree: ' || pParallelDegree, 'INFO', vParameters); -- Sequential processing (parallel degree = 1) IF pParallelDegree = 1 THEN ENV_MANAGER.LOG_PROCESS_EVENT('Using sequential processing (pParallelDegree = 1)', 'DEBUG', vParameters); FOR i IN 1 .. vPartitions.COUNT LOOP EXPORT_SINGLE_PARTITION( pSchemaName => vSchemaName, pTableName => vTableName, pKeyColumnName => vKeyColumnName, pYear => vPartitions(i).year, pMonth => vPartitions(i).month, pBucketUri => vBucketUri, pFolderName => pFolderName, pProcessedColumns => vProcessedColumnList, pMinDate => pMinDate, pMaxDate => pMaxDate, pCredentialName => pCredentialName, pFormat => 'CSV', pFileBaseName => vFileBaseName, pMaxFileSize => pMaxFileSize, pParameters => vParameters ); END LOOP; -- Parallel processing (parallel degree > 1) ELSE -- Skip parallel processing if no partitions found IF vPartitions.COUNT = 0 THEN ENV_MANAGER.LOG_PROCESS_EVENT('No partitions to export - skipping parallel CSV processing', 'INFO', vParameters); ELSE DECLARE vTaskName VARCHAR2(128) := 'DATA_CSV_EXPORT_TASK_' || TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF'); vChunkId NUMBER; BEGIN ENV_MANAGER.LOG_PROCESS_EVENT('Using parallel processing with ' || pParallelDegree || ' threads', 'INFO', vParameters); -- Clean up old completed chunks (>24 hours) to prevent table bloat -- CRITICAL: Do NOT delete chunks from other active sessions (same-day tasks) -- This prevents race conditions when multiple CSV exports run simultaneously DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'COMPLETED' AND CREATED_DATE < SYSTIMESTAMP - INTERVAL '1' DAY; COMMIT; ENV_MANAGER.LOG_PROCESS_EVENT('Cleared old COMPLETED chunks (>24h). Active session chunks preserved.', 'DEBUG', vParameters); -- Populate chunks table (insert new chunks, preserve FAILED chunks for retry) FOR i IN 1 .. vPartitions.COUNT LOOP MERGE INTO CT_MRDS.A_PARALLEL_EXPORT_CHUNKS t USING (SELECT i AS chunk_id, vTaskName AS task_name, vPartitions(i).year AS yr, vPartitions(i).month AS mn FROM DUAL) s ON (t.CHUNK_ID = s.chunk_id AND t.TASK_NAME = s.task_name) WHEN NOT MATCHED THEN INSERT (CHUNK_ID, TASK_NAME, YEAR_VALUE, MONTH_VALUE, SCHEMA_NAME, TABLE_NAME, KEY_COLUMN_NAME, BUCKET_URI, FOLDER_NAME, PROCESSED_COLUMNS, MIN_DATE, MAX_DATE, CREDENTIAL_NAME, FORMAT_TYPE, FILE_BASE_NAME, TEMPLATE_TABLE_NAME, MAX_FILE_SIZE, JOB_CLASS, STATUS) VALUES (i, vTaskName, vPartitions(i).year, vPartitions(i).month, vSchemaName, vTableName, vKeyColumnName, vBucketUri, pFolderName, vProcessedColumnList, pMinDate, pMaxDate, pCredentialName, 'CSV', vFileBaseName, pTemplateTableName, pMaxFileSize, pJobClass, 'PENDING') WHEN MATCHED THEN -- Match found: chunk exists for SAME task (composite PK: TASK_NAME, CHUNK_ID) -- This handles retry scenario: reset FAILED chunks to PENDING for re-processing UPDATE SET STATUS = CASE WHEN t.STATUS = 'FAILED' THEN 'PENDING' ELSE t.STATUS END, ERROR_MESSAGE = CASE WHEN t.STATUS = 'FAILED' THEN NULL ELSE t.ERROR_MESSAGE END; END LOOP; COMMIT; -- Log chunk statistics (session-safe: only count chunks for THIS task) DECLARE vPendingCount NUMBER; vFailedCount NUMBER; BEGIN SELECT COUNT(*) INTO vPendingCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'PENDING' AND TASK_NAME = vTaskName; SELECT COUNT(*) INTO vFailedCount FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE STATUS = 'FAILED' AND TASK_NAME = vTaskName; ENV_MANAGER.LOG_PROCESS_EVENT('Chunk statistics for task ' || vTaskName || ': PENDING=' || vPendingCount || ', FAILED (retry)=' || vFailedCount, 'INFO', vParameters); END; -- Create parallel task DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => vTaskName); -- Define chunks using SQL query to ensure TASK_NAME isolation -- CRITICAL: Filter by TASK_NAME to avoid selecting chunks from other concurrent sessions -- CRITICAL: Use START_ID and END_ID aliases to avoid ORA-00960 ambiguous column naming DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL( task_name => vTaskName, sql_stmt => 'SELECT CHUNK_ID AS START_ID, CHUNK_ID AS END_ID FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = ''' || vTaskName || ''' ORDER BY CHUNK_ID', by_rowid => FALSE ); -- Execute task in parallel ENV_MANAGER.LOG_PROCESS_EVENT('Executing parallel CSV export task: ' || vTaskName || CASE WHEN pJobClass IS NOT NULL THEN ' with job class: ' || pJobClass ELSE '' END, 'DEBUG', vParameters); IF pJobClass IS NOT NULL THEN DBMS_PARALLEL_EXECUTE.RUN_TASK( task_name => vTaskName, sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id, ''' || vTaskName || '''); END;', language_flag => DBMS_SQL.NATIVE, parallel_level => pParallelDegree, job_class => pJobClass ); ELSE DBMS_PARALLEL_EXECUTE.RUN_TASK( task_name => vTaskName, sql_stmt => 'BEGIN CT_MRDS.DATA_EXPORTER.EXPORT_PARTITION_PARALLEL(:start_id, :end_id, ''' || vTaskName || '''); END;', language_flag => DBMS_SQL.NATIVE, parallel_level => pParallelDegree ); END IF; -- Check for errors DECLARE vErrorCount NUMBER; BEGIN SELECT COUNT(*) INTO vErrorCount FROM USER_PARALLEL_EXECUTE_CHUNKS WHERE task_name = vTaskName AND status = 'PROCESSED_WITH_ERROR'; IF vErrorCount > 0 THEN vgMsgTmp := 'Parallel CSV export completed with ' || vErrorCount || ' errors. Check USER_PARALLEL_EXECUTE_CHUNKS for details.'; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); END IF; END; -- Clean up task DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName); -- Clean up chunks for THIS specific task only (session-safe) -- CRITICAL: Use TASK_NAME filter to avoid deleting chunks from other active CSV sessions DELETE FROM CT_MRDS.A_PARALLEL_EXPORT_CHUNKS WHERE TASK_NAME = vTaskName; COMMIT; ENV_MANAGER.LOG_PROCESS_EVENT('Parallel CSV execution completed successfully', 'INFO', vParameters); EXCEPTION WHEN OTHERS THEN -- Attempt to drop task on error BEGIN DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => vTaskName); EXCEPTION WHEN OTHERS THEN NULL; -- Ignore drop errors END; vgMsgTmp := ENV_MANAGER.MSG_PARALLEL_EXECUTION_FAILED || ': ' || SQLERRM || cgBL || DBMS_UTILITY.FORMAT_ERROR_BACKTRACE; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); END; END IF; END IF; -- Note: File registration handled by EXPORT_SINGLE_PARTITION when pRegisterExport=TRUE -- Each partition calls pRegisterExport logic independently during serial/parallel execution -- Register exported files to A_SOURCE_FILE_RECEIVED if requested (after successful export) IF pRegisterExport THEN -- Lookup A_SOURCE_FILE_CONFIG_KEY based on pFolderName parsing -- Format: {BUCKET_AREA}/{SOURCE_KEY}/{TABLE_ID} -- Example: 'ODS/CSDB/CSDB_DEBT_DAILY' -> SOURCE_KEY='CSDB', TABLE_ID='CSDB_DEBT_DAILY' -- Parse pFolderName to extract SOURCE_KEY and TABLE_ID vSlashPos1 := INSTR(pFolderName, '/', 1, 1); -- First '/' position vSlashPos2 := INSTR(pFolderName, '/', 1, 2); -- Second '/' position IF vSlashPos1 > 0 AND vSlashPos2 > 0 THEN -- Extract segment 2 (SOURCE_KEY) and segment 3 (TABLE_ID) vSourceKey := SUBSTR(pFolderName, vSlashPos1 + 1, vSlashPos2 - vSlashPos1 - 1); vTableId := SUBSTR(pFolderName, vSlashPos2 + 1); -- Find configuration based on SOURCE_KEY and TABLE_ID BEGIN SELECT A_SOURCE_FILE_CONFIG_KEY INTO vConfigKey FROM CT_MRDS.A_SOURCE_FILE_CONFIG WHERE A_SOURCE_KEY = vSourceKey AND TABLE_ID = vTableId AND SOURCE_FILE_TYPE = 'INPUT' AND ROWNUM = 1; ENV_MANAGER.LOG_PROCESS_EVENT('Found config key: ' || vConfigKey || ' for SOURCE=' || vSourceKey || ', TABLE=' || vTableId, 'DEBUG', vParameters); EXCEPTION WHEN NO_DATA_FOUND THEN vConfigKey := -1; ENV_MANAGER.LOG_PROCESS_EVENT('No config found for SOURCE=' || vSourceKey || ', TABLE=' || vTableId || ' - using default (-1)', 'INFO', vParameters); END; ELSE -- Cannot parse folder name - use default vConfigKey := -1; ENV_MANAGER.LOG_PROCESS_EVENT('Cannot parse pFolderName: ' || pFolderName || ' - using default (-1)', 'WARNING', vParameters); END IF; ENV_MANAGER.LOG_PROCESS_EVENT('Registering ' || vPartitions.COUNT || ' exported files to A_SOURCE_FILE_RECEIVED with config key: ' || vConfigKey, 'INFO', vParameters); FOR i IN 1 .. vPartitions.COUNT LOOP -- Construct filename and URI for this partition vFileName := NVL(vFileBaseName, UPPER(REPLACE(vTableName, vSchemaName || '.', ''))) || '_' || vPartitions(i).year || vPartitions(i).month || '.csv'; vFileUri := vBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || sanitizeFilename(vFileName); -- Get file metadata from OCI bucket (CHECKSUM, CREATED, BYTES) with retry logic DECLARE vChecksum VARCHAR2(128); vCreated TIMESTAMP WITH TIME ZONE; vBytes NUMBER; vActualFileName VARCHAR2(1000); -- Actual filename with Oracle suffix vSanitizedFileName VARCHAR2(1000); vRetryCount NUMBER := 0; vMaxRetries NUMBER := 1; -- One retry after initial attempt vRetryDelay NUMBER := 2; -- 2 seconds delay BEGIN -- Sanitize filename first (PL/SQL function cannot be used directly in SQL) vSanitizedFileName := sanitizeFilename(vFileName); -- Remove .csv extension for LIKE pattern matching (Oracle adds suffixes BEFORE .csv) -- Example: LEGACY_DEBT_202508.csv becomes LEGACY_DEBT_202508_1_20260211T102621591769Z.csv vSanitizedFileName := REGEXP_REPLACE(vSanitizedFileName, '\.csv$', '', 1, 0, 'i'); -- Try to get file metadata with retry logic <> LOOP BEGIN SELECT object_name, checksum, created, bytes INTO vActualFileName, vChecksum, vCreated, vBytes FROM TABLE(DBMS_CLOUD.LIST_OBJECTS( credential_name => pCredentialName, location_uri => vBucketUri )) WHERE object_name LIKE CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || vSanitizedFileName || '%' ORDER BY created DESC, bytes DESC FETCH FIRST 1 ROW ONLY; -- Extract filename only from full path (remove bucket folder prefix) -- vActualFileName contains: 'ODS/CSDB/CSDB_DEBT/LEGACY_DEBT_202508_1_20260211T111341375171Z.csv' -- Extract only: 'LEGACY_DEBT_202508_1_20260211T111341375171Z.csv' vActualFileName := SUBSTR(vActualFileName, INSTR(vActualFileName, '/', -1) + 1); -- Success - exit retry loop EXIT metadata_retry_loop; EXCEPTION WHEN NO_DATA_FOUND THEN vRetryCount := vRetryCount + 1; IF vRetryCount <= vMaxRetries THEN -- Log retry attempt ENV_MANAGER.LOG_PROCESS_EVENT('File not found in bucket (attempt ' || vRetryCount || '/' || (vMaxRetries + 1) || '), retrying after ' || vRetryDelay || ' seconds: ' || vFileName, 'DEBUG', vParameters); -- Wait before retry using DBMS_SESSION.SLEEP (alternative to DBMS_LOCK) DBMS_SESSION.SLEEP(vRetryDelay); ELSE -- Max retries exceeded - re-raise exception RAISE; END IF; END; END LOOP metadata_retry_loop; -- Create A_SOURCE_FILE_RECEIVED record for this export with metadata vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL; INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED ( A_SOURCE_FILE_RECEIVED_KEY, A_SOURCE_FILE_CONFIG_KEY, SOURCE_FILE_NAME, CHECKSUM, CREATED, BYTES, RECEPTION_DATE, PROCESSING_STATUS, PARTITION_YEAR, PARTITION_MONTH, ARCH_PATH, PROCESS_NAME ) VALUES ( vSourceFileReceivedKey, vConfigKey, -- Config key from A_SOURCE_FILE_CONFIG lookup vActualFileName, -- Use actual filename with Oracle suffix vChecksum, vCreated, vBytes, SYSDATE, 'INGESTED', NULL, -- PARTITION_YEAR not used for CSV exports NULL, -- PARTITION_MONTH not used for CSV exports NULL, -- ARCH_PATH not used for CSV exports pProcessName -- Process name from parameter ); ENV_MANAGER.LOG_PROCESS_EVENT('Registered file: FileReceivedKey=' || vSourceFileReceivedKey || ', File=' || vActualFileName || ', Size=' || vBytes || ' bytes', 'DEBUG', vParameters); EXCEPTION WHEN NO_DATA_FOUND THEN -- File not found after retries - log warning and continue without metadata ENV_MANAGER.LOG_PROCESS_EVENT('WARNING: File not found in bucket after ' || (vMaxRetries + 1) || ' attempts: ' || vFileName, 'WARNING', vParameters); -- Sanitize filename for fallback INSERT (function cannot be used in SQL) vSanitizedFileName := sanitizeFilename(vFileName); -- Insert without metadata vSourceFileReceivedKey := CT_MRDS.A_SOURCE_FILE_RECEIVED_KEY_SEQ.NEXTVAL; INSERT INTO CT_MRDS.A_SOURCE_FILE_RECEIVED ( A_SOURCE_FILE_RECEIVED_KEY, A_SOURCE_FILE_CONFIG_KEY, SOURCE_FILE_NAME, RECEPTION_DATE, PROCESSING_STATUS, PARTITION_YEAR, PARTITION_MONTH, ARCH_PATH, PROCESS_NAME ) VALUES ( vSourceFileReceivedKey, vConfigKey, -- Config key from A_SOURCE_FILE_CONFIG lookup vSanitizedFileName, -- Fallback: use theoretical filename if actual not found SYSDATE, 'INGESTED', NULL, -- PARTITION_YEAR not used for CSV exports NULL, -- PARTITION_MONTH not used for CSV exports NULL, -- ARCH_PATH not used for CSV exports pProcessName -- Process name from parameter ); END; END LOOP; COMMIT; ENV_MANAGER.LOG_PROCESS_EVENT('Successfully registered all ' || vPartitions.COUNT || ' files', 'INFO', vParameters); END IF; ENV_MANAGER.LOG_PROCESS_EVENT('Export completed successfully for ' || vPartitions.COUNT || ' files', 'INFO', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp); WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp); WHEN ENV_MANAGER.ERR_INVALID_PARALLEL_DEGREE THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_INVALID_PARALLEL_DEGREE, vgMsgTmp); WHEN ENV_MANAGER.ERR_PARALLEL_EXECUTION_FAILED THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_PARALLEL_EXECUTION_FAILED, vgMsgTmp); WHEN OTHERS THEN -- Log complete error details including full stack trace and backtrace ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER'); ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END EXPORT_TABLE_DATA_TO_CSV_BY_DATE; ---------------------------------------------------------------------------------------------------- -- VERSION MANAGEMENT FUNCTIONS ---------------------------------------------------------------------------------------------------- FUNCTION GET_VERSION RETURN VARCHAR2 IS BEGIN RETURN PACKAGE_VERSION; END GET_VERSION; ---------------------------------------------------------------------------------------------------- FUNCTION GET_BUILD_INFO RETURN VARCHAR2 IS BEGIN RETURN ENV_MANAGER.GET_PACKAGE_VERSION_INFO( pPackageName => 'DATA_EXPORTER', pVersion => PACKAGE_VERSION, pBuildDate => PACKAGE_BUILD_DATE, pAuthor => PACKAGE_AUTHOR ); END GET_BUILD_INFO; ---------------------------------------------------------------------------------------------------- FUNCTION GET_VERSION_HISTORY RETURN VARCHAR2 IS BEGIN RETURN ENV_MANAGER.FORMAT_VERSION_HISTORY( pPackageName => 'DATA_EXPORTER', pVersionHistory => VERSION_HISTORY ); END GET_VERSION_HISTORY; ---------------------------------------------------------------------------------------------------- END; /