create or replace PACKAGE BODY CT_MRDS.DATA_EXPORTER AS ---------------------------------------------------------------------------------------------------- -- PRIVATE HELPER FUNCTIONS (USED BY MULTIPLE PROCEDURES) ---------------------------------------------------------------------------------------------------- /** * Sanitizes filename by replacing disallowed characters with underscores **/ FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS vFilename VARCHAR2(1000); BEGIN vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_'); RETURN vFilename; END sanitizeFilename; ---------------------------------------------------------------------------------------------------- -- Internal shared function to process column list with T. prefix and key column mapping FUNCTION processColumnList(pColumnList IN VARCHAR2, pTableName IN VARCHAR2, pSchemaName IN VARCHAR2, pKeyColumnName IN VARCHAR2) RETURN VARCHAR2 IS vResult VARCHAR2(32767); vColumns VARCHAR2(32767); vPos PLS_INTEGER; vNextPos PLS_INTEGER; vCurrentCol VARCHAR2(128); vAllCols VARCHAR2(32767); BEGIN IF pColumnList IS NULL THEN -- Build list of all columns SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id) INTO vAllCols FROM all_tab_columns WHERE table_name = pTableName AND owner = pSchemaName; -- Add T. prefix to all columns vResult := 'T.' || REPLACE(vAllCols, ', ', ', T.'); -- Replace key column with aliased version (e.g., T.A_ETL_LOAD_SET_KEY_FK AS A_WORKFLOW_HISTORY_KEY) vResult := REPLACE(vResult, 'T.' || pKeyColumnName, 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY'); RETURN vResult; END IF; -- Remove extra spaces and convert to uppercase vColumns := UPPER(REPLACE(pColumnList, ' ', '')); vPos := 1; vResult := ''; -- Parse comma-separated column list and add T. prefix WHILE vPos <= LENGTH(vColumns) LOOP vNextPos := INSTR(vColumns, ',', vPos); IF vNextPos = 0 THEN vNextPos := LENGTH(vColumns) + 1; END IF; vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos); -- Check if this is the key column (e.g., A_ETL_LOAD_SET_KEY_FK) and add alias IF UPPER(vCurrentCol) = UPPER(pKeyColumnName) THEN vCurrentCol := 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY'; ELSIF UPPER(vCurrentCol) = 'A_ETL_LOAD_SET_KEY' THEN vCurrentCol := 'T.A_ETL_LOAD_SET_KEY AS A_WORKFLOW_HISTORY_KEY'; ELSE -- Add T. prefix if not already present IF INSTR(vCurrentCol, '.') = 0 THEN vCurrentCol := 'T.' || vCurrentCol; END IF; END IF; -- Add to result with comma separator IF vResult IS NOT NULL THEN vResult := vResult || ', '; END IF; vResult := vResult || vCurrentCol; vPos := vNextPos + 1; END LOOP; RETURN vResult; END processColumnList; ---------------------------------------------------------------------------------------------------- /** * Validates table existence, key column existence, and column list **/ PROCEDURE VALIDATE_TABLE_AND_COLUMNS ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pColumnList IN VARCHAR2, pParameters IN VARCHAR2 ) IS vCount INTEGER; vColumns VARCHAR2(32767); vPos PLS_INTEGER; vNextPos PLS_INTEGER; vCurrentCol VARCHAR2(128); BEGIN -- Check if table exists SELECT COUNT(*) INTO vCount FROM all_tables WHERE table_name = pTableName AND owner = pSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS); END IF; -- Check if key column exists SELECT COUNT(*) INTO vCount FROM all_tab_columns WHERE table_name = pTableName AND column_name = pKeyColumnName AND owner = pSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); END IF; -- Validate pColumnList - check if all column names exist in the table IF pColumnList IS NOT NULL THEN vColumns := UPPER(REPLACE(pColumnList, ' ', '')); vPos := 1; WHILE vPos <= LENGTH(vColumns) LOOP vNextPos := INSTR(vColumns, ',', vPos); IF vNextPos = 0 THEN vNextPos := LENGTH(vColumns) + 1; END IF; vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos); -- Remove table alias prefix if present IF INSTR(vCurrentCol, '.') > 0 THEN vCurrentCol := SUBSTR(vCurrentCol, INSTR(vCurrentCol, '.') + 1); END IF; -- Check if column exists SELECT COUNT(*) INTO vCount FROM all_tab_columns WHERE table_name = pTableName AND column_name = vCurrentCol AND owner = pSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); END IF; vPos := vNextPos + 1; END LOOP; END IF; END VALIDATE_TABLE_AND_COLUMNS; ---------------------------------------------------------------------------------------------------- /** * Retrieves list of year/month partitions based on date range **/ FUNCTION GET_PARTITIONS ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pMinDate IN DATE, pMaxDate IN DATE, pParameters IN VARCHAR2 ) RETURN partition_tab IS vSql VARCHAR2(32000); vPartitions partition_tab; vKeyValuesYear DBMS_SQL.VARCHAR2_TABLE; vKeyValuesMonth DBMS_SQL.VARCHAR2_TABLE; vFullTableName VARCHAR2(200); BEGIN -- Build fully qualified table name if not already qualified IF INSTR(pTableName, '.') > 0 THEN vFullTableName := pTableName; -- Already fully qualified ELSE vFullTableName := pSchemaName || '.' || pTableName; END IF; vSql := 'SELECT DISTINCT TO_CHAR(L.LOAD_START,''YYYY'') AS YR, TO_CHAR(L.LOAD_START,''MM'') AS MN FROM ' || vFullTableName || ' T, CT_ODS.A_LOAD_HISTORY L WHERE T.' || pKeyColumnName || ' = L.A_ETL_LOAD_SET_KEY AND L.LOAD_START >= :pMinDate AND L.LOAD_START < :pMaxDate ORDER BY YR, MN'; ENV_MANAGER.LOG_PROCESS_EVENT('Executing date range query: ' || vSql, 'DEBUG', pParameters); EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValuesYear, vKeyValuesMonth USING pMinDate, pMaxDate; ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValuesYear.COUNT || ' year/month combinations to export', 'DEBUG', pParameters); -- Convert to partition_tab vPartitions := partition_tab(); vPartitions.EXTEND(vKeyValuesYear.COUNT); FOR i IN 1 .. vKeyValuesYear.COUNT LOOP vPartitions(i).year := vKeyValuesYear(i); vPartitions(i).month := vKeyValuesMonth(i); END LOOP; RETURN vPartitions; END GET_PARTITIONS; ---------------------------------------------------------------------------------------------------- /** * Exports single partition (year/month) to specified format (PARQUET or CSV) * This is the core worker procedure that will be used for parallel processing in v2.3.0 **/ PROCEDURE EXPORT_SINGLE_PARTITION ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pYear IN VARCHAR2, pMonth IN VARCHAR2, pBucketUri IN VARCHAR2, pFolderName IN VARCHAR2, pProcessedColumns IN VARCHAR2, pMinDate IN DATE, pMaxDate IN DATE, pCredentialName IN VARCHAR2, pFormat IN VARCHAR2 DEFAULT 'PARQUET', pFileBaseName IN VARCHAR2 DEFAULT NULL, pParameters IN VARCHAR2 ) IS vQuery VARCHAR2(32767); vUri VARCHAR2(4000); vFileName VARCHAR2(1000); vFullTableName VARCHAR2(200); BEGIN -- Build fully qualified table name if not already qualified IF INSTR(pTableName, '.') > 0 THEN vFullTableName := pTableName; -- Already fully qualified ELSE vFullTableName := pSchemaName || '.' || pTableName; END IF; -- Construct the query to extract data for the current year/month vQuery := 'SELECT ' || pProcessedColumns || ' FROM ' || vFullTableName || ' T, CT_ODS.A_LOAD_HISTORY L WHERE T.' || pKeyColumnName || ' = L.A_ETL_LOAD_SET_KEY AND TO_CHAR(L.LOAD_START,''YYYY'') = ' || CHR(39) || pYear || CHR(39) || ' AND TO_CHAR(L.LOAD_START,''MM'') = ' || CHR(39) || pMonth || CHR(39) || ' AND L.LOAD_START >= TO_DATE(' || CHR(39) || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'') AND L.LOAD_START < TO_DATE(' || CHR(39) || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')'; -- Construct the URI based on format IF pFormat = 'PARQUET' THEN -- Parquet: Use Hive-style partitioning vUri := pBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || 'PARTITION_YEAR=' || sanitizeFilename(pYear) || '/' || 'PARTITION_MONTH=' || sanitizeFilename(pMonth) || '/' || sanitizeFilename(pYear) || sanitizeFilename(pMonth) || '.parquet'; ENV_MANAGER.LOG_PROCESS_EVENT('Parquet export URI: ' || vUri, 'DEBUG', pParameters); DBMS_CLOUD.EXPORT_DATA( credential_name => pCredentialName, file_uri_list => vUri, query => vQuery, format => json_object('type' VALUE 'parquet') ); ELSIF pFormat = 'CSV' THEN -- CSV: Flat file structure with year/month in filename vFileName := NVL(pFileBaseName, UPPER(pTableName)) || '_' || pYear || pMonth || '.csv'; vUri := pBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || sanitizeFilename(vFileName); ENV_MANAGER.LOG_PROCESS_EVENT('CSV export URI: ' || vUri, 'DEBUG', pParameters); DBMS_CLOUD.EXPORT_DATA( credential_name => pCredentialName, file_uri_list => vUri, query => vQuery, format => json_object('type' VALUE 'CSV', 'header' VALUE true) ); ELSE RAISE_APPLICATION_ERROR(-20001, 'Unsupported format: ' || pFormat || '. Use PARQUET or CSV.'); END IF; ENV_MANAGER.LOG_PROCESS_EVENT('Processing Year/Month: ' || pYear || '/' || pMonth || ' (Format: ' || pFormat || ')', 'DEBUG', pParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', pParameters); END EXPORT_SINGLE_PARTITION; ---------------------------------------------------------------------------------------------------- -- MAIN EXPORT PROCEDURES ---------------------------------------------------------------------------------------------------- PROCEDURE EXPORT_TABLE_DATA ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pBucketArea IN VARCHAR2, pFolderName IN VARCHAR2, pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName ) IS -- Type definition for key values TYPE key_value_tab IS TABLE OF VARCHAR2(4000); vKeyValues key_value_tab; vCount INTEGER; vSql VARCHAR2(4000); vKeyValue VARCHAR2(4000); vQuery VARCHAR2(32767); vUri VARCHAR2(4000); vDataType VARCHAR2(30); vTableName VARCHAR2(128); vSchemaName VARCHAR2(128); vKeyColumnName VARCHAR2(128); vParameters VARCHAR2(4000); vBucketUri VARCHAR2(4000); vProcessedColumnList VARCHAR2(32767); vCurrentCol VARCHAR2(128); vAllColumnsList VARCHAR2(32767); BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||'''' ,'pTableName => '''||nvl(pTableName, 'NULL')||'''' ,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||'''' ,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||'''' ,'pFolderName => '''||nvl(pFolderName, 'NULL')||'''' ,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||'''' )); ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); -- Get bucket URI based on bucket area using FILE_MANAGER function vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea); -- Convert table and column names to uppercase to match data dictionary vTableName := UPPER(pTableName); vSchemaName := UPPER(pSchemaName); vKeyColumnName := UPPER(pKeyColumnName); -- Check if table exists SELECT COUNT(*) INTO vCount FROM all_tables WHERE table_name = vTableName AND owner = vSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS); END IF; -- Check if key column exists SELECT COUNT(*) INTO vCount FROM all_tab_columns WHERE table_name = vTableName AND column_name = vKeyColumnName AND owner = vSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); END IF; -- Get the data type of the key column SELECT data_type INTO vDataType FROM all_tab_columns WHERE table_name = vTableName AND column_name = vKeyColumnName AND owner = vSchemaName; -- Build list of all columns for the table (excluding key column to avoid duplication) SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id) INTO vAllColumnsList FROM all_tab_columns WHERE table_name = vTableName AND owner = vSchemaName AND column_name != vKeyColumnName; -- Process column list to add T. prefix to each column vProcessedColumnList := processColumnList(vAllColumnsList, vTableName, vSchemaName, vKeyColumnName); ENV_MANAGER.LOG_PROCESS_EVENT('Dynamic column list built (excluding key): ' || vAllColumnsList, 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with T. prefix: ' || vProcessedColumnList, 'DEBUG', vParameters); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName); -- Fetch unique key values from A_LOAD_HISTORY vSql := 'SELECT DISTINCT L.A_ETL_LOAD_SET_KEY' || ' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' || ' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY'; ENV_MANAGER.LOG_PROCESS_EVENT('Executing key values query: ' || vSql, 'DEBUG', vParameters); EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValues; ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValues.COUNT || ' unique key values to process', 'DEBUG', vParameters); -- Loop over each unique key value FOR i IN 1 .. vKeyValues.COUNT LOOP vKeyValue := vKeyValues(i); -- Construct the query to extract data for the current key value with A_WORKFLOW_HISTORY_KEY mapping IF vDataType IN ('VARCHAR2', 'CHAR', 'NCHAR', 'NVARCHAR2') THEN vQuery := 'SELECT ' || vProcessedColumnList || ' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' || ' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' || ' AND L.A_ETL_LOAD_SET_KEY = ' || CHR(39) || vKeyValue || CHR(39); ELSIF vDataType IN ('NUMBER', 'FLOAT', 'BINARY_FLOAT', 'BINARY_DOUBLE') THEN vQuery := 'SELECT ' || vProcessedColumnList || ' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' || ' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' || ' AND L.A_ETL_LOAD_SET_KEY = ' || vKeyValue; ELSIF vDataType LIKE 'TIMESTAMP%' OR vDataType = 'DATE' THEN vQuery := 'SELECT ' || vProcessedColumnList || ' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' || ' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' || ' AND L.A_ETL_LOAD_SET_KEY = TO_TIMESTAMP(' || CHR(39) || vKeyValue || CHR(39) ||', ''YYYY-MM-DD HH24:MI:SS.FF'')'; ELSE RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNSUPPORTED_DATA_TYPE, ENV_MANAGER.MSG_UNSUPPORTED_DATA_TYPE); END IF; -- Construct the URI for the file in OCI Object Storage vUri := vBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || sanitizeFilename(vKeyValue) || '.csv'; ENV_MANAGER.LOG_PROCESS_EVENT('Processing key value: ' || vKeyValue || ' (' || (i) || '/' || vKeyValues.COUNT || ')', 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Export URI: ' || vUri, 'DEBUG', vParameters); -- Use DBMS_CLOUD package to export data to the URI DBMS_CLOUD.EXPORT_DATA( credential_name => pCredentialName, file_uri_list => vUri, query => vQuery, format => json_object('type' VALUE 'CSV', 'header' VALUE true) ); END LOOP; ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp); WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in column list' ELSE '' END; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp); WHEN ENV_MANAGER.ERR_UNSUPPORTED_DATA_TYPE THEN vgMsgTmp := ENV_MANAGER.MSG_UNSUPPORTED_DATA_TYPE || ' vDataType: '||vDataType; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNSUPPORTED_DATA_TYPE, vgMsgTmp); WHEN OTHERS THEN -- Log complete error details including full stack trace and backtrace ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER'); ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END EXPORT_TABLE_DATA; ---------------------------------------------------------------------------------------------------- PROCEDURE EXPORT_TABLE_DATA_BY_DATE ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pBucketArea IN VARCHAR2, pFolderName IN VARCHAR2, pColumnList IN VARCHAR2 default NULL, pMinDate IN DATE default DATE '1900-01-01', pMaxDate IN DATE default SYSDATE, pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName ) IS vTableName VARCHAR2(128); vSchemaName VARCHAR2(128); vKeyColumnName VARCHAR2(128); vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vProcessedColumnList VARCHAR2(32767); vBucketUri VARCHAR2(4000); vCurrentCol VARCHAR2(128); vPartitions partition_tab; BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||'''' ,'pTableName => '''||nvl(pTableName, 'NULL')||'''' ,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||'''' ,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||'''' ,'pFolderName => '''||nvl(pFolderName, 'NULL')||'''' ,'pColumnList => '''||nvl(pColumnList, 'NULL')||'''' ,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' ,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' ,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||'''' )); ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); -- Get bucket URI based on bucket area using FILE_MANAGER function vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea); -- Convert table and column names to uppercase to match data dictionary vTableName := UPPER(pTableName); vSchemaName := UPPER(pSchemaName); vKeyColumnName := UPPER(pKeyColumnName); -- Validate table, key column, and column list using shared procedure VALIDATE_TABLE_AND_COLUMNS(vSchemaName, vTableName, vKeyColumnName, pColumnList, vParameters); -- Process column list to add T. prefix to each column vProcessedColumnList := processColumnList(pColumnList, vTableName, vSchemaName, vKeyColumnName); ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (building dynamic list from table metadata)'), 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list: ' || vProcessedColumnList, 'DEBUG', vParameters); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName); -- Get partitions using shared function vPartitions := GET_PARTITIONS(vSchemaName, vTableName, vKeyColumnName, pMinDate, pMaxDate, vParameters); -- Loop over each partition and export using shared worker procedure FOR i IN 1 .. vPartitions.COUNT LOOP EXPORT_SINGLE_PARTITION( pSchemaName => vSchemaName, pTableName => vTableName, pKeyColumnName => vKeyColumnName, pYear => vPartitions(i).year, pMonth => vPartitions(i).month, pBucketUri => vBucketUri, pFolderName => pFolderName, pProcessedColumns => vProcessedColumnList, pMinDate => pMinDate, pMaxDate => pMaxDate, pCredentialName => pCredentialName, pFormat => 'PARQUET', pFileBaseName => NULL, pParameters => vParameters ); END LOOP; ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp); WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp); WHEN OTHERS THEN -- Log complete error details including full stack trace and backtrace ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER'); ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END EXPORT_TABLE_DATA_BY_DATE; ---------------------------------------------------------------------------------------------------- /** * @name EXPORT_TABLE_DATA_TO_CSV_BY_DATE * @desc Exports data to a single CSV file with date filtering. * Unlike EXPORT_TABLE_DATA_BY_DATE, this procedure creates one CSV file * instead of multiple Parquet files partitioned by year/month. * Uses the same date filtering mechanism with CT_ODS.A_LOAD_HISTORY. * Allows specifying custom column list or uses T.* if pColumnList is NULL. * Validates that all columns in pColumnList exist in the target table. * Automatically adds 'T.' prefix to column names in pColumnList. * @example * begin * DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE( * pSchemaName => 'CT_MRDS', * pTableName => 'MY_TABLE', * pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', * pBucketArea => 'DATA', * pFolderName => 'exports', * pFileName => 'my_export.csv', * pColumnList => 'COLUMN1, COLUMN2, COLUMN3', -- Optional * pMinDate => DATE '2024-01-01', * pMaxDate => SYSDATE * ); * end; **/ PROCEDURE EXPORT_TABLE_DATA_TO_CSV_BY_DATE ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pBucketArea IN VARCHAR2, pFolderName IN VARCHAR2, pFileName IN VARCHAR2 DEFAULT NULL, pColumnList IN VARCHAR2 default NULL, pMinDate IN DATE default DATE '1900-01-01', pMaxDate IN DATE default SYSDATE, pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName ) IS vTableName VARCHAR2(128); vSchemaName VARCHAR2(128); vKeyColumnName VARCHAR2(128); vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vFileBaseName VARCHAR2(4000); vFileExtension VARCHAR2(10); vProcessedColumnList VARCHAR2(32767); vBucketUri VARCHAR2(4000); vCurrentCol VARCHAR2(128); vPartitions partition_tab; BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||'''' ,'pTableName => '''||nvl(pTableName, 'NULL')||'''' ,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||'''' ,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||'''' ,'pFolderName => '''||nvl(pFolderName, 'NULL')||'''' ,'pFileName => '''||nvl(pFileName, 'NULL')||'''' ,'pColumnList => '''||nvl(pColumnList, 'NULL')||'''' ,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' ,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' ,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||'''' )); ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); -- Get bucket URI based on bucket area using FILE_MANAGER function vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea); -- Convert table and column names to uppercase to match data dictionary vTableName := UPPER(pTableName); vSchemaName := UPPER(pSchemaName); vKeyColumnName := UPPER(pKeyColumnName); -- Extract base filename and extension or construct default filename IF pFileName IS NOT NULL THEN -- Use provided filename IF INSTR(pFileName, '.') > 0 THEN vFileBaseName := SUBSTR(pFileName, 1, INSTR(pFileName, '.', -1) - 1); vFileExtension := SUBSTR(pFileName, INSTR(pFileName, '.', -1)); ELSE vFileBaseName := pFileName; vFileExtension := '.csv'; END IF; ELSE -- Construct default filename: TABLENAME (without extension, will be added by worker) vFileBaseName := UPPER(pTableName); vFileExtension := '.csv'; END IF; -- Validate table, key column, and column list using shared procedure VALIDATE_TABLE_AND_COLUMNS(vSchemaName, vTableName, vKeyColumnName, pColumnList, vParameters); -- Process column list to add T. prefix to each column vProcessedColumnList := processColumnList(pColumnList, vTableName, vSchemaName, vKeyColumnName); ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (using dynamic column list)'), 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list: ' || vProcessedColumnList, 'DEBUG', vParameters); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName); -- Get partitions using shared function vPartitions := GET_PARTITIONS(vSchemaName, vTableName, vKeyColumnName, pMinDate, pMaxDate, vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vPartitions.COUNT || ' year/month combinations to export', 'INFO', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Date range: ' || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || ' to ' || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'DEBUG', vParameters); -- Loop over each partition and export using shared worker procedure FOR i IN 1 .. vPartitions.COUNT LOOP EXPORT_SINGLE_PARTITION( pSchemaName => vSchemaName, pTableName => vTableName, pKeyColumnName => vKeyColumnName, pYear => vPartitions(i).year, pMonth => vPartitions(i).month, pBucketUri => vBucketUri, pFolderName => pFolderName, pProcessedColumns => vProcessedColumnList, pMinDate => pMinDate, pMaxDate => pMaxDate, pCredentialName => pCredentialName, pFormat => 'CSV', pFileBaseName => vFileBaseName, pParameters => vParameters ); END LOOP; ENV_MANAGER.LOG_PROCESS_EVENT('Export completed successfully for ' || vPartitions.COUNT || ' files', 'INFO', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp); WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp); WHEN OTHERS THEN -- Log complete error details including full stack trace and backtrace ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER'); ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END EXPORT_TABLE_DATA_TO_CSV_BY_DATE; ---------------------------------------------------------------------------------------------------- -- VERSION MANAGEMENT FUNCTIONS ---------------------------------------------------------------------------------------------------- FUNCTION GET_VERSION RETURN VARCHAR2 IS BEGIN RETURN PACKAGE_VERSION; END GET_VERSION; ---------------------------------------------------------------------------------------------------- FUNCTION GET_BUILD_INFO RETURN VARCHAR2 IS BEGIN RETURN ENV_MANAGER.GET_PACKAGE_VERSION_INFO( pPackageName => 'DATA_EXPORTER', pVersion => PACKAGE_VERSION, pBuildDate => PACKAGE_BUILD_DATE, pAuthor => PACKAGE_AUTHOR ); END GET_BUILD_INFO; ---------------------------------------------------------------------------------------------------- FUNCTION GET_VERSION_HISTORY RETURN VARCHAR2 IS BEGIN RETURN ENV_MANAGER.FORMAT_VERSION_HISTORY( pPackageName => 'DATA_EXPORTER', pVersionHistory => VERSION_HISTORY ); END GET_VERSION_HISTORY; ---------------------------------------------------------------------------------------------------- END; /