Files
Grzegorz Michalski 2c225d68ac init
2026-03-02 09:47:35 +01:00

251 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import List, Optional
from apache_ranger.model.ranger_service import *
from apache_ranger.client.ranger_client import *
from apache_ranger.model.ranger_policy import *
import re
def add_table_permission_groups(corporate_store: str, target_table: str, access_type: str, source_table: str, igam_entitlement_list: List[str], columns_list: Optional[List[str]] = None, row_list: Optional[List[str]] = None):
igam_entitlements = igam_entitlement_list + ["public"] if source_table.lower() == "rar_sources_igam_sentry" else igam_entitlement_list
column_details = columns_list if columns_list is not None else ["*"]
columns = column_details
row_filter = row_list if row_list is not None else ["*"]
filter_condition = ','.join([f"'{row}'" for row in row_filter])
igam_roles = [x.lower() for x in igam_entitlements if x !=""]
return {
'corporate_store': corporate_store,
'target_table': target_table,
'access_type': access_type,
'columns': columns,
'rows': filter_condition,
'igam_roles': igam_roles
}
from typing import List, Optional
# --- helpers ---------------------------------------------------------------
def _policy_name_from_params(config, policy_id: Optional[str] = None) -> Optional[str]:
"""
Build the exact policy name used by your create functions.
Returns None for types where we need to match multiple (e.g., 2a without id).
"""
cs = config['corporate_store'].lower()
tbl = config['target_table'].lower()
at = config['access_type'].lower()
base = f"cpo_{cs}_{tbl}_{at}"
if at == "1":
# yaml_format_1
return base
elif at == "2a":
# yaml_format_2a -> requires policy_id to be exact
if policy_id:
return f"{base}_policy_{policy_id}"
# without policy_id, well delete all that start with this prefix
return None
elif at == "2b":
# yaml_format_2b
return f"{base}_row_level_policy"
elif at == "3":
# yaml_format_3 uses same name pattern as 2b in your script
return f"{base}_row_level_policy"
else:
raise ValueError(f"Invalid access type '{config['access_type']}'. Expected one of: 1, 2a, 2b, 3.")
def _ranger_client(env_config) -> RangerClient:
ranger_url = env_config['RANGER_HOSTNAME']
ranger_auth = ( env_config['DEVO_USERNAME'], env_config['DEVO_SECRET'])
client = RangerClient(ranger_url, ranger_auth)
client.session.verify = False
return client
# --- main deletion API -----------------------------------------------------
def delete_policy(config,env_config, policy_id: Optional[str] = None) -> List[str]:
"""
Delete Ranger policy/policies by name based on:
- params['corporate_store']
- params['target_table']
- typeOfAccess: "1", "2a", "2b", "3"
- policy_id: optional (only meaningful for '2a')
Returns a list of deleted policy names.
"""
ranger = _ranger_client(env_config)
service_name = "cm_hive"
# Try build exact name
deleted: List[str] = []
# If we dont have an exact name (e.g. type 2a without policy_id),
# delete *all* that match the expected prefix.
cs = config['corporate_store'].lower()
tbl = config['target_table'].lower()
at = config['access_type'].lower()
prefix = f"cpo_{cs}_{tbl}_"
print(prefix)
# Fetch all policies for the table and filter client-side to reduce calls.
start = 0
candidates = []
page_size=1000
service_name="cm_hive"
while True:
params = {"pageSize": page_size, "startIndex": start}
page = ranger.get_policies_in_service(service_name, params=params) or []
candidates.extend(page)
if len(page) < page_size:
break
start += len(page)
for p in candidates:
name = p["name"]
print(f"analizing policy:{name}")
if re.fullmatch(f"{prefix}([0-9]?[a-z]?)(_policy_)?([0-9]*)?(_row_level_policy)?(full_access)?$",name) != None:
try:
ranger.delete_policy_by_id(p["id"])
deleted.append(name)
except Exception:
# continue attempting others
pass
if not deleted:
raise RuntimeError(
f"No matching policies found for deletion with prefix '{prefix}'. "
)
return deleted
def generate_policy(params,env_config, policy_id: Optional[str] = None):
access_type = params['access_type'].lower()
if access_type == "1":
return yaml_format_1(params,env_config)
elif access_type == "2a":
return yaml_format_2a(params, env_config, policy_id)
elif access_type == "2b":
return yaml_format_1(params,env_config)
elif access_type == "3":
return yaml_format_3(params)
else:
raise Exception(f"Invalid access type {params['access_type']}. Please check the input param")
def yaml_format_1(params,env_config) -> str:
ranger=_ranger_client(env_config)
# For Kerberos authentication
#
# from requests_kerberos import HTTPKerberosAuth
#
# ranger_auth = HTTPKerberosAuth()
policy = RangerPolicy()
policy.service = "cm_hive" #harcoded
policy.name = f"cpo_{params['corporate_store'].lower()}_{params['target_table'].lower()}_{params['access_type'].lower()}" #corporatestore_table_accessType
policy.resources = { 'database': RangerPolicyResource({ 'values': [params['corporate_store'].lower()] }),
'table': RangerPolicyResource({ 'values': [params['target_table']] }),
'column': RangerPolicyResource({ 'values': params['columns'] }) }
allowItem1 = RangerPolicyItem()
allowItem1.groups = params['igam_roles']
allowItem1.accesses = [RangerPolicyItemAccess({ 'type': 'select' })]
policy.policyItems = [ allowItem1 ]
created_policy = ranger.create_policy(policy)
print('Created policy: name=' + created_policy.name + ', id=' + str(created_policy.id))
return policy
def yaml_format_2a(params, env_config,policy_id: Optional[str]) -> str:
policy_ID = policy_id if policy_id is not None else "0"
# For Kerberos authentication
#
# from requests_kerberos import HTTPKerberosAuth
#
# ranger_auth = HTTPKerberosAuth()
ranger = _ranger_client(env_config)
policy = RangerPolicy()
policy.service = "cm_hive" #harcoded
policy.name = f"cpo_{params['corporate_store'].lower()}_{params['target_table'].lower()}_{params['access_type'].lower()}_policy_{policy_ID}" #corporatestore_table_accessType
policy.resources = { 'database': RangerPolicyResource({ 'values': [params['corporate_store'].lower()] }),
'table': RangerPolicyResource({ 'values': [params['target_table']] }),
'column': RangerPolicyResource({ 'values': params['columns'] }) }
allowItem1 = RangerPolicyItem()
allowItem1.groups = params['igam_roles']
allowItem1.accesses = [RangerPolicyItemAccess({ 'type': 'select' })]
policy.policyItems = [ allowItem1 ]
created_policy = ranger.create_policy(policy)
print(' created policy: name=' + created_policy.name + ', id=' + str(created_policy.id))
return policy
def yaml_format_2b(params,env_config, full_access_list: Optional[List]) -> str:
# For Kerberos authentication
#
# from requests_kerberos import HTTPKerberosAuth
#
# ranger_auth = HTTPKerberosAuth()
ranger = _ranger_client(env_config)
policy = RangerPolicy()
policy.service = "cm_hive" #harcoded
policy.name = f"cpo_{params['corporate_store'].lower()}_{params['target_table'].lower()}_{params['access_type'].lower()}_row_level_policy" #corporatestore_table_accessType
policy.isEnabled = True
policy.resources ={ 'database': RangerPolicyResource({ 'values': [params['corporate_store'].lower()] }),
'table': RangerPolicyResource({ 'values': [params['target_table']] })}
rowFilterAllowItem1= RangerRowFilterPolicyItem()
rowFilterAllowItem1.groups = params['igam_roles']
rowFilterAllowItem1.accesses = [RangerPolicyItemAccess({ 'type': 'select' })]
rowFilterAllowItem1.rowFilterInfo = RangerPolicyItemRowFilterInfo({ 'filterExpr': f"lower(source) IN (select lower(rar_subsource_id) from {params['corporate_store'].lower()}.t_ref_rar_sources_igam_sentry where lower(rar_igam_entitlement) IN (select ad_group from {params['corporate_store'].lower()}.active_directory_user_groups where username = lower(regexp_extract(current_user(),'[^@]*',0))))" })
rowFilterAllowItem2= RangerRowFilterPolicyItem()
rowFilterAllowItem2.groups = [x.lower() for x in full_access_list]
rowFilterAllowItem2.accesses = [RangerPolicyItemAccess({ 'type': 'select' })]
rowFilterAllowItem2.rowFilterInfo = RangerPolicyItemRowFilterInfo({ 'filterExpr': f"1=1" })
policy.rowFilterPolicyItems= [rowFilterAllowItem1, rowFilterAllowItem2]
created_policy = ranger.create_policy(policy)
print(' created policy: name=' + created_policy.name + ', id=' + str(created_policy.id))
return policy
def yaml_format_3(params, env_config,filterString, full_access_list: Optional[List]) -> str:
ranger = _ranger_client(env_config)
policy = RangerPolicy()
policy.service = "cm_hive" # hardcoded
policy.name = (
f"cpo_{params['corporate_store'].lower()}_"
f"{params['target_table'].lower()}_"
f"{params['access_type'].lower()}_row_level_policy"
)
policy.isEnabled = True
policy.resources = {
"database": RangerPolicyResource({"values": [params["corporate_store"].lower()]}),
"table": RangerPolicyResource({"values": [params["target_table"]]}),
}
# Row filter item
rowFilterAllowItem = RangerRowFilterPolicyItem()
rowFilterAllowItem.groups = params["igam_roles"]
rowFilterAllowItem.accesses = [RangerPolicyItemAccess({"type": "select"})]
rowFilterAllowItem.rowFilterInfo = RangerPolicyItemRowFilterInfo(
{
"filterExpr": filterString
}
)
rowFilterAllowItem2= RangerRowFilterPolicyItem()
rowFilterAllowItem2.groups = [x.lower() for x in full_access_list]
rowFilterAllowItem2.accesses = [RangerPolicyItemAccess({ 'type': 'select' })]
rowFilterAllowItem2.rowFilterInfo = RangerPolicyItemRowFilterInfo({ 'filterExpr': f"1=1" })
policy.rowFilterPolicyItems = [rowFilterAllowItem,rowFilterAllowItem2]
# Create policy in Ranger
created_policy = ranger.create_policy(policy)
print(f" created policy: name={created_policy.name}, id={created_policy.id}")
return policy