123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469 |
- #!/usr/bin/env python3
- ###
- # CLOUDERA CDP Control (cdpctl)
- #
- # (C) Cloudera, Inc. 2021-2021
- # All rights reserved.
- #
- # Applicable Open Source License: GNU AFFERO GENERAL PUBLIC LICENSE
- #
- # NOTE: Cloudera open source products are modular software products
- # made up of hundreds of individual components, each of which was
- # individually copyrighted. Each Cloudera open source product is a
- # collective work under U.S. Copyright Law. Your license to use the
- # collective work is as provided in your written agreement with
- # Cloudera. Used apart from the collective work, this file is
- # licensed for your use pursuant to the open source license
- # identified above.
- #
- # This code is provided to you pursuant a written agreement with
- # (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
- # this code. If you do not have a written agreement with Cloudera nor
- # with an authorized and properly licensed third party, you do not
- # have any rights to access nor to use this code.
- #
- # Absent a written agreement with Cloudera, Inc. (“Cloudera”) to the
- # contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
- # KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
- # WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
- # IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
- # FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
- # AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
- # ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
- # OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
- # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
- # CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
- # RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
- # BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
- # DATA.
- #
- # Source File Name: validate_azure_dladmin_identity.py
- ###
- """Validation of Azure Datalake Admin Identity."""
- from typing import Any, Dict, List
- import pytest
- from azure.mgmt.authorization import AuthorizationManagementClient
- from azure.mgmt.resource import ResourceManagementClient
- from cdpctl.validation import fail, get_config_value
- from cdpctl.validation.azure_utils import (
- check_for_actions,
- get_client,
- get_role_assignments,
- get_storage_container_scope,
- parse_adls_path,
- )
- from cdpctl.validation.infra.issues import (
- AZURE_IDENTITY_MISSING_ACTIONS_FOR_LOCATION,
- AZURE_IDENTITY_MISSING_DATA_ACTIONS_FOR_LOCATION,
- )
- @pytest.fixture(autouse=True, name="resource_client")
- def resource_client_fixture(config: Dict[str, Any]) -> ResourceManagementClient:
- """Return an Azure Resource Client."""
- return get_client("resource", config)
- @pytest.fixture(autouse=True, name="auth_client")
- def auth_client_fixture(config: Dict[str, Any]) -> AuthorizationManagementClient:
- """Return an Azure Auth Client."""
- return get_client("auth", config)
- @pytest.mark.azure
- @pytest.mark.infra
- def azure_dladmin_actions_for_logs_storage_validation(
- config: Dict[str, Any],
- auth_client: AuthorizationManagementClient,
- resource_client: ResourceManagementClient,
- azure_data_required_actions,
- ) -> None: # pragma: no cover
- """Datalake Admin Identity has required Actions on logs storage location.""" # noqa: D401,E501
- _azure_dladmin_logs_storage_actions_check(
- config=config,
- auth_client=auth_client,
- resource_client=resource_client,
- azure_data_required_actions=azure_data_required_actions,
- )
- def _azure_dladmin_logs_storage_actions_check(
- config: Dict[str, Any],
- auth_client: AuthorizationManagementClient,
- resource_client: ResourceManagementClient,
- azure_data_required_actions: List[str],
- ) -> None: # pragma: no cover
- # noqa: D401,E501
- sub_id: str = get_config_value(config=config, key="infra:azure:subscription_id")
- rg_name: str = get_config_value(config=config, key="infra:azure:metagroup:name")
- storage_name: str = get_config_value(config=config, key="env:azure:storage:name")
- log_path: str = get_config_value(config=config, key="env:azure:storage:path:logs")
- datalake_admin: str = get_config_value(
- config=config, key="env:azure:role:name:datalake_admin"
- )
- parsed_logger_path = parse_adls_path(log_path)
- container_name = parsed_logger_path[1]
- role_assignments = get_role_assignments(
- auth_client=auth_client,
- resource_client=resource_client,
- identity_name=datalake_admin,
- subscription_id=sub_id,
- resource_group=rg_name,
- )
- proper_scope = get_storage_container_scope(
- sub_id, rg_name, storage_name, container_name
- )
- missing_actions, _ = check_for_actions(
- auth_client=auth_client,
- role_assigments=role_assignments,
- proper_scope=proper_scope,
- required_actions=azure_data_required_actions,
- required_data_actions=[],
- )
- if missing_actions:
- fail(
- AZURE_IDENTITY_MISSING_ACTIONS_FOR_LOCATION,
- subjects=[
- datalake_admin,
- f"storageAccounts/{storage_name}/blobServices/default/containers/{container_name}", # noqa: E501
- ],
- resources=missing_actions,
- )
- @pytest.mark.azure
- @pytest.mark.infra
- def azure_dladmin_data_actions_for_logs_storage_validation(
- config: Dict[str, Any],
- auth_client: AuthorizationManagementClient,
- resource_client: ResourceManagementClient,
- azure_data_required_data_actions,
- ) -> None: # pragma: no cover
- """Datalake Admin Identity has required Data Actions on logs storage location.""" # noqa: D401,E501
- _azure_dladmin_logs_storage_data_actions_check(
- config=config,
- auth_client=auth_client,
- resource_client=resource_client,
- azure_data_required_data_actions=azure_data_required_data_actions,
- )
- def _azure_dladmin_logs_storage_data_actions_check(
- config: Dict[str, Any],
- auth_client: AuthorizationManagementClient,
- resource_client: ResourceManagementClient,
- azure_data_required_data_actions: List[str],
- ) -> None: # pragma: no cover
- # noqa: D401,E501
- sub_id: str = get_config_value(config=config, key="infra:azure:subscription_id")
- rg_name: str = get_config_value(config=config, key="infra:azure:metagroup:name")
- storage_name: str = get_config_value(config=config, key="env:azure:storage:name")
- log_path: str = get_config_value(config=config, key="env:azure:storage:path:logs")
- datalake_admin: str = get_config_value(
- config=config, key="env:azure:role:name:datalake_admin"
- )
- parsed_logger_path = parse_adls_path(log_path)
- container_name = parsed_logger_path[1]
- role_assignments = get_role_assignments(
- auth_client=auth_client,
- resource_client=resource_client,
- identity_name=datalake_admin,
- subscription_id=sub_id,
- resource_group=rg_name,
- )
- proper_scope = get_storage_container_scope(
- sub_id, rg_name, storage_name, container_name
- )
- _, missing_data_actions = check_for_actions(
- auth_client=auth_client,
- role_assigments=role_assignments,
- proper_scope=proper_scope,
- required_actions=[],
- required_data_actions=azure_data_required_data_actions,
- )
- if missing_data_actions:
- fail(
- AZURE_IDENTITY_MISSING_DATA_ACTIONS_FOR_LOCATION,
- subjects=[
- datalake_admin,
- f"storageAccounts/{storage_name}/blobServices/default/containers/{container_name}", # noqa: E501
- ],
- resources=missing_data_actions,
- )
- @pytest.mark.azure
- @pytest.mark.infra
- def azure_dladmin_actions_for_data_storage_validation(
- config: Dict[str, Any],
- auth_client: AuthorizationManagementClient,
- resource_client: ResourceManagementClient,
- azure_data_required_actions,
- ) -> None: # pragma: no cover
- """Datalake Admin Identity has required Actions on data storage location.""" # noqa: D401,E501
- _azure_dladmin_data_storage_actions_check(
- config=config,
- auth_client=auth_client,
- resource_client=resource_client,
- azure_data_required_actions=azure_data_required_actions,
- )
- def _azure_dladmin_data_storage_actions_check(
- config: Dict[str, Any],
- auth_client: AuthorizationManagementClient,
- resource_client: ResourceManagementClient,
- azure_data_required_actions: List[str],
- ) -> None: # pragma: no cover
- # noqa: D401,E501
- sub_id: str = get_config_value(config=config, key="infra:azure:subscription_id")
- rg_name: str = get_config_value(config=config, key="infra:azure:metagroup:name")
- storage_name: str = get_config_value(config=config, key="env:azure:storage:name")
- data_path: str = get_config_value(config=config, key="env:azure:storage:path:data")
- datalake_admin: str = get_config_value(
- config=config, key="env:azure:role:name:datalake_admin"
- )
- parsed_data_path = parse_adls_path(data_path)
- container_name = parsed_data_path[1]
- role_assignments = get_role_assignments(
- auth_client=auth_client,
- resource_client=resource_client,
- identity_name=datalake_admin,
- subscription_id=sub_id,
- resource_group=rg_name,
- )
- proper_scope = get_storage_container_scope(
- sub_id, rg_name, storage_name, container_name
- )
- missing_actions, _ = check_for_actions(
- auth_client=auth_client,
- role_assigments=role_assignments,
- proper_scope=proper_scope,
- required_actions=azure_data_required_actions,
- required_data_actions=[],
- )
- if missing_actions:
- fail(
- AZURE_IDENTITY_MISSING_ACTIONS_FOR_LOCATION,
- subjects=[
- datalake_admin,
- f"storageAccounts/{storage_name}/blobServices/default/containers/{container_name}", # noqa: E501
- ],
- resources=missing_actions,
- )
- @pytest.mark.azure
- @pytest.mark.infra
- def azure_dladmin_data_actions_for_data_storage_validation(
- config: Dict[str, Any],
- auth_client: AuthorizationManagementClient,
- resource_client: ResourceManagementClient,
- azure_data_required_data_actions,
- ) -> None: # pragma: no cover
- """Datalake Admin Identity has required Data Actions on data storage location.""" # noqa: D401,E501
- _azure_dladmin_data_storage_data_actions_check(
- config=config,
- auth_client=auth_client,
- resource_client=resource_client,
- azure_data_required_data_actions=azure_data_required_data_actions,
- )
- def _azure_dladmin_data_storage_data_actions_check(
- config: Dict[str, Any],
- auth_client: AuthorizationManagementClient,
- resource_client: ResourceManagementClient,
- azure_data_required_data_actions: List[str],
- ) -> None: # pragma: no cover
- # noqa: D401,E501
- sub_id: str = get_config_value(config=config, key="infra:azure:subscription_id")
- rg_name: str = get_config_value(config=config, key="infra:azure:metagroup:name")
- storage_name: str = get_config_value(config=config, key="env:azure:storage:name")
- data_path: str = get_config_value(config=config, key="env:azure:storage:path:data")
- datalake_admin: str = get_config_value(
- config=config, key="env:azure:role:name:datalake_admin"
- )
- parsed_data_path = parse_adls_path(data_path)
- container_name = parsed_data_path[1]
- role_assignments = get_role_assignments(
- auth_client=auth_client,
- resource_client=resource_client,
- identity_name=datalake_admin,
- subscription_id=sub_id,
- resource_group=rg_name,
- )
- proper_scope = get_storage_container_scope(
- sub_id, rg_name, storage_name, container_name
- )
- _, missing_data_actions = check_for_actions(
- auth_client=auth_client,
- role_assigments=role_assignments,
- proper_scope=proper_scope,
- required_actions=[],
- required_data_actions=azure_data_required_data_actions,
- )
- if missing_data_actions:
- fail(
- AZURE_IDENTITY_MISSING_DATA_ACTIONS_FOR_LOCATION,
- subjects=[
- datalake_admin,
- f"storageAccounts/{storage_name}/blobServices/default/containers/{container_name}", # noqa: E501
- ],
- resources=missing_data_actions,
- )
- @pytest.mark.azure
- @pytest.mark.infra
- def azure_dladmin_actions_for_backup_storage_validation(
- config: Dict[str, Any],
- auth_client: AuthorizationManagementClient,
- resource_client: ResourceManagementClient,
- azure_data_required_actions,
- ) -> None: # pragma: no cover
- """Datalake Admin Identity has required Actions on backup storage location.""" # noqa: D401,E501
- _azure_dladmin_backup_storage_actions_check(
- config=config,
- auth_client=auth_client,
- resource_client=resource_client,
- azure_data_required_actions=azure_data_required_actions,
- )
- def _azure_dladmin_backup_storage_actions_check(
- config: Dict[str, Any],
- auth_client: AuthorizationManagementClient,
- resource_client: ResourceManagementClient,
- azure_data_required_actions: List[str],
- ) -> None: # pragma: no cover
- # noqa: D401,E501
- sub_id: str = get_config_value(config=config, key="infra:azure:subscription_id")
- rg_name: str = get_config_value(config=config, key="infra:azure:metagroup:name")
- storage_name: str = get_config_value(config=config, key="env:azure:storage:name")
- backup_path: str = get_config_value(
- config=config, key="env:azure:storage:path:backup"
- )
- datalake_admin: str = get_config_value(
- config=config, key="env:azure:role:name:datalake_admin"
- )
- parsed_logger_path = parse_adls_path(backup_path)
- container_name = parsed_logger_path[1]
- role_assignments = get_role_assignments(
- auth_client=auth_client,
- resource_client=resource_client,
- identity_name=datalake_admin,
- subscription_id=sub_id,
- resource_group=rg_name,
- )
- proper_scope = get_storage_container_scope(
- sub_id, rg_name, storage_name, container_name
- )
- missing_actions, _ = check_for_actions(
- auth_client=auth_client,
- role_assigments=role_assignments,
- proper_scope=proper_scope,
- required_actions=azure_data_required_actions,
- required_data_actions=[],
- )
- if missing_actions:
- fail(
- AZURE_IDENTITY_MISSING_ACTIONS_FOR_LOCATION,
- subjects=[
- datalake_admin,
- f"storageAccounts/{storage_name}/blobServices/default/containers/{container_name}", # noqa: E501
- ],
- resources=missing_actions,
- )
- @pytest.mark.azure
- @pytest.mark.infra
- def azure_dladmin_data_actions_for_backup_storage_validation(
- config: Dict[str, Any],
- auth_client: AuthorizationManagementClient,
- resource_client: ResourceManagementClient,
- azure_data_required_data_actions,
- ) -> None: # pragma: no cover
- """Datalake Admin Identity has required Data Actions on backup storage location.""" # noqa: D401,E501
- _azure_dladmin_backup_storage_data_actions_check(
- config=config,
- auth_client=auth_client,
- resource_client=resource_client,
- azure_data_required_data_actions=azure_data_required_data_actions,
- )
- def _azure_dladmin_backup_storage_data_actions_check(
- config: Dict[str, Any],
- auth_client: AuthorizationManagementClient,
- resource_client: ResourceManagementClient,
- azure_data_required_data_actions: List[str],
- ) -> None: # pragma: no cover
- # noqa: D401,E501
- sub_id: str = get_config_value(config=config, key="infra:azure:subscription_id")
- rg_name: str = get_config_value(config=config, key="infra:azure:metagroup:name")
- storage_name: str = get_config_value(config=config, key="env:azure:storage:name")
- backup_path: str = get_config_value(
- config=config, key="env:azure:storage:path:backup"
- )
- datalake_admin: str = get_config_value(
- config=config, key="env:azure:role:name:datalake_admin"
- )
- parsed_logger_path = parse_adls_path(backup_path)
- container_name = parsed_logger_path[1]
- role_assignments = get_role_assignments(
- auth_client=auth_client,
- resource_client=resource_client,
- identity_name=datalake_admin,
- subscription_id=sub_id,
- resource_group=rg_name,
- )
- proper_scope = get_storage_container_scope(
- sub_id, rg_name, storage_name, container_name
- )
- _, missing_data_actions = check_for_actions(
- auth_client=auth_client,
- role_assigments=role_assignments,
- proper_scope=proper_scope,
- required_actions=[],
- required_data_actions=azure_data_required_data_actions,
- )
- if missing_data_actions:
- fail(
- AZURE_IDENTITY_MISSING_DATA_ACTIONS_FOR_LOCATION,
- subjects=[
- datalake_admin,
- f"storageAccounts/{storage_name}/blobServices/default/containers/{container_name}", # noqa: E501
- ],
- resources=missing_data_actions,
- )
|