validate_azure_dladmin_identity_4.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. def _azure_dladmin_logs_storage_actions_check(
  2. config: Dict[str, Any],
  3. auth_client: AuthorizationManagementClient,
  4. resource_client: ResourceManagementClient,
  5. azure_data_required_actions: List[str],
  6. ) -> None: # pragma: no cover
  7. # noqa: D401,E501
  8. sub_id: str = get_config_value(config=config, key="infra:azure:subscription_id")
  9. rg_name: str = get_config_value(config=config, key="infra:azure:metagroup:name")
  10. storage_name: str = get_config_value(config=config, key="env:azure:storage:name")
  11. log_path: str = get_config_value(config=config, key="env:azure:storage:path:logs")
  12. datalake_admin: str = get_config_value(
  13. config=config, key="env:azure:role:name:datalake_admin"
  14. )
  15. parsed_logger_path = parse_adls_path(log_path)
  16. container_name = parsed_logger_path[1]
  17. role_assignments = get_role_assignments(
  18. auth_client=auth_client,
  19. resource_client=resource_client,
  20. identity_name=datalake_admin,
  21. subscription_id=sub_id,
  22. resource_group=rg_name,
  23. )
  24. proper_scope = get_storage_container_scope(
  25. sub_id, rg_name, storage_name, container_name
  26. )
  27. missing_actions, _ = check_for_actions(
  28. auth_client=auth_client,
  29. role_assigments=role_assignments,
  30. proper_scope=proper_scope,
  31. required_actions=azure_data_required_actions,
  32. required_data_actions=[],
  33. )
  34. if missing_actions:
  35. fail(
  36. AZURE_IDENTITY_MISSING_ACTIONS_FOR_LOCATION,
  37. subjects=[
  38. datalake_admin,
  39. f"storageAccounts/{storage_name}/blobServices/default/containers/{container_name}", # noqa: E501
  40. ],
  41. resources=missing_actions,
  42. )