Przeniesienie
This commit is contained in:
@@ -0,0 +1,708 @@
|
||||
create or replace PACKAGE BODY CT_MRDS.DATA_EXPORTER
|
||||
AS
|
||||
|
||||
----------------------------------------------------------------------------------------------------
|
||||
|
||||
PROCEDURE EXPORT_TABLE_DATA (
|
||||
pSchemaName IN VARCHAR2,
|
||||
pTableName IN VARCHAR2,
|
||||
pKeyColumnName IN VARCHAR2,
|
||||
pBucketArea IN VARCHAR2,
|
||||
pFolderName IN VARCHAR2,
|
||||
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
|
||||
)
|
||||
IS
|
||||
-- Type definition for key values
|
||||
TYPE key_value_tab IS TABLE OF VARCHAR2(4000);
|
||||
vKeyValues key_value_tab;
|
||||
vCount INTEGER;
|
||||
vSql VARCHAR2(4000);
|
||||
vKeyValue VARCHAR2(4000);
|
||||
vQuery VARCHAR2(32767);
|
||||
vUri VARCHAR2(4000);
|
||||
vDataType VARCHAR2(30);
|
||||
vTableName VARCHAR2(128);
|
||||
vSchemaName VARCHAR2(128);
|
||||
vKeyColumnName VARCHAR2(128);
|
||||
vParameters VARCHAR2(4000);
|
||||
vBucketUri VARCHAR2(4000);
|
||||
|
||||
|
||||
-- Function to sanitize file names
|
||||
FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS
|
||||
vFilename VARCHAR2(1000);
|
||||
BEGIN
|
||||
-- Replace any disallowed characters with underscores
|
||||
vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_');
|
||||
RETURN vFilename;
|
||||
END sanitizeFilename;
|
||||
|
||||
BEGIN
|
||||
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
|
||||
,'pTableName => '''||nvl(pTableName, 'NULL')||''''
|
||||
,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||''''
|
||||
,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||''''
|
||||
,'pFolderName => '''||nvl(pFolderName, 'NULL')||''''
|
||||
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
|
||||
));
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
|
||||
|
||||
-- Get bucket URI based on bucket area using FILE_MANAGER function
|
||||
vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea);
|
||||
|
||||
-- Convert table and column names to uppercase to match data dictionary
|
||||
vTableName := UPPER(pTableName);
|
||||
vSchemaName := UPPER(pSchemaName);
|
||||
vKeyColumnName := UPPER(pKeyColumnName);
|
||||
|
||||
-- Check if table exists
|
||||
SELECT COUNT(*) INTO vCount
|
||||
FROM all_tables
|
||||
WHERE table_name = vTableName
|
||||
AND owner = vSchemaName;
|
||||
|
||||
IF vCount = 0 THEN
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS);
|
||||
END IF;
|
||||
|
||||
-- Check if key column exists
|
||||
SELECT COUNT(*) INTO vCount
|
||||
FROM all_tab_columns
|
||||
WHERE table_name = vTableName
|
||||
AND column_name = vKeyColumnName
|
||||
AND owner = vSchemaName;
|
||||
|
||||
IF vCount = 0 THEN
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
|
||||
|
||||
END IF;
|
||||
|
||||
-- Get the data type of the key column
|
||||
SELECT data_type INTO vDataType
|
||||
FROM all_tab_columns
|
||||
WHERE table_name = vTableName
|
||||
AND column_name = vKeyColumnName
|
||||
AND owner = vSchemaName;
|
||||
|
||||
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName);
|
||||
-- Fetch unique key values
|
||||
vSql := 'SELECT DISTINCT ' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) ||
|
||||
' FROM ' || vTableName;
|
||||
EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValues;
|
||||
|
||||
-- Loop over each unique key value
|
||||
FOR i IN 1 .. vKeyValues.COUNT LOOP
|
||||
vKeyValue := vKeyValues(i);
|
||||
|
||||
-- Construct the query to extract data for the current key value
|
||||
IF vDataType IN ('VARCHAR2', 'CHAR', 'NCHAR', 'NVARCHAR2') THEN
|
||||
vQuery := 'SELECT * FROM ' || vTableName ||
|
||||
' WHERE ' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = ' || CHR(39) || vKeyValue || CHR(39);
|
||||
ELSIF vDataType IN ('NUMBER', 'FLOAT', 'BINARY_FLOAT', 'BINARY_DOUBLE') THEN
|
||||
vQuery := 'SELECT * FROM ' || vTableName ||
|
||||
' WHERE ' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = ' || vKeyValue;
|
||||
ELSIF vDataType LIKE 'TIMESTAMP%' OR vDataType = 'DATE' THEN
|
||||
vQuery := 'SELECT * FROM ' || vTableName ||
|
||||
' WHERE ' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) ||
|
||||
' = TO_TIMESTAMP(' || CHR(39) || vKeyValue || CHR(39) ||', ''YYYY-MM-DD HH24:MI:SS.FF'')';
|
||||
ELSE
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNSUPPORTED_DATA_TYPE, ENV_MANAGER.MSG_UNSUPPORTED_DATA_TYPE);
|
||||
END IF;
|
||||
|
||||
-- Construct the URI for the file in OCI Object Storage
|
||||
vUri := vBucketUri ||
|
||||
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
|
||||
sanitizeFilename(vKeyValue) || '.csv';
|
||||
|
||||
-- Use DBMS_CLOUD package to export data to the URI
|
||||
DBMS_CLOUD.EXPORT_DATA(
|
||||
credential_name => pCredentialName,
|
||||
file_uri_list => vUri,
|
||||
query => vQuery,
|
||||
format => json_object('type' VALUE 'CSV', 'header' VALUE true)
|
||||
);
|
||||
END LOOP;
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
|
||||
EXCEPTION
|
||||
WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN
|
||||
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName;
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
|
||||
WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN
|
||||
vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName;
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp);
|
||||
WHEN ENV_MANAGER.ERR_UNSUPPORTED_DATA_TYPE THEN
|
||||
vgMsgTmp := ENV_MANAGER.MSG_UNSUPPORTED_DATA_TYPE || ' vDataType: '||vDataType;
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNSUPPORTED_DATA_TYPE, vgMsgTmp);
|
||||
WHEN OTHERS THEN
|
||||
-- Log complete error details including full stack trace and backtrace
|
||||
ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER');
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
|
||||
|
||||
END EXPORT_TABLE_DATA;
|
||||
|
||||
----------------------------------------------------------------------------------------------------
|
||||
|
||||
PROCEDURE EXPORT_TABLE_DATA_BY_DATE (
|
||||
pSchemaName IN VARCHAR2,
|
||||
pTableName IN VARCHAR2,
|
||||
pKeyColumnName IN VARCHAR2,
|
||||
pBucketArea IN VARCHAR2,
|
||||
pFolderName IN VARCHAR2,
|
||||
pColumnList IN VARCHAR2 default NULL,
|
||||
pMinDate IN DATE default DATE '1900-01-01',
|
||||
pMaxDate IN DATE default SYSDATE,
|
||||
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
|
||||
)
|
||||
IS
|
||||
-- Type definition for key values
|
||||
TYPE key_value_tab IS TABLE OF VARCHAR2(4000);
|
||||
|
||||
vKeyValuesYear key_value_tab;
|
||||
vKeyValuesMonth key_value_tab;
|
||||
|
||||
vCount INTEGER;
|
||||
vSql VARCHAR2(32000);
|
||||
vKeyValueYear VARCHAR2(4000);
|
||||
vKeyValueMonth VARCHAR2(4000);
|
||||
vQuery VARCHAR2(32767);
|
||||
vUri VARCHAR2(4000);
|
||||
vDataType VARCHAR2(30);
|
||||
vTableName VARCHAR2(128);
|
||||
vSchemaName VARCHAR2(128);
|
||||
vKeyColumnName VARCHAR2(128);
|
||||
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
|
||||
vProcessedColumnList VARCHAR2(32767);
|
||||
vBucketUri VARCHAR2(4000);
|
||||
vCurrentCol VARCHAR2(128);
|
||||
|
||||
-- Function to sanitize file names
|
||||
FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS
|
||||
vFilename VARCHAR2(1000);
|
||||
BEGIN
|
||||
-- Replace any disallowed characters with underscores
|
||||
vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_');
|
||||
RETURN vFilename;
|
||||
END sanitizeFilename;
|
||||
|
||||
-- Function to add T. prefix to column names
|
||||
FUNCTION addTablePrefix(pColumnList IN VARCHAR2) RETURN VARCHAR2 IS
|
||||
vResult VARCHAR2(32767);
|
||||
vColumns VARCHAR2(32767);
|
||||
vPos PLS_INTEGER;
|
||||
vNextPos PLS_INTEGER;
|
||||
vCurrentCol VARCHAR2(128);
|
||||
BEGIN
|
||||
IF pColumnList IS NULL THEN
|
||||
RETURN 'T.*';
|
||||
END IF;
|
||||
|
||||
-- Remove extra spaces and convert to uppercase
|
||||
vColumns := UPPER(REPLACE(pColumnList, ' ', ''));
|
||||
vPos := 1;
|
||||
vResult := '';
|
||||
|
||||
-- Parse comma-separated column list and add T. prefix
|
||||
WHILE vPos <= LENGTH(vColumns) LOOP
|
||||
vNextPos := INSTR(vColumns, ',', vPos);
|
||||
IF vNextPos = 0 THEN
|
||||
vNextPos := LENGTH(vColumns) + 1;
|
||||
END IF;
|
||||
|
||||
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
|
||||
|
||||
-- Add T. prefix if not already present
|
||||
IF INSTR(vCurrentCol, '.') = 0 THEN
|
||||
vCurrentCol := 'T.' || vCurrentCol;
|
||||
END IF;
|
||||
|
||||
-- Add to result with comma separator
|
||||
IF vResult IS NOT NULL THEN
|
||||
vResult := vResult || ', ';
|
||||
END IF;
|
||||
vResult := vResult || vCurrentCol;
|
||||
|
||||
vPos := vNextPos + 1;
|
||||
END LOOP;
|
||||
|
||||
RETURN vResult;
|
||||
END addTablePrefix;
|
||||
|
||||
BEGIN
|
||||
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
|
||||
,'pTableName => '''||nvl(pTableName, 'NULL')||''''
|
||||
,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||''''
|
||||
,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||''''
|
||||
,'pFolderName => '''||nvl(pFolderName, 'NULL')||''''
|
||||
,'pColumnList => '''||nvl(pColumnList, 'NULL')||''''
|
||||
,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
|
||||
,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
|
||||
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
|
||||
));
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
|
||||
|
||||
-- Get bucket URI based on bucket area using FILE_MANAGER function
|
||||
vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea);
|
||||
|
||||
-- Convert table and column names to uppercase to match data dictionary
|
||||
vTableName := UPPER(pTableName);
|
||||
vSchemaName := UPPER(pSchemaName);
|
||||
vKeyColumnName := UPPER(pKeyColumnName);
|
||||
|
||||
-- Check if table exists
|
||||
SELECT COUNT(*) INTO vCount
|
||||
FROM all_tables
|
||||
WHERE table_name = vTableName
|
||||
AND owner = vSchemaName;
|
||||
|
||||
IF vCount = 0 THEN
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS);
|
||||
END IF;
|
||||
|
||||
-- Check if key column exists
|
||||
SELECT COUNT(*) INTO vCount
|
||||
FROM all_tab_columns
|
||||
WHERE table_name = vTableName
|
||||
AND column_name = vKeyColumnName
|
||||
AND owner = vSchemaName;
|
||||
|
||||
IF vCount = 0 THEN
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
|
||||
END IF;
|
||||
|
||||
-- Validate pColumnList - check if all column names exist in the table
|
||||
IF pColumnList IS NOT NULL THEN
|
||||
DECLARE
|
||||
vColumnName VARCHAR2(128);
|
||||
vColumns VARCHAR2(32767);
|
||||
vPos PLS_INTEGER;
|
||||
vNextPos PLS_INTEGER;
|
||||
vCurrentCol VARCHAR2(128);
|
||||
BEGIN
|
||||
-- Remove spaces and convert to uppercase for processing
|
||||
vColumns := UPPER(REPLACE(pColumnList, ' ', ''));
|
||||
vPos := 1;
|
||||
|
||||
-- Parse comma-separated column list
|
||||
WHILE vPos <= LENGTH(vColumns) LOOP
|
||||
vNextPos := INSTR(vColumns, ',', vPos);
|
||||
IF vNextPos = 0 THEN
|
||||
vNextPos := LENGTH(vColumns) + 1;
|
||||
END IF;
|
||||
|
||||
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
|
||||
|
||||
-- Remove table alias prefix if present (e.g., 'T.COLUMN_NAME' -> 'COLUMN_NAME')
|
||||
IF INSTR(vCurrentCol, '.') > 0 THEN
|
||||
vCurrentCol := SUBSTR(vCurrentCol, INSTR(vCurrentCol, '.') + 1);
|
||||
END IF;
|
||||
|
||||
-- Check if column exists in the table
|
||||
SELECT COUNT(*) INTO vCount
|
||||
FROM all_tab_columns
|
||||
WHERE table_name = vTableName
|
||||
AND column_name = vCurrentCol
|
||||
AND owner = vSchemaName;
|
||||
|
||||
IF vCount = 0 THEN
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
|
||||
END IF;
|
||||
|
||||
vPos := vNextPos + 1;
|
||||
END LOOP;
|
||||
END;
|
||||
END IF;
|
||||
|
||||
-- Process column list to add T. prefix to each column
|
||||
vProcessedColumnList := addTablePrefix(pColumnList);
|
||||
|
||||
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName);
|
||||
-- Fetch unique key values
|
||||
vSql := 'SELECT DISTINCT TO_CHAR(L.LOAD_START,''YYYY'') AS YR, TO_CHAR(L.LOAD_START,''MM'') AS MN
|
||||
FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L
|
||||
WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_WORKFLOW_HISTORY_KEY
|
||||
AND L.LOAD_START >= :pMinDate
|
||||
AND L.LOAD_START < :pMaxDate
|
||||
' ;
|
||||
EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValuesYear, vKeyValuesMonth USING pMinDate, pMaxDate;
|
||||
|
||||
-- Loop over each unique key value
|
||||
FOR i IN 1 .. vKeyValuesYear.COUNT LOOP
|
||||
vKeyValueYear := vKeyValuesYear(i);
|
||||
vKeyValueMonth := vKeyValuesMonth(i);
|
||||
-- Construct the query to extract data for the current key value
|
||||
|
||||
vQuery := 'SELECT ' || vProcessedColumnList || '
|
||||
FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L
|
||||
WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_WORKFLOW_HISTORY_KEY
|
||||
AND TO_CHAR(L.LOAD_START,''YYYY'') = ' || CHR(39) || vKeyValueYear || CHR(39) || '
|
||||
AND TO_CHAR(L.LOAD_START,''MM'') = ' || CHR(39) || vKeyValueMonth || CHR(39) || '
|
||||
AND L.LOAD_START >= TO_DATE(' || CHR(39) || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')
|
||||
AND L.LOAD_START < TO_DATE(' || CHR(39) || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')';
|
||||
|
||||
-- Construct the URI for the file in OCI Object Storage
|
||||
vUri := vBucketUri ||
|
||||
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
|
||||
'PARTITION_YEAR=' || sanitizeFilename(vKeyValueYear) || '/' ||
|
||||
'PARTITION_MONTH=' || sanitizeFilename(vKeyValueMonth) || '/' ||
|
||||
sanitizeFilename(vKeyValueYear) || sanitizeFilename(vKeyValueMonth) || '.parquet';
|
||||
|
||||
--DBMS_OUTPUT.PUT_LINE(vQuery);
|
||||
|
||||
-- Use DBMS_CLOUD package to export data to the URI
|
||||
DBMS_CLOUD.EXPORT_DATA(
|
||||
credential_name => pCredentialName,
|
||||
file_uri_list => vUri,
|
||||
query => vQuery,
|
||||
format => json_object('type' VALUE 'parquet')
|
||||
);
|
||||
END LOOP;
|
||||
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
|
||||
EXCEPTION
|
||||
WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN
|
||||
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName;
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
|
||||
WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN
|
||||
vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END;
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp);
|
||||
WHEN OTHERS THEN
|
||||
-- Log complete error details including full stack trace and backtrace
|
||||
ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER');
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
|
||||
|
||||
END EXPORT_TABLE_DATA_BY_DATE;
|
||||
|
||||
----------------------------------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* @name EXPORT_TABLE_DATA_TO_CSV_BY_DATE
|
||||
* @desc Exports data to a single CSV file with date filtering.
|
||||
* Unlike EXPORT_TABLE_DATA_BY_DATE, this procedure creates one CSV file
|
||||
* instead of multiple Parquet files partitioned by year/month.
|
||||
* Uses the same date filtering mechanism with CT_ODS.A_LOAD_HISTORY.
|
||||
* Allows specifying custom column list or uses T.* if pColumnList is NULL.
|
||||
* Validates that all columns in pColumnList exist in the target table.
|
||||
* Automatically adds 'T.' prefix to column names in pColumnList.
|
||||
* @example
|
||||
* begin
|
||||
* DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
|
||||
* pSchemaName => 'CT_MRDS',
|
||||
* pTableName => 'MY_TABLE',
|
||||
* pKeyColumnName => 'A_WORKFLOW_HISTORY_KEY',
|
||||
* pBucketArea => 'DATA',
|
||||
* pFolderName => 'exports',
|
||||
* pFileName => 'my_export.csv',
|
||||
* pColumnList => 'COLUMN1, COLUMN2, COLUMN3', -- Optional
|
||||
* pMinDate => DATE '2024-01-01',
|
||||
* pMaxDate => SYSDATE
|
||||
* );
|
||||
* end;
|
||||
**/
|
||||
PROCEDURE EXPORT_TABLE_DATA_TO_CSV_BY_DATE (
|
||||
pSchemaName IN VARCHAR2,
|
||||
pTableName IN VARCHAR2,
|
||||
pKeyColumnName IN VARCHAR2,
|
||||
pBucketArea IN VARCHAR2,
|
||||
pFolderName IN VARCHAR2,
|
||||
pFileName IN VARCHAR2 DEFAULT NULL,
|
||||
pColumnList IN VARCHAR2 default NULL,
|
||||
pMinDate IN DATE default DATE '1900-01-01',
|
||||
pMaxDate IN DATE default SYSDATE,
|
||||
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
|
||||
)
|
||||
IS
|
||||
-- Type definition for key values
|
||||
TYPE key_value_tab IS TABLE OF VARCHAR2(4000);
|
||||
|
||||
vKeyValuesYear key_value_tab;
|
||||
vKeyValuesMonth key_value_tab;
|
||||
|
||||
vCount INTEGER;
|
||||
vSql VARCHAR2(4000);
|
||||
vKeyValueYear VARCHAR2(4000);
|
||||
vKeyValueMonth VARCHAR2(4000);
|
||||
vQuery VARCHAR2(32767);
|
||||
vUri VARCHAR2(4000);
|
||||
vDataType VARCHAR2(30);
|
||||
vTableName VARCHAR2(128);
|
||||
vSchemaName VARCHAR2(128);
|
||||
vKeyColumnName VARCHAR2(128);
|
||||
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
|
||||
vFileBaseName VARCHAR2(4000);
|
||||
vFileExtension VARCHAR2(10);
|
||||
vProcessedColumnList VARCHAR2(32767);
|
||||
vBucketUri VARCHAR2(4000);
|
||||
vCurrentCol VARCHAR2(128);
|
||||
|
||||
-- Function to sanitize file names
|
||||
FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS
|
||||
vFilename VARCHAR2(1000);
|
||||
BEGIN
|
||||
-- Replace any disallowed characters with underscores
|
||||
vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_');
|
||||
RETURN vFilename;
|
||||
END sanitizeFilename;
|
||||
|
||||
-- Function to add T. prefix to column names
|
||||
FUNCTION addTablePrefix(pColumnList IN VARCHAR2) RETURN VARCHAR2 IS
|
||||
vResult VARCHAR2(32767);
|
||||
vColumns VARCHAR2(32767);
|
||||
vPos PLS_INTEGER;
|
||||
vNextPos PLS_INTEGER;
|
||||
vCurrentCol VARCHAR2(128);
|
||||
BEGIN
|
||||
IF pColumnList IS NULL THEN
|
||||
RETURN 'T.*';
|
||||
END IF;
|
||||
|
||||
-- Remove extra spaces and convert to uppercase
|
||||
vColumns := UPPER(REPLACE(pColumnList, ' ', ''));
|
||||
vPos := 1;
|
||||
vResult := '';
|
||||
|
||||
-- Parse comma-separated column list and add T. prefix
|
||||
WHILE vPos <= LENGTH(vColumns) LOOP
|
||||
vNextPos := INSTR(vColumns, ',', vPos);
|
||||
IF vNextPos = 0 THEN
|
||||
vNextPos := LENGTH(vColumns) + 1;
|
||||
END IF;
|
||||
|
||||
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
|
||||
|
||||
-- Add T. prefix if not already present
|
||||
IF INSTR(vCurrentCol, '.') = 0 THEN
|
||||
vCurrentCol := 'T.' || vCurrentCol;
|
||||
END IF;
|
||||
|
||||
-- Add to result with comma separator
|
||||
IF vResult IS NOT NULL THEN
|
||||
vResult := vResult || ', ';
|
||||
END IF;
|
||||
vResult := vResult || vCurrentCol;
|
||||
|
||||
vPos := vNextPos + 1;
|
||||
END LOOP;
|
||||
|
||||
RETURN vResult;
|
||||
END addTablePrefix;
|
||||
|
||||
BEGIN
|
||||
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
|
||||
,'pTableName => '''||nvl(pTableName, 'NULL')||''''
|
||||
,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||''''
|
||||
,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||''''
|
||||
,'pFolderName => '''||nvl(pFolderName, 'NULL')||''''
|
||||
,'pFileName => '''||nvl(pFileName, 'NULL')||''''
|
||||
,'pColumnList => '''||nvl(pColumnList, 'NULL')||''''
|
||||
,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
|
||||
,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
|
||||
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
|
||||
));
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
|
||||
|
||||
-- Get bucket URI based on bucket area using FILE_MANAGER function
|
||||
vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea);
|
||||
|
||||
-- Convert table and column names to uppercase to match data dictionary
|
||||
vTableName := UPPER(pTableName);
|
||||
vSchemaName := UPPER(pSchemaName);
|
||||
vKeyColumnName := UPPER(pKeyColumnName);
|
||||
|
||||
-- Extract base filename and extension or construct default filename
|
||||
IF pFileName IS NOT NULL THEN
|
||||
-- Use provided filename
|
||||
IF INSTR(pFileName, '.') > 0 THEN
|
||||
vFileBaseName := SUBSTR(pFileName, 1, INSTR(pFileName, '.', -1) - 1);
|
||||
vFileExtension := SUBSTR(pFileName, INSTR(pFileName, '.', -1));
|
||||
ELSE
|
||||
vFileBaseName := pFileName;
|
||||
vFileExtension := '.csv';
|
||||
END IF;
|
||||
ELSE
|
||||
-- Construct default filename: TABLENAME.csv (without date range)
|
||||
vFileBaseName := UPPER(pTableName);
|
||||
vFileExtension := '.csv';
|
||||
END IF;
|
||||
|
||||
-- Check if table exists
|
||||
SELECT COUNT(*) INTO vCount
|
||||
FROM all_tables
|
||||
WHERE table_name = vTableName
|
||||
AND owner = vSchemaName;
|
||||
|
||||
IF vCount = 0 THEN
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS);
|
||||
END IF;
|
||||
|
||||
-- Check if key column exists
|
||||
SELECT COUNT(*) INTO vCount
|
||||
FROM all_tab_columns
|
||||
WHERE table_name = vTableName
|
||||
AND column_name = vKeyColumnName
|
||||
AND owner = vSchemaName;
|
||||
|
||||
IF vCount = 0 THEN
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
|
||||
END IF;
|
||||
|
||||
-- Validate pColumnList - check if all column names exist in the table
|
||||
IF pColumnList IS NOT NULL THEN
|
||||
DECLARE
|
||||
vColumnName VARCHAR2(128);
|
||||
vColumns VARCHAR2(32767);
|
||||
vPos PLS_INTEGER;
|
||||
vNextPos PLS_INTEGER;
|
||||
vCurrentCol VARCHAR2(128);
|
||||
BEGIN
|
||||
-- Remove spaces and convert to uppercase for processing
|
||||
vColumns := UPPER(REPLACE(pColumnList, ' ', ''));
|
||||
vPos := 1;
|
||||
|
||||
-- Parse comma-separated column list
|
||||
WHILE vPos <= LENGTH(vColumns) LOOP
|
||||
vNextPos := INSTR(vColumns, ',', vPos);
|
||||
IF vNextPos = 0 THEN
|
||||
vNextPos := LENGTH(vColumns) + 1;
|
||||
END IF;
|
||||
|
||||
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
|
||||
|
||||
-- Remove table alias prefix if present (e.g., 'T.COLUMN_NAME' -> 'COLUMN_NAME')
|
||||
IF INSTR(vCurrentCol, '.') > 0 THEN
|
||||
vCurrentCol := SUBSTR(vCurrentCol, INSTR(vCurrentCol, '.') + 1);
|
||||
END IF;
|
||||
|
||||
-- Check if column exists in the table
|
||||
SELECT COUNT(*) INTO vCount
|
||||
FROM all_tab_columns
|
||||
WHERE table_name = vTableName
|
||||
AND column_name = vCurrentCol
|
||||
AND owner = vSchemaName;
|
||||
|
||||
IF vCount = 0 THEN
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
|
||||
END IF;
|
||||
|
||||
vPos := vNextPos + 1;
|
||||
END LOOP;
|
||||
END;
|
||||
END IF;
|
||||
|
||||
-- Process column list to add T. prefix to each column
|
||||
vProcessedColumnList := addTablePrefix(pColumnList);
|
||||
|
||||
-- Get the data type of the key column
|
||||
SELECT data_type INTO vDataType
|
||||
FROM all_tab_columns
|
||||
WHERE table_name = vTableName
|
||||
AND column_name = vKeyColumnName
|
||||
AND owner = vSchemaName;
|
||||
|
||||
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName);
|
||||
|
||||
-- Fetch unique year/month combinations
|
||||
vSql := 'SELECT DISTINCT TO_CHAR(L.LOAD_START,''YYYY'') AS YR, TO_CHAR(L.LOAD_START,''MM'') AS MN
|
||||
FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L
|
||||
WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_WORKFLOW_HISTORY_KEY
|
||||
AND L.LOAD_START >= :pMinDate
|
||||
AND L.LOAD_START < :pMaxDate
|
||||
' ;
|
||||
EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValuesYear, vKeyValuesMonth USING pMinDate, pMaxDate;
|
||||
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValuesYear.COUNT || ' year/month combinations to export', 'INFO', vParameters);
|
||||
|
||||
-- Loop over each unique year/month combination
|
||||
FOR i IN 1 .. vKeyValuesYear.COUNT LOOP
|
||||
vKeyValueYear := vKeyValuesYear(i);
|
||||
vKeyValueMonth := vKeyValuesMonth(i);
|
||||
|
||||
-- Construct the query to extract data for the current year/month
|
||||
vQuery := 'SELECT ' || vProcessedColumnList || '
|
||||
FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L
|
||||
WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_WORKFLOW_HISTORY_KEY
|
||||
AND TO_CHAR(L.LOAD_START,''YYYY'') = ' || CHR(39) || vKeyValueYear || CHR(39) || '
|
||||
AND TO_CHAR(L.LOAD_START,''MM'') = ' || CHR(39) || vKeyValueMonth || CHR(39) || '
|
||||
AND L.LOAD_START >= TO_DATE(' || CHR(39) || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')
|
||||
AND L.LOAD_START < TO_DATE(' || CHR(39) || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')';
|
||||
|
||||
-- Construct the URI for the CSV file in OCI Object Storage
|
||||
vUri := vBucketUri ||
|
||||
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
|
||||
sanitizeFilename(vFileBaseName) || '_' ||
|
||||
sanitizeFilename(vKeyValueYear) || sanitizeFilename(vKeyValueMonth) ||
|
||||
vFileExtension;
|
||||
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Exporting to CSV file: ' || vUri, 'INFO', vParameters);
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Year/Month: ' || vKeyValueYear || '/' || vKeyValueMonth, 'DEBUG', vParameters);
|
||||
|
||||
-- Use DBMS_CLOUD package to export data to CSV file
|
||||
DBMS_CLOUD.EXPORT_DATA(
|
||||
credential_name => pCredentialName,
|
||||
file_uri_list => vUri,
|
||||
query => vQuery,
|
||||
format => json_object('type' VALUE 'CSV', 'header' VALUE true)
|
||||
);
|
||||
END LOOP;
|
||||
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('Export completed successfully for ' || vKeyValuesYear.COUNT || ' files', 'INFO', vParameters);
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
|
||||
|
||||
EXCEPTION
|
||||
WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN
|
||||
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName;
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
|
||||
WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN
|
||||
vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END;
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp);
|
||||
WHEN OTHERS THEN
|
||||
-- Log complete error details including full stack trace and backtrace
|
||||
ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER');
|
||||
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
|
||||
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
|
||||
|
||||
END EXPORT_TABLE_DATA_TO_CSV_BY_DATE;
|
||||
|
||||
----------------------------------------------------------------------------------------------------
|
||||
-- VERSION MANAGEMENT FUNCTIONS
|
||||
----------------------------------------------------------------------------------------------------
|
||||
|
||||
FUNCTION GET_VERSION RETURN VARCHAR2 IS
|
||||
BEGIN
|
||||
RETURN PACKAGE_VERSION;
|
||||
END GET_VERSION;
|
||||
|
||||
----------------------------------------------------------------------------------------------------
|
||||
|
||||
FUNCTION GET_BUILD_INFO RETURN VARCHAR2 IS
|
||||
BEGIN
|
||||
RETURN ENV_MANAGER.GET_PACKAGE_VERSION_INFO(
|
||||
pPackageName => 'DATA_EXPORTER',
|
||||
pVersion => PACKAGE_VERSION,
|
||||
pBuildDate => PACKAGE_BUILD_DATE,
|
||||
pAuthor => PACKAGE_AUTHOR
|
||||
);
|
||||
END GET_BUILD_INFO;
|
||||
|
||||
----------------------------------------------------------------------------------------------------
|
||||
|
||||
FUNCTION GET_VERSION_HISTORY RETURN VARCHAR2 IS
|
||||
BEGIN
|
||||
RETURN ENV_MANAGER.FORMAT_VERSION_HISTORY(
|
||||
pPackageName => 'DATA_EXPORTER',
|
||||
pVersionHistory => VERSION_HISTORY
|
||||
);
|
||||
END GET_VERSION_HISTORY;
|
||||
|
||||
----------------------------------------------------------------------------------------------------
|
||||
|
||||
END;
|
||||
/
|
||||
Reference in New Issue
Block a user