create or replace PACKAGE BODY CT_MRDS.DATA_EXPORTER AS -- Internal shared function to process column list with T. prefix and key column mapping FUNCTION processColumnList(pColumnList IN VARCHAR2, pTableName IN VARCHAR2, pSchemaName IN VARCHAR2, pKeyColumnName IN VARCHAR2) RETURN VARCHAR2 IS vResult VARCHAR2(32767); vColumns VARCHAR2(32767); vPos PLS_INTEGER; vNextPos PLS_INTEGER; vCurrentCol VARCHAR2(128); vAllCols VARCHAR2(32767); BEGIN IF pColumnList IS NULL THEN -- Build list of all columns SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id) INTO vAllCols FROM all_tab_columns WHERE table_name = pTableName AND owner = pSchemaName; -- Add T. prefix to all columns vResult := 'T.' || REPLACE(vAllCols, ', ', ', T.'); -- Replace key column with aliased version (e.g., T.A_ETL_LOAD_SET_KEY_FK AS A_WORKFLOW_HISTORY_KEY) vResult := REPLACE(vResult, 'T.' || pKeyColumnName, 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY'); RETURN vResult; END IF; -- Remove extra spaces and convert to uppercase vColumns := UPPER(REPLACE(pColumnList, ' ', '')); vPos := 1; vResult := ''; -- Parse comma-separated column list and add T. prefix WHILE vPos <= LENGTH(vColumns) LOOP vNextPos := INSTR(vColumns, ',', vPos); IF vNextPos = 0 THEN vNextPos := LENGTH(vColumns) + 1; END IF; vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos); -- Check if this is the key column (e.g., A_ETL_LOAD_SET_KEY_FK) and add alias IF UPPER(vCurrentCol) = UPPER(pKeyColumnName) THEN vCurrentCol := 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY'; ELSIF UPPER(vCurrentCol) = 'A_ETL_LOAD_SET_KEY' THEN vCurrentCol := 'T.A_ETL_LOAD_SET_KEY AS A_WORKFLOW_HISTORY_KEY'; ELSE -- Add T. prefix if not already present IF INSTR(vCurrentCol, '.') = 0 THEN vCurrentCol := 'T.' || vCurrentCol; END IF; END IF; -- Add to result with comma separator IF vResult IS NOT NULL THEN vResult := vResult || ', '; END IF; vResult := vResult || vCurrentCol; vPos := vNextPos + 1; END LOOP; RETURN vResult; END processColumnList; ---------------------------------------------------------------------------------------------------- PROCEDURE EXPORT_TABLE_DATA ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pBucketArea IN VARCHAR2, pFolderName IN VARCHAR2, pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName ) IS -- Type definition for key values TYPE key_value_tab IS TABLE OF VARCHAR2(4000); vKeyValues key_value_tab; vCount INTEGER; vSql VARCHAR2(4000); vKeyValue VARCHAR2(4000); vQuery VARCHAR2(32767); vUri VARCHAR2(4000); vDataType VARCHAR2(30); vTableName VARCHAR2(128); vSchemaName VARCHAR2(128); vKeyColumnName VARCHAR2(128); vParameters VARCHAR2(4000); vBucketUri VARCHAR2(4000); vProcessedColumnList VARCHAR2(32767); vCurrentCol VARCHAR2(128); vAllColumnsList VARCHAR2(32767); -- Function to sanitize file names FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS vFilename VARCHAR2(1000); BEGIN -- Replace any disallowed characters with underscores vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_'); RETURN vFilename; END sanitizeFilename; BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||'''' ,'pTableName => '''||nvl(pTableName, 'NULL')||'''' ,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||'''' ,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||'''' ,'pFolderName => '''||nvl(pFolderName, 'NULL')||'''' ,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||'''' )); ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); -- Get bucket URI based on bucket area using FILE_MANAGER function vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea); -- Convert table and column names to uppercase to match data dictionary vTableName := UPPER(pTableName); vSchemaName := UPPER(pSchemaName); vKeyColumnName := UPPER(pKeyColumnName); -- Check if table exists SELECT COUNT(*) INTO vCount FROM all_tables WHERE table_name = vTableName AND owner = vSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS); END IF; -- Check if key column exists SELECT COUNT(*) INTO vCount FROM all_tab_columns WHERE table_name = vTableName AND column_name = vKeyColumnName AND owner = vSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); END IF; -- Get the data type of the key column SELECT data_type INTO vDataType FROM all_tab_columns WHERE table_name = vTableName AND column_name = vKeyColumnName AND owner = vSchemaName; -- Build list of all columns for the table (excluding key column to avoid duplication) SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id) INTO vAllColumnsList FROM all_tab_columns WHERE table_name = vTableName AND owner = vSchemaName AND column_name != vKeyColumnName; -- Process column list to add T. prefix to each column vProcessedColumnList := processColumnList(vAllColumnsList, vTableName, vSchemaName, vKeyColumnName); ENV_MANAGER.LOG_PROCESS_EVENT('Dynamic column list built (excluding key): ' || vAllColumnsList, 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with T. prefix: ' || vProcessedColumnList, 'DEBUG', vParameters); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName); -- Fetch unique key values from A_LOAD_HISTORY vSql := 'SELECT DISTINCT L.A_ETL_LOAD_SET_KEY' || ' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' || ' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY'; ENV_MANAGER.LOG_PROCESS_EVENT('Executing key values query: ' || vSql, 'DEBUG', vParameters); EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValues; ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValues.COUNT || ' unique key values to process', 'DEBUG', vParameters); -- Loop over each unique key value FOR i IN 1 .. vKeyValues.COUNT LOOP vKeyValue := vKeyValues(i); -- Construct the query to extract data for the current key value with A_WORKFLOW_HISTORY_KEY mapping IF vDataType IN ('VARCHAR2', 'CHAR', 'NCHAR', 'NVARCHAR2') THEN vQuery := 'SELECT ' || vProcessedColumnList || ' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' || ' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' || ' AND L.A_ETL_LOAD_SET_KEY = ' || CHR(39) || vKeyValue || CHR(39); ELSIF vDataType IN ('NUMBER', 'FLOAT', 'BINARY_FLOAT', 'BINARY_DOUBLE') THEN vQuery := 'SELECT ' || vProcessedColumnList || ' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' || ' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' || ' AND L.A_ETL_LOAD_SET_KEY = ' || vKeyValue; ELSIF vDataType LIKE 'TIMESTAMP%' OR vDataType = 'DATE' THEN vQuery := 'SELECT ' || vProcessedColumnList || ' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' || ' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' || ' AND L.A_ETL_LOAD_SET_KEY = TO_TIMESTAMP(' || CHR(39) || vKeyValue || CHR(39) ||', ''YYYY-MM-DD HH24:MI:SS.FF'')'; ELSE RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNSUPPORTED_DATA_TYPE, ENV_MANAGER.MSG_UNSUPPORTED_DATA_TYPE); END IF; -- Construct the URI for the file in OCI Object Storage vUri := vBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || sanitizeFilename(vKeyValue) || '.csv'; ENV_MANAGER.LOG_PROCESS_EVENT('Processing key value: ' || vKeyValue || ' (' || (i) || '/' || vKeyValues.COUNT || ')', 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Export URI: ' || vUri, 'DEBUG', vParameters); -- Use DBMS_CLOUD package to export data to the URI DBMS_CLOUD.EXPORT_DATA( credential_name => pCredentialName, file_uri_list => vUri, query => vQuery, format => json_object('type' VALUE 'CSV', 'header' VALUE true) ); END LOOP; ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp); WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in column list' ELSE '' END; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp); WHEN ENV_MANAGER.ERR_UNSUPPORTED_DATA_TYPE THEN vgMsgTmp := ENV_MANAGER.MSG_UNSUPPORTED_DATA_TYPE || ' vDataType: '||vDataType; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNSUPPORTED_DATA_TYPE, vgMsgTmp); WHEN OTHERS THEN -- Log complete error details including full stack trace and backtrace ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER'); ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END EXPORT_TABLE_DATA; ---------------------------------------------------------------------------------------------------- PROCEDURE EXPORT_TABLE_DATA_BY_DATE ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pBucketArea IN VARCHAR2, pFolderName IN VARCHAR2, pColumnList IN VARCHAR2 default NULL, pMinDate IN DATE default DATE '1900-01-01', pMaxDate IN DATE default SYSDATE, pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName ) IS -- Type definition for key values TYPE key_value_tab IS TABLE OF VARCHAR2(4000); vKeyValuesYear key_value_tab; vKeyValuesMonth key_value_tab; vCount INTEGER; vSql VARCHAR2(32000); vKeyValueYear VARCHAR2(4000); vKeyValueMonth VARCHAR2(4000); vQuery VARCHAR2(32767); vUri VARCHAR2(4000); vDataType VARCHAR2(30); vTableName VARCHAR2(128); vSchemaName VARCHAR2(128); vKeyColumnName VARCHAR2(128); vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vProcessedColumnList VARCHAR2(32767); vBucketUri VARCHAR2(4000); vCurrentCol VARCHAR2(128); -- Function to sanitize file names FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS vFilename VARCHAR2(1000); BEGIN -- Replace any disallowed characters with underscores vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_'); RETURN vFilename; END sanitizeFilename; BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||'''' ,'pTableName => '''||nvl(pTableName, 'NULL')||'''' ,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||'''' ,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||'''' ,'pFolderName => '''||nvl(pFolderName, 'NULL')||'''' ,'pColumnList => '''||nvl(pColumnList, 'NULL')||'''' ,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' ,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' ,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||'''' )); ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); -- Get bucket URI based on bucket area using FILE_MANAGER function vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea); -- Convert table and column names to uppercase to match data dictionary vTableName := UPPER(pTableName); vSchemaName := UPPER(pSchemaName); vKeyColumnName := UPPER(pKeyColumnName); -- Check if table exists SELECT COUNT(*) INTO vCount FROM all_tables WHERE table_name = vTableName AND owner = vSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS); END IF; -- Check if key column exists SELECT COUNT(*) INTO vCount FROM all_tab_columns WHERE table_name = vTableName AND column_name = vKeyColumnName AND owner = vSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); END IF; -- Validate pColumnList - check if all column names exist in the table IF pColumnList IS NOT NULL THEN DECLARE vColumnName VARCHAR2(128); vColumns VARCHAR2(32767); vPos PLS_INTEGER; vNextPos PLS_INTEGER; vCurrentCol VARCHAR2(128); BEGIN -- Remove spaces and convert to uppercase for processing vColumns := UPPER(REPLACE(pColumnList, ' ', '')); vPos := 1; -- Parse comma-separated column list WHILE vPos <= LENGTH(vColumns) LOOP vNextPos := INSTR(vColumns, ',', vPos); IF vNextPos = 0 THEN vNextPos := LENGTH(vColumns) + 1; END IF; vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos); -- Remove table alias prefix if present (e.g., 'T.COLUMN_NAME' -> 'COLUMN_NAME') IF INSTR(vCurrentCol, '.') > 0 THEN vCurrentCol := SUBSTR(vCurrentCol, INSTR(vCurrentCol, '.') + 1); END IF; -- Check if column exists in the table SELECT COUNT(*) INTO vCount FROM all_tab_columns WHERE table_name = vTableName AND column_name = vCurrentCol AND owner = vSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); END IF; vPos := vNextPos + 1; END LOOP; END; END IF; -- Process column list to add T. prefix to each column vProcessedColumnList := processColumnList(pColumnList, vTableName, vSchemaName, vKeyColumnName); ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (building dynamic list from table metadata)'), 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list: ' || vProcessedColumnList, 'DEBUG', vParameters); vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName); -- Fetch unique key values vSql := 'SELECT DISTINCT TO_CHAR(L.LOAD_START,''YYYY'') AS YR, TO_CHAR(L.LOAD_START,''MM'') AS MN FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY AND L.LOAD_START >= :pMinDate AND L.LOAD_START < :pMaxDate ' ; ENV_MANAGER.LOG_PROCESS_EVENT('Executing date range query: ' || vSql, 'DEBUG', vParameters); EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValuesYear, vKeyValuesMonth USING pMinDate, pMaxDate; ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValuesYear.COUNT || ' year/month combinations to export', 'DEBUG', vParameters); -- Loop over each unique key value FOR i IN 1 .. vKeyValuesYear.COUNT LOOP vKeyValueYear := vKeyValuesYear(i); vKeyValueMonth := vKeyValuesMonth(i); ENV_MANAGER.LOG_PROCESS_EVENT('Processing Year/Month: ' || vKeyValueYear || '/' || vKeyValueMonth || ' (' || i || '/' || vKeyValuesYear.COUNT || ')', 'DEBUG', vParameters); -- Construct the query to extract data for the current key value -- Note: processColumnList already handles A_WORKFLOW_HISTORY_KEY aliasing vQuery := 'SELECT ' || vProcessedColumnList || ' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY AND TO_CHAR(L.LOAD_START,''YYYY'') = ' || CHR(39) || vKeyValueYear || CHR(39) || ' AND TO_CHAR(L.LOAD_START,''MM'') = ' || CHR(39) || vKeyValueMonth || CHR(39) || ' AND L.LOAD_START >= TO_DATE(' || CHR(39) || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'') AND L.LOAD_START < TO_DATE(' || CHR(39) || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')'; -- Construct the URI for the file in OCI Object Storage vUri := vBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || 'PARTITION_YEAR=' || sanitizeFilename(vKeyValueYear) || '/' || 'PARTITION_MONTH=' || sanitizeFilename(vKeyValueMonth) || '/' || sanitizeFilename(vKeyValueYear) || sanitizeFilename(vKeyValueMonth) || '.parquet'; ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Parquet export URI: ' || vUri, 'DEBUG', vParameters); -- Use DBMS_CLOUD package to export data to the URI DBMS_CLOUD.EXPORT_DATA( credential_name => pCredentialName, file_uri_list => vUri, query => vQuery, format => json_object('type' VALUE 'parquet') ); END LOOP; ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp); WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp); WHEN OTHERS THEN -- Log complete error details including full stack trace and backtrace ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER'); ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END EXPORT_TABLE_DATA_BY_DATE; ---------------------------------------------------------------------------------------------------- /** * @name EXPORT_TABLE_DATA_TO_CSV_BY_DATE * @desc Exports data to a single CSV file with date filtering. * Unlike EXPORT_TABLE_DATA_BY_DATE, this procedure creates one CSV file * instead of multiple Parquet files partitioned by year/month. * Uses the same date filtering mechanism with CT_ODS.A_LOAD_HISTORY. * Allows specifying custom column list or uses T.* if pColumnList is NULL. * Validates that all columns in pColumnList exist in the target table. * Automatically adds 'T.' prefix to column names in pColumnList. * @example * begin * DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE( * pSchemaName => 'CT_MRDS', * pTableName => 'MY_TABLE', * pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK', * pBucketArea => 'DATA', * pFolderName => 'exports', * pFileName => 'my_export.csv', * pColumnList => 'COLUMN1, COLUMN2, COLUMN3', -- Optional * pMinDate => DATE '2024-01-01', * pMaxDate => SYSDATE * ); * end; **/ PROCEDURE EXPORT_TABLE_DATA_TO_CSV_BY_DATE ( pSchemaName IN VARCHAR2, pTableName IN VARCHAR2, pKeyColumnName IN VARCHAR2, pBucketArea IN VARCHAR2, pFolderName IN VARCHAR2, pFileName IN VARCHAR2 DEFAULT NULL, pColumnList IN VARCHAR2 default NULL, pMinDate IN DATE default DATE '1900-01-01', pMaxDate IN DATE default SYSDATE, pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName ) IS -- Type definition for key values TYPE key_value_tab IS TABLE OF VARCHAR2(4000); vKeyValuesYear key_value_tab; vKeyValuesMonth key_value_tab; vCount INTEGER; vSql VARCHAR2(4000); vKeyValueYear VARCHAR2(4000); vKeyValueMonth VARCHAR2(4000); vQuery VARCHAR2(32767); vUri VARCHAR2(4000); vDataType VARCHAR2(30); vTableName VARCHAR2(128); vSchemaName VARCHAR2(128); vKeyColumnName VARCHAR2(128); vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE; vFileBaseName VARCHAR2(4000); vFileExtension VARCHAR2(10); vProcessedColumnList VARCHAR2(32767); vBucketUri VARCHAR2(4000); vCurrentCol VARCHAR2(128); -- Function to sanitize file names FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS vFilename VARCHAR2(1000); BEGIN -- Replace any disallowed characters with underscores vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_'); RETURN vFilename; END sanitizeFilename; BEGIN vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||'''' ,'pTableName => '''||nvl(pTableName, 'NULL')||'''' ,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||'''' ,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||'''' ,'pFolderName => '''||nvl(pFolderName, 'NULL')||'''' ,'pFileName => '''||nvl(pFileName, 'NULL')||'''' ,'pColumnList => '''||nvl(pColumnList, 'NULL')||'''' ,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' ,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||'''' ,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||'''' )); ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters); -- Get bucket URI based on bucket area using FILE_MANAGER function vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea); -- Convert table and column names to uppercase to match data dictionary vTableName := UPPER(pTableName); vSchemaName := UPPER(pSchemaName); vKeyColumnName := UPPER(pKeyColumnName); -- Extract base filename and extension or construct default filename IF pFileName IS NOT NULL THEN -- Use provided filename IF INSTR(pFileName, '.') > 0 THEN vFileBaseName := SUBSTR(pFileName, 1, INSTR(pFileName, '.', -1) - 1); vFileExtension := SUBSTR(pFileName, INSTR(pFileName, '.', -1)); ELSE vFileBaseName := pFileName; vFileExtension := '.csv'; END IF; ELSE -- Construct default filename: TABLENAME.csv (without date range) vFileBaseName := UPPER(pTableName); vFileExtension := '.csv'; END IF; -- Check if table exists SELECT COUNT(*) INTO vCount FROM all_tables WHERE table_name = vTableName AND owner = vSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS); END IF; -- Check if key column exists SELECT COUNT(*) INTO vCount FROM all_tab_columns WHERE table_name = vTableName AND column_name = vKeyColumnName AND owner = vSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); END IF; -- Validate pColumnList - check if all column names exist in the table IF pColumnList IS NOT NULL THEN DECLARE vColumnName VARCHAR2(128); vColumns VARCHAR2(32767); vPos PLS_INTEGER; vNextPos PLS_INTEGER; vCurrentCol VARCHAR2(128); BEGIN -- Remove spaces and convert to uppercase for processing vColumns := UPPER(REPLACE(pColumnList, ' ', '')); vPos := 1; -- Parse comma-separated column list WHILE vPos <= LENGTH(vColumns) LOOP vNextPos := INSTR(vColumns, ',', vPos); IF vNextPos = 0 THEN vNextPos := LENGTH(vColumns) + 1; END IF; vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos); -- Remove table alias prefix if present (e.g., 'T.COLUMN_NAME' -> 'COLUMN_NAME') IF INSTR(vCurrentCol, '.') > 0 THEN vCurrentCol := SUBSTR(vCurrentCol, INSTR(vCurrentCol, '.') + 1); END IF; -- Check if column exists in the table SELECT COUNT(*) INTO vCount FROM all_tab_columns WHERE table_name = vTableName AND column_name = vCurrentCol AND owner = vSchemaName; IF vCount = 0 THEN RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS); END IF; vPos := vNextPos + 1; END LOOP; END; END IF; -- Process column list to add T. prefix to each column vProcessedColumnList := processColumnList(pColumnList, vTableName, vSchemaName, vKeyColumnName); ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (using dynamic column list)'), 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list: ' || vProcessedColumnList, 'DEBUG', vParameters); -- Get the data type of the key column SELECT data_type INTO vDataType FROM all_tab_columns WHERE table_name = vTableName AND column_name = vKeyColumnName AND owner = vSchemaName; vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName); -- Fetch unique year/month combinations vSql := 'SELECT DISTINCT TO_CHAR(L.LOAD_START,''YYYY'') AS YR, TO_CHAR(L.LOAD_START,''MM'') AS MN FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY AND L.LOAD_START >= :pMinDate AND L.LOAD_START < :pMaxDate ' ; ENV_MANAGER.LOG_PROCESS_EVENT('Executing date range query: ' || vSql, 'DEBUG', vParameters); EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValuesYear, vKeyValuesMonth USING pMinDate, pMaxDate; ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValuesYear.COUNT || ' year/month combinations to export', 'INFO', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Date range: ' || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || ' to ' || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'DEBUG', vParameters); -- Loop over each unique year/month combination FOR i IN 1 .. vKeyValuesYear.COUNT LOOP vKeyValueYear := vKeyValuesYear(i); vKeyValueMonth := vKeyValuesMonth(i); -- Construct the query to extract data for the current year/month vQuery := 'SELECT ' || vProcessedColumnList || ' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY AND TO_CHAR(L.LOAD_START,''YYYY'') = ' || CHR(39) || vKeyValueYear || CHR(39) || ' AND TO_CHAR(L.LOAD_START,''MM'') = ' || CHR(39) || vKeyValueMonth || CHR(39) || ' AND L.LOAD_START >= TO_DATE(' || CHR(39) || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'') AND L.LOAD_START < TO_DATE(' || CHR(39) || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')'; -- Construct the URI for the CSV file in OCI Object Storage vUri := vBucketUri || CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END || sanitizeFilename(vFileBaseName) || '_' || sanitizeFilename(vKeyValueYear) || sanitizeFilename(vKeyValueMonth) || vFileExtension; ENV_MANAGER.LOG_PROCESS_EVENT('Exporting to CSV file: ' || vUri, 'INFO', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Processing Year/Month: ' || vKeyValueYear || '/' || vKeyValueMonth || ' (' || i || '/' || vKeyValuesYear.COUNT || ')', 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('File name pattern: ' || vFileBaseName || '_' || vKeyValueYear || vKeyValueMonth || vFileExtension, 'DEBUG', vParameters); -- Use DBMS_CLOUD package to export data to CSV file DBMS_CLOUD.EXPORT_DATA( credential_name => pCredentialName, file_uri_list => vUri, query => vQuery, format => json_object('type' VALUE 'CSV', 'header' VALUE true) ); END LOOP; ENV_MANAGER.LOG_PROCESS_EVENT('Export completed successfully for ' || vKeyValuesYear.COUNT || ' files', 'INFO', vParameters); ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters); EXCEPTION WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp); WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END; ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp); WHEN OTHERS THEN -- Log complete error details including full stack trace and backtrace ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER'); ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters); RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE)); END EXPORT_TABLE_DATA_TO_CSV_BY_DATE; ---------------------------------------------------------------------------------------------------- -- VERSION MANAGEMENT FUNCTIONS ---------------------------------------------------------------------------------------------------- FUNCTION GET_VERSION RETURN VARCHAR2 IS BEGIN RETURN PACKAGE_VERSION; END GET_VERSION; ---------------------------------------------------------------------------------------------------- FUNCTION GET_BUILD_INFO RETURN VARCHAR2 IS BEGIN RETURN ENV_MANAGER.GET_PACKAGE_VERSION_INFO( pPackageName => 'DATA_EXPORTER', pVersion => PACKAGE_VERSION, pBuildDate => PACKAGE_BUILD_DATE, pAuthor => PACKAGE_AUTHOR ); END GET_BUILD_INFO; ---------------------------------------------------------------------------------------------------- FUNCTION GET_VERSION_HISTORY RETURN VARCHAR2 IS BEGIN RETURN ENV_MANAGER.FORMAT_VERSION_HISTORY( pPackageName => 'DATA_EXPORTER', pVersionHistory => VERSION_HISTORY ); END GET_VERSION_HISTORY; ---------------------------------------------------------------------------------------------------- END; /