329 lines
12 KiB
Markdown
329 lines
12 KiB
Markdown
# MRDS APP
|
|
|
|
The main purpose of this application is to download XML or CSV files from source, perform some basic ETL and upload them to target.
|
|
Below is a simplified workflow of the application.
|
|
|
|
## Application workflow
|
|
|
|
```mermaid
|
|
flowchart LR
|
|
subgraph CoreApplication
|
|
direction TB
|
|
B[Read and validate config file] --> |If valid| C[Download source file]
|
|
C[Download source file] --> D[Unzip if file is ZIP]
|
|
D[Unzip if file is ZIP] --> E[Validate source file]
|
|
E --> |If valid| G[Start task defined in config file]
|
|
G --> H[Build output file with selected data from source]
|
|
H --> I[Enrich output file with metadata]
|
|
I --> J[Upload the output file]
|
|
J --> K[Trigger remote function]
|
|
K --> L[Check if more tasks are available in config file]
|
|
L --> |Yes| G
|
|
L --> |No| M[Archive & Delete source file]
|
|
M --> N[Finish workflow]
|
|
end
|
|
A[Trigger app via CLI or Airflow DAG] --> CoreApplication
|
|
```
|
|
|
|
## Installation
|
|
|
|
Checkout repository and cd to root project directory
|
|
|
|
```shell
|
|
cd python/mrds_common
|
|
```
|
|
|
|
Create new virtual environment using Python >=3.11
|
|
|
|
```shell
|
|
python3.11 -m venv .venv
|
|
```
|
|
|
|
Activate virtual environment
|
|
|
|
```shell
|
|
source .venv/bin/activate
|
|
```
|
|
|
|
Upgrade pip
|
|
|
|
```shell
|
|
pip install --upgrade pip
|
|
```
|
|
|
|
Install app
|
|
|
|
```shell
|
|
pip install .
|
|
```
|
|
|
|
## Environment variables
|
|
|
|
There are two operating system environment variables, which are requred by the application:
|
|
|
|
BUCKET_NAMESPACE - OCI namespace where main operating bucket is located (if not found - default value is frcnomajoc7v)
|
|
|
|
BUCKET - main operating OCI bucket for downloading and uploading files (if not found - default value is mrds_inbox_poc)
|
|
|
|
|
|
## Usage
|
|
|
|
The application accepts two required and four optional parameters.
|
|
|
|
### Parameters
|
|
|
|
| Parameter | Short Flag | Required | Default | Description |
|
|
|-------------------------------|------------|----------|---------|----------------------------------------------------------------------------------------------------------------------|
|
|
| `--workflow-context` | `-w` | No* | None | JSON string representing the workflow context. Must contain `run_id` and `a_workflow_history_key`. |
|
|
| `--generate-workflow-context` | | No* | | Flag type. If provided, app automatically generates and finalizes workflow context. Use this if `--workflow-context` is not provided. |
|
|
| `--source-filename` | `-s` | Yes | None | Name of the source file to be looked up in source inbox set in configuration file (`inbox_prefix`). |
|
|
| `--config-file` | `-c` | Yes | None | Path to the YAML configuration file. Can be absolute, or relative to current working directory. |
|
|
| `--keep-source-file` | | No | | Flag type. If provided, app keeps source file, instead of archiving and deleting it. |
|
|
| `--keep-tmp-dir` | | No | | Flag type. If provided, app keeps tmp directory, instead of deleting it. |
|
|
|
|
*`--workflow-context` and `--generate-workflow-context` are both optional, however - either one of them MUST be provided for the application to run.
|
|
|
|
|
|
### CLI
|
|
|
|
```shell
|
|
mrds-cli --workflow-context '{"run_id": "0ce35637-302c-4293-8069-3186d5d9a57d", "a_workflow_history_key": 352344}' \
|
|
--source-filename 'CSDB_Debt_Daily.ZIP' \
|
|
--config-file /home/dbt/GEORGI/projects/mrds_elt/airflow/ods/csdb/debt_daily/config/yaml/csdb_debt_daily.yaml
|
|
```
|
|
|
|
### Python module
|
|
|
|
Import main function from core module and provide needed parameters:
|
|
|
|
```python
|
|
from mrds.core import main
|
|
from mrds.utils.manage_runs import init_workflow, finalise_workflow
|
|
from mrds.utils.static_vars import status_success, status_failed
|
|
|
|
import datetime
|
|
import logging
|
|
import sys
|
|
|
|
# Configure logging for your needs. This is just a sample
|
|
current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
log_filename = f"mrds_{current_time}.log"
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)s %(name)s - %(message)s",
|
|
handlers=[
|
|
logging.FileHandler(log_filename),
|
|
logging.StreamHandler(sys.stdout),
|
|
],
|
|
)
|
|
|
|
STATUS_SUCCESS = status_success
|
|
STATUS_FAILURE = status_failed
|
|
|
|
# Run time parameters
|
|
|
|
run_id = "0ce35637-302c-4293-8069-3186d5d9a57d"
|
|
a_workflow_history_key = init_workflow(database_name='ODS', workflow_name='w_OU_C2D_UC_DISSEM', workflow_run_id=run_id)
|
|
|
|
workflow_context = {
|
|
"run_id": run_id,
|
|
"a_workflow_history_key": a_workflow_history_key,
|
|
}
|
|
|
|
source_filename = "CSDB_Debt_Daily.ZIP"
|
|
config_file = "/home/dbt/GEORGI/projects/mrds_elt/airflow/ods/csdb/debt_daily/config/yaml/csdb_debt_daily.yaml"
|
|
|
|
main(workflow_context, source_filename, config_file)
|
|
|
|
# implement your desired error handling logic and provide correct status to function finalize_workflow
|
|
|
|
finalise_workflow(workflow_context["a_workflow_history_key"], STATUS_SUCCESS)
|
|
|
|
|
|
```
|
|
|
|
## Configuration
|
|
|
|
### Generate workflow context
|
|
|
|
Use this if you are using the application in standalone mode. Workflow context will be generated, and then finalized.
|
|
|
|
### Source filename
|
|
|
|
This is the source file name to be looked up in in source inbox set in the configuration file (`inbox_prefix`).
|
|
|
|
### Workflow context
|
|
|
|
This is a JSON string (or from the application standpoint view - dictionary) containing run_id and a_workflow_history_key values.
|
|
|
|
```JSON
|
|
workflow_context = {
|
|
"run_id": "0ce35637-302c-4293-8069-3186d5d9a57d",
|
|
"a_workflow_history_key": 352344,
|
|
}
|
|
```
|
|
|
|
run_id - this represent orchestration ID. Can be any string ID of your choice, for example Airflow DAG ID.
|
|
a_workflow_history_key - can be generated via mrds.utils.manage_runs.init_workflow() function.
|
|
|
|
If you provide workflow context by yourself, you need to take care of finalizing it too.
|
|
|
|
### Config file
|
|
|
|
This is the main place which we can control the application.
|
|
|
|
At the top, are the Application configurations. These apply to all tasks. These are all optional and are used to override some specific runtime application settings.
|
|
|
|
```yaml
|
|
# System configurations
|
|
|
|
encoding_type: cp1252 # Overrides default encoding type (utf-8) of the app. This encoding is used when reading source csv/xml files and when writing the output csv files of the app. For codec naming, follow guidelines here - https://docs.python.org/3/library/codecs.html#standard-encodings
|
|
```
|
|
|
|
After that, are the global configurations. These apply to all tasks:
|
|
|
|
```yaml
|
|
# Global configurations
|
|
tmpdir: /tmp # root temporary directory to create runtime temporary directory, download source file and perform operations on it, before upload it to target
|
|
inbox_prefix: INBOX/C2D/UC_DISSEM # prefix for the inbox containing the source file
|
|
archive_prefix: ARCHIVE/C2D/UC_DISSEM # prefix for the archive bucket
|
|
workflow_name: w_OU_C2D_UC_DISSEM # name of the particular workflow
|
|
validation_schema_path: 'xsd/UseOfCollateralMessage.xsd' # relative path (to runtime location) to schema used to validate XML or CSV file
|
|
file_type: xml # file type of the expected source file - either CSV or XML
|
|
```
|
|
|
|
Following, there is a list of tasks to be performed on the source file.
|
|
We can have multiple tasks per file, meaning - we can generate more than one output file, from one source file.
|
|
Further, one of the key configuration parameters per task is "output_columns". There we define columns of the final output file.
|
|
There are several types of columns:
|
|
|
|
xpath - this type of column is used when source file is XML. It is a standart xpath expression, pointing to path in the xml.
|
|
|
|
xpath_element_id - this type of column is used when we need to id a particular xml element. Used to create foreign keys between two separate tasks. It is a standart xpath expression, pointing to path in the xml.
|
|
|
|
csv_header - this type of column is used when source file is CSV. It just points to the corresponding csv header in the source file.
|
|
|
|
a_key - generates key unique per row.
|
|
|
|
workflow_key - generates key unique per run of the application
|
|
|
|
static - allows the user to define column with static value
|
|
|
|
The application respects the order of the output columns in the configuration file, when generating the output file.
|
|
Data and columns from the source file, not included in the configuration file, will not be present in the final output file.
|
|
|
|
Example of xml task configuration:
|
|
|
|
```yaml
|
|
# List of tasks
|
|
tasks:
|
|
- task_name: ou_lm_standing_facilities_header_create_file # name of the particular task
|
|
ods_prefix: INBOX/LM/STANDING_FACILITIES/STANDING_FACILITIES_HEADER # prefix for the upload location
|
|
output_table: standing_facilities_headers # table in Oracle
|
|
namespaces:
|
|
ns2: 'http://escb.ecb.int/sf' # XML namespace
|
|
output_columns: # Columns in the output file, order will be respected.
|
|
- type: 'a_key' # A_KEY type of column
|
|
column_header: 'A_KEY' # naming of the column in the output file
|
|
- type: 'workflow_key' # WORKFLOW_KEY type of column
|
|
column_header: 'A_WORKFLOW_HISTORY_KEY'
|
|
- type: 'xpath' # xpath type of column
|
|
value: '//ns2:header/ns2:version'
|
|
column_header: 'REV_NUMBER'
|
|
is_key: 'N' # value is transposed across the rows - YES/NO. Used when there is only single value in source XML
|
|
- type: 'xpath'
|
|
value: '//ns2:header/ns2:referenceDate'
|
|
column_header: 'REF_DATE'
|
|
is_key: 'N'
|
|
- type: 'static'
|
|
value: ''
|
|
column_header: 'FREE_TEXT'
|
|
|
|
- task_name: ou_lm_standing_facilities_create_file
|
|
ods_prefix: INBOX/LM/STANDING_FACILITIES/STANDING_FACILITIES
|
|
output_table: standing_facilities
|
|
namespaces:
|
|
ns2: 'http://escb.ecb.int/sf'
|
|
output_columns:
|
|
- type: 'a_key'
|
|
column_header: 'A_KEY'
|
|
- type: 'workflow_key'
|
|
column_header: 'A_SFH_FK'
|
|
- type: 'workflow_key'
|
|
column_header: 'A_WORKFLOW_HISTORY_KEY'
|
|
- type: 'xpath'
|
|
value: '//ns2:disaggregatedStandingFacilities/ns2:standingFacilities/ns2:disaggregatedStandingFacility/ns2:country'
|
|
column_header: 'COUNTRY'
|
|
- type: 'static'
|
|
value: ''
|
|
column_header: 'COMMENT_'
|
|
|
|
```
|
|
|
|
Example of CSV task configuration:
|
|
|
|
```yaml
|
|
tasks:
|
|
- task_name: ODS_CSDB_DEBT_DAILY_process_csv
|
|
ods_prefix: ODS/CSDB/DEBT_DAILY
|
|
output_table: DEBT_DAILY
|
|
output_columns:
|
|
- type: 'a_key'
|
|
column_header: 'A_KEY'
|
|
- type: 'workflow_key'
|
|
column_header: 'A_WORKFLOW_HISTORY_KEY'
|
|
- type: 'csv_header' # csv_header type of column
|
|
value: 'Date last modified' # naming of the column in the SOURCE file
|
|
column_header: 'Date last modified' # naming of the column in the OUTPUT file
|
|
- type: 'csv_header'
|
|
value: 'Extraction date'
|
|
column_header: 'Extraction date'
|
|
- type: 'csv_header'
|
|
value: 'ISIN code'
|
|
column_header: 'ISIN code'
|
|
```
|
|
|
|
## Development
|
|
|
|
### Installing requirements
|
|
|
|
Install app + dev requirements. For easier workflow, you can install in editable mode
|
|
|
|
```
|
|
pip install -e .[dev]
|
|
```
|
|
|
|
In editable mode, instead of copying the package files to the site-packages directory, pip creates a special link that points to the source code directory. This means any changes you make to your source code will be immediately available without needing to reinstall the package.
|
|
|
|
### Code formattting
|
|
|
|
Run black to reformat the code before pushing changes.
|
|
|
|
Following will reformat all files recursively from current dir.
|
|
|
|
```
|
|
black .
|
|
```
|
|
|
|
Following will only check and report what needs to be formatted, recursively from current dir.
|
|
|
|
```
|
|
black --check --diff .
|
|
```
|
|
|
|
### Tests
|
|
|
|
Run tests with
|
|
|
|
```
|
|
pytest .
|
|
```
|
|
|
|
### Tox automation
|
|
|
|
Tox automates runs of black checks and tests
|
|
|
|
```
|
|
tox .
|
|
```
|