1081 lines
44 KiB
Python
1081 lines
44 KiB
Python
from __future__ import annotations
|
|
import sys
|
|
|
|
sys.path.append('/opt/airflow/python/mrds_common')
|
|
sys.path.append('/opt/airflow/python/devo_replicator/table_generator')
|
|
import pandas as pd
|
|
import mrds.utils.manage_files as fileManager
|
|
import logging
|
|
import tableBuilderQueries as tbq
|
|
from devo_query import execute_query
|
|
import ranger_updater as ranger
|
|
import yaml
|
|
import numpy as np
|
|
from mrds.utils.secrets import get_secret
|
|
import os
|
|
import logging
|
|
import yaml
|
|
from datetime import timedelta
|
|
|
|
from airflow import DAG
|
|
from airflow.utils.dates import days_ago
|
|
from airflow.utils.trigger_rule import TriggerRule
|
|
from airflow.operators.python import PythonOperator
|
|
import traceback
|
|
try:
|
|
from airflow.exceptions import AirflowFailException
|
|
except Exception:
|
|
from airflow.exceptions import AirflowException as AirflowFailException
|
|
|
|
|
|
|
|
from mrds.utils import oraconn
|
|
|
|
ENV_CONFIG_PATH = "/opt/airflow/python/devo_replicator/config/env_config.yaml"
|
|
|
|
# Set up basic configuration for logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
# Create a logger object
|
|
logger = logging.getLogger(__name__)
|
|
import re
|
|
|
|
#0 utilities
|
|
def initialize_config(config_file_path):
|
|
# Ensure the file exists
|
|
if not os.path.exists(config_file_path):
|
|
raise FileNotFoundError(f"Configuration file {config_file_path} not found.")
|
|
|
|
# Load the configuration
|
|
with open(config_file_path, "r") as f:
|
|
config_data = yaml.safe_load(f)
|
|
|
|
return config_data
|
|
|
|
def fix_impala_sql(sql: str) -> str:
|
|
# List of reserved keywords in Impala that need backticks if used as column names
|
|
impala_reserved_keywords = {
|
|
'date', 'value', 'source', 'comment', 'partition', 'row', 'select', 'insert',
|
|
'table', 'external', 'format', 'location', 'stored', 'inputformat', 'outputformat',
|
|
'scenario', 'string', 'int', 'decimal', 'timestamp', 'float', 'double','procedure', 'floor'
|
|
}
|
|
|
|
# Regex pattern to find column definitions
|
|
pattern = re.compile(
|
|
r'(?P<col>`?\w+`?)\s+(?P<type>[A-Za-z]+\s*(?:\([^)]+\))?)\s*(?P<comment>comment\s*\'[^\']*\'|)?',
|
|
re.IGNORECASE
|
|
)
|
|
|
|
def replace(match):
|
|
col = match.group('col').strip('`')
|
|
dtype = match.group('type')
|
|
comment = match.group('comment') or ''
|
|
# Add backticks only if column name is a reserved keyword or contains special chars
|
|
if col.lower() in impala_reserved_keywords or not re.match(r'^[A-Za-z_][A-Za-z0-9_]*$', col):
|
|
col = f'`{col}`'
|
|
return f"{col} {dtype} {comment}".strip()
|
|
|
|
# Only replace column list part between parentheses
|
|
table_def_start = sql.find('(')
|
|
table_def_end = sql.find('ROW FORMAT SERDE', table_def_start)
|
|
if table_def_start == -1 or table_def_end == -1:
|
|
raise ValueError("Invalid SQL format: Missing column definition parentheses.")
|
|
|
|
before = sql[:table_def_start + 1]
|
|
columns = sql[table_def_start + 1:table_def_end]
|
|
after = sql[table_def_end:]
|
|
|
|
# Replace all columns inside definition
|
|
fixed_columns = pattern.sub(replace, columns)
|
|
|
|
# Combine and return
|
|
final= before + fixed_columns + after
|
|
final=final.replace("\\'", "").replace('\\\\', '\\')
|
|
return final
|
|
|
|
def applyQueryParameters(query: str, parameters: str) -> str:
|
|
"""
|
|
Replaces placeholders in the query with values from parameters.
|
|
|
|
Parameters:
|
|
- query: Original query string with placeholders like $$$1, $$$2, etc.
|
|
- parameters: Semicolon-separated string of parameter values.
|
|
|
|
Returns:
|
|
- String with the query filled with parameter values.
|
|
"""
|
|
filled_query = query
|
|
if parameters:
|
|
# Split the parameters string and reverse the list
|
|
params_array = parameters.split(';')[::-1]
|
|
index = len(params_array)
|
|
for param in params_array:
|
|
# Replace the placeholder $$$<index> with the parameter
|
|
placeholder = f"$$${index}"
|
|
filled_query = filled_query.replace(placeholder, param)
|
|
index -= 1 # Decrement the index
|
|
return filled_query
|
|
|
|
def format_column_definition(row):
|
|
if pd.isnull(row['data_description']):
|
|
# If data_description is null, only include column_name and data_type_string
|
|
return f"{row['column_name']} {row['data_type_string']}"
|
|
else:
|
|
# If data_description is present, include it with a comment
|
|
# Ensure data_description does not contain single quotes
|
|
data_description = str(row['data_description']).replace("'", "\\'")
|
|
return f"{row['column_name']} {row['data_type_string']} comment '{data_description}'"
|
|
#1 receive table name and check for target table and access type
|
|
|
|
def execute_oracle_query(sql):
|
|
oracle_conn = oraconn.connect('MRDS_LOADER')
|
|
cursor = oracle_conn.cursor()
|
|
options=cursor.execute(sql).fetchall()
|
|
oracle_conn.commit()
|
|
df = pd.DataFrame(options,columns= [row[0].lower() for row in cursor.description])
|
|
## fetch db dtypes
|
|
cursor.close()
|
|
oracle_conn.close()
|
|
return df
|
|
|
|
def get_target_table(oracle_mgmt_table,source_schema,source_table):
|
|
sql=f"SELECT DISTINCT TABLE_ALIAS FROM {oracle_mgmt_table} WHERE OWNER = '{source_schema}' AND TABLE_NAME = '{source_table}'"
|
|
df=execute_oracle_query(sql)
|
|
return df
|
|
|
|
def get_type_ofAccess(oracle_metadata_table,table_owner,source_table,service_name):
|
|
sql=f"SELECT DISTINCT RAR3_TYPE_OF_ACCESS FROM {oracle_metadata_table} WHERE A_VALID_TO > SYSDATE AND OWNER = '{table_owner}'AND TABLE_NAME = '{source_table}'"
|
|
df=execute_oracle_query(sql)
|
|
|
|
return df
|
|
|
|
#2 load metadata
|
|
def readIGAMRoles( config ):
|
|
queryParams = "'" + config['sentry_role_environment'] + "'"
|
|
igamRolesQuery = tbq.get_query_igam_roles(config['oracle_igam_table'],config['service_name'])
|
|
logger.info(f"Querying the IGAM Table: {igamRolesQuery}")
|
|
|
|
queryWithParamsIgamSentry = applyQueryParameters(igamRolesQuery, queryParams)
|
|
|
|
logger.info(f"Replaced params to IGAM Table: {queryWithParamsIgamSentry}")
|
|
|
|
igamRoleDF = execute_oracle_query(queryWithParamsIgamSentry)
|
|
|
|
return igamRoleDF
|
|
|
|
def loadMetadataTable( config ):
|
|
|
|
metadataQuery = tbq.get_query_metadata(config['oracle_metadata_table'], config['source_schema'], config['source_table'])
|
|
|
|
logger.info("Map Oracle metadata (data types) to Hive query: {metadataQuery}")
|
|
|
|
jdbcMetaDataDF = execute_oracle_query( metadataQuery)
|
|
|
|
logger.info("Fetch all fields for table and concatenate them separated by ','")
|
|
tableDataList = jdbcMetaDataDF.apply(format_column_definition, axis=1).tolist()
|
|
tableFields = ",".join(tableDataList)
|
|
|
|
return tableFields
|
|
|
|
|
|
#3 drop table and policies
|
|
def deleteExternalTable(config,env_config):
|
|
try:
|
|
try:
|
|
deleted=ranger.delete_policy(config,env_config)
|
|
logger.info(f"deleted policies: {deleted}")
|
|
except Exception as e:
|
|
logger.error("Error in dropping table")
|
|
logger.error("Exception: %s", e)
|
|
logger.error("Traceback:\n%s", traceback.format_exc())
|
|
except RuntimeError as e:
|
|
logger.error("Error in dropping table")
|
|
logger.error("Exception: %s", e)
|
|
logger.error("Traceback:\n%s", traceback.format_exc())
|
|
|
|
sql_drop = f"DROP TABLE IF EXISTS {config['corporate_store']}.{config['target_table']}"
|
|
execute_query(
|
|
sql_drop,
|
|
env_config['DEVO_USERNAME'], env_config['IMPALA_HOSTNAME'], env_config['DEVO_SECRET'],
|
|
)
|
|
logger.info(f"table {config['corporate_store']}.{config['target_table']} dropped")
|
|
|
|
#4 create external table
|
|
def createExternalTables(config, tableFields,env_config ):
|
|
sql_create = (
|
|
f"CREATE EXTERNAL TABLE {config['corporate_store']}.{config['target_table']} "
|
|
f"({tableFields}, {config['tech_meta_data_fields']}) "
|
|
"ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' "
|
|
"STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' "
|
|
"OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' "
|
|
f"LOCATION '{config['target_s3_bucket']}/{config['target_table']}' "
|
|
"TBLPROPERTIES ("
|
|
"'external.table.purge'='true', "
|
|
"'parquet.compression'='snappy')"
|
|
)
|
|
sql_create=fix_impala_sql(sql_create)
|
|
execute_query(sql_create,env_config['DEVO_USERNAME'],env_config['HIVE_HOSTNAME'],env_config['DEVO_SECRET'])
|
|
|
|
def createTableFromExternal( config, tableFields,env_config ):
|
|
sql_create = (
|
|
f"CREATE EXTERNAL TABLE {config['corporate_store']}.{config['target_table']} AS "
|
|
f"SELECT * FROM {config['corporate_store']}.{config['target_table']}_EXT"
|
|
)
|
|
execute_query(sql_create,env_config['DEVO_USERNAME'],env_config['HIVE_HOSTNAME'],env_config['DEVO_SECRET'])
|
|
|
|
#5 create table policies
|
|
def accessTypeMapper(config, env_config, igamRoleDF):
|
|
|
|
|
|
if (config['access_type'].lower() == '1'):
|
|
return accessType_1(config, env_config, igamRoleDF)
|
|
elif (config['access_type'].lower() == '2a'):
|
|
return accessType_2A(config, env_config, igamRoleDF)
|
|
elif (config['access_type'].lower() == '2b'):
|
|
return accessType_2B(config, env_config, igamRoleDF)
|
|
elif (config['access_type'].lower() == '3'):
|
|
return accessType_3(config, env_config, igamRoleDF)
|
|
else:
|
|
|
|
logger.info(f"Invalid access type ${config['access_type']}. Please check the input param")
|
|
raise RuntimeError(
|
|
f"Access type error, access type :{config['access_type'].lower()} unsupported"
|
|
)
|
|
|
|
def accessType_1(config, env_config, igamRoleDF):
|
|
logger.info("Grant privileges for access type 1")
|
|
logger.info("Fetch metadata from Oracle for access type 1")
|
|
|
|
# ---- Construct query and fetch from Oracle ----
|
|
queryParams = f"'{config['source_schema']}.{config['source_table']}'"
|
|
queryMetadataAccessType1 = tbq.get_query_metadata_access_type1(config['oracle_metadata_table'])
|
|
queryWithParamsAccessType1 = applyQueryParameters(queryMetadataAccessType1, queryParams)
|
|
|
|
logger.info("Metadata table query: " + queryWithParamsAccessType1)
|
|
jdbcMetaDataAccessType1DF = execute_oracle_query(queryWithParamsAccessType1)
|
|
|
|
# ---- Normalize columns ----
|
|
df = jdbcMetaDataAccessType1DF.copy()
|
|
df["rar3_type_of_access"] = df["rar3_type_of_access"].astype(str).str.strip()
|
|
df["source"] = df["source"].astype(str).str.strip().str.upper()
|
|
igamRoleDF["datasource"] = igamRoleDF["datasource"].astype(str).str.strip().str.upper()
|
|
|
|
# ---- Branch A: source != 'RAR' ----
|
|
left_a = (
|
|
df.loc[
|
|
(df["rar3_type_of_access"] == "1") & (df["source"] != config['service_name']),
|
|
["table_name", "source"]
|
|
]
|
|
.drop_duplicates()
|
|
)
|
|
|
|
branch_a = (
|
|
left_a.merge(
|
|
igamRoleDF,
|
|
left_on="source",
|
|
right_on="datasource",
|
|
how="inner"
|
|
)
|
|
[["table_name", "source", "subsource_id", "igam_entitlement", "environment"]]
|
|
.drop_duplicates()
|
|
)
|
|
|
|
# ---- Branch B: source == 'RAR' (CROSS JOIN with igamRoleDF) ----
|
|
left_b = (
|
|
df.loc[
|
|
(df["rar3_type_of_access"] == "1") & (df["source"] == config['service_name']),
|
|
["table_name", "source"]
|
|
]
|
|
.drop_duplicates()
|
|
)
|
|
|
|
if not left_b.empty:
|
|
branch_b = (
|
|
left_b.merge(igamRoleDF, how="cross")
|
|
[["table_name", "source", "subsource_id", "igam_entitlement", "environment"]]
|
|
.drop_duplicates()
|
|
)
|
|
else:
|
|
branch_b = pd.DataFrame(columns=["table_name", "source", "subsource_id", "igam_entitlement", "environment"])
|
|
|
|
# ---- UNION (distinct) ----
|
|
typeOneDF = (
|
|
pd.concat([branch_a, branch_b], ignore_index=True)
|
|
.drop_duplicates()
|
|
.reset_index(drop=True)
|
|
)
|
|
|
|
logger.info("typeOneDF:\n%s", typeOneDF)
|
|
|
|
# ---- Collect IGAM entitlements ----
|
|
igam_entitlements = (
|
|
typeOneDF["igam_entitlement"]
|
|
.dropna()
|
|
.astype(str)
|
|
.str.strip()
|
|
.tolist()
|
|
)
|
|
# Extract IGAM entitlements
|
|
|
|
# Merge with optional full access list
|
|
if config['full_access_entitlement_list'] is None:
|
|
combined_entitlements = igam_entitlements
|
|
else:
|
|
full_access_list_clean = config['full_access_entitlement_list']
|
|
combined_entitlements = igam_entitlements + full_access_list_clean
|
|
|
|
# Add table permission groups using YAMLFormatter
|
|
params = ranger.add_table_permission_groups(
|
|
config['corporate_store'],
|
|
config['target_table'],
|
|
config['access_type'],
|
|
config['source_table'],
|
|
combined_entitlements
|
|
)
|
|
|
|
# Generate the final YAML policy
|
|
ranger.generate_policy(params,env_config, None)
|
|
|
|
|
|
|
|
def accessType_2A(config, env_config, igamRoleDF):
|
|
logger.info("Grant privileges for access type 2a")
|
|
logger.info("Fetch the metadata in Oracle for access type 2a")
|
|
|
|
# ---- Construct query and fetch from Oracle ----
|
|
queryParams = f"'{config['source_schema']}.{config['source_table']}'"
|
|
queryMetadataAccessType2a = tbq.get_query_metadata_access_type2a(config['oracle_metadata_table'])
|
|
queryWithParamsAccessType2a = applyQueryParameters(queryMetadataAccessType2a, queryParams)
|
|
|
|
logger.info("Meta data table query: ")
|
|
jdbcMetaDataAccessType2aDF = execute_oracle_query(queryWithParamsAccessType2a)
|
|
|
|
# ---- Normalize columns ----
|
|
df = jdbcMetaDataAccessType2aDF.copy()
|
|
df["rar3_type_of_access"] = df["rar3_type_of_access"].astype(str).str.strip().str.lower()
|
|
df["source"] = df["source"].astype(str).str.strip().str.upper()
|
|
|
|
roles = igamRoleDF.copy()
|
|
# expected columns in igamRoleDF: rar_subsource_id, igam_entitlement, environment (plus anything else you keep)
|
|
roles["subsource_id"] = roles["subsource_id"].astype(str).str.strip().str.upper()
|
|
roles["igam_entitlement"] = roles["igam_entitlement"].astype(str).str.strip()
|
|
|
|
# ---- Branch A: source != service_name -> INNER JOIN on source == rar_subsource_id ----
|
|
left_a = (
|
|
df.loc[
|
|
(df["rar3_type_of_access"] == "2a")
|
|
& (df["source"] != config['service_name'].upper()),
|
|
["table_name", "column_name", "source"]
|
|
]
|
|
)
|
|
|
|
branch_a = (
|
|
left_a.merge(
|
|
roles,
|
|
left_on="source",
|
|
right_on="subsource_id",
|
|
how="inner"
|
|
)
|
|
.drop(columns=["subsource_id", "source"], errors="ignore")
|
|
[["table_name", "column_name", "igam_entitlement", "environment"]]
|
|
)
|
|
|
|
# ---- Branch B: source == service_name -> CROSS JOIN with igamRoleDF ----
|
|
left_b = (
|
|
df.loc[
|
|
(df["rar3_type_of_access"] == "2a")
|
|
& (df["source"] == config['service_name'].upper()),
|
|
["table_name", "column_name", "source"]
|
|
]
|
|
)
|
|
|
|
if not left_b.empty:
|
|
try:
|
|
branch_b = (
|
|
left_b.merge(roles, how="cross")
|
|
.drop(columns=["subsource_id", "source"], errors="ignore")
|
|
[["table_name", "column_name", "igam_entitlement", "environment"]]
|
|
)
|
|
except TypeError:
|
|
# pandas < 1.2 fallback
|
|
left_b["_cj"] = 1
|
|
roles["_cj"] = 1
|
|
branch_b = (
|
|
left_b.merge(roles, on="_cj")
|
|
.drop(columns=["_cj", "subsource_id", "source"], errors="ignore")
|
|
[["table_name", "column_name", "igam_entitlement", "environment"]]
|
|
)
|
|
# (optional) cleanup if you keep using roles later
|
|
roles.drop(columns=["_cj"], inplace=True, errors="ignore")
|
|
else:
|
|
branch_b = pd.DataFrame(columns=["table_name", "column_name", "igam_entitlement", "environment"])
|
|
|
|
# ---- UNION (ALL) ----
|
|
one_df = (
|
|
pd.concat([branch_a, branch_b], ignore_index=True)
|
|
.reset_index(drop=True)
|
|
)
|
|
|
|
# ---- Group 1: (table_name, igam_entitlement) -> sorted, comma-joined column_list ----
|
|
tmp = one_df.sort_values(["table_name", "igam_entitlement", "column_name"], kind="mergesort")
|
|
new_df = (
|
|
tmp.groupby(["table_name", "igam_entitlement"], as_index=False)["column_name"]
|
|
.apply(lambda s: ",".join(s.dropna().astype(str).tolist()))
|
|
.rename(columns={"column_name": "column_list"})
|
|
)
|
|
# Columns: table_name, igam_entitlement, column_list
|
|
|
|
# ---- Group 2: (table_name, column_list) -> comma-joined igam_entitlement ----
|
|
grouped = (
|
|
new_df.groupby(["table_name", "column_list"], as_index=False)["igam_entitlement"]
|
|
.apply(lambda s: ",".join(s.dropna().astype(str).tolist()))
|
|
)
|
|
# Columns: table_name, column_list, igam_entitlement
|
|
|
|
# ---- ROW_NUMBER() OVER (ORDER BY column_list) -> policy_id ----
|
|
grouped = grouped.sort_values(["column_list"], kind="mergesort")
|
|
grouped["policy_id"] = np.arange(1, len(grouped) + 1).astype(int)
|
|
|
|
# ---- Emit policies: one per (table_name, column_list) row ----
|
|
for _, row in grouped.iterrows():
|
|
entitlements_list = [e.strip() for e in str(row["igam_entitlement"]).split(",") if e.strip()]
|
|
columns_list = [c.strip() for c in str(row["column_list"]).split(",") if c.strip()]
|
|
policy_id = str(int(row["policy_id"]))
|
|
|
|
params = ranger.add_table_permission_groups(
|
|
config['corporate_store'],
|
|
config['target_table'],
|
|
config['access_type'], # "2a"
|
|
config['source_table'],
|
|
entitlements_list,
|
|
columns_list=columns_list
|
|
)
|
|
ranger.generate_policy(params, env_config, policy_id)
|
|
|
|
|
|
# ---- Optional: append full-access YAML if list provided on config ----
|
|
if config["full_access_entitlement_list"] != None:
|
|
# If your code already provides a list, use it directly; otherwise split string.
|
|
if isinstance(config["full_access_entitlement_list"], list):
|
|
full_access_list = config["full_access_entitlement_list"]
|
|
else:
|
|
full_access_list = [s.strip() for s in str(config["full_access_entitlement_list"]).split(",") if s.strip()]
|
|
|
|
params_full = ranger.add_table_permission_groups(
|
|
config['corporate_store'],
|
|
config['target_table'],
|
|
config['access_type'],
|
|
config['source_table'],
|
|
full_access_list
|
|
)
|
|
ranger.generate_policy(params_full, env_config, "full_access")
|
|
|
|
|
|
|
|
|
|
def accessType_2B(config, env_config,igamRoleDF):
|
|
logger.info(f"Grant privileges for access type {config['access_type']}")
|
|
logger.info("Fetch the metadata in Oracle for access type 2b")
|
|
|
|
# --- Validate required columns ---
|
|
required = {"environment", "igam_entitlement", "subsource_id"}
|
|
missing = required - set(igamRoleDF.columns)
|
|
if missing:
|
|
raise KeyError(f"igamRoleDF missing required column(s): {sorted(missing)}")
|
|
|
|
# --- Normalize to strings (robust against None/NaN) ---
|
|
igamRoleDF = igamRoleDF.copy()
|
|
igamRoleDF["environment"] = igamRoleDF["environment"].astype(str).str.strip()
|
|
igamRoleDF["igam_entitlement"] = igamRoleDF["igam_entitlement"].astype(str).str.strip()
|
|
igamRoleDF["subsource_id"] = igamRoleDF["subsource_id"].astype(str).str.strip()
|
|
|
|
# --- Aggregation: per (environment, igam_entitlement) collect unique rar_subsource_id list ---
|
|
# Keep a stable order by sorting; remove empties.
|
|
agg_df = (
|
|
igamRoleDF.loc[igamRoleDF["subsource_id"].ne(""), ["environment", "igam_entitlement", "subsource_id"]]
|
|
.drop_duplicates()
|
|
.sort_values(["environment", "igam_entitlement", "subsource_id"], kind="mergesort")
|
|
.groupby(["environment", "igam_entitlement"], as_index=False)["subsource_id"]
|
|
.agg(lambda s: ",".join(s.unique()))
|
|
.rename(columns={"subsource_id": "subsource_id_list"})
|
|
)
|
|
|
|
# List of tuples (IGAM_ENTITLEMENT, rar_subsource_id_list) — mirrors your log payload
|
|
accessType2bValidList = list(zip(
|
|
agg_df["igam_entitlement"].astype(str),
|
|
agg_df["subsource_id_list"].astype(str)
|
|
))
|
|
logger.info(f"accessType2bValidList : {accessType2bValidList}")
|
|
|
|
# --- Entitlements for policy generation (unique, non-empty) ---
|
|
igam_entitlements = (
|
|
igamRoleDF["igam_entitlement"]
|
|
.dropna()
|
|
.map(str)
|
|
.str.strip()
|
|
.loc[lambda s: s.ne("")]
|
|
.drop_duplicates()
|
|
.tolist()
|
|
)
|
|
logger.info(f"Collected IGAM entitlements ({len(igam_entitlements)}): {igam_entitlements}")
|
|
|
|
# --- Row-level permissions (per your existing API) ---
|
|
params_row_level = ranger.add_table_permission_groups(
|
|
config['corporate_store'],
|
|
config['target_table'],
|
|
config['access_type'],
|
|
config['source_table'],
|
|
igam_entitlements
|
|
)
|
|
|
|
# --- Table-level permissions, merging in full-access entitlements if provided ---
|
|
if ["full_access_entitlement_list"] != None:
|
|
combined_entitlements = igam_entitlements + config['full_access_entitlement_list']
|
|
logger.info(f"Full-access entitlements provided ({len(config['full_access_entitlement_list'])}): {config['full_access_entitlement_list']}")
|
|
else:
|
|
combined_entitlements = igam_entitlements
|
|
|
|
|
|
# ---- Optional: append full-access YAML if list provided on config ----
|
|
if config["full_access_entitlement_list"] != None:
|
|
# If your code already provides a list, use it directly; otherwise split string.
|
|
params_full = ranger.add_table_permission_groups(
|
|
config["corporate_store"],
|
|
config["target_table"],
|
|
config["access_type"], # keep same access type per your pattern
|
|
config["source_table"],
|
|
combined_entitlements
|
|
)
|
|
ranger.generate_policy(params_full, env_config)
|
|
|
|
|
|
ranger.yaml_format_2b(params_row_level,env_config, config['full_access_entitlement_list']) # row-level policy
|
|
logger.info("Final YAML format emitted for 2B.")
|
|
|
|
|
|
|
|
|
|
def accessType_3(config,env_config, igamRoleDF):
|
|
"""
|
|
Python/pandas translation of the Scala accessType_3.
|
|
Expects igamRoleDF to have at least: ['igam_entitlement', 'rar_subsource_id'].
|
|
The `config` object should expose the attributes used below (names match your Scala/Python usage).
|
|
Uses a YAML formatter module `ranger` with:
|
|
- add_table_permission_groups(corporate_store, target_table, access_type, source_table, entitlements)
|
|
- yaml_format_3(params)
|
|
- yaml_format_1(params)
|
|
"""
|
|
|
|
# --- 1) Filter entitlements where rar_subsource_id = 'TMS' ---
|
|
if not {"igam_entitlement", "subsource_id"}.issubset(igamRoleDF.columns):
|
|
missing = {"igam_entitlement", "subsource_id"} - set(igamRoleDF.columns)
|
|
raise KeyError(f"igamRoleDF missing required column(s): {sorted(missing)}")
|
|
|
|
new_df = (
|
|
igamRoleDF.loc[
|
|
igamRoleDF["subsource_id"].astype(str).str.upper() == "TMS",
|
|
["igam_entitlement"]
|
|
].drop_duplicates()
|
|
)
|
|
logger.info("new_df :\n%s", new_df.to_string(index=False))
|
|
|
|
accessType3ValidList = new_df["igam_entitlement"].astype(str).str.strip().tolist()
|
|
|
|
# --- 2) Build params for row-level groups (type 3) ---
|
|
params_row_level = ranger.add_table_permission_groups(
|
|
config['corporate_store'],
|
|
config['target_table'],
|
|
config['access_type'],
|
|
config['source_table'],
|
|
accessType3ValidList
|
|
)
|
|
|
|
corp = str(config['corporate_store']).lower()
|
|
src_tbl = str(config['source_table']).lower()
|
|
|
|
# --- 3) Compose the filter expressions (match Scala strings) ---
|
|
sqlCreateView3NonRestrString_Ptree = (
|
|
"(parent_fk in ( "
|
|
f"select portfolio_fk from {corp}.nh_portfolio_access "
|
|
"where lower(user_id) LIKE concat('%', lower(regexp_extract(current_user(),'[^@]*',0)), '%') "
|
|
"AND to_date(a_valid_to) > current_timestamp() "
|
|
")) AND (child_fk in ( "
|
|
f"select portfolio_fk from {corp}.nh_portfolio_access "
|
|
"where lower(user_id) LIKE concat('%', lower(regexp_extract(current_user(),'[^@]*',0)), '%') "
|
|
"AND to_date(a_valid_to) > current_timestamp() "
|
|
"))"
|
|
)
|
|
|
|
sqlCreateView3NonRestrString_Pos = (
|
|
"position_key in ( "
|
|
f"select position_key from {corp}.nh_portfolio_access a "
|
|
f"inner join {corp}.nh_position b on ( "
|
|
"(b.portfolio_fk = a.portfolio_fk and b.portfolio_fk is not NULL) or "
|
|
"(b.portfolio_compare_fk = a.portfolio_fk and b.portfolio_compare_fk is not NULL) "
|
|
") "
|
|
"where lower(user_id) LIKE concat('%', lower(regexp_extract(current_user(),'[^@]*',0)), '%') "
|
|
"AND to_date(a_valid_to) > current_timestamp() "
|
|
")"
|
|
)
|
|
|
|
sqlCreateView3PortAccess = "lower(user_id) LIKE concat('%', lower(regexp_extract(current_user(),'[^@]*',0)), '%')"
|
|
sqlCreateView3LimAccess = "lower(user_id) LIKE concat('%', lower(regexp_extract(current_user(),'[^@]*',0)), '%')"
|
|
|
|
# Standard case uses the configured key columns/table names
|
|
key_col = getattr(config, "type3SourceTableKeyColumn", None)
|
|
acc_col = getattr(config, "type3AccessTableKeyColumn", None)
|
|
acc_table= getattr(config, "type3AccessTable", None)
|
|
if not all([key_col, acc_col, acc_table]):
|
|
# Only needed for the default branch; keep None if your config doesn't use the default
|
|
key_col = key_col or "source_key_col"
|
|
acc_col = acc_col or "access_key_col"
|
|
acc_table = acc_table or "type3_access_table"
|
|
|
|
sqlCreateView3NonRestrString_Stdrd = (
|
|
f"{key_col} in (select {acc_col} from {corp}.{acc_table} "
|
|
"where lower(user_id) LIKE concat('%', lower(regexp_extract(current_user(),'[^@]*',0)), '%') "
|
|
"AND to_date(a_valid_to) > current_timestamp())"
|
|
)
|
|
|
|
# --- 4) Choose the filter by source table (matches Scala match/case) ---
|
|
if src_tbl == "nh_portfoliotree":
|
|
sqlCreateViewType3Filter = sqlCreateView3NonRestrString_Ptree
|
|
elif src_tbl == "nh_position":
|
|
sqlCreateViewType3Filter = sqlCreateView3NonRestrString_Pos
|
|
elif src_tbl == "nh_portfolio_access":
|
|
sqlCreateViewType3Filter = sqlCreateView3PortAccess
|
|
elif src_tbl == "nh_limit_access":
|
|
sqlCreateViewType3Filter = sqlCreateView3LimAccess
|
|
else:
|
|
sqlCreateViewType3Filter = sqlCreateView3NonRestrString_Stdrd
|
|
|
|
# --- 5) Row filter YAML block (uses groups from params_row_level) ---
|
|
# Expecting params_row_level like {'igam_roles': '...'}; adjust key if your API differs.
|
|
igam_roles_lower = str(params_row_level.get("igam_roles", "")).lower()
|
|
rowFilter = (
|
|
"- groups:\n"
|
|
f" {igam_roles_lower}\n"
|
|
" accesses:\n"
|
|
" - select\n"
|
|
f" filterExpr: \"{sqlCreateViewType3Filter}\"\n"
|
|
" "
|
|
)
|
|
|
|
# --- 6) Handle optional full access entitlements ---
|
|
|
|
|
|
if config['full_access_entitlement_list']:
|
|
paramsFullAccess = ranger.add_table_permission_groups(
|
|
config['corporate_store'],
|
|
config['target_table'],
|
|
config['access_type'],
|
|
config['source_table'],
|
|
config['full_access_entitlement_list']
|
|
)
|
|
|
|
full_groups_lower = str(paramsFullAccess.get("igam_roles", "")).lower()
|
|
"""
|
|
fullAccessFilter = (
|
|
"- groups:\n"
|
|
f" {full_groups_lower}\n"
|
|
" accesses:\n"
|
|
" - select\n"
|
|
" filterExpr: \"1=1\"\n"
|
|
" "
|
|
)
|
|
"""
|
|
params_table_level = ranger.add_table_permission_groups(
|
|
config['corporate_store'],
|
|
config['target_table'],
|
|
config['access_type'],
|
|
config['source_table'],
|
|
accessType3ValidList + config['full_access_entitlement_list']
|
|
)
|
|
else:
|
|
fullAccessFilter = ""
|
|
params_table_level = ranger.add_table_permission_groups(
|
|
config['corporate_store'],
|
|
config['target_table'],
|
|
config['access_type'],
|
|
config['source_table'],
|
|
accessType3ValidList
|
|
)
|
|
|
|
# --- 7) Render YAML and merge like Scala ---
|
|
ranger.yaml_format_3(params_row_level,env_config,sqlCreateViewType3Filter,config['full_access_entitlement_list'] ) # base type 3 yaml
|
|
ranger.yaml_format_1(params_table_level,env_config) # table-level yaml
|
|
|
|
#########################################################################################################################################################
|
|
####################################STARTING DAG#########################################################################################################
|
|
|
|
|
|
default_args = {
|
|
'owner': 'devo',
|
|
'depends_on_past': False,
|
|
'start_date': days_ago(1),
|
|
'email_on_failure': False,
|
|
'email_on_retry': False,
|
|
'retries': 1,
|
|
'retry_delay': timedelta(minutes=1),
|
|
}
|
|
|
|
with DAG(
|
|
dag_id='devo_table_generator_core',
|
|
default_args=default_args,
|
|
description='Core Devo table generator workflow for single table',
|
|
schedule=None,
|
|
catchup=False,
|
|
tags=['DevoTableGenerator'],
|
|
max_active_runs=10,
|
|
max_active_tasks=16,
|
|
) as dag:
|
|
|
|
# Init - read config from context
|
|
def init_step(**context):
|
|
dag_run = context.get("dag_run")
|
|
ti = context["ti"]
|
|
conf = (dag_run.conf or {}) if dag_run else {}
|
|
|
|
env = os.getenv("MRDS_ENV")
|
|
if not env:
|
|
raise ValueError("MRDS_ENV environment variable is required")
|
|
env = env.lower()
|
|
store = conf.get("store")
|
|
if not store:
|
|
raise ValueError("store parameter is required")
|
|
store = store.lower()
|
|
|
|
owner_table = conf.get("owner_table")
|
|
if not owner_table or '.' not in owner_table:
|
|
raise ValueError("owner_table must be in format 'OWNER.TABLE_NAME'")
|
|
|
|
table_owner, table_name = owner_table.split('.', 1)
|
|
|
|
if env not in {"dev", "tst","acc","prd"}:
|
|
raise ValueError(f"Unsupported env '{env}'. Expected 'dev', 'tst', 'acc' or 'prd'.")
|
|
if store not in {"mopdb", "rar", 'rqsd'}:
|
|
raise ValueError(f"Unsupported store '{store}'. Expected 'mopdb', 'rar', 'rqsd'.")
|
|
|
|
if store.lower() == "mopdb":
|
|
p_service_name = "MOPDB"
|
|
elif store.lower() == "rar":
|
|
p_service_name = "RAR"
|
|
elif store.lower() == 'rqsd':
|
|
p_service_name = "RQSD"
|
|
|
|
|
|
if env == "dev" or env == "tst":
|
|
sentry_role_environment = "TEST/INTEGRATION"
|
|
elif env == "acc":
|
|
sentry_role_environment = "ACCEPTANCE"
|
|
elif env == 'prd':
|
|
sentry_role_environment = "PRODUCTION"
|
|
|
|
|
|
with open(ENV_CONFIG_PATH, "r") as f:
|
|
cfg = yaml.safe_load(f)
|
|
|
|
|
|
env_cfg = cfg[env]
|
|
store_cfg = cfg[store]
|
|
p_objectstore_uri = env_cfg["S3_LOCATION_URI"].replace("{0}", store.lower())
|
|
p_run_id = str(ti.run_id)
|
|
logging.info("=== init_step begins === env=%s store=%s table=%s.%s run_id=%s sentry_role_environment=%s",
|
|
env, store, table_owner, table_name, p_run_id, sentry_role_environment)
|
|
|
|
if store.lower()=='rqsd':
|
|
env_cfg["DEVO_SECRET"]=env_cfg["DEVO_SECRET_RQSD"]
|
|
env_cfg["DEVO_USERNAME"]=env_cfg["DEVO_USERNAME_RQSD"]
|
|
try:
|
|
devo_secret_name = env_cfg["DEVO_SECRET"]
|
|
env_cfg["DEVO_SECRET"]= get_secret(devo_secret_name)
|
|
except:
|
|
logger.error("Failed to retrieve credentials from secrets")
|
|
raise(Exception)
|
|
|
|
logging.info("=== init_step getting table info=== env=%s store=%s table=%s.%s run_id=%s devo_user=%s sentry_role_environment=%s",
|
|
env, store, table_owner, table_name, p_run_id,env_cfg["DEVO_USERNAME"], sentry_role_environment)
|
|
try:
|
|
target_table=get_target_table(store_cfg['oracle_mgmt_table'],table_owner,table_name)['table_alias'][0]
|
|
except Exception as e:
|
|
logger.error("Table not found in oracle management table")
|
|
logger.error("Exception: %s", e)
|
|
logger.error("Traceback:\n%s", traceback.format_exc())
|
|
raise
|
|
try:
|
|
access_type=get_type_ofAccess(store_cfg['oracle_metadata_table'],table_owner,table_name,p_service_name)['rar3_type_of_access'][0]
|
|
except Exception as e:
|
|
logger.error("Table not found in oracle metadata inventory")
|
|
logger.error("Exception: %s", e)
|
|
logger.error("Traceback:\n%s", traceback.format_exc())
|
|
raise
|
|
|
|
|
|
xcom = {
|
|
'db':store,
|
|
'env': env,
|
|
'run_id':p_run_id,
|
|
'corporate_store':store_cfg['corporate_store'],
|
|
'service_name': p_service_name,
|
|
'source_schema':table_owner,
|
|
'source_table':table_name,
|
|
'oracle_metadata_table':store_cfg['oracle_metadata_table'],
|
|
'oracle_igam_table':store_cfg['oracle_igam_table'],
|
|
'oracle_mgmt_table': store_cfg['oracle_mgmt_table'],
|
|
'target_table':target_table,
|
|
'sentry_role_environment':sentry_role_environment,
|
|
'target_s3_bucket': env_cfg["BUCKET_PREFIX"]+store_cfg['target_s3_bucket'] ,
|
|
'tech_meta_data_fields': store_cfg['tech_meta_data_fields'],
|
|
'full_access_entitlement_list':env_cfg[f'FULL_ACCESS_LIST_{p_service_name.upper()}'].split(','),
|
|
'access_type': access_type,
|
|
'DEVO_USERNAME': env_cfg["DEVO_USERNAME"],
|
|
'DEVO_SECRET': env_cfg["DEVO_SECRET"],
|
|
'IMPALA_HOSTNAME': env_cfg["IMPALA_HOSTNAME"],
|
|
'HIVE_HOSTNAME': env_cfg["HIVE_HOSTNAME"],
|
|
'RANGER_HOSTNAME': env_cfg["RANGER_HOSTNAME"],
|
|
'BUCKET_PREFIX': env_cfg['BUCKET_PREFIX'],
|
|
'S3_LOCATION_URI': env_cfg["S3_LOCATION_URI"]
|
|
}
|
|
|
|
for k, v in xcom.items():
|
|
ti.xcom_push(key=k, value=v)
|
|
|
|
init = PythonOperator(
|
|
task_id='init_step',
|
|
python_callable=init_step,
|
|
)
|
|
|
|
# Start log table
|
|
def start_log_table_task(**context):
|
|
ti = context["ti"]
|
|
p_run_id = ti.xcom_pull(task_ids='init_step', key='run_id')
|
|
p_service_name = ti.xcom_pull(task_ids='init_step', key='service_name')
|
|
p_table_owner = ti.xcom_pull(task_ids='init_step', key='table_owner')
|
|
p_table_name = ti.xcom_pull(task_ids='init_step', key='table_name')
|
|
oracle_conn = None
|
|
try:
|
|
oracle_conn = oraconn.connect('MRDS_LOADER')
|
|
oraconn.run_proc(
|
|
oracle_conn,
|
|
'MRDS_LOADER.DATA_REPLICATOR.end_log_table_generator',
|
|
[p_run_id, p_service_name, p_table_owner, p_table_name,'success']
|
|
)
|
|
oracle_conn.commit()
|
|
logging.info("start_log_table procedure executed successfully")
|
|
except Exception as e:
|
|
logging.error(f"Error in start_log_table: {e}")
|
|
raise
|
|
finally:
|
|
if oracle_conn:
|
|
oracle_conn.close()
|
|
|
|
t1 = PythonOperator(
|
|
task_id='start_log_table',
|
|
python_callable=start_log_table_task,
|
|
)
|
|
|
|
|
|
# Drop table
|
|
def drop_table_task(**context):
|
|
ti = context["ti"]
|
|
config={"corporate_store":ti.xcom_pull(task_ids='init_step', key='corporate_store'),
|
|
"target_table":ti.xcom_pull(task_ids='init_step', key='target_table'),
|
|
'access_type': ti.xcom_pull(task_ids='init_step', key='access_type'),
|
|
'env': ti.xcom_pull(task_ids='init_step', key='env')
|
|
|
|
}
|
|
env_config={'DEVO_USERNAME': ti.xcom_pull(task_ids='init_step', key="DEVO_USERNAME"),
|
|
'DEVO_SECRET': ti.xcom_pull(task_ids='init_step', key="DEVO_SECRET"),
|
|
'IMPALA_HOSTNAME':ti.xcom_pull(task_ids='init_step', key="IMPALA_HOSTNAME"),
|
|
'HIVE_HOSTNAME': ti.xcom_pull(task_ids='init_step', key="HIVE_HOSTNAME"),
|
|
'RANGER_HOSTNAME': ti.xcom_pull(task_ids='init_step', key="RANGER_HOSTNAME"),
|
|
'BUCKET_PREFIX': ti.xcom_pull(task_ids='init_step', key='BUCKET_PREFIX'),
|
|
'S3_LOCATION_URI':ti.xcom_pull(task_ids='init_step', key="S3_LOCATION_URI")}
|
|
try:
|
|
deleteExternalTable(config,env_config)
|
|
logging.info("drop_table procedure executed successfully")
|
|
except Exception as e:
|
|
logging.error(f"Error in drop_table: {e}")
|
|
raise
|
|
|
|
t2 = PythonOperator(
|
|
task_id='drop_table',
|
|
python_callable=drop_table_task,
|
|
trigger_rule=TriggerRule.ALL_DONE,
|
|
)
|
|
|
|
# Devo Iextarnal table creation
|
|
def devo_table_creation_task(**context):
|
|
ti = context["ti"]
|
|
|
|
config={"corporate_store":ti.xcom_pull(task_ids='init_step', key='corporate_store'),
|
|
"target_table":ti.xcom_pull(task_ids='init_step', key='target_table'),
|
|
'access_type': ti.xcom_pull(task_ids='init_step', key='access_type'),
|
|
'source_schema': ti.xcom_pull(task_ids='init_step', key='source_schema'),
|
|
'source_table': ti.xcom_pull(task_ids='init_step', key='source_table'),
|
|
'oracle_metadata_table': ti.xcom_pull(task_ids='init_step', key='oracle_metadata_table'),
|
|
'target_s3_bucket': ti.xcom_pull(task_ids='init_step', key='target_s3_bucket'),
|
|
'tech_meta_data_fields': ti.xcom_pull(task_ids='init_step', key='tech_meta_data_fields'),
|
|
'env': ti.xcom_pull(task_ids='init_step', key='env')
|
|
|
|
}
|
|
env_config={'DEVO_USERNAME': ti.xcom_pull(task_ids='init_step', key="DEVO_USERNAME"),
|
|
'DEVO_SECRET': ti.xcom_pull(task_ids='init_step', key="DEVO_SECRET"),
|
|
'IMPALA_HOSTNAME':ti.xcom_pull(task_ids='init_step', key="IMPALA_HOSTNAME"),
|
|
'HIVE_HOSTNAME': ti.xcom_pull(task_ids='init_step', key="HIVE_HOSTNAME"),
|
|
'RANGER_HOSTNAME': ti.xcom_pull(task_ids='init_step', key="RANGER_HOSTNAME"),
|
|
'BUCKET_PREFIX': ti.xcom_pull(task_ids='init_step', key='BUCKET_PREFIX'),
|
|
'S3_LOCATION_URI':ti.xcom_pull(task_ids='init_step', key="S3_LOCATION_URI")}
|
|
|
|
tableFields=loadMetadataTable(config)
|
|
logging.info("Starting table creation on hive with env=%s store=%s corporate_store=%s table=%s devo_user=%s",
|
|
ti.xcom_pull(task_ids='init_step', key='env'), ti.xcom_pull(task_ids='init_step', key='db'), ti.xcom_pull(task_ids='init_step', key='corporate_store'), ti.xcom_pull(task_ids='init_step', key='target_table'),env_config["DEVO_USERNAME"])
|
|
|
|
if (config['target_table'][-4:].upper() == '_EXT'):
|
|
createExternalTables( config, tableFields,env_config )
|
|
else:
|
|
createTableFromExternal( config, tableFields,env_config)
|
|
|
|
try:
|
|
logging.info("Impyla (Devo) task finished successfully.")
|
|
except Exception as e:
|
|
logging.error(f"Error in devo_impyla_task: {e}")
|
|
raise
|
|
|
|
t3 = PythonOperator(
|
|
task_id='devo_table_creation_task',
|
|
python_callable=devo_table_creation_task,
|
|
trigger_rule=TriggerRule.ALL_DONE,
|
|
)
|
|
|
|
# Devo
|
|
def ranger_policy_creation(**context):
|
|
ti = context["ti"]
|
|
|
|
config={"corporate_store":ti.xcom_pull(task_ids='init_step', key='corporate_store'),
|
|
'source_table': ti.xcom_pull(task_ids='init_step', key='source_table'),
|
|
"target_table":ti.xcom_pull(task_ids='init_step', key='target_table'),
|
|
'access_type': ti.xcom_pull(task_ids='init_step', key='access_type'),
|
|
'source_schema': ti.xcom_pull(task_ids='init_step', key='source_schema'),
|
|
'service_name': ti.xcom_pull(task_ids='init_step', key='service_name'),
|
|
'oracle_metadata_table': ti.xcom_pull(task_ids='init_step', key='oracle_metadata_table'),
|
|
'target_s3_bucket': ti.xcom_pull(task_ids='init_step', key='target_s3_bucket'),
|
|
'tech_meta_data_fields': ti.xcom_pull(task_ids='init_step', key='tech_meta_data_fields'),
|
|
'sentry_role_environment': ti.xcom_pull(task_ids='init_step', key='sentry_role_environment'),
|
|
'full_access_entitlement_list': ti.xcom_pull(task_ids='init_step', key='full_access_entitlement_list'),
|
|
'oracle_igam_table':ti.xcom_pull(task_ids='init_step', key='oracle_igam_table'),
|
|
'env': ti.xcom_pull(task_ids='init_step', key='env')
|
|
|
|
}
|
|
env_config={'DEVO_USERNAME': ti.xcom_pull(task_ids='init_step', key="DEVO_USERNAME"),
|
|
'DEVO_SECRET': ti.xcom_pull(task_ids='init_step', key="DEVO_SECRET"),
|
|
'IMPALA_HOSTNAME':ti.xcom_pull(task_ids='init_step', key="IMPALA_HOSTNAME"),
|
|
'HIVE_HOSTNAME': ti.xcom_pull(task_ids='init_step', key="HIVE_HOSTNAME"),
|
|
'RANGER_HOSTNAME': ti.xcom_pull(task_ids='init_step', key="RANGER_HOSTNAME"),
|
|
'BUCKET_PREFIX': ti.xcom_pull(task_ids='init_step', key='BUCKET_PREFIX'),
|
|
'S3_LOCATION_URI':ti.xcom_pull(task_ids='init_step', key="S3_LOCATION_URI")}
|
|
|
|
|
|
logging.info("Starting Policy creation with env=%s store=%s corporate_store=%s table=%s sentry_role_environment=%s",
|
|
ti.xcom_pull(task_ids='init_step', key='env'), ti.xcom_pull(task_ids='init_step', key='db'), ti.xcom_pull(task_ids='init_step', key='corporate_store'), ti.xcom_pull(task_ids='init_step', key='target_table'),ti.xcom_pull(task_ids='init_step', key='sentry_role_environment'))
|
|
|
|
|
|
if config['target_table'][-4:].upper() != '_EXT':
|
|
igamRoles=readIGAMRoles(config)
|
|
logger.info(accessTypeMapper(config,env_config,igamRoles))
|
|
|
|
try:
|
|
logging.info("Impyla (Devo) task finished successfully.")
|
|
except Exception as e:
|
|
logging.error(f"Error in devo_impyla_task: {e}")
|
|
raise
|
|
|
|
t4 = PythonOperator(
|
|
task_id='ranger_policy_creation',
|
|
python_callable=ranger_policy_creation,
|
|
trigger_rule=TriggerRule.ALL_DONE,
|
|
)
|
|
|
|
|
|
# End log table
|
|
def end_log_table_task(**context):
|
|
ti = context["ti"]
|
|
p_service_name = ti.xcom_pull(task_ids='init_step', key='p_service_name')
|
|
p_table_owner = ti.xcom_pull(task_ids='init_step', key='p_table_owner')
|
|
p_table_name = ti.xcom_pull(task_ids='init_step', key='p_table_name')
|
|
|
|
oracle_conn = None
|
|
try:
|
|
oracle_conn = oraconn.connect('MRDS_LOADER')
|
|
oraconn.run_proc(
|
|
oracle_conn,
|
|
'MRDS_LOADER.DATA_REPLICATOR.end_log_table',
|
|
[p_service_name, p_table_owner, p_table_name]
|
|
)
|
|
oracle_conn.commit()
|
|
logging.info("end_log_table procedure executed successfully")
|
|
except Exception as e:
|
|
logging.error(f"Error in end_log_table: {e}")
|
|
logging.info("Continuing despite end_log_table error (cleanup task)")
|
|
finally:
|
|
if oracle_conn:
|
|
oracle_conn.close()
|
|
|
|
t5 = PythonOperator(
|
|
task_id='end_log_table',
|
|
python_callable=end_log_table_task,
|
|
trigger_rule=TriggerRule.ALL_DONE,
|
|
)
|
|
|
|
|
|
# Check status and fail if needed
|
|
def fail_if_any_failed(**context):
|
|
dag_run = context['dag_run']
|
|
check_tasks = ['init_step','start_log_table', 'drop_table', 'devo_table_creation_task','ranger_policy_creation', 'end_log_table']
|
|
failed = []
|
|
|
|
for tid in check_tasks:
|
|
ti_up = dag_run.get_task_instance(tid)
|
|
if ti_up and ti_up.state == 'failed':
|
|
failed.append(tid)
|
|
|
|
if failed:
|
|
ti = context["ti"]
|
|
p_run_id = ti.xcom_pull(task_ids='init_step', key='run_id')
|
|
p_service_name = ti.xcom_pull(task_ids='init_step', key='service_name')
|
|
p_table_owner = ti.xcom_pull(task_ids='init_step', key='table_owner')
|
|
p_table_name = ti.xcom_pull(task_ids='init_step', key='table_name')
|
|
oracle_conn = None
|
|
try:
|
|
oracle_conn = oraconn.connect('MRDS_LOADER')
|
|
oraconn.run_proc(
|
|
oracle_conn,
|
|
'MRDS_LOADER.DATA_REPLICATOR.end_log_table_generator',
|
|
[p_run_id, p_service_name, p_table_owner, p_table_name,f"The Following task failed: {failed[0]}"]
|
|
)
|
|
oracle_conn.commit()
|
|
logging.info("start_log_table procedure executed successfully")
|
|
except Exception as e:
|
|
logging.error(f"Error in start_log_table: {e}")
|
|
raise
|
|
finally:
|
|
if oracle_conn:
|
|
oracle_conn.close()
|
|
|
|
error_msg = f"Critical task(s) failed: {', '.join(failed)}. DAG execution failed."
|
|
logging.error(error_msg)
|
|
raise AirflowFailException(error_msg)
|
|
|
|
logging.info("All critical tasks completed successfully: %s", check_tasks)
|
|
|
|
t6 = PythonOperator(
|
|
task_id='fail_if_any_failed',
|
|
python_callable=fail_if_any_failed,
|
|
trigger_rule=TriggerRule.ALL_DONE,
|
|
)
|
|
|
|
# Dependencies
|
|
init >> t1 >> t2 >> t3>> t4
|
|
[t1, t2, t3, t4] >> t5
|
|
t5 >> t6
|