This commit is contained in:
Grzegorz Michalski
2026-02-02 10:59:29 +01:00
commit ecd833f682
679 changed files with 122717 additions and 0 deletions

View File

@@ -0,0 +1,25 @@
# MARS-826-PREHOOK Package - Git Ignore Rules
# Standard exclusions for MARS deployment packages
# Confluence documentation (generated, not source)
confluence/
# Log files from SPOOL operations
log/
*.log
# Test directories and files
test/
*_test.sql
# Mock data scripts (development only)
mock_data/
# Temporary files
*.tmp
*.bak
*~
# OS-specific files
.DS_Store
Thumbs.db

View File

@@ -0,0 +1,25 @@
-- ============================================================================
-- MARS-826-PREHOOK Installation Script 00: DATA_EXPORTER Package Specification Update
-- ============================================================================
-- Purpose: Deploy updated DATA_EXPORTER package specification with version 2.1.1
-- Schema: CT_MRDS
-- Object: PACKAGE SPECIFICATION DATA_EXPORTER
-- ============================================================================
SET SERVEROUTPUT ON SIZE UNLIMITED
PROMPT
PROMPT ============================================================================
PROMPT MARS-826-PREHOOK: Installing CT_MRDS.DATA_EXPORTER Package Specification
PROMPT ============================================================================
PROMPT Package: CT_MRDS.DATA_EXPORTER (SPECIFICATION)
PROMPT Version: 2.1.0 -> 2.1.1 (PATCH)
PROMPT Change: Updated package version and build date
PROMPT ============================================================================
-- Deploy package specification from new_version folder
@@new_version\DATA_EXPORTER.pkg
PROMPT
PROMPT Package specification deployment completed.
PROMPT

View File

@@ -0,0 +1,25 @@
-- ============================================================================
-- MARS-826-PREHOOK Installation Script 01: DATA_EXPORTER Package Body Update
-- ============================================================================
-- Purpose: Deploy updated DATA_EXPORTER package with A_ETL_LOAD_SET_KEY support
-- Schema: CT_MRDS
-- Object: PACKAGE BODY DATA_EXPORTER
-- ============================================================================
SET SERVEROUTPUT ON SIZE UNLIMITED
PROMPT
PROMPT ============================================================================
PROMPT MARS-826-PREHOOK: Installing CT_MRDS.DATA_EXPORTER Package Body
PROMPT ============================================================================
PROMPT Package: CT_MRDS.DATA_EXPORTER (BODY)
PROMPT Change: Updated JOIN column reference from A_WORKFLOW_HISTORY_KEY to A_ETL_LOAD_SET_KEY
PROMPT Impact: Fixes export procedures to work with renamed column in CT_ODS.A_LOAD_HISTORY
PROMPT ============================================================================
-- Deploy package body from new_version folder
@@new_version\DATA_EXPORTER.pkb
PROMPT
PROMPT Package deployment completed.
PROMPT

View File

@@ -0,0 +1,25 @@
-- ============================================================================
-- MARS-826-PREHOOK Rollback Script 92: DATA_EXPORTER Package Specification Rollback
-- ============================================================================
-- Purpose: Restore previous DATA_EXPORTER package specification (v2.1.0)
-- Schema: CT_MRDS
-- Object: PACKAGE SPECIFICATION DATA_EXPORTER
-- ============================================================================
SET SERVEROUTPUT ON SIZE UNLIMITED
PROMPT
PROMPT ============================================================================
PROMPT MARS-826-PREHOOK: Rolling Back CT_MRDS.DATA_EXPORTER Package Specification
PROMPT ============================================================================
PROMPT Package: CT_MRDS.DATA_EXPORTER (SPECIFICATION)
PROMPT Version: 2.1.1 -> 2.1.0 (Rollback)
PROMPT Source: current_version/DATA_EXPORTER.pkg (MARS-846)
PROMPT ============================================================================
-- Deploy previous package specification from current_version folder (v2.1.0)
@@current_version\DATA_EXPORTER.pkg
PROMPT
PROMPT Package specification rollback completed.
PROMPT

View File

@@ -0,0 +1,27 @@
-- ============================================================================
-- MARS-826-PREHOOK Rollback Script 91: Restore Previous DATA_EXPORTER Version
-- ============================================================================
-- Purpose: Rollback DATA_EXPORTER package to version with A_WORKFLOW_HISTORY_KEY
-- Schema: CT_MRDS
-- Object: PACKAGE BODY DATA_EXPORTER
-- WARNING: This will restore package to state BEFORE A_ETL_LOAD_SET_KEY support
-- ============================================================================
SET SERVEROUTPUT ON SIZE UNLIMITED
PROMPT
PROMPT ============================================================================
PROMPT MARS-826-PREHOOK ROLLBACK: Restoring Previous DATA_EXPORTER Package Body
PROMPT ============================================================================
PROMPT WARNING: This will restore package to use A_WORKFLOW_HISTORY_KEY
PROMPT This rollback is only valid if CT_ODS.A_LOAD_HISTORY column has been
PROMPT reverted to A_WORKFLOW_HISTORY_KEY (not A_ETL_LOAD_SET_KEY)
PROMPT ============================================================================
-- Deploy previous package body from current_version folder (v2.1.0)
@@current_version\DATA_EXPORTER.pkb
PROMPT
PROMPT Package rollback completed.
PROMPT Previous version restored (with A_WORKFLOW_HISTORY_KEY references).
PROMPT

View File

@@ -0,0 +1,223 @@
# MARS-826-PREHOOK: DATA_EXPORTER Package Update - Column Rename Support
## Overview
**Purpose**: Update DATA_EXPORTER package to support renamed column in CT_ODS.A_LOAD_HISTORY
**Type**: Pre-Hook Deployment (Required before MARS-826)
**Target Schema**: CT_MRDS
**Database Objects**: DATA_EXPORTER package body
## Background
### Issue
The CT_ODS.A_LOAD_HISTORY table has column `A_ETL_LOAD_SET_KEY` (PRIMARY KEY), but the DATA_EXPORTER package had hardcoded references to the old column name `A_WORKFLOW_HISTORY_KEY`, causing JOIN failures.
### Root Cause
Column name mismatch discovered during MARS-826 testing:
- **Database DDL**: `A_ETL_LOAD_SET_KEY` (correct, as per source definition)
- **DATA_EXPORTER package**: `L.A_WORKFLOW_HISTORY_KEY` (incorrect, outdated reference)
- **Impact**: All export procedures failed with `ORA-00904: "L"."A_WORKFLOW_HISTORY_KEY": invalid identifier`
### Solution
Updated DATA_EXPORTER.pkb to use correct column name `A_ETL_LOAD_SET_KEY` in all JOIN operations with CT_ODS.A_LOAD_HISTORY.
## Changes Made
### Package Version Update
**Previous Version**: 2.1.0
**New Version**: 2.1.1
**Change Type**: PATCH (Bug Fix)
**Build Date**: 2025-12-04 13:10:00
### Modified Files
1. **DATA_EXPORTER.pkg** (Package Specification)
- Updated PACKAGE_VERSION: '2.1.0' → '2.1.1'
- Updated PACKAGE_BUILD_DATE: '2025-12-04 13:10:00'
- Updated VERSION_HISTORY with v2.1.1 entry
2. **DATA_EXPORTER.pkb** (Package Body)
- Updated 4 JOIN clauses in dynamic SQL
- Updated example in documentation comment
### Specific Changes
**Location 1**: Line ~326 (EXPORT_TABLE_DATA_BY_DATE - partition query)
```sql
-- BEFORE:
WHERE T.' || ... || ' = L.A_WORKFLOW_HISTORY_KEY
-- AFTER:
WHERE T.' || ... || ' = L.A_ETL_LOAD_SET_KEY
```
**Location 2**: Line ~340 (EXPORT_TABLE_DATA_BY_DATE - export query)
```sql
-- BEFORE:
WHERE T.' || ... || ' = L.A_WORKFLOW_HISTORY_KEY
-- AFTER:
WHERE T.' || ... || ' = L.A_ETL_LOAD_SET_KEY
```
**Location 3**: Line ~613 (EXPORT_TABLE_DATA_TO_CSV_BY_DATE - partition query)
```sql
-- BEFORE:
WHERE T.' || ... || ' = L.A_WORKFLOW_HISTORY_KEY
-- AFTER:
WHERE T.' || ... || ' = L.A_ETL_LOAD_SET_KEY
```
**Location 4**: Line ~629 (EXPORT_TABLE_DATA_TO_CSV_BY_DATE - export query)
```sql
-- BEFORE:
WHERE T.' || ... || ' = L.A_WORKFLOW_HISTORY_KEY
-- AFTER:
WHERE T.' || ... || ' = L.A_ETL_LOAD_SET_KEY
```
**Location 5**: Line ~398 (Documentation example)
```sql
-- BEFORE:
pKeyColumnName => 'A_WORKFLOW_HISTORY_KEY',
-- AFTER:
pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK',
```
## Deployment
### Prerequisites
- Database connection: CT_MRDS schema
- Required privileges: ALTER PACKAGE on CT_MRDS.DATA_EXPORTER
- Estimated time: 1-2 minutes
### Installation Steps
The master installation script performs these operations in sequence:
1. **Deploy Package Specification** - Install updated DATA_EXPORTER.pkg from new_version/ (v2.1.1)
2. **Deploy Package Body** - Install updated DATA_EXPORTER.pkb from new_version/ with column reference fixes
3. **Track Version** - Record package version using universal tracking script (Standard)
4. **Verify Status** - Check all tracked packages for untracked changes using universal verification script (Standard)
**Version Folders:**
- **current_version/** - Contains DATA_EXPORTER v2.1.0 (backup from MARS-846 for rollback)
- **new_version/** - Contains DATA_EXPORTER v2.1.1 (updated with A_ETL_LOAD_SET_KEY fixes)
### Execution
1. Connect to database as CT_MRDS user
2. Execute `install_mars826_prehook.sql`
3. Verify package compilation status
4. Test export functionality
### Verification
```sql
-- Check package compilation status
SELECT object_name, object_type, status
FROM user_objects
WHERE object_name = 'DATA_EXPORTER'
AND object_type = 'PACKAGE BODY';
-- Verify package version
SELECT CT_MRDS.DATA_EXPORTER.GET_VERSION() FROM DUAL;
-- Test export with new column reference
BEGIN
CT_MRDS.DATA_EXPORTER.EXPORT_TABLE_DATA_BY_DATE(
pSchemaName => 'OU_LM',
pTableName => 'ADHOC_ADJ_HEADER',
pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK',
pBucketArea => 'ARCHIVE',
pFolderName => 'TEST_EXPORT'
);
END;
/
```
### Rollback
Execute `rollback_mars826_prehook.sql` to restore previous package version.
## Dependencies
### Required by MARS-826
**CRITICAL**: This package must be deployed **BEFORE** MARS-826 execution. MARS-826 export scripts depend on this updated DATA_EXPORTER package.
### Database Objects
- **Table**: CT_ODS.A_LOAD_HISTORY (must have column A_ETL_LOAD_SET_KEY)
- **Package**: CT_MRDS.ENV_MANAGER (for error handling and logging)
- **Package**: CT_MRDS.FILE_MANAGER (for bucket URI resolution)
## Configuration Changes
### Bucket Configuration
**Note**: During testing, discovered bucket configuration issue:
- Original: `ArchiveBucketName = 'archive'` (bucket did not exist)
- Fixed: `ArchiveBucketName = 'history'` (correct bucket name)
This configuration fix is **NOT** part of this MARS package but was applied directly to database:
```sql
UPDATE CT_MRDS.A_FILE_MANAGER_CONFIG
SET CONFIG_VARIABLE_VALUE = 'history'
WHERE CONFIG_VARIABLE = 'ArchiveBucketName';
COMMIT;
```
## Testing Results
### Test Case 1: Single Table Export
**Table**: OU_LM.ADHOC_ADJ_HEADER
**Result**: ✅ SUCCESS
**Output**: 2 Parquet files created with Hive-style partitioning
```
LM_ADHOC_ADJUSTMENTS_HEADER/PARTITION_YEAR=2025/PARTITION_MONTH=08/202508_1_*.parquet (1,433 bytes)
LM_ADHOC_ADJUSTMENTS_HEADER/PARTITION_YEAR=2025/PARTITION_MONTH=09/202509_1_*.parquet (1,432 bytes)
```
### Test Case 2: Package Compilation
**Status**: ✅ VALID
**Compilation Time**: < 1 second
**Errors**: None
## Impact Analysis
### Affected Procedures
1. `EXPORT_TABLE_DATA_BY_DATE` - Main export with Parquet partitioning
2. `EXPORT_TABLE_DATA_TO_CSV_BY_DATE` - CSV export with date filtering
### Tables Using A_ETL_LOAD_SET_KEY_FK
All tables in MARS-826 export scope (19 tables):
- OU_LM: 17 tables (ADHOC_ADJ_*, BALANCESHEET_*, CSM_ADJ_*, FORECAST_*, QR_ADJ_*, STANDING_FACILITY*, TTS_*)
- OU_MRR: 2 tables (IND_CURRENT_ACCOUNT, IND_OVERNIGHT_DEPOSITS)
### Backward Compatibility
⚠️ **BREAKING CHANGE**: This update is NOT backward compatible with databases where A_LOAD_HISTORY still uses `A_WORKFLOW_HISTORY_KEY` column name.
## Files Included
1. **README.md** - This documentation file
2. **.gitignore** - Git exclusions (confluence/, log/, test/, mock_data/)
3. **install_mars826_prehook.sql** - Master installation script with SPOOL logging (5 steps)
4. **rollback_mars826_prehook.sql** - Master rollback script (2 steps)
5. **00_MARS_826_PREHOOK_install_DATA_EXPORTER_SPEC.sql** - Deploy updated package specification
6. **01_MARS_826_PREHOOK_install_DATA_EXPORTER_BODY.sql** - Deploy updated package body
7. **02_MARS_826_PREHOOK_verify_package.sql** - Verify package compilation and test export
8. **track_package_versions.sql** - Universal version tracking script (Standard)
9. **verify_packages_version.sql** - Universal package verification script (Standard)
10. **91_MARS_826_PREHOOK_rollback_DATA_EXPORTER_BODY.sql** - Restore previous package body
11. **92_MARS_826_PREHOOK_rollback_DATA_EXPORTER_SPEC.sql** - Restore previous package specification
12. **current_version/** - Backup of DATA_EXPORTER v2.1.0 (from MARS-846)
13. **new_version/** - Updated DATA_EXPORTER v2.1.1 (with A_ETL_LOAD_SET_KEY fixes)
## Version History
- **v2.1.1** (2025-12-04): Fixed JOIN column reference A_WORKFLOW_HISTORY_KEY → A_ETL_LOAD_SET_KEY
- **v2.1.0** (2025-10-22): Added version tracking and PARTITION_YEAR/PARTITION_MONTH support
- **v2.0.0** (2025-10-01): Separated export functionality from FILE_MANAGER package
## Related JIRA Issues
- **MARS-826**: Export CSDB historical data to HIST bucket (requires this pre-hook)
## Author
Created by: Grzegorz Michalski
Date: 2025-12-04
Schema: CT_MRDS

View File

@@ -0,0 +1,708 @@
create or replace PACKAGE BODY CT_MRDS.DATA_EXPORTER
AS
----------------------------------------------------------------------------------------------------
PROCEDURE EXPORT_TABLE_DATA (
pSchemaName IN VARCHAR2,
pTableName IN VARCHAR2,
pKeyColumnName IN VARCHAR2,
pBucketArea IN VARCHAR2,
pFolderName IN VARCHAR2,
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
)
IS
-- Type definition for key values
TYPE key_value_tab IS TABLE OF VARCHAR2(4000);
vKeyValues key_value_tab;
vCount INTEGER;
vSql VARCHAR2(4000);
vKeyValue VARCHAR2(4000);
vQuery VARCHAR2(32767);
vUri VARCHAR2(4000);
vDataType VARCHAR2(30);
vTableName VARCHAR2(128);
vSchemaName VARCHAR2(128);
vKeyColumnName VARCHAR2(128);
vParameters VARCHAR2(4000);
vBucketUri VARCHAR2(4000);
-- Function to sanitize file names
FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS
vFilename VARCHAR2(1000);
BEGIN
-- Replace any disallowed characters with underscores
vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_');
RETURN vFilename;
END sanitizeFilename;
BEGIN
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
,'pTableName => '''||nvl(pTableName, 'NULL')||''''
,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||''''
,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||''''
,'pFolderName => '''||nvl(pFolderName, 'NULL')||''''
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
));
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
-- Get bucket URI based on bucket area using FILE_MANAGER function
vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea);
-- Convert table and column names to uppercase to match data dictionary
vTableName := UPPER(pTableName);
vSchemaName := UPPER(pSchemaName);
vKeyColumnName := UPPER(pKeyColumnName);
-- Check if table exists
SELECT COUNT(*) INTO vCount
FROM all_tables
WHERE table_name = vTableName
AND owner = vSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS);
END IF;
-- Check if key column exists
SELECT COUNT(*) INTO vCount
FROM all_tab_columns
WHERE table_name = vTableName
AND column_name = vKeyColumnName
AND owner = vSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
END IF;
-- Get the data type of the key column
SELECT data_type INTO vDataType
FROM all_tab_columns
WHERE table_name = vTableName
AND column_name = vKeyColumnName
AND owner = vSchemaName;
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName);
-- Fetch unique key values
vSql := 'SELECT DISTINCT ' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) ||
' FROM ' || vTableName;
EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValues;
-- Loop over each unique key value
FOR i IN 1 .. vKeyValues.COUNT LOOP
vKeyValue := vKeyValues(i);
-- Construct the query to extract data for the current key value
IF vDataType IN ('VARCHAR2', 'CHAR', 'NCHAR', 'NVARCHAR2') THEN
vQuery := 'SELECT * FROM ' || vTableName ||
' WHERE ' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = ' || CHR(39) || vKeyValue || CHR(39);
ELSIF vDataType IN ('NUMBER', 'FLOAT', 'BINARY_FLOAT', 'BINARY_DOUBLE') THEN
vQuery := 'SELECT * FROM ' || vTableName ||
' WHERE ' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = ' || vKeyValue;
ELSIF vDataType LIKE 'TIMESTAMP%' OR vDataType = 'DATE' THEN
vQuery := 'SELECT * FROM ' || vTableName ||
' WHERE ' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) ||
' = TO_TIMESTAMP(' || CHR(39) || vKeyValue || CHR(39) ||', ''YYYY-MM-DD HH24:MI:SS.FF'')';
ELSE
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNSUPPORTED_DATA_TYPE, ENV_MANAGER.MSG_UNSUPPORTED_DATA_TYPE);
END IF;
-- Construct the URI for the file in OCI Object Storage
vUri := vBucketUri ||
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
sanitizeFilename(vKeyValue) || '.csv';
-- Use DBMS_CLOUD package to export data to the URI
DBMS_CLOUD.EXPORT_DATA(
credential_name => pCredentialName,
file_uri_list => vUri,
query => vQuery,
format => json_object('type' VALUE 'CSV', 'header' VALUE true)
);
END LOOP;
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
EXCEPTION
WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN
vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp);
WHEN ENV_MANAGER.ERR_UNSUPPORTED_DATA_TYPE THEN
vgMsgTmp := ENV_MANAGER.MSG_UNSUPPORTED_DATA_TYPE || ' vDataType: '||vDataType;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNSUPPORTED_DATA_TYPE, vgMsgTmp);
WHEN OTHERS THEN
-- Log complete error details including full stack trace and backtrace
ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER');
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
END EXPORT_TABLE_DATA;
----------------------------------------------------------------------------------------------------
PROCEDURE EXPORT_TABLE_DATA_BY_DATE (
pSchemaName IN VARCHAR2,
pTableName IN VARCHAR2,
pKeyColumnName IN VARCHAR2,
pBucketArea IN VARCHAR2,
pFolderName IN VARCHAR2,
pColumnList IN VARCHAR2 default NULL,
pMinDate IN DATE default DATE '1900-01-01',
pMaxDate IN DATE default SYSDATE,
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
)
IS
-- Type definition for key values
TYPE key_value_tab IS TABLE OF VARCHAR2(4000);
vKeyValuesYear key_value_tab;
vKeyValuesMonth key_value_tab;
vCount INTEGER;
vSql VARCHAR2(32000);
vKeyValueYear VARCHAR2(4000);
vKeyValueMonth VARCHAR2(4000);
vQuery VARCHAR2(32767);
vUri VARCHAR2(4000);
vDataType VARCHAR2(30);
vTableName VARCHAR2(128);
vSchemaName VARCHAR2(128);
vKeyColumnName VARCHAR2(128);
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
vProcessedColumnList VARCHAR2(32767);
vBucketUri VARCHAR2(4000);
vCurrentCol VARCHAR2(128);
-- Function to sanitize file names
FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS
vFilename VARCHAR2(1000);
BEGIN
-- Replace any disallowed characters with underscores
vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_');
RETURN vFilename;
END sanitizeFilename;
-- Function to add T. prefix to column names
FUNCTION addTablePrefix(pColumnList IN VARCHAR2) RETURN VARCHAR2 IS
vResult VARCHAR2(32767);
vColumns VARCHAR2(32767);
vPos PLS_INTEGER;
vNextPos PLS_INTEGER;
vCurrentCol VARCHAR2(128);
BEGIN
IF pColumnList IS NULL THEN
RETURN 'T.*';
END IF;
-- Remove extra spaces and convert to uppercase
vColumns := UPPER(REPLACE(pColumnList, ' ', ''));
vPos := 1;
vResult := '';
-- Parse comma-separated column list and add T. prefix
WHILE vPos <= LENGTH(vColumns) LOOP
vNextPos := INSTR(vColumns, ',', vPos);
IF vNextPos = 0 THEN
vNextPos := LENGTH(vColumns) + 1;
END IF;
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
-- Add T. prefix if not already present
IF INSTR(vCurrentCol, '.') = 0 THEN
vCurrentCol := 'T.' || vCurrentCol;
END IF;
-- Add to result with comma separator
IF vResult IS NOT NULL THEN
vResult := vResult || ', ';
END IF;
vResult := vResult || vCurrentCol;
vPos := vNextPos + 1;
END LOOP;
RETURN vResult;
END addTablePrefix;
BEGIN
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
,'pTableName => '''||nvl(pTableName, 'NULL')||''''
,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||''''
,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||''''
,'pFolderName => '''||nvl(pFolderName, 'NULL')||''''
,'pColumnList => '''||nvl(pColumnList, 'NULL')||''''
,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
));
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
-- Get bucket URI based on bucket area using FILE_MANAGER function
vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea);
-- Convert table and column names to uppercase to match data dictionary
vTableName := UPPER(pTableName);
vSchemaName := UPPER(pSchemaName);
vKeyColumnName := UPPER(pKeyColumnName);
-- Check if table exists
SELECT COUNT(*) INTO vCount
FROM all_tables
WHERE table_name = vTableName
AND owner = vSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS);
END IF;
-- Check if key column exists
SELECT COUNT(*) INTO vCount
FROM all_tab_columns
WHERE table_name = vTableName
AND column_name = vKeyColumnName
AND owner = vSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
END IF;
-- Validate pColumnList - check if all column names exist in the table
IF pColumnList IS NOT NULL THEN
DECLARE
vColumnName VARCHAR2(128);
vColumns VARCHAR2(32767);
vPos PLS_INTEGER;
vNextPos PLS_INTEGER;
vCurrentCol VARCHAR2(128);
BEGIN
-- Remove spaces and convert to uppercase for processing
vColumns := UPPER(REPLACE(pColumnList, ' ', ''));
vPos := 1;
-- Parse comma-separated column list
WHILE vPos <= LENGTH(vColumns) LOOP
vNextPos := INSTR(vColumns, ',', vPos);
IF vNextPos = 0 THEN
vNextPos := LENGTH(vColumns) + 1;
END IF;
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
-- Remove table alias prefix if present (e.g., 'T.COLUMN_NAME' -> 'COLUMN_NAME')
IF INSTR(vCurrentCol, '.') > 0 THEN
vCurrentCol := SUBSTR(vCurrentCol, INSTR(vCurrentCol, '.') + 1);
END IF;
-- Check if column exists in the table
SELECT COUNT(*) INTO vCount
FROM all_tab_columns
WHERE table_name = vTableName
AND column_name = vCurrentCol
AND owner = vSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
END IF;
vPos := vNextPos + 1;
END LOOP;
END;
END IF;
-- Process column list to add T. prefix to each column
vProcessedColumnList := addTablePrefix(pColumnList);
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName);
-- Fetch unique key values
vSql := 'SELECT DISTINCT TO_CHAR(L.LOAD_START,''YYYY'') AS YR, TO_CHAR(L.LOAD_START,''MM'') AS MN
FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L
WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_WORKFLOW_HISTORY_KEY
AND L.LOAD_START >= :pMinDate
AND L.LOAD_START < :pMaxDate
' ;
EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValuesYear, vKeyValuesMonth USING pMinDate, pMaxDate;
-- Loop over each unique key value
FOR i IN 1 .. vKeyValuesYear.COUNT LOOP
vKeyValueYear := vKeyValuesYear(i);
vKeyValueMonth := vKeyValuesMonth(i);
-- Construct the query to extract data for the current key value
vQuery := 'SELECT ' || vProcessedColumnList || '
FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L
WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_WORKFLOW_HISTORY_KEY
AND TO_CHAR(L.LOAD_START,''YYYY'') = ' || CHR(39) || vKeyValueYear || CHR(39) || '
AND TO_CHAR(L.LOAD_START,''MM'') = ' || CHR(39) || vKeyValueMonth || CHR(39) || '
AND L.LOAD_START >= TO_DATE(' || CHR(39) || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')
AND L.LOAD_START < TO_DATE(' || CHR(39) || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')';
-- Construct the URI for the file in OCI Object Storage
vUri := vBucketUri ||
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
'PARTITION_YEAR=' || sanitizeFilename(vKeyValueYear) || '/' ||
'PARTITION_MONTH=' || sanitizeFilename(vKeyValueMonth) || '/' ||
sanitizeFilename(vKeyValueYear) || sanitizeFilename(vKeyValueMonth) || '.parquet';
--DBMS_OUTPUT.PUT_LINE(vQuery);
-- Use DBMS_CLOUD package to export data to the URI
DBMS_CLOUD.EXPORT_DATA(
credential_name => pCredentialName,
file_uri_list => vUri,
query => vQuery,
format => json_object('type' VALUE 'parquet')
);
END LOOP;
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
EXCEPTION
WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN
vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp);
WHEN OTHERS THEN
-- Log complete error details including full stack trace and backtrace
ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER');
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
END EXPORT_TABLE_DATA_BY_DATE;
----------------------------------------------------------------------------------------------------
/**
* @name EXPORT_TABLE_DATA_TO_CSV_BY_DATE
* @desc Exports data to a single CSV file with date filtering.
* Unlike EXPORT_TABLE_DATA_BY_DATE, this procedure creates one CSV file
* instead of multiple Parquet files partitioned by year/month.
* Uses the same date filtering mechanism with CT_ODS.A_LOAD_HISTORY.
* Allows specifying custom column list or uses T.* if pColumnList is NULL.
* Validates that all columns in pColumnList exist in the target table.
* Automatically adds 'T.' prefix to column names in pColumnList.
* @example
* begin
* DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
* pSchemaName => 'CT_MRDS',
* pTableName => 'MY_TABLE',
* pKeyColumnName => 'A_WORKFLOW_HISTORY_KEY',
* pBucketArea => 'DATA',
* pFolderName => 'exports',
* pFileName => 'my_export.csv',
* pColumnList => 'COLUMN1, COLUMN2, COLUMN3', -- Optional
* pMinDate => DATE '2024-01-01',
* pMaxDate => SYSDATE
* );
* end;
**/
PROCEDURE EXPORT_TABLE_DATA_TO_CSV_BY_DATE (
pSchemaName IN VARCHAR2,
pTableName IN VARCHAR2,
pKeyColumnName IN VARCHAR2,
pBucketArea IN VARCHAR2,
pFolderName IN VARCHAR2,
pFileName IN VARCHAR2 DEFAULT NULL,
pColumnList IN VARCHAR2 default NULL,
pMinDate IN DATE default DATE '1900-01-01',
pMaxDate IN DATE default SYSDATE,
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
)
IS
-- Type definition for key values
TYPE key_value_tab IS TABLE OF VARCHAR2(4000);
vKeyValuesYear key_value_tab;
vKeyValuesMonth key_value_tab;
vCount INTEGER;
vSql VARCHAR2(4000);
vKeyValueYear VARCHAR2(4000);
vKeyValueMonth VARCHAR2(4000);
vQuery VARCHAR2(32767);
vUri VARCHAR2(4000);
vDataType VARCHAR2(30);
vTableName VARCHAR2(128);
vSchemaName VARCHAR2(128);
vKeyColumnName VARCHAR2(128);
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
vFileBaseName VARCHAR2(4000);
vFileExtension VARCHAR2(10);
vProcessedColumnList VARCHAR2(32767);
vBucketUri VARCHAR2(4000);
vCurrentCol VARCHAR2(128);
-- Function to sanitize file names
FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS
vFilename VARCHAR2(1000);
BEGIN
-- Replace any disallowed characters with underscores
vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_');
RETURN vFilename;
END sanitizeFilename;
-- Function to add T. prefix to column names
FUNCTION addTablePrefix(pColumnList IN VARCHAR2) RETURN VARCHAR2 IS
vResult VARCHAR2(32767);
vColumns VARCHAR2(32767);
vPos PLS_INTEGER;
vNextPos PLS_INTEGER;
vCurrentCol VARCHAR2(128);
BEGIN
IF pColumnList IS NULL THEN
RETURN 'T.*';
END IF;
-- Remove extra spaces and convert to uppercase
vColumns := UPPER(REPLACE(pColumnList, ' ', ''));
vPos := 1;
vResult := '';
-- Parse comma-separated column list and add T. prefix
WHILE vPos <= LENGTH(vColumns) LOOP
vNextPos := INSTR(vColumns, ',', vPos);
IF vNextPos = 0 THEN
vNextPos := LENGTH(vColumns) + 1;
END IF;
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
-- Add T. prefix if not already present
IF INSTR(vCurrentCol, '.') = 0 THEN
vCurrentCol := 'T.' || vCurrentCol;
END IF;
-- Add to result with comma separator
IF vResult IS NOT NULL THEN
vResult := vResult || ', ';
END IF;
vResult := vResult || vCurrentCol;
vPos := vNextPos + 1;
END LOOP;
RETURN vResult;
END addTablePrefix;
BEGIN
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
,'pTableName => '''||nvl(pTableName, 'NULL')||''''
,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||''''
,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||''''
,'pFolderName => '''||nvl(pFolderName, 'NULL')||''''
,'pFileName => '''||nvl(pFileName, 'NULL')||''''
,'pColumnList => '''||nvl(pColumnList, 'NULL')||''''
,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
));
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
-- Get bucket URI based on bucket area using FILE_MANAGER function
vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea);
-- Convert table and column names to uppercase to match data dictionary
vTableName := UPPER(pTableName);
vSchemaName := UPPER(pSchemaName);
vKeyColumnName := UPPER(pKeyColumnName);
-- Extract base filename and extension or construct default filename
IF pFileName IS NOT NULL THEN
-- Use provided filename
IF INSTR(pFileName, '.') > 0 THEN
vFileBaseName := SUBSTR(pFileName, 1, INSTR(pFileName, '.', -1) - 1);
vFileExtension := SUBSTR(pFileName, INSTR(pFileName, '.', -1));
ELSE
vFileBaseName := pFileName;
vFileExtension := '.csv';
END IF;
ELSE
-- Construct default filename: TABLENAME.csv (without date range)
vFileBaseName := UPPER(pTableName);
vFileExtension := '.csv';
END IF;
-- Check if table exists
SELECT COUNT(*) INTO vCount
FROM all_tables
WHERE table_name = vTableName
AND owner = vSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS);
END IF;
-- Check if key column exists
SELECT COUNT(*) INTO vCount
FROM all_tab_columns
WHERE table_name = vTableName
AND column_name = vKeyColumnName
AND owner = vSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
END IF;
-- Validate pColumnList - check if all column names exist in the table
IF pColumnList IS NOT NULL THEN
DECLARE
vColumnName VARCHAR2(128);
vColumns VARCHAR2(32767);
vPos PLS_INTEGER;
vNextPos PLS_INTEGER;
vCurrentCol VARCHAR2(128);
BEGIN
-- Remove spaces and convert to uppercase for processing
vColumns := UPPER(REPLACE(pColumnList, ' ', ''));
vPos := 1;
-- Parse comma-separated column list
WHILE vPos <= LENGTH(vColumns) LOOP
vNextPos := INSTR(vColumns, ',', vPos);
IF vNextPos = 0 THEN
vNextPos := LENGTH(vColumns) + 1;
END IF;
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
-- Remove table alias prefix if present (e.g., 'T.COLUMN_NAME' -> 'COLUMN_NAME')
IF INSTR(vCurrentCol, '.') > 0 THEN
vCurrentCol := SUBSTR(vCurrentCol, INSTR(vCurrentCol, '.') + 1);
END IF;
-- Check if column exists in the table
SELECT COUNT(*) INTO vCount
FROM all_tab_columns
WHERE table_name = vTableName
AND column_name = vCurrentCol
AND owner = vSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
END IF;
vPos := vNextPos + 1;
END LOOP;
END;
END IF;
-- Process column list to add T. prefix to each column
vProcessedColumnList := addTablePrefix(pColumnList);
-- Get the data type of the key column
SELECT data_type INTO vDataType
FROM all_tab_columns
WHERE table_name = vTableName
AND column_name = vKeyColumnName
AND owner = vSchemaName;
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName);
-- Fetch unique year/month combinations
vSql := 'SELECT DISTINCT TO_CHAR(L.LOAD_START,''YYYY'') AS YR, TO_CHAR(L.LOAD_START,''MM'') AS MN
FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L
WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_WORKFLOW_HISTORY_KEY
AND L.LOAD_START >= :pMinDate
AND L.LOAD_START < :pMaxDate
' ;
EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValuesYear, vKeyValuesMonth USING pMinDate, pMaxDate;
ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValuesYear.COUNT || ' year/month combinations to export', 'INFO', vParameters);
-- Loop over each unique year/month combination
FOR i IN 1 .. vKeyValuesYear.COUNT LOOP
vKeyValueYear := vKeyValuesYear(i);
vKeyValueMonth := vKeyValuesMonth(i);
-- Construct the query to extract data for the current year/month
vQuery := 'SELECT ' || vProcessedColumnList || '
FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L
WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_WORKFLOW_HISTORY_KEY
AND TO_CHAR(L.LOAD_START,''YYYY'') = ' || CHR(39) || vKeyValueYear || CHR(39) || '
AND TO_CHAR(L.LOAD_START,''MM'') = ' || CHR(39) || vKeyValueMonth || CHR(39) || '
AND L.LOAD_START >= TO_DATE(' || CHR(39) || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')
AND L.LOAD_START < TO_DATE(' || CHR(39) || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')';
-- Construct the URI for the CSV file in OCI Object Storage
vUri := vBucketUri ||
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
sanitizeFilename(vFileBaseName) || '_' ||
sanitizeFilename(vKeyValueYear) || sanitizeFilename(vKeyValueMonth) ||
vFileExtension;
ENV_MANAGER.LOG_PROCESS_EVENT('Exporting to CSV file: ' || vUri, 'INFO', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Year/Month: ' || vKeyValueYear || '/' || vKeyValueMonth, 'DEBUG', vParameters);
-- Use DBMS_CLOUD package to export data to CSV file
DBMS_CLOUD.EXPORT_DATA(
credential_name => pCredentialName,
file_uri_list => vUri,
query => vQuery,
format => json_object('type' VALUE 'CSV', 'header' VALUE true)
);
END LOOP;
ENV_MANAGER.LOG_PROCESS_EVENT('Export completed successfully for ' || vKeyValuesYear.COUNT || ' files', 'INFO', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
EXCEPTION
WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN
vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp);
WHEN OTHERS THEN
-- Log complete error details including full stack trace and backtrace
ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER');
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
END EXPORT_TABLE_DATA_TO_CSV_BY_DATE;
----------------------------------------------------------------------------------------------------
-- VERSION MANAGEMENT FUNCTIONS
----------------------------------------------------------------------------------------------------
FUNCTION GET_VERSION RETURN VARCHAR2 IS
BEGIN
RETURN PACKAGE_VERSION;
END GET_VERSION;
----------------------------------------------------------------------------------------------------
FUNCTION GET_BUILD_INFO RETURN VARCHAR2 IS
BEGIN
RETURN ENV_MANAGER.GET_PACKAGE_VERSION_INFO(
pPackageName => 'DATA_EXPORTER',
pVersion => PACKAGE_VERSION,
pBuildDate => PACKAGE_BUILD_DATE,
pAuthor => PACKAGE_AUTHOR
);
END GET_BUILD_INFO;
----------------------------------------------------------------------------------------------------
FUNCTION GET_VERSION_HISTORY RETURN VARCHAR2 IS
BEGIN
RETURN ENV_MANAGER.FORMAT_VERSION_HISTORY(
pPackageName => 'DATA_EXPORTER',
pVersionHistory => VERSION_HISTORY
);
END GET_VERSION_HISTORY;
----------------------------------------------------------------------------------------------------
END;
/

View File

@@ -0,0 +1,163 @@
create or replace PACKAGE CT_MRDS.DATA_EXPORTER
AUTHID CURRENT_USER
AS
/**
* Data Export Package: Provides comprehensive data export capabilities to various formats (CSV, Parquet)
* with support for cloud storage integration via Oracle Cloud Infrastructure (OCI).
* The structure of comment is used by GET_PACKAGE_DOCUMENTATION function
* which returns documentation text for confluence page (to Copy-Paste it).
**/
-- Package Version Information
PACKAGE_VERSION CONSTANT VARCHAR2(10) := '2.1.0';
PACKAGE_BUILD_DATE CONSTANT VARCHAR2(19) := '2025-10-22 15:00:00';
PACKAGE_AUTHOR CONSTANT VARCHAR2(50) := 'MRDS Development Team';
-- Version History (last 3-5 changes)
VERSION_HISTORY CONSTANT VARCHAR2(4000) :=
'v2.1.0 (2025-10-22): Added version tracking and PARTITION_YEAR/PARTITION_MONTH support' || CHR(10) ||
'v2.0.0 (2025-10-01): Separated export functionality from FILE_MANAGER package' || CHR(10) ||
'v1.0.0 (2025-09-15): Initial implementation within FILE_MANAGER package' || CHR(10);
cgBL CONSTANT VARCHAR2(2) := CHR(13)||CHR(10);
vgMsgTmp VARCHAR2(32000);
---------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------------------------------------
/**
* @name EXPORT_TABLE_DATA
* @desc Wrapper procedure for DBMS_CLOUD.EXPORT_DATA.
* Exports data into CSV file on OCI infrustructure.
* pBucketArea parameter accepts: 'INBOX', 'ODS', 'DATA', 'ARCHIVE'
* @example
* begin
* DATA_EXPORTER.EXPORT_TABLE_DATA(
* pSchemaName => 'CT_MRDS',
* pTableName => 'MY_TABLE',
* pKeyColumnName => 'A_WORKFLOW_HISTORY_KEY',
* pBucketArea => 'DATA',
* pFolderName => 'csv_exports'
* );
* end;
**/
PROCEDURE EXPORT_TABLE_DATA (
pSchemaName IN VARCHAR2,
pTableName IN VARCHAR2,
pKeyColumnName IN VARCHAR2,
pBucketArea IN VARCHAR2,
pFolderName IN VARCHAR2,
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
);
/**
* @name EXPORT_TABLE_DATA_BY_DATE
* @desc Wrapper procedure for DBMS_CLOUD.EXPORT_DATA.
* Exports data into PARQUET files on OCI infrustructure.
* Each YEAR_MONTH pair goes to seperate file (implicit partitioning).
* Allows specifying custom column list or uses T.* if pColumnList is NULL.
* Validates that all columns in pColumnList exist in the target table.
* Automatically adds 'T.' prefix to column names in pColumnList.
* pBucketArea parameter accepts: 'INBOX', 'ODS', 'DATA', 'ARCHIVE'
* @example
* begin
* DATA_EXPORTER.EXPORT_TABLE_DATA_BY_DATE(
* pSchemaName => 'CT_MRDS',
* pTableName => 'MY_TABLE',
* pKeyColumnName => 'A_WORKFLOW_HISTORY_KEY',
* pBucketArea => 'DATA',
* pFolderName => 'parquet_exports',
* pColumnList => 'COLUMN1, COLUMN2, COLUMN3', -- Optional
* pMinDate => DATE '2024-01-01',
* pMaxDate => SYSDATE
* );
* end;
**/
PROCEDURE EXPORT_TABLE_DATA_BY_DATE (
pSchemaName IN VARCHAR2,
pTableName IN VARCHAR2,
pKeyColumnName IN VARCHAR2,
pBucketArea IN VARCHAR2,
pFolderName IN VARCHAR2,
pColumnList IN VARCHAR2 default NULL,
pMinDate IN DATE default DATE '1900-01-01',
pMaxDate IN DATE default SYSDATE,
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
);
/**
* @name EXPORT_TABLE_DATA_TO_CSV_BY_DATE
* @desc Exports data to separate CSV files partitioned by year and month.
* Creates one CSV file for each year/month combination found in the data.
* Uses the same date filtering mechanism with CT_ODS.A_LOAD_HISTORY as EXPORT_TABLE_DATA_BY_DATE,
* but exports to CSV format instead of Parquet.
* File naming pattern: {pFileName}_YYYYMM.csv or {TABLENAME}_YYYYMM.csv (if pFileName is NULL)
* @example
* begin
* -- With custom filename
* DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
* pSchemaName => 'CT_MRDS',
* pTableName => 'MY_TABLE',
* pKeyColumnName => 'A_WORKFLOW_HISTORY_KEY',
* pBucketArea => 'DATA',
* pFolderName => 'exports',
* pFileName => 'my_export.csv',
* pMinDate => DATE '2024-01-01',
* pMaxDate => SYSDATE
* );
*
* -- With auto-generated filename (based on table name only)
* DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
* pSchemaName => 'OU_TOP',
* pTableName => 'AGGREGATED_ALLOTMENT',
* pKeyColumnName => 'A_WORKFLOW_HISTORY_KEY',
* pBucketArea => 'ARCHIVE',
* pFolderName => 'exports',
* pMinDate => DATE '2025-09-01',
* pMaxDate => DATE '2025-09-17'
* );
* -- This will create files like: AGGREGATED_ALLOTMENT_202509.csv, etc.
* pBucketArea parameter accepts: 'INBOX', 'ODS', 'DATA', 'ARCHIVE'
* end;
**/
PROCEDURE EXPORT_TABLE_DATA_TO_CSV_BY_DATE (
pSchemaName IN VARCHAR2,
pTableName IN VARCHAR2,
pKeyColumnName IN VARCHAR2,
pBucketArea IN VARCHAR2,
pFolderName IN VARCHAR2,
pFileName IN VARCHAR2 DEFAULT NULL,
pColumnList IN VARCHAR2 default NULL,
pMinDate IN DATE default DATE '1900-01-01',
pMaxDate IN DATE default SYSDATE,
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
);
---------------------------------------------------------------------------------------------------------------------------
-- VERSION MANAGEMENT FUNCTIONS
---------------------------------------------------------------------------------------------------------------------------
/**
* Returns the current package version number
* return: Version string in format X.Y.Z (e.g., '2.1.0')
**/
FUNCTION GET_VERSION RETURN VARCHAR2;
/**
* Returns comprehensive build information including version, date, and author
* return: Formatted string with complete build details
**/
FUNCTION GET_BUILD_INFO RETURN VARCHAR2;
/**
* Returns the version history with recent changes
* return: Multi-line string with version history
**/
FUNCTION GET_VERSION_HISTORY RETURN VARCHAR2;
END;
/

View File

@@ -0,0 +1,89 @@
-- ============================================================================
-- MARS-826-PREHOOK Master Installation Script
-- ============================================================================
-- Purpose: Deploy updated DATA_EXPORTER package with A_ETL_LOAD_SET_KEY support
-- Target Schema: CT_MRDS
-- Estimated Time: 1-2 minutes
-- Prerequisites: Database connection to CT_MRDS schema
-- ============================================================================
SET SERVEROUTPUT ON SIZE UNLIMITED
SET VERIFY OFF
SET FEEDBACK ON
SET ECHO OFF
-- Create log directory if it doesn't exist
host mkdir log 2>nul
-- Generate dynamic SPOOL filename with timestamp
var filename VARCHAR2(100)
BEGIN
:filename := 'log/INSTALL_MARS_826_PREHOOK_' || SYS_CONTEXT('USERENV', 'CON_NAME') || '_' || TO_CHAR(SYSDATE,'YYYYMMDD_HH24MISS') || '.log';
END;
/
column filename new_value _filename
select :filename filename from dual;
spool &_filename
PROMPT
PROMPT ============================================================================
PROMPT MARS-826-PREHOOK Installation Starting
PROMPT ============================================================================
PROMPT Package: CT_MRDS.DATA_EXPORTER
PROMPT Change: Column reference A_WORKFLOW_HISTORY_KEY -> A_ETL_LOAD_SET_KEY
PROMPT Purpose: Support for renamed column in CT_ODS.A_LOAD_HISTORY
PROMPT Timestamp:
SELECT TO_CHAR(SYSDATE, 'YYYY-MM-DD HH24:MI:SS') AS install_start FROM DUAL;
PROMPT ============================================================================
-- Confirm installation with user
ACCEPT continue CHAR PROMPT 'Type YES to continue with installation, or Ctrl+C to abort: '
WHENEVER SQLERROR EXIT SQL.SQLCODE
BEGIN
IF '&continue' IS NULL OR TRIM('&continue') IS NULL OR UPPER(TRIM('&continue')) != 'YES' THEN
RAISE_APPLICATION_ERROR(-20001, 'Installation aborted by user');
END IF;
END;
/
WHENEVER SQLERROR CONTINUE
-- Installation steps
PROMPT
PROMPT Step 1/5: Deploying DATA_EXPORTER Package Specification
PROMPT ========================================================
@@00_MARS_826_PREHOOK_install_DATA_EXPORTER_SPEC.sql
PROMPT
PROMPT Step 2/4: Deploying DATA_EXPORTER Package Body
PROMPT ===============================================
@@01_MARS_826_PREHOOK_install_DATA_EXPORTER_BODY.sql
PROMPT
PROMPT Step 3/4: Tracking Package Version
PROMPT =====================================
@@track_package_versions.sql
PROMPT
PROMPT Step 4/4: Verifying Package Status
PROMPT ======================================
@@verify_packages_version.sql
PROMPT
PROMPT ============================================================================
PROMPT MARS-826-PREHOOK Installation Completed
PROMPT ============================================================================
PROMPT Completion Time:
SELECT TO_CHAR(SYSDATE, 'YYYY-MM-DD HH24:MI:SS') AS install_end FROM DUAL;
PROMPT
PROMPT Installation Summary:
PROMPT - Package: CT_MRDS.DATA_EXPORTER
PROMPT - Version: 2.1.0 -> 2.1.1 (PATCH)
PROMPT - Change: A_WORKFLOW_HISTORY_KEY -> A_ETL_LOAD_SET_KEY
PROMPT
PROMPT
PROMPT Log file: &_filename
PROMPT ============================================================================
spool off
quit;

View File

@@ -0,0 +1,733 @@
create or replace PACKAGE BODY CT_MRDS.DATA_EXPORTER
AS
-- Internal shared function to process column list with T. prefix and key column mapping
FUNCTION processColumnList(pColumnList IN VARCHAR2, pTableName IN VARCHAR2, pSchemaName IN VARCHAR2, pKeyColumnName IN VARCHAR2) RETURN VARCHAR2 IS
vResult VARCHAR2(32767);
vColumns VARCHAR2(32767);
vPos PLS_INTEGER;
vNextPos PLS_INTEGER;
vCurrentCol VARCHAR2(128);
vAllCols VARCHAR2(32767);
BEGIN
IF pColumnList IS NULL THEN
-- Build list of all columns
SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id)
INTO vAllCols
FROM all_tab_columns
WHERE table_name = pTableName
AND owner = pSchemaName;
-- Add T. prefix to all columns
vResult := 'T.' || REPLACE(vAllCols, ', ', ', T.');
-- Replace key column with aliased version (e.g., T.A_ETL_LOAD_SET_KEY_FK AS A_WORKFLOW_HISTORY_KEY)
vResult := REPLACE(vResult, 'T.' || pKeyColumnName, 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY');
RETURN vResult;
END IF;
-- Remove extra spaces and convert to uppercase
vColumns := UPPER(REPLACE(pColumnList, ' ', ''));
vPos := 1;
vResult := '';
-- Parse comma-separated column list and add T. prefix
WHILE vPos <= LENGTH(vColumns) LOOP
vNextPos := INSTR(vColumns, ',', vPos);
IF vNextPos = 0 THEN
vNextPos := LENGTH(vColumns) + 1;
END IF;
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
-- Check if this is the key column (e.g., A_ETL_LOAD_SET_KEY_FK) and add alias
IF UPPER(vCurrentCol) = UPPER(pKeyColumnName) THEN
vCurrentCol := 'T.' || pKeyColumnName || ' AS A_WORKFLOW_HISTORY_KEY';
ELSIF UPPER(vCurrentCol) = 'A_ETL_LOAD_SET_KEY' THEN
vCurrentCol := 'T.A_ETL_LOAD_SET_KEY AS A_WORKFLOW_HISTORY_KEY';
ELSE
-- Add T. prefix if not already present
IF INSTR(vCurrentCol, '.') = 0 THEN
vCurrentCol := 'T.' || vCurrentCol;
END IF;
END IF;
-- Add to result with comma separator
IF vResult IS NOT NULL THEN
vResult := vResult || ', ';
END IF;
vResult := vResult || vCurrentCol;
vPos := vNextPos + 1;
END LOOP;
RETURN vResult;
END processColumnList;
----------------------------------------------------------------------------------------------------
PROCEDURE EXPORT_TABLE_DATA (
pSchemaName IN VARCHAR2,
pTableName IN VARCHAR2,
pKeyColumnName IN VARCHAR2,
pBucketArea IN VARCHAR2,
pFolderName IN VARCHAR2,
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
)
IS
-- Type definition for key values
TYPE key_value_tab IS TABLE OF VARCHAR2(4000);
vKeyValues key_value_tab;
vCount INTEGER;
vSql VARCHAR2(4000);
vKeyValue VARCHAR2(4000);
vQuery VARCHAR2(32767);
vUri VARCHAR2(4000);
vDataType VARCHAR2(30);
vTableName VARCHAR2(128);
vSchemaName VARCHAR2(128);
vKeyColumnName VARCHAR2(128);
vParameters VARCHAR2(4000);
vBucketUri VARCHAR2(4000);
vProcessedColumnList VARCHAR2(32767);
vCurrentCol VARCHAR2(128);
vAllColumnsList VARCHAR2(32767);
-- Function to sanitize file names
FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS
vFilename VARCHAR2(1000);
BEGIN
-- Replace any disallowed characters with underscores
vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_');
RETURN vFilename;
END sanitizeFilename;
BEGIN
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
,'pTableName => '''||nvl(pTableName, 'NULL')||''''
,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||''''
,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||''''
,'pFolderName => '''||nvl(pFolderName, 'NULL')||''''
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
));
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
-- Get bucket URI based on bucket area using FILE_MANAGER function
vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea);
-- Convert table and column names to uppercase to match data dictionary
vTableName := UPPER(pTableName);
vSchemaName := UPPER(pSchemaName);
vKeyColumnName := UPPER(pKeyColumnName);
-- Check if table exists
SELECT COUNT(*) INTO vCount
FROM all_tables
WHERE table_name = vTableName
AND owner = vSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS);
END IF;
-- Check if key column exists
SELECT COUNT(*) INTO vCount
FROM all_tab_columns
WHERE table_name = vTableName
AND column_name = vKeyColumnName
AND owner = vSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
END IF;
-- Get the data type of the key column
SELECT data_type INTO vDataType
FROM all_tab_columns
WHERE table_name = vTableName
AND column_name = vKeyColumnName
AND owner = vSchemaName;
-- Build list of all columns for the table (excluding key column to avoid duplication)
SELECT LISTAGG(column_name, ', ') WITHIN GROUP (ORDER BY column_id)
INTO vAllColumnsList
FROM all_tab_columns
WHERE table_name = vTableName
AND owner = vSchemaName
AND column_name != vKeyColumnName;
-- Process column list to add T. prefix to each column
vProcessedColumnList := processColumnList(vAllColumnsList, vTableName, vSchemaName, vKeyColumnName);
ENV_MANAGER.LOG_PROCESS_EVENT('Dynamic column list built (excluding key): ' || vAllColumnsList, 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list with T. prefix: ' || vProcessedColumnList, 'DEBUG', vParameters);
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName);
-- Fetch unique key values from A_LOAD_HISTORY
vSql := 'SELECT DISTINCT L.A_ETL_LOAD_SET_KEY' ||
' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' ||
' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY';
ENV_MANAGER.LOG_PROCESS_EVENT('Executing key values query: ' || vSql, 'DEBUG', vParameters);
EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValues;
ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValues.COUNT || ' unique key values to process', 'DEBUG', vParameters);
-- Loop over each unique key value
FOR i IN 1 .. vKeyValues.COUNT LOOP
vKeyValue := vKeyValues(i);
-- Construct the query to extract data for the current key value with A_WORKFLOW_HISTORY_KEY mapping
IF vDataType IN ('VARCHAR2', 'CHAR', 'NCHAR', 'NVARCHAR2') THEN
vQuery := 'SELECT ' || vProcessedColumnList ||
' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' ||
' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' ||
' AND L.A_ETL_LOAD_SET_KEY = ' || CHR(39) || vKeyValue || CHR(39);
ELSIF vDataType IN ('NUMBER', 'FLOAT', 'BINARY_FLOAT', 'BINARY_DOUBLE') THEN
vQuery := 'SELECT ' || vProcessedColumnList ||
' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' ||
' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' ||
' AND L.A_ETL_LOAD_SET_KEY = ' || vKeyValue;
ELSIF vDataType LIKE 'TIMESTAMP%' OR vDataType = 'DATE' THEN
vQuery := 'SELECT ' || vProcessedColumnList ||
' FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L' ||
' WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY' ||
' AND L.A_ETL_LOAD_SET_KEY = TO_TIMESTAMP(' || CHR(39) || vKeyValue || CHR(39) ||', ''YYYY-MM-DD HH24:MI:SS.FF'')';
ELSE
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNSUPPORTED_DATA_TYPE, ENV_MANAGER.MSG_UNSUPPORTED_DATA_TYPE);
END IF;
-- Construct the URI for the file in OCI Object Storage
vUri := vBucketUri ||
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
sanitizeFilename(vKeyValue) || '.csv';
ENV_MANAGER.LOG_PROCESS_EVENT('Processing key value: ' || vKeyValue || ' (' || (i) || '/' || vKeyValues.COUNT || ')', 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Export URI: ' || vUri, 'DEBUG', vParameters);
-- Use DBMS_CLOUD package to export data to the URI
DBMS_CLOUD.EXPORT_DATA(
credential_name => pCredentialName,
file_uri_list => vUri,
query => vQuery,
format => json_object('type' VALUE 'CSV', 'header' VALUE true)
);
END LOOP;
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
EXCEPTION
WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN
vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in column list' ELSE '' END;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp);
WHEN ENV_MANAGER.ERR_UNSUPPORTED_DATA_TYPE THEN
vgMsgTmp := ENV_MANAGER.MSG_UNSUPPORTED_DATA_TYPE || ' vDataType: '||vDataType;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNSUPPORTED_DATA_TYPE, vgMsgTmp);
WHEN OTHERS THEN
-- Log complete error details including full stack trace and backtrace
ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER');
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
END EXPORT_TABLE_DATA;
----------------------------------------------------------------------------------------------------
PROCEDURE EXPORT_TABLE_DATA_BY_DATE (
pSchemaName IN VARCHAR2,
pTableName IN VARCHAR2,
pKeyColumnName IN VARCHAR2,
pBucketArea IN VARCHAR2,
pFolderName IN VARCHAR2,
pColumnList IN VARCHAR2 default NULL,
pMinDate IN DATE default DATE '1900-01-01',
pMaxDate IN DATE default SYSDATE,
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
)
IS
-- Type definition for key values
TYPE key_value_tab IS TABLE OF VARCHAR2(4000);
vKeyValuesYear key_value_tab;
vKeyValuesMonth key_value_tab;
vCount INTEGER;
vSql VARCHAR2(32000);
vKeyValueYear VARCHAR2(4000);
vKeyValueMonth VARCHAR2(4000);
vQuery VARCHAR2(32767);
vUri VARCHAR2(4000);
vDataType VARCHAR2(30);
vTableName VARCHAR2(128);
vSchemaName VARCHAR2(128);
vKeyColumnName VARCHAR2(128);
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
vProcessedColumnList VARCHAR2(32767);
vBucketUri VARCHAR2(4000);
vCurrentCol VARCHAR2(128);
-- Function to sanitize file names
FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS
vFilename VARCHAR2(1000);
BEGIN
-- Replace any disallowed characters with underscores
vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_');
RETURN vFilename;
END sanitizeFilename;
BEGIN
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
,'pTableName => '''||nvl(pTableName, 'NULL')||''''
,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||''''
,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||''''
,'pFolderName => '''||nvl(pFolderName, 'NULL')||''''
,'pColumnList => '''||nvl(pColumnList, 'NULL')||''''
,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
));
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
-- Get bucket URI based on bucket area using FILE_MANAGER function
vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea);
-- Convert table and column names to uppercase to match data dictionary
vTableName := UPPER(pTableName);
vSchemaName := UPPER(pSchemaName);
vKeyColumnName := UPPER(pKeyColumnName);
-- Check if table exists
SELECT COUNT(*) INTO vCount
FROM all_tables
WHERE table_name = vTableName
AND owner = vSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS);
END IF;
-- Check if key column exists
SELECT COUNT(*) INTO vCount
FROM all_tab_columns
WHERE table_name = vTableName
AND column_name = vKeyColumnName
AND owner = vSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
END IF;
-- Validate pColumnList - check if all column names exist in the table
IF pColumnList IS NOT NULL THEN
DECLARE
vColumnName VARCHAR2(128);
vColumns VARCHAR2(32767);
vPos PLS_INTEGER;
vNextPos PLS_INTEGER;
vCurrentCol VARCHAR2(128);
BEGIN
-- Remove spaces and convert to uppercase for processing
vColumns := UPPER(REPLACE(pColumnList, ' ', ''));
vPos := 1;
-- Parse comma-separated column list
WHILE vPos <= LENGTH(vColumns) LOOP
vNextPos := INSTR(vColumns, ',', vPos);
IF vNextPos = 0 THEN
vNextPos := LENGTH(vColumns) + 1;
END IF;
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
-- Remove table alias prefix if present (e.g., 'T.COLUMN_NAME' -> 'COLUMN_NAME')
IF INSTR(vCurrentCol, '.') > 0 THEN
vCurrentCol := SUBSTR(vCurrentCol, INSTR(vCurrentCol, '.') + 1);
END IF;
-- Check if column exists in the table
SELECT COUNT(*) INTO vCount
FROM all_tab_columns
WHERE table_name = vTableName
AND column_name = vCurrentCol
AND owner = vSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
END IF;
vPos := vNextPos + 1;
END LOOP;
END;
END IF;
-- Process column list to add T. prefix to each column
vProcessedColumnList := processColumnList(pColumnList, vTableName, vSchemaName, vKeyColumnName);
ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (building dynamic list from table metadata)'), 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list: ' || vProcessedColumnList, 'DEBUG', vParameters);
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName);
-- Fetch unique key values
vSql := 'SELECT DISTINCT TO_CHAR(L.LOAD_START,''YYYY'') AS YR, TO_CHAR(L.LOAD_START,''MM'') AS MN
FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L
WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY
AND L.LOAD_START >= :pMinDate
AND L.LOAD_START < :pMaxDate
' ;
ENV_MANAGER.LOG_PROCESS_EVENT('Executing date range query: ' || vSql, 'DEBUG', vParameters);
EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValuesYear, vKeyValuesMonth USING pMinDate, pMaxDate;
ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValuesYear.COUNT || ' year/month combinations to export', 'DEBUG', vParameters);
-- Loop over each unique key value
FOR i IN 1 .. vKeyValuesYear.COUNT LOOP
vKeyValueYear := vKeyValuesYear(i);
vKeyValueMonth := vKeyValuesMonth(i);
ENV_MANAGER.LOG_PROCESS_EVENT('Processing Year/Month: ' || vKeyValueYear || '/' || vKeyValueMonth || ' (' || i || '/' || vKeyValuesYear.COUNT || ')', 'DEBUG', vParameters);
-- Construct the query to extract data for the current key value
-- Note: processColumnList already handles A_WORKFLOW_HISTORY_KEY aliasing
vQuery := 'SELECT ' || vProcessedColumnList || '
FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L
WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY
AND TO_CHAR(L.LOAD_START,''YYYY'') = ' || CHR(39) || vKeyValueYear || CHR(39) || '
AND TO_CHAR(L.LOAD_START,''MM'') = ' || CHR(39) || vKeyValueMonth || CHR(39) || '
AND L.LOAD_START >= TO_DATE(' || CHR(39) || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')
AND L.LOAD_START < TO_DATE(' || CHR(39) || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')';
-- Construct the URI for the file in OCI Object Storage
vUri := vBucketUri ||
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
'PARTITION_YEAR=' || sanitizeFilename(vKeyValueYear) || '/' ||
'PARTITION_MONTH=' || sanitizeFilename(vKeyValueMonth) || '/' ||
sanitizeFilename(vKeyValueYear) || sanitizeFilename(vKeyValueMonth) || '.parquet';
ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Parquet export URI: ' || vUri, 'DEBUG', vParameters);
-- Use DBMS_CLOUD package to export data to the URI
DBMS_CLOUD.EXPORT_DATA(
credential_name => pCredentialName,
file_uri_list => vUri,
query => vQuery,
format => json_object('type' VALUE 'parquet')
);
END LOOP;
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
EXCEPTION
WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN
vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp);
WHEN OTHERS THEN
-- Log complete error details including full stack trace and backtrace
ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER');
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
END EXPORT_TABLE_DATA_BY_DATE;
----------------------------------------------------------------------------------------------------
/**
* @name EXPORT_TABLE_DATA_TO_CSV_BY_DATE
* @desc Exports data to a single CSV file with date filtering.
* Unlike EXPORT_TABLE_DATA_BY_DATE, this procedure creates one CSV file
* instead of multiple Parquet files partitioned by year/month.
* Uses the same date filtering mechanism with CT_ODS.A_LOAD_HISTORY.
* Allows specifying custom column list or uses T.* if pColumnList is NULL.
* Validates that all columns in pColumnList exist in the target table.
* Automatically adds 'T.' prefix to column names in pColumnList.
* @example
* begin
* DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
* pSchemaName => 'CT_MRDS',
* pTableName => 'MY_TABLE',
* pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK',
* pBucketArea => 'DATA',
* pFolderName => 'exports',
* pFileName => 'my_export.csv',
* pColumnList => 'COLUMN1, COLUMN2, COLUMN3', -- Optional
* pMinDate => DATE '2024-01-01',
* pMaxDate => SYSDATE
* );
* end;
**/
PROCEDURE EXPORT_TABLE_DATA_TO_CSV_BY_DATE (
pSchemaName IN VARCHAR2,
pTableName IN VARCHAR2,
pKeyColumnName IN VARCHAR2,
pBucketArea IN VARCHAR2,
pFolderName IN VARCHAR2,
pFileName IN VARCHAR2 DEFAULT NULL,
pColumnList IN VARCHAR2 default NULL,
pMinDate IN DATE default DATE '1900-01-01',
pMaxDate IN DATE default SYSDATE,
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
)
IS
-- Type definition for key values
TYPE key_value_tab IS TABLE OF VARCHAR2(4000);
vKeyValuesYear key_value_tab;
vKeyValuesMonth key_value_tab;
vCount INTEGER;
vSql VARCHAR2(4000);
vKeyValueYear VARCHAR2(4000);
vKeyValueMonth VARCHAR2(4000);
vQuery VARCHAR2(32767);
vUri VARCHAR2(4000);
vDataType VARCHAR2(30);
vTableName VARCHAR2(128);
vSchemaName VARCHAR2(128);
vKeyColumnName VARCHAR2(128);
vParameters CT_MRDS.A_PROCESS_LOG.PROCEDURE_PARAMETERS%TYPE;
vFileBaseName VARCHAR2(4000);
vFileExtension VARCHAR2(10);
vProcessedColumnList VARCHAR2(32767);
vBucketUri VARCHAR2(4000);
vCurrentCol VARCHAR2(128);
-- Function to sanitize file names
FUNCTION sanitizeFilename(pFilename IN VARCHAR2) RETURN VARCHAR2 IS
vFilename VARCHAR2(1000);
BEGIN
-- Replace any disallowed characters with underscores
vFilename := REGEXP_REPLACE(pFilename, '[^a-zA-Z0-9._-]', '_');
RETURN vFilename;
END sanitizeFilename;
BEGIN
vParameters := ENV_MANAGER.FORMAT_PARAMETERS(SYS.ODCIVARCHAR2LIST( 'pSchemaName => '''||nvl(pSchemaName, 'NULL')||''''
,'pTableName => '''||nvl(pTableName, 'NULL')||''''
,'pKeyColumnName => '''||nvl(pKeyColumnName, 'NULL')||''''
,'pBucketArea => '''||nvl(pBucketArea, 'NULL')||''''
,'pFolderName => '''||nvl(pFolderName, 'NULL')||''''
,'pFileName => '''||nvl(pFileName, 'NULL')||''''
,'pColumnList => '''||nvl(pColumnList, 'NULL')||''''
,'pMinDate => '''||nvl(TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
,'pMaxDate => '''||nvl(TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'NULL')||''''
,'pCredentialName => '''||nvl(pCredentialName, 'NULL')||''''
));
ENV_MANAGER.LOG_PROCESS_EVENT('Start','INFO', vParameters);
-- Get bucket URI based on bucket area using FILE_MANAGER function
vBucketUri := FILE_MANAGER.GET_BUCKET_URI(pBucketArea);
-- Convert table and column names to uppercase to match data dictionary
vTableName := UPPER(pTableName);
vSchemaName := UPPER(pSchemaName);
vKeyColumnName := UPPER(pKeyColumnName);
-- Extract base filename and extension or construct default filename
IF pFileName IS NOT NULL THEN
-- Use provided filename
IF INSTR(pFileName, '.') > 0 THEN
vFileBaseName := SUBSTR(pFileName, 1, INSTR(pFileName, '.', -1) - 1);
vFileExtension := SUBSTR(pFileName, INSTR(pFileName, '.', -1));
ELSE
vFileBaseName := pFileName;
vFileExtension := '.csv';
END IF;
ELSE
-- Construct default filename: TABLENAME.csv (without date range)
vFileBaseName := UPPER(pTableName);
vFileExtension := '.csv';
END IF;
-- Check if table exists
SELECT COUNT(*) INTO vCount
FROM all_tables
WHERE table_name = vTableName
AND owner = vSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, ENV_MANAGER.MSG_TABLE_NOT_EXISTS);
END IF;
-- Check if key column exists
SELECT COUNT(*) INTO vCount
FROM all_tab_columns
WHERE table_name = vTableName
AND column_name = vKeyColumnName
AND owner = vSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
END IF;
-- Validate pColumnList - check if all column names exist in the table
IF pColumnList IS NOT NULL THEN
DECLARE
vColumnName VARCHAR2(128);
vColumns VARCHAR2(32767);
vPos PLS_INTEGER;
vNextPos PLS_INTEGER;
vCurrentCol VARCHAR2(128);
BEGIN
-- Remove spaces and convert to uppercase for processing
vColumns := UPPER(REPLACE(pColumnList, ' ', ''));
vPos := 1;
-- Parse comma-separated column list
WHILE vPos <= LENGTH(vColumns) LOOP
vNextPos := INSTR(vColumns, ',', vPos);
IF vNextPos = 0 THEN
vNextPos := LENGTH(vColumns) + 1;
END IF;
vCurrentCol := SUBSTR(vColumns, vPos, vNextPos - vPos);
-- Remove table alias prefix if present (e.g., 'T.COLUMN_NAME' -> 'COLUMN_NAME')
IF INSTR(vCurrentCol, '.') > 0 THEN
vCurrentCol := SUBSTR(vCurrentCol, INSTR(vCurrentCol, '.') + 1);
END IF;
-- Check if column exists in the table
SELECT COUNT(*) INTO vCount
FROM all_tab_columns
WHERE table_name = vTableName
AND column_name = vCurrentCol
AND owner = vSchemaName;
IF vCount = 0 THEN
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, ENV_MANAGER.MSG_COLUMN_NOT_EXISTS);
END IF;
vPos := vNextPos + 1;
END LOOP;
END;
END IF;
-- Process column list to add T. prefix to each column
vProcessedColumnList := processColumnList(pColumnList, vTableName, vSchemaName, vKeyColumnName);
ENV_MANAGER.LOG_PROCESS_EVENT('Input column list: ' || NVL(pColumnList, 'NULL (using dynamic column list)'), 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Processed column list: ' || vProcessedColumnList, 'DEBUG', vParameters);
-- Get the data type of the key column
SELECT data_type INTO vDataType
FROM all_tab_columns
WHERE table_name = vTableName
AND column_name = vKeyColumnName
AND owner = vSchemaName;
vTableName := DBMS_ASSERT.SCHEMA_NAME(vSchemaName) || '.' || DBMS_ASSERT.simple_sql_name(vTableName);
-- Fetch unique year/month combinations
vSql := 'SELECT DISTINCT TO_CHAR(L.LOAD_START,''YYYY'') AS YR, TO_CHAR(L.LOAD_START,''MM'') AS MN
FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L
WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY
AND L.LOAD_START >= :pMinDate
AND L.LOAD_START < :pMaxDate
' ;
ENV_MANAGER.LOG_PROCESS_EVENT('Executing date range query: ' || vSql, 'DEBUG', vParameters);
EXECUTE IMMEDIATE vSql BULK COLLECT INTO vKeyValuesYear, vKeyValuesMonth USING pMinDate, pMaxDate;
ENV_MANAGER.LOG_PROCESS_EVENT('Found ' || vKeyValuesYear.COUNT || ' year/month combinations to export', 'INFO', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Date range: ' || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || ' to ' || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS'), 'DEBUG', vParameters);
-- Loop over each unique year/month combination
FOR i IN 1 .. vKeyValuesYear.COUNT LOOP
vKeyValueYear := vKeyValuesYear(i);
vKeyValueMonth := vKeyValuesMonth(i);
-- Construct the query to extract data for the current year/month
vQuery := 'SELECT ' || vProcessedColumnList || '
FROM ' || vTableName || ' T, CT_ODS.A_LOAD_HISTORY L
WHERE T.' || DBMS_ASSERT.simple_sql_name(vKeyColumnName) || ' = L.A_ETL_LOAD_SET_KEY
AND TO_CHAR(L.LOAD_START,''YYYY'') = ' || CHR(39) || vKeyValueYear || CHR(39) || '
AND TO_CHAR(L.LOAD_START,''MM'') = ' || CHR(39) || vKeyValueMonth || CHR(39) || '
AND L.LOAD_START >= TO_DATE(' || CHR(39) || TO_CHAR(pMinDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')
AND L.LOAD_START < TO_DATE(' || CHR(39) || TO_CHAR(pMaxDate, 'YYYY-MM-DD HH24:MI:SS') || CHR(39) || ', ''YYYY-MM-DD HH24:MI:SS'')';
-- Construct the URI for the CSV file in OCI Object Storage
vUri := vBucketUri ||
CASE WHEN pFolderName IS NOT NULL THEN pFolderName || '/' ELSE '' END ||
sanitizeFilename(vFileBaseName) || '_' ||
sanitizeFilename(vKeyValueYear) || sanitizeFilename(vKeyValueMonth) ||
vFileExtension;
ENV_MANAGER.LOG_PROCESS_EVENT('Exporting to CSV file: ' || vUri, 'INFO', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Processing Year/Month: ' || vKeyValueYear || '/' || vKeyValueMonth || ' (' || i || '/' || vKeyValuesYear.COUNT || ')', 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('Export query: ' || vQuery, 'DEBUG', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('File name pattern: ' || vFileBaseName || '_' || vKeyValueYear || vKeyValueMonth || vFileExtension, 'DEBUG', vParameters);
-- Use DBMS_CLOUD package to export data to CSV file
DBMS_CLOUD.EXPORT_DATA(
credential_name => pCredentialName,
file_uri_list => vUri,
query => vQuery,
format => json_object('type' VALUE 'CSV', 'header' VALUE true)
);
END LOOP;
ENV_MANAGER.LOG_PROCESS_EVENT('Export completed successfully for ' || vKeyValuesYear.COUNT || ' files', 'INFO', vParameters);
ENV_MANAGER.LOG_PROCESS_EVENT('End','INFO',vParameters);
EXCEPTION
WHEN ENV_MANAGER.ERR_TABLE_NOT_EXISTS THEN
vgMsgTmp := ENV_MANAGER.MSG_TABLE_NOT_EXISTS ||': '||vTableName;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_TABLE_NOT_EXISTS, vgMsgTmp);
WHEN ENV_MANAGER.ERR_COLUMN_NOT_EXISTS THEN
vgMsgTmp := ENV_MANAGER.MSG_COLUMN_NOT_EXISTS || ' (TableName.ColumnName): ' || vTableName||'.'||vKeyColumnName||CASE WHEN vCurrentCol IS NOT NULL THEN '.'||vCurrentCol||' in pColumnList' ELSE '' END;
ENV_MANAGER.LOG_PROCESS_EVENT(vgMsgTmp, 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_COLUMN_NOT_EXISTS, vgMsgTmp);
WHEN OTHERS THEN
-- Log complete error details including full stack trace and backtrace
ENV_MANAGER.LOG_PROCESS_ERROR('Export failed: ' || SQLERRM, vParameters, 'DATA_EXPORTER');
ENV_MANAGER.LOG_PROCESS_EVENT(ENV_MANAGER.GET_ERROR_STACK(pFormat => 'TABLE', pCode=> SQLCODE), 'ERROR', vParameters);
RAISE_APPLICATION_ERROR(ENV_MANAGER.CODE_UNKNOWN, ENV_MANAGER.GET_ERROR_STACK(pFormat => 'OUTPUT', pCode=> SQLCODE));
END EXPORT_TABLE_DATA_TO_CSV_BY_DATE;
----------------------------------------------------------------------------------------------------
-- VERSION MANAGEMENT FUNCTIONS
----------------------------------------------------------------------------------------------------
FUNCTION GET_VERSION RETURN VARCHAR2 IS
BEGIN
RETURN PACKAGE_VERSION;
END GET_VERSION;
----------------------------------------------------------------------------------------------------
FUNCTION GET_BUILD_INFO RETURN VARCHAR2 IS
BEGIN
RETURN ENV_MANAGER.GET_PACKAGE_VERSION_INFO(
pPackageName => 'DATA_EXPORTER',
pVersion => PACKAGE_VERSION,
pBuildDate => PACKAGE_BUILD_DATE,
pAuthor => PACKAGE_AUTHOR
);
END GET_BUILD_INFO;
----------------------------------------------------------------------------------------------------
FUNCTION GET_VERSION_HISTORY RETURN VARCHAR2 IS
BEGIN
RETURN ENV_MANAGER.FORMAT_VERSION_HISTORY(
pPackageName => 'DATA_EXPORTER',
pVersionHistory => VERSION_HISTORY
);
END GET_VERSION_HISTORY;
----------------------------------------------------------------------------------------------------
END;
/

View File

@@ -0,0 +1,166 @@
create or replace PACKAGE CT_MRDS.DATA_EXPORTER
AUTHID CURRENT_USER
AS
/**
* Data Export Package: Provides comprehensive data export capabilities to various formats (CSV, Parquet)
* with support for cloud storage integration via Oracle Cloud Infrastructure (OCI).
* The structure of comment is used by GET_PACKAGE_DOCUMENTATION function
* which returns documentation text for confluence page (to Copy-Paste it).
**/
-- Package Version Information
PACKAGE_VERSION CONSTANT VARCHAR2(10) := '2.1.1';
PACKAGE_BUILD_DATE CONSTANT VARCHAR2(19) := '2025-12-04 13:10:00';
PACKAGE_AUTHOR CONSTANT VARCHAR2(50) := 'MRDS Development Team';
-- Version History (last 3-5 changes)
VERSION_HISTORY CONSTANT VARCHAR2(4000) :=
'v2.1.1 (2025-12-04): Fixed JOIN column reference A_WORKFLOW_HISTORY_KEY -> A_ETL_LOAD_SET_KEY, added consistent column mapping and dynamic column list to EXPORT_TABLE_DATA procedure, enhanced DEBUG logging for all export operations' || CHR(10) ||
'v2.1.1 (2025-12-04): Fixed JOIN column reference A_WORKFLOW_HISTORY_KEY -> A_ETL_LOAD_SET_KEY' || CHR(10) ||
'v2.1.0 (2025-10-22): Added version tracking and PARTITION_YEAR/PARTITION_MONTH support' || CHR(10) ||
'v2.0.0 (2025-10-01): Separated export functionality from FILE_MANAGER package' || CHR(10) ||
'v1.0.0 (2025-09-15): Initial implementation within FILE_MANAGER package' || CHR(10);
cgBL CONSTANT VARCHAR2(2) := CHR(13)||CHR(10);
vgMsgTmp VARCHAR2(32000);
---------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------------------------------------
/**
* @name EXPORT_TABLE_DATA
* @desc Wrapper procedure for DBMS_CLOUD.EXPORT_DATA.
* Exports data into CSV file on OCI infrustructure.
* pBucketArea parameter accepts: 'INBOX', 'ODS', 'DATA', 'ARCHIVE'
* @example
* begin
* DATA_EXPORTER.EXPORT_TABLE_DATA(
* pSchemaName => 'CT_MRDS',
* pTableName => 'MY_TABLE',
* pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK',
* pBucketArea => 'DATA',
* pFolderName => 'csv_exports'
* );
* end;
**/
PROCEDURE EXPORT_TABLE_DATA (
pSchemaName IN VARCHAR2,
pTableName IN VARCHAR2,
pKeyColumnName IN VARCHAR2,
pBucketArea IN VARCHAR2,
pFolderName IN VARCHAR2,
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
);
/**
* @name EXPORT_TABLE_DATA_BY_DATE
* @desc Wrapper procedure for DBMS_CLOUD.EXPORT_DATA.
* Exports data into PARQUET files on OCI infrustructure.
* Each YEAR_MONTH pair goes to seperate file (implicit partitioning).
* Allows specifying custom column list or uses T.* if pColumnList is NULL.
* Validates that all columns in pColumnList exist in the target table.
* Automatically adds 'T.' prefix to column names in pColumnList.
* pBucketArea parameter accepts: 'INBOX', 'ODS', 'DATA', 'ARCHIVE'
* @example
* begin
* DATA_EXPORTER.EXPORT_TABLE_DATA_BY_DATE(
* pSchemaName => 'CT_MRDS',
* pTableName => 'MY_TABLE',
* pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK',
* pBucketArea => 'DATA',
* pFolderName => 'parquet_exports',
* pColumnList => 'COLUMN1, COLUMN2, COLUMN3', -- Optional
* pMinDate => DATE '2024-01-01',
* pMaxDate => SYSDATE
* );
* end;
**/
PROCEDURE EXPORT_TABLE_DATA_BY_DATE (
pSchemaName IN VARCHAR2,
pTableName IN VARCHAR2,
pKeyColumnName IN VARCHAR2,
pBucketArea IN VARCHAR2,
pFolderName IN VARCHAR2,
pColumnList IN VARCHAR2 default NULL,
pMinDate IN DATE default DATE '1900-01-01',
pMaxDate IN DATE default SYSDATE,
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
);
/**
* @name EXPORT_TABLE_DATA_TO_CSV_BY_DATE
* @desc Exports data to separate CSV files partitioned by year and month.
* Creates one CSV file for each year/month combination found in the data.
* Uses the same date filtering mechanism with CT_ODS.A_LOAD_HISTORY as EXPORT_TABLE_DATA_BY_DATE,
* but exports to CSV format instead of Parquet.
* File naming pattern: {pFileName}_YYYYMM.csv or {TABLENAME}_YYYYMM.csv (if pFileName is NULL)
* @example
* begin
* -- With custom filename
* DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
* pSchemaName => 'CT_MRDS',
* pTableName => 'MY_TABLE',
* pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK',
* pBucketArea => 'DATA',
* pFolderName => 'exports',
* pFileName => 'my_export.csv',
* pMinDate => DATE '2024-01-01',
* pMaxDate => SYSDATE
* );
*
* -- With auto-generated filename (based on table name only)
* DATA_EXPORTER.EXPORT_TABLE_DATA_TO_CSV_BY_DATE(
* pSchemaName => 'OU_TOP',
* pTableName => 'AGGREGATED_ALLOTMENT',
* pKeyColumnName => 'A_ETL_LOAD_SET_KEY_FK',
* pBucketArea => 'ARCHIVE',
* pFolderName => 'exports',
* pMinDate => DATE '2025-09-01',
* pMaxDate => DATE '2025-09-17'
* );
* -- This will create files like: AGGREGATED_ALLOTMENT_202509.csv, etc.
* pBucketArea parameter accepts: 'INBOX', 'ODS', 'DATA', 'ARCHIVE'
* end;
**/
PROCEDURE EXPORT_TABLE_DATA_TO_CSV_BY_DATE (
pSchemaName IN VARCHAR2,
pTableName IN VARCHAR2,
pKeyColumnName IN VARCHAR2,
pBucketArea IN VARCHAR2,
pFolderName IN VARCHAR2,
pFileName IN VARCHAR2 DEFAULT NULL,
pColumnList IN VARCHAR2 default NULL,
pMinDate IN DATE default DATE '1900-01-01',
pMaxDate IN DATE default SYSDATE,
pCredentialName IN VARCHAR2 default ENV_MANAGER.gvCredentialName
);
---------------------------------------------------------------------------------------------------------------------------
-- VERSION MANAGEMENT FUNCTIONS
---------------------------------------------------------------------------------------------------------------------------
/**
* Returns the current package version number
* return: Version string in format X.Y.Z (e.g., '2.1.0')
**/
FUNCTION GET_VERSION RETURN VARCHAR2;
/**
* Returns comprehensive build information including version, date, and author
* return: Formatted string with complete build details
**/
FUNCTION GET_BUILD_INFO RETURN VARCHAR2;
/**
* Returns the version history with recent changes
* return: Multi-line string with version history
**/
FUNCTION GET_VERSION_HISTORY RETURN VARCHAR2;
END;
/

View File

@@ -0,0 +1,91 @@
-- ============================================================================
-- MARS-826-PREHOOK Master Rollback Script
-- ============================================================================
-- Purpose: Rollback DATA_EXPORTER package to previous version
-- Target Schema: CT_MRDS
-- WARNING: Only execute if you need to revert to A_WORKFLOW_HISTORY_KEY
-- ============================================================================
SET SERVEROUTPUT ON SIZE UNLIMITED
SET VERIFY OFF
SET FEEDBACK ON
SET ECHO OFF
-- Create log directory if it doesn't exist
host mkdir log 2>nul
-- Generate dynamic SPOOL filename with timestamp
var filename VARCHAR2(100)
BEGIN
:filename := 'log/ROLLBACK_MARS_826_PREHOOK_' || SYS_CONTEXT('USERENV', 'CON_NAME') || '_' || TO_CHAR(SYSDATE,'YYYYMMDD_HH24MISS') || '.log';
END;
/
column filename new_value _filename
select :filename filename from dual;
spool &_filename
PROMPT
PROMPT ============================================================================
PROMPT MARS-826-PREHOOK Rollback Starting
PROMPT ============================================================================
PROMPT WARNING: This will restore DATA_EXPORTER to use A_WORKFLOW_HISTORY_KEY
PROMPT
PROMPT CRITICAL PREREQUISITES:
PROMPT 1. CT_ODS.A_LOAD_HISTORY must have column A_WORKFLOW_HISTORY_KEY
PROMPT (if column is A_ETL_LOAD_SET_KEY, rollback will fail)
PROMPT 2. No active MARS-826 exports should be running
PROMPT
PROMPT Timestamp:
SELECT TO_CHAR(SYSDATE, 'YYYY-MM-DD HH24:MI:SS') AS rollback_start FROM DUAL;
PROMPT ============================================================================
-- Confirm rollback with user
ACCEPT continue CHAR PROMPT 'Type YES to continue with rollback, or Ctrl+C to abort: '
WHENEVER SQLERROR EXIT SQL.SQLCODE
BEGIN
IF '&continue' IS NULL OR TRIM('&continue') IS NULL OR UPPER(TRIM('&continue')) != 'YES' THEN
RAISE_APPLICATION_ERROR(-20001, 'Rollback aborted by user');
END IF;
END;
/
WHENEVER SQLERROR CONTINUE
-- Rollback steps (SPEC first, then BODY - standard Oracle deployment order)
PROMPT
PROMPT Step 1/2: Restoring Previous DATA_EXPORTER Package Specification
PROMPT =================================================================
@@91_MARS_826_PREHOOK_rollback_DATA_EXPORTER_SPEC.sql
PROMPT
PROMPT Step 2/2: Restoring Previous DATA_EXPORTER Package Body
PROMPT ========================================================
@@92_MARS_826_PREHOOK_rollback_DATA_EXPORTER_BODY.sql
-- Verify rollback
PROMPT
PROMPT Verification: Package Compilation Status
PROMPT =========================================
SELECT object_name, object_type, status, last_ddl_time
FROM all_objects
WHERE owner = 'CT_MRDS'
AND object_name = 'DATA_EXPORTER'
AND object_type IN ('PACKAGE', 'PACKAGE BODY')
ORDER BY object_type;
PROMPT
PROMPT ============================================================================
PROMPT MARS-826-PREHOOK Rollback Completed
PROMPT ============================================================================
PROMPT Completion Time:
SELECT TO_CHAR(SYSDATE, 'YYYY-MM-DD HH24:MI:SS') AS rollback_end FROM DUAL;
PROMPT
PROMPT Rollback Summary:
PROMPT - Package Body: CT_MRDS.DATA_EXPORTER
PROMPT - Restored Version: Previous (with A_WORKFLOW_HISTORY_KEY references)
PROMPT
PROMPT Log file: &_filename
PROMPT ============================================================================
spool off
quit;

View File

@@ -0,0 +1,96 @@
-- ===================================================================
-- Simple Package Version Tracking Script
-- ===================================================================
-- Purpose: Track specified Oracle package versions
-- Author: Grzegorz Michalski
-- Date: 2025-12-04
-- Version: 3.1.0 - List-Based Edition
--
-- USAGE:
-- 1. Edit package list below (add/remove packages as needed)
-- 2. Include in your install/rollback script: @@track_package_versions.sql
-- ===================================================================
SET SERVEROUTPUT ON;
DECLARE
TYPE t_package_rec IS RECORD (
owner VARCHAR2(50),
package_name VARCHAR2(50),
version VARCHAR2(50)
);
TYPE t_packages IS TABLE OF t_package_rec;
TYPE t_string_array IS TABLE OF VARCHAR2(100);
-- ===================================================================
-- PACKAGE LIST - Edit this array to specify packages to track
-- ===================================================================
-- Add or remove entries as needed for your MARS issue
-- Format: 'SCHEMA.PACKAGE_NAME'
-- ===================================================================
vPackageList t_string_array := t_string_array(
'CT_MRDS.DATA_EXPORTER'
);
-- ===================================================================
vPackages t_packages := t_packages();
vVersion VARCHAR2(50);
vCount NUMBER := 0;
vOwner VARCHAR2(50);
vPackageName VARCHAR2(50);
vDotPos NUMBER;
BEGIN
DBMS_OUTPUT.PUT_LINE('========================================');
DBMS_OUTPUT.PUT_LINE('Package Version Tracking');
DBMS_OUTPUT.PUT_LINE('========================================');
-- Process each package in the list
FOR i IN 1..vPackageList.COUNT LOOP
vDotPos := INSTR(vPackageList(i), '.');
IF vDotPos > 0 THEN
vOwner := SUBSTR(vPackageList(i), 1, vDotPos - 1);
vPackageName := SUBSTR(vPackageList(i), vDotPos + 1);
-- Get package version
BEGIN
EXECUTE IMMEDIATE 'SELECT ' || vOwner || '.' || vPackageName || '.GET_VERSION() FROM DUAL' INTO vVersion;
vPackages.EXTEND;
vPackages(vPackages.COUNT).owner := vOwner;
vPackages(vPackages.COUNT).package_name := vPackageName;
vPackages(vPackages.COUNT).version := vVersion;
-- Track in ENV_MANAGER
BEGIN
CT_MRDS.ENV_MANAGER.TRACK_PACKAGE_VERSION(
pPackageOwner => vOwner,
pPackageName => vPackageName,
pPackageVersion => vVersion,
pPackageBuildDate => TO_CHAR(SYSDATE, 'YYYY-MM-DD HH24:MI:SS'),
pPackageAuthor => 'Grzegorz Michalski'
);
vCount := vCount + 1;
EXCEPTION
WHEN OTHERS THEN NULL; -- Continue even if tracking fails
END;
EXCEPTION
WHEN OTHERS THEN NULL; -- Skip packages that fail
END;
END IF;
END LOOP;
DBMS_OUTPUT.PUT_LINE('');
DBMS_OUTPUT.PUT_LINE('Summary:');
DBMS_OUTPUT.PUT_LINE('--------');
DBMS_OUTPUT.PUT_LINE('Packages tracked: ' || vCount || '/' || vPackageList.COUNT);
IF vPackages.COUNT > 0 THEN
DBMS_OUTPUT.PUT_LINE('');
DBMS_OUTPUT.PUT_LINE('Tracked Packages:');
FOR i IN 1..vPackages.COUNT LOOP
DBMS_OUTPUT.PUT_LINE(' ' || vPackages(i).owner || '.' || vPackages(i).package_name || ' v' || vPackages(i).version);
END LOOP;
END IF;
DBMS_OUTPUT.PUT_LINE('========================================');
END;
/

View File

@@ -0,0 +1,62 @@
-- ===================================================================
-- Universal Package Version Verification Script
-- ===================================================================
-- Purpose: Verify all tracked Oracle packages for code changes
-- Author: Grzegorz Michalski
-- Date: 2025-12-04
-- Version: 1.0.0
--
-- USAGE:
-- Include at the end of install/rollback scripts: @@verify_packages_version.sql
--
-- OUTPUT:
-- - List of all tracked packages with their current status
-- - OK: Package has not changed since last tracking
-- - WARNING: Package code changed without version update
-- ===================================================================
SET LINESIZE 200
SET PAGESIZE 1000
SET FEEDBACK OFF
PROMPT
PROMPT ========================================
PROMPT Package Version Verification
PROMPT ========================================
PROMPT
COLUMN PACKAGE_OWNER FORMAT A15
COLUMN PACKAGE_NAME FORMAT A20
COLUMN VERSION FORMAT A10
COLUMN STATUS FORMAT A80
SELECT
PACKAGE_OWNER,
PACKAGE_NAME,
PACKAGE_VERSION AS VERSION,
CT_MRDS.ENV_MANAGER.CHECK_PACKAGE_CHANGES(PACKAGE_OWNER, PACKAGE_NAME) AS STATUS
FROM (
SELECT
PACKAGE_OWNER,
PACKAGE_NAME,
PACKAGE_VERSION,
ROW_NUMBER() OVER (PARTITION BY PACKAGE_OWNER, PACKAGE_NAME ORDER BY TRACKING_DATE DESC) AS RN
FROM CT_MRDS.A_PACKAGE_VERSION_TRACKING
)
WHERE RN = 1
ORDER BY PACKAGE_OWNER, PACKAGE_NAME;
PROMPT
PROMPT ========================================
PROMPT Verification Complete
PROMPT ========================================
PROMPT
PROMPT Legend:
PROMPT OK - Package has not changed since last tracking
PROMPT WARNING - Package code changed without version update
PROMPT
PROMPT For detailed hash information, use:
PROMPT SELECT ENV_MANAGER.GET_PACKAGE_HASH_INFO('OWNER', 'PACKAGE') FROM DUAL;
PROMPT ========================================
SET FEEDBACK ON