734 lines
36 KiB
Plaintext
734 lines
36 KiB
Plaintext
create or replace PACKAGE BODY CT_MRDS.DATA_EXPORTER
|
|
AS
|
|
|
|
-- Internal shared function to process column list with T. prefix and key column mapping
|
|
FUNCTION processColumnList(pColumnList IN VARCHAR2, pTableName IN VARCHAR2, pSchemaName IN VARCHAR2, pKeyColumnName IN VARCHAR2) RETURN VARCHAR2 IS
|
|
vResult VARCHAR2(32767);
|
|
vColumns VARCHAR2(32767);
|
|
vPos PLS_INTEGER;
|
|
vNextPos PLS_INTEGER;
|
|
vCurrentCol VARCHAR2(128);
|
|
vAllCols VARCHAR2(32767);
|
|
BEGIN
|
|
IF pColumnList IS NULL THEN
|
|
-- Build list of all columns
|
|
SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id)
|
|
INTO vAllCols
|
|
FROM all_tab_columns
|
|
WHERE table_name = pTableName
|
|
AND owner = pSchemaName;
|
|
|
|
-- Add T. prefix to all columns
|
|
vResult := 'T.' || REPLACE(vAllCols, ', ', ', T.');
|
|
|
|
-- Replace key column with aliased version (e.g., T.A_ETL_LOAD_SET_KEY_FK AS A_WORKFLOW_HISTORY_KEY)
|
|
vResult := REPLACE(vResult, 'T.' || pKeyColumnName, 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY');
|
|
|
|
RETURN vResult;
|
|
END IF;
|
|
|
|
-- Remove extra spaces and convert to uppercase
|
|
vColumns := UPPER(REPLACE(pColumnList, ' ', ''));
|
|
vPos := 1;
|
|
vResult := '';
|
|
|
|
-- Parse comma-separated column list and add T. prefix
|
|
WHILE vPos <= LENGTH(vColumns) LOOP
|
|
vNextPos := INSTR(vColumns, ',', vPos);
|
|
IF vNextPos = 0 THEN
|
|
vNextPos := LENGTH(vColumns) + 1;
|
|
END IF;
|
|
|
|
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
|
|
|
|
-- Check if this is the key column (e.g., A_ETL_LOAD_SET_KEY_FK) and add alias
|
|
IF UPPER(vCurrentCol) = UPPER(pKeyColumnName) THEN
|
|
vCurrentCol := 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY';
|
|
ELSIF UPPER(vCurrentCol) = 'A_ETL_LOAD_SET_KEY' THEN
|
|
vCurrentCol := 'T.A_ETL_LOAD_SET_KEY AS A_WORKFLOW_HISTORY_KEY';
|
|
ELSE
|
|
-- Add T. prefix if not already present
|
|
IF INSTR(vCurrentCol, '.') = 0 THEN
|
|
vCurrentCol := 'T.' || vCurrentCol;
|
|
END IF;
|
|
END IF;
|
|
|
|
-- Add to result with comma separator
|
|
IF vResult IS NOT NULL THEN
|
|
vResult := vResult || ', ';
|
|
END IF;
|
|
vResult := vResult || vCurrentCol;
|
|
|
|
vPos := vNextPos + 1;
|
|
END LOOP;
|
|
|
|
RETURN vResult;
|
|
END processColumnList;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
PROCEDURE EXPORT_TABLE_DATA (
|
|
pSchemaName IN VARCHAR2,
|
|
pTableName IN VARCHAR2,
|
|
pKeyColumnName IN VARCHAR2,
|
|
pBucketArea IN VARCHAR2,
|
|
pFolderName IN VARCHAR2,
|
|
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
|
|
)
|
|
IS
|
|
-- Type definition for key values
|
|
TYPE key_value_tab IS TABLE OF VARCHAR2(4000);
|
|
vKeyValues key_value_tab;
|
|
vCount INTEGER;
|
|
vSql VARCHAR2(4000);
|
|
vKeyValue VARCHAR2(4000);
|
|
vQuery VARCHAR2(32767);
|
|
vUri VARCHAR2(4000);
|
|
vDataType VARCHAR2(30);
|
|
vTableName VARCHAR2(128);
|
|
vSchemaName VARCHAR2(128);
|
|
vKeyColumnName VARCHAR2(128);
|
|
vParameters VARCHAR2(4000);
|
|
vBucketUri VARCHAR2(4000);
|
|
vProcessedColumnList VARCHAR2(32767);
|
|
vCurrentCol VARCHAR2(128);
|
|
vAllColumnsList VARCHAR2(32767);
|
|
|
|
|
|
-- Function to sanitize file names
|
|
FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS
|
|
vFilename VARCHAR2(1000);
|
|
BEGIN
|
|
-- Replace any disallowed characters with underscores
|
|
vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_');
|
|
RETURN vFilename;
|
|
END sanitizeFilename;
|
|
|
|
BEGIN
|
|
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
|
|
,'pTableName => '''||nvl(pTableName, 'NULL')||''''
|
|
,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||''''
|
|
,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||''''
|
|
,'pFolderName => '''||nvl(pFolderName, 'NULL')||''''
|
|
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
|
|
));
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
|
|
|
|
-- Get bucket URI based on bucket area using FILE_MANAGER function
|
|
vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea);
|
|
|
|
-- Convert table and column names to uppercase to match data dictionary
|
|
vTableName := UPPER(pTableName);
|
|
vSchemaName := UPPER(pSchemaName);
|
|
vKeyColumnName := UPPER(pKeyColumnName);
|
|
|
|
-- Check if table exists
|
|
SELECT COUNT(*) INTO vCount
|
|
FROM all_tables
|
|
WHERE table_name = vTableName
|
|
AND owner = vSchemaName;
|
|
|
|
IF vCount = 0 THEN
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS);
|
|
END IF;
|
|
|
|
-- Check if key column exists
|
|
SELECT COUNT(*) INTO vCount
|
|
FROM all_tab_columns
|
|
WHERE table_name = vTableName
|
|
AND column_name = vKeyColumnName
|
|
AND owner = vSchemaName;
|
|
|
|
IF vCount = 0 THEN
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
|
|
|
|
END IF;
|
|
|
|
-- Get the data type of the key column
|
|
SELECT data_type INTO vDataType
|
|
FROM all_tab_columns
|
|
WHERE table_name = vTableName
|
|
AND column_name = vKeyColumnName
|
|
AND owner = vSchemaName;
|
|
|
|
-- Build list of all columns for the table (excluding key column to avoid duplication)
|
|
SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id)
|
|
INTO vAllColumnsList
|
|
FROM all_tab_columns
|
|
WHERE table_name = vTableName
|
|
AND owner = vSchemaName
|
|
AND column_name != vKeyColumnName;
|
|
|
|
-- Process column list to add T. prefix to each column
|
|
vProcessedColumnList := processColumnList(vAllColumnsList, vTableName, vSchemaName, vKeyColumnName);
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Dynamic column list built (excluding key): ' || vAllColumnsList, 'DEBUG', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with T. prefix: ' || vProcessedColumnList, 'DEBUG', vParameters);
|
|
|
|
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName);
|
|
-- Fetch unique key values from A_LOAD_HISTORY
|
|
vSql := 'SELECT DISTINCT L.A_ETL_LOAD_SET_KEY' ||
|
|
' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' ||
|
|
' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY';
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Executing key values query: ' || vSql, 'DEBUG', vParameters);
|
|
EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValues;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValues.COUNT || ' unique key values to process', 'DEBUG', vParameters);
|
|
|
|
-- Loop over each unique key value
|
|
FOR i IN 1 .. vKeyValues.COUNT LOOP
|
|
vKeyValue := vKeyValues(i);
|
|
|
|
-- Construct the query to extract data for the current key value with A_WORKFLOW_HISTORY_KEY mapping
|
|
IF vDataType IN ('VARCHAR2', 'CHAR', 'NCHAR', 'NVARCHAR2') THEN
|
|
vQuery := 'SELECT ' || vProcessedColumnList ||
|
|
' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' ||
|
|
' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' ||
|
|
' AND L.A_ETL_LOAD_SET_KEY = ' || CHR(39) || vKeyValue || CHR(39);
|
|
ELSIF vDataType IN ('NUMBER', 'FLOAT', 'BINARY_FLOAT', 'BINARY_DOUBLE') THEN
|
|
vQuery := 'SELECT ' || vProcessedColumnList ||
|
|
' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' ||
|
|
' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' ||
|
|
' AND L.A_ETL_LOAD_SET_KEY = ' || vKeyValue;
|
|
ELSIF vDataType LIKE 'TIMESTAMP%' OR vDataType = 'DATE' THEN
|
|
vQuery := 'SELECT ' || vProcessedColumnList ||
|
|
' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' ||
|
|
' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' ||
|
|
' AND L.A_ETL_LOAD_SET_KEY = TO_TIMESTAMP(' || CHR(39) || vKeyValue || CHR(39) ||', ''YYYY-MM-DD HH24:MI:SS.FF'')';
|
|
ELSE
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNSUPPORTED_DATA_TYPE, ENV_MANAGER.MSG_UNSUPPORTED_DATA_TYPE);
|
|
END IF;
|
|
|
|
-- Construct the URI for the file in OCI Object Storage
|
|
vUri := vBucketUri ||
|
|
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
|
|
sanitizeFilename(vKeyValue) || '.csv';
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Processing key value: ' || vKeyValue || ' (' || (i) || '/' || vKeyValues.COUNT || ')', 'DEBUG', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Export URI: ' || vUri, 'DEBUG', vParameters);
|
|
|
|
-- Use DBMS_CLOUD package to export data to the URI
|
|
DBMS_CLOUD.EXPORT_DATA(
|
|
credential_name => pCredentialName,
|
|
file_uri_list => vUri,
|
|
query => vQuery,
|
|
format => json_object('type' VALUE 'CSV', 'header' VALUE true)
|
|
);
|
|
END LOOP;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
|
|
EXCEPTION
|
|
WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN
|
|
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
|
|
WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN
|
|
vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in column list' ELSE '' END;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp);
|
|
WHEN ENV_MANAGER.ERR_UNSUPPORTED_DATA_TYPE THEN
|
|
vgMsgTmp := ENV_MANAGER.MSG_UNSUPPORTED_DATA_TYPE || ' vDataType: '||vDataType;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNSUPPORTED_DATA_TYPE, vgMsgTmp);
|
|
WHEN OTHERS THEN
|
|
-- Log complete error details including full stack trace and backtrace
|
|
ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER');
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
|
|
|
|
END EXPORT_TABLE_DATA;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
PROCEDURE EXPORT_TABLE_DATA_BY_DATE (
|
|
pSchemaName IN VARCHAR2,
|
|
pTableName IN VARCHAR2,
|
|
pKeyColumnName IN VARCHAR2,
|
|
pBucketArea IN VARCHAR2,
|
|
pFolderName IN VARCHAR2,
|
|
pColumnList IN VARCHAR2 default NULL,
|
|
pMinDate IN DATE default DATE '1900-01-01',
|
|
pMaxDate IN DATE default SYSDATE,
|
|
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
|
|
)
|
|
IS
|
|
-- Type definition for key values
|
|
TYPE key_value_tab IS TABLE OF VARCHAR2(4000);
|
|
|
|
vKeyValuesYear key_value_tab;
|
|
vKeyValuesMonth key_value_tab;
|
|
|
|
vCount INTEGER;
|
|
vSql VARCHAR2(32000);
|
|
vKeyValueYear VARCHAR2(4000);
|
|
vKeyValueMonth VARCHAR2(4000);
|
|
vQuery VARCHAR2(32767);
|
|
vUri VARCHAR2(4000);
|
|
vDataType VARCHAR2(30);
|
|
vTableName VARCHAR2(128);
|
|
vSchemaName VARCHAR2(128);
|
|
vKeyColumnName VARCHAR2(128);
|
|
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
|
|
vProcessedColumnList VARCHAR2(32767);
|
|
vBucketUri VARCHAR2(4000);
|
|
vCurrentCol VARCHAR2(128);
|
|
|
|
-- Function to sanitize file names
|
|
FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS
|
|
vFilename VARCHAR2(1000);
|
|
BEGIN
|
|
-- Replace any disallowed characters with underscores
|
|
vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_');
|
|
RETURN vFilename;
|
|
END sanitizeFilename;
|
|
|
|
BEGIN
|
|
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
|
|
,'pTableName => '''||nvl(pTableName, 'NULL')||''''
|
|
,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||''''
|
|
,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||''''
|
|
,'pFolderName => '''||nvl(pFolderName, 'NULL')||''''
|
|
,'pColumnList => '''||nvl(pColumnList, 'NULL')||''''
|
|
,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
|
|
,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
|
|
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
|
|
));
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
|
|
|
|
-- Get bucket URI based on bucket area using FILE_MANAGER function
|
|
vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea);
|
|
|
|
-- Convert table and column names to uppercase to match data dictionary
|
|
vTableName := UPPER(pTableName);
|
|
vSchemaName := UPPER(pSchemaName);
|
|
vKeyColumnName := UPPER(pKeyColumnName);
|
|
|
|
-- Check if table exists
|
|
SELECT COUNT(*) INTO vCount
|
|
FROM all_tables
|
|
WHERE table_name = vTableName
|
|
AND owner = vSchemaName;
|
|
|
|
IF vCount = 0 THEN
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS);
|
|
END IF;
|
|
|
|
-- Check if key column exists
|
|
SELECT COUNT(*) INTO vCount
|
|
FROM all_tab_columns
|
|
WHERE table_name = vTableName
|
|
AND column_name = vKeyColumnName
|
|
AND owner = vSchemaName;
|
|
|
|
IF vCount = 0 THEN
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
|
|
END IF;
|
|
|
|
-- Validate pColumnList - check if all column names exist in the table
|
|
IF pColumnList IS NOT NULL THEN
|
|
DECLARE
|
|
vColumnName VARCHAR2(128);
|
|
vColumns VARCHAR2(32767);
|
|
vPos PLS_INTEGER;
|
|
vNextPos PLS_INTEGER;
|
|
vCurrentCol VARCHAR2(128);
|
|
BEGIN
|
|
-- Remove spaces and convert to uppercase for processing
|
|
vColumns := UPPER(REPLACE(pColumnList, ' ', ''));
|
|
vPos := 1;
|
|
|
|
-- Parse comma-separated column list
|
|
WHILE vPos <= LENGTH(vColumns) LOOP
|
|
vNextPos := INSTR(vColumns, ',', vPos);
|
|
IF vNextPos = 0 THEN
|
|
vNextPos := LENGTH(vColumns) + 1;
|
|
END IF;
|
|
|
|
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
|
|
|
|
-- Remove table alias prefix if present (e.g., 'T.COLUMN_NAME' -> 'COLUMN_NAME')
|
|
IF INSTR(vCurrentCol, '.') > 0 THEN
|
|
vCurrentCol := SUBSTR(vCurrentCol, INSTR(vCurrentCol, '.') + 1);
|
|
END IF;
|
|
|
|
-- Check if column exists in the table
|
|
SELECT COUNT(*) INTO vCount
|
|
FROM all_tab_columns
|
|
WHERE table_name = vTableName
|
|
AND column_name = vCurrentCol
|
|
AND owner = vSchemaName;
|
|
|
|
IF vCount = 0 THEN
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
|
|
END IF;
|
|
|
|
vPos := vNextPos + 1;
|
|
END LOOP;
|
|
END;
|
|
END IF;
|
|
|
|
-- Process column list to add T. prefix to each column
|
|
vProcessedColumnList := processColumnList(pColumnList, vTableName, vSchemaName, vKeyColumnName);
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (building dynamic list from table metadata)'), 'DEBUG', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list: ' || vProcessedColumnList, 'DEBUG', vParameters);
|
|
|
|
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName);
|
|
-- Fetch unique key values
|
|
vSql := 'SELECT DISTINCT TO_CHAR(L.LOAD_START,''YYYY'') AS YR, TO_CHAR(L.LOAD_START,''MM'') AS MN
|
|
FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L
|
|
WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY
|
|
AND L.LOAD_START >= :pMinDate
|
|
AND L.LOAD_START < :pMaxDate
|
|
' ;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Executing date range query: ' || vSql, 'DEBUG', vParameters);
|
|
EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValuesYear, vKeyValuesMonth USING pMinDate, pMaxDate;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValuesYear.COUNT || ' year/month combinations to export', 'DEBUG', vParameters);
|
|
|
|
-- Loop over each unique key value
|
|
FOR i IN 1 .. vKeyValuesYear.COUNT LOOP
|
|
vKeyValueYear := vKeyValuesYear(i);
|
|
vKeyValueMonth := vKeyValuesMonth(i);
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Processing Year/Month: ' || vKeyValueYear || '/' || vKeyValueMonth || ' (' || i || '/' || vKeyValuesYear.COUNT || ')', 'DEBUG', vParameters);
|
|
-- Construct the query to extract data for the current key value
|
|
-- Note: processColumnList already handles A_WORKFLOW_HISTORY_KEY aliasing
|
|
|
|
vQuery := 'SELECT ' || vProcessedColumnList || '
|
|
FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L
|
|
WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY
|
|
AND TO_CHAR(L.LOAD_START,''YYYY'') = ' || CHR(39) || vKeyValueYear || CHR(39) || '
|
|
AND TO_CHAR(L.LOAD_START,''MM'') = ' || CHR(39) || vKeyValueMonth || CHR(39) || '
|
|
AND L.LOAD_START >= TO_DATE(' || CHR(39) || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')
|
|
AND L.LOAD_START < TO_DATE(' || CHR(39) || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')';
|
|
|
|
-- Construct the URI for the file in OCI Object Storage
|
|
vUri := vBucketUri ||
|
|
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
|
|
'PARTITION_YEAR=' || sanitizeFilename(vKeyValueYear) || '/' ||
|
|
'PARTITION_MONTH=' || sanitizeFilename(vKeyValueMonth) || '/' ||
|
|
sanitizeFilename(vKeyValueYear) || sanitizeFilename(vKeyValueMonth) || '.parquet';
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Parquet export URI: ' || vUri, 'DEBUG', vParameters);
|
|
|
|
-- Use DBMS_CLOUD package to export data to the URI
|
|
DBMS_CLOUD.EXPORT_DATA(
|
|
credential_name => pCredentialName,
|
|
file_uri_list => vUri,
|
|
query => vQuery,
|
|
format => json_object('type' VALUE 'parquet')
|
|
);
|
|
END LOOP;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
|
|
EXCEPTION
|
|
WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN
|
|
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
|
|
WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN
|
|
vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp);
|
|
WHEN OTHERS THEN
|
|
-- Log complete error details including full stack trace and backtrace
|
|
ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER');
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
|
|
|
|
END EXPORT_TABLE_DATA_BY_DATE;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
/**
|
|
* @name EXPORT_TABLE_DATA_TO_CSV_BY_DATE
|
|
* @desc Exports data to a single CSV file with date filtering.
|
|
* Unlike EXPORT_TABLE_DATA_BY_DATE, this procedure creates one CSV file
|
|
* instead of multiple Parquet files partitioned by year/month.
|
|
* Uses the same date filtering mechanism with CT_ODS.A_LOAD_HISTORY.
|
|
* Allows specifying custom column list or uses T.* if pColumnList is NULL.
|
|
* Validates that all columns in pColumnList exist in the target table.
|
|
* Automatically adds 'T.' prefix to column names in pColumnList.
|
|
* @example
|
|
* begin
|
|
* DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
|
|
* pSchemaName => 'CT_MRDS',
|
|
* pTableName => 'MY_TABLE',
|
|
* pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK',
|
|
* pBucketArea => 'DATA',
|
|
* pFolderName => 'exports',
|
|
* pFileName => 'my_export.csv',
|
|
* pColumnList => 'COLUMN1, COLUMN2, COLUMN3', -- Optional
|
|
* pMinDate => DATE '2024-01-01',
|
|
* pMaxDate => SYSDATE
|
|
* );
|
|
* end;
|
|
**/
|
|
PROCEDURE EXPORT_TABLE_DATA_TO_CSV_BY_DATE (
|
|
pSchemaName IN VARCHAR2,
|
|
pTableName IN VARCHAR2,
|
|
pKeyColumnName IN VARCHAR2,
|
|
pBucketArea IN VARCHAR2,
|
|
pFolderName IN VARCHAR2,
|
|
pFileName IN VARCHAR2 DEFAULT NULL,
|
|
pColumnList IN VARCHAR2 default NULL,
|
|
pMinDate IN DATE default DATE '1900-01-01',
|
|
pMaxDate IN DATE default SYSDATE,
|
|
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
|
|
)
|
|
IS
|
|
-- Type definition for key values
|
|
TYPE key_value_tab IS TABLE OF VARCHAR2(4000);
|
|
|
|
vKeyValuesYear key_value_tab;
|
|
vKeyValuesMonth key_value_tab;
|
|
|
|
vCount INTEGER;
|
|
vSql VARCHAR2(4000);
|
|
vKeyValueYear VARCHAR2(4000);
|
|
vKeyValueMonth VARCHAR2(4000);
|
|
vQuery VARCHAR2(32767);
|
|
vUri VARCHAR2(4000);
|
|
vDataType VARCHAR2(30);
|
|
vTableName VARCHAR2(128);
|
|
vSchemaName VARCHAR2(128);
|
|
vKeyColumnName VARCHAR2(128);
|
|
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
|
|
vFileBaseName VARCHAR2(4000);
|
|
vFileExtension VARCHAR2(10);
|
|
vProcessedColumnList VARCHAR2(32767);
|
|
vBucketUri VARCHAR2(4000);
|
|
vCurrentCol VARCHAR2(128);
|
|
|
|
-- Function to sanitize file names
|
|
FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS
|
|
vFilename VARCHAR2(1000);
|
|
BEGIN
|
|
-- Replace any disallowed characters with underscores
|
|
vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_');
|
|
RETURN vFilename;
|
|
END sanitizeFilename;
|
|
|
|
BEGIN
|
|
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
|
|
,'pTableName => '''||nvl(pTableName, 'NULL')||''''
|
|
,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||''''
|
|
,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||''''
|
|
,'pFolderName => '''||nvl(pFolderName, 'NULL')||''''
|
|
,'pFileName => '''||nvl(pFileName, 'NULL')||''''
|
|
,'pColumnList => '''||nvl(pColumnList, 'NULL')||''''
|
|
,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
|
|
,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
|
|
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
|
|
));
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
|
|
|
|
-- Get bucket URI based on bucket area using FILE_MANAGER function
|
|
vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea);
|
|
|
|
-- Convert table and column names to uppercase to match data dictionary
|
|
vTableName := UPPER(pTableName);
|
|
vSchemaName := UPPER(pSchemaName);
|
|
vKeyColumnName := UPPER(pKeyColumnName);
|
|
|
|
-- Extract base filename and extension or construct default filename
|
|
IF pFileName IS NOT NULL THEN
|
|
-- Use provided filename
|
|
IF INSTR(pFileName, '.') > 0 THEN
|
|
vFileBaseName := SUBSTR(pFileName, 1, INSTR(pFileName, '.', -1) - 1);
|
|
vFileExtension := SUBSTR(pFileName, INSTR(pFileName, '.', -1));
|
|
ELSE
|
|
vFileBaseName := pFileName;
|
|
vFileExtension := '.csv';
|
|
END IF;
|
|
ELSE
|
|
-- Construct default filename: TABLENAME.csv (without date range)
|
|
vFileBaseName := UPPER(pTableName);
|
|
vFileExtension := '.csv';
|
|
END IF;
|
|
|
|
-- Check if table exists
|
|
SELECT COUNT(*) INTO vCount
|
|
FROM all_tables
|
|
WHERE table_name = vTableName
|
|
AND owner = vSchemaName;
|
|
|
|
IF vCount = 0 THEN
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS);
|
|
END IF;
|
|
|
|
-- Check if key column exists
|
|
SELECT COUNT(*) INTO vCount
|
|
FROM all_tab_columns
|
|
WHERE table_name = vTableName
|
|
AND column_name = vKeyColumnName
|
|
AND owner = vSchemaName;
|
|
|
|
IF vCount = 0 THEN
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
|
|
END IF;
|
|
|
|
-- Validate pColumnList - check if all column names exist in the table
|
|
IF pColumnList IS NOT NULL THEN
|
|
DECLARE
|
|
vColumnName VARCHAR2(128);
|
|
vColumns VARCHAR2(32767);
|
|
vPos PLS_INTEGER;
|
|
vNextPos PLS_INTEGER;
|
|
vCurrentCol VARCHAR2(128);
|
|
BEGIN
|
|
-- Remove spaces and convert to uppercase for processing
|
|
vColumns := UPPER(REPLACE(pColumnList, ' ', ''));
|
|
vPos := 1;
|
|
|
|
-- Parse comma-separated column list
|
|
WHILE vPos <= LENGTH(vColumns) LOOP
|
|
vNextPos := INSTR(vColumns, ',', vPos);
|
|
IF vNextPos = 0 THEN
|
|
vNextPos := LENGTH(vColumns) + 1;
|
|
END IF;
|
|
|
|
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
|
|
|
|
-- Remove table alias prefix if present (e.g., 'T.COLUMN_NAME' -> 'COLUMN_NAME')
|
|
IF INSTR(vCurrentCol, '.') > 0 THEN
|
|
vCurrentCol := SUBSTR(vCurrentCol, INSTR(vCurrentCol, '.') + 1);
|
|
END IF;
|
|
|
|
-- Check if column exists in the table
|
|
SELECT COUNT(*) INTO vCount
|
|
FROM all_tab_columns
|
|
WHERE table_name = vTableName
|
|
AND column_name = vCurrentCol
|
|
AND owner = vSchemaName;
|
|
|
|
IF vCount = 0 THEN
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
|
|
END IF;
|
|
|
|
vPos := vNextPos + 1;
|
|
END LOOP;
|
|
END;
|
|
END IF;
|
|
|
|
-- Process column list to add T. prefix to each column
|
|
vProcessedColumnList := processColumnList(pColumnList, vTableName, vSchemaName, vKeyColumnName);
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (using dynamic column list)'), 'DEBUG', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list: ' || vProcessedColumnList, 'DEBUG', vParameters);
|
|
|
|
-- Get the data type of the key column
|
|
SELECT data_type INTO vDataType
|
|
FROM all_tab_columns
|
|
WHERE table_name = vTableName
|
|
AND column_name = vKeyColumnName
|
|
AND owner = vSchemaName;
|
|
|
|
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName);
|
|
|
|
-- Fetch unique year/month combinations
|
|
vSql := 'SELECT DISTINCT TO_CHAR(L.LOAD_START,''YYYY'') AS YR, TO_CHAR(L.LOAD_START,''MM'') AS MN
|
|
FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L
|
|
WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY
|
|
AND L.LOAD_START >= :pMinDate
|
|
AND L.LOAD_START < :pMaxDate
|
|
' ;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Executing date range query: ' || vSql, 'DEBUG', vParameters);
|
|
EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValuesYear, vKeyValuesMonth USING pMinDate, pMaxDate;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValuesYear.COUNT || ' year/month combinations to export', 'INFO', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Date range: ' || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || ' to ' || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'DEBUG', vParameters);
|
|
|
|
-- Loop over each unique year/month combination
|
|
FOR i IN 1 .. vKeyValuesYear.COUNT LOOP
|
|
vKeyValueYear := vKeyValuesYear(i);
|
|
vKeyValueMonth := vKeyValuesMonth(i);
|
|
|
|
-- Construct the query to extract data for the current year/month
|
|
vQuery := 'SELECT ' || vProcessedColumnList || '
|
|
FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L
|
|
WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY
|
|
AND TO_CHAR(L.LOAD_START,''YYYY'') = ' || CHR(39) || vKeyValueYear || CHR(39) || '
|
|
AND TO_CHAR(L.LOAD_START,''MM'') = ' || CHR(39) || vKeyValueMonth || CHR(39) || '
|
|
AND L.LOAD_START >= TO_DATE(' || CHR(39) || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')
|
|
AND L.LOAD_START < TO_DATE(' || CHR(39) || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')';
|
|
|
|
-- Construct the URI for the CSV file in OCI Object Storage
|
|
vUri := vBucketUri ||
|
|
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
|
|
sanitizeFilename(vFileBaseName) || '_' ||
|
|
sanitizeFilename(vKeyValueYear) || sanitizeFilename(vKeyValueMonth) ||
|
|
vFileExtension;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Exporting to CSV file: ' || vUri, 'INFO', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Processing Year/Month: ' || vKeyValueYear || '/' || vKeyValueMonth || ' (' || i || '/' || vKeyValuesYear.COUNT || ')', 'DEBUG', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('File name pattern: ' || vFileBaseName || '_' || vKeyValueYear || vKeyValueMonth || vFileExtension, 'DEBUG', vParameters);
|
|
|
|
-- Use DBMS_CLOUD package to export data to CSV file
|
|
DBMS_CLOUD.EXPORT_DATA(
|
|
credential_name => pCredentialName,
|
|
file_uri_list => vUri,
|
|
query => vQuery,
|
|
format => json_object('type' VALUE 'CSV', 'header' VALUE true)
|
|
);
|
|
END LOOP;
|
|
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('Export completed successfully for ' || vKeyValuesYear.COUNT || ' files', 'INFO', vParameters);
|
|
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
|
|
|
|
EXCEPTION
|
|
WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN
|
|
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
|
|
WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN
|
|
vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END;
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp);
|
|
WHEN OTHERS THEN
|
|
-- Log complete error details including full stack trace and backtrace
|
|
ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER');
|
|
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
|
|
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
|
|
|
|
END EXPORT_TABLE_DATA_TO_CSV_BY_DATE;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
-- VERSION MANAGEMENT FUNCTIONS
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
FUNCTION GET_VERSION RETURN VARCHAR2 IS
|
|
BEGIN
|
|
RETURN PACKAGE_VERSION;
|
|
END GET_VERSION;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
FUNCTION GET_BUILD_INFO RETURN VARCHAR2 IS
|
|
BEGIN
|
|
RETURN ENV_MANAGER.GET_PACKAGE_VERSION_INFO(
|
|
pPackageName => 'DATA_EXPORTER',
|
|
pVersion => PACKAGE_VERSION,
|
|
pBuildDate => PACKAGE_BUILD_DATE,
|
|
pAuthor => PACKAGE_AUTHOR
|
|
);
|
|
END GET_BUILD_INFO;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
FUNCTION GET_VERSION_HISTORY RETURN VARCHAR2 IS
|
|
BEGIN
|
|
RETURN ENV_MANAGER.FORMAT_VERSION_HISTORY(
|
|
pPackageName => 'DATA_EXPORTER',
|
|
pVersionHistory => VERSION_HISTORY
|
|
);
|
|
END GET_VERSION_HISTORY;
|
|
|
|
----------------------------------------------------------------------------------------------------
|
|
|
|
END;
|
|
|
|
/
|