Source code for ecs_composex_msk_cluster.msk_cluster_ecs_iam

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2020-2022 John Mille <john@compose-x.io>

"""
Manages the IAM Permissions & SCRAM configuration for authentication to the Kafka cluster resources
"""

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from ecs_composex.common.settings import ComposeXSettings
    from ecs_composex.ecs.ecs_family import ComposeFamily
    from ecs_composex_msk_cluster.msk_cluster import MskCluster

from compose_x_common.compose_x_common import set_else_none
from ecs_composex.common.logging import LOG
from ecs_composex.common.troposphere_tools import add_resource
from troposphere import Ref, Select, Split, Sub
from troposphere.iam import PolicyType

from .msk_cluster_params import MSK_CLUSTER_ARN

MSK_TOPIC_PERMS_MAPPING: dict = {
    "Admin": [
        "kafka-cluster:*Topic*",
        "kafka-cluster:ReadData",
        "kafka-cluster:WriteData",
    ],
    "Producer": ["kafka-cluster:DescribeTopic", "kafka-cluster:WriteData"],
    "Consumer": ["kafka-cluster:DescribeTopic", "kafka-cluster:ReadData"],
    "ProducerConsumer": [
        "kafka-cluster:DescribeTopic",
        "kafka-cluster:WriteData",
        "kafka-cluster:ReadData",
    ],
}
MSK_GROUP_PERMS_MAPPING: dict = {
    "Admin": [
        "kafka-cluster:DescribeGroup",
        "kafka-cluster:AlterGroup",
        "kafka-cluster:DeleteGroup",
    ],
    "Consumer": [
        "kafka-cluster:DescribeGroup",
        "kafka-cluster:AlterGroup",
    ],
}
MSK_TRANSACTIONAL_IDS_PERMS_MAPPING: dict = {
    "Producer": [
        "kafka-cluster:AlterTransactionalId",
        "kafka-cluster:DescribeTransactionalId",
    ],
}


[docs]def handle_topic_iam_policy( resource: MskCluster, topics_def: dict, statement: list, cluster_arn_prefix: Select, ) -> None: """ Creates the IAM policies for MSK IAM connectivity to Kafka Topics """ for access_type, topics_regex in topics_def.items(): if access_type not in MSK_TOPIC_PERMS_MAPPING: LOG.warning( f"{resource.module.res_key}.{resource.name}" " Topic permissions {access_type} is invalid." f" Must be one of {list(MSK_TOPIC_PERMS_MAPPING.keys())}" ) continue access_type_statement: dict = { "Sid": f"Topic{access_type}", "Effect": "Allow", "Action": MSK_TOPIC_PERMS_MAPPING[access_type], "Resource": [ Sub( f"${{CLUSTER_ARN_PREFIX}}:topic/${{CLUSTER_ID}}/{regex}", CLUSTER_ARN_PREFIX=cluster_arn_prefix, CLUSTER_ID=resource.cluster_uuid, ) for regex in topics_regex ], } statement.append(access_type_statement)
[docs]def handle_group_iam_policy( resource: MskCluster, groups_def: dict, statement: list, cluster_arn_prefix: Select, ) -> None: """ Creates the IAM policies for MSK IAM connectivity to Kafka Topics """ for access_type, topics_regex in groups_def.items(): if access_type not in MSK_GROUP_PERMS_MAPPING: LOG.warning( f"{resource.module.res_key}.{resource.name}" " Topic permissions {access_type} is invalid." f" Must be one of {list(MSK_GROUP_PERMS_MAPPING.keys())}" ) continue access_type_statement: dict = { "Sid": f"Group{access_type}", "Effect": "Allow", "Action": MSK_GROUP_PERMS_MAPPING[access_type], "Resource": [ Sub( f"${{CLUSTER_ARN_PREFIX}}:group/${{CLUSTER_ID}}/{regex}", CLUSTER_ARN_PREFIX=cluster_arn_prefix, CLUSTER_ID=resource.cluster_uuid, ) for regex in topics_regex ], } statement.append(access_type_statement)
[docs]def handle_transactional_iam_policy( resource: MskCluster, transactional_def: dict, statement: list, cluster_arn_prefix: Select, ) -> None: """ Creates the IAM policies for MSK IAM connectivity to Kafka Topics """ for access_type, topics_regex in transactional_def.items(): if access_type not in MSK_TRANSACTIONAL_IDS_PERMS_MAPPING: LOG.warning( f"{resource.module.res_key}.{resource.name}" " Transactional ID permissions {access_type} is invalid." f" Must be one of {list(MSK_TRANSACTIONAL_IDS_PERMS_MAPPING.keys())}" ) continue access_type_statement: dict = { "Sid": f"Transactions{access_type}", "Effect": "Allow", "Action": MSK_TRANSACTIONAL_IDS_PERMS_MAPPING[access_type], "Resource": [ Sub( f"${{CLUSTER_ARN_PREFIX}}:transactional-id/${{CLUSTER_ID}}/{regex}", CLUSTER_ARN_PREFIX=cluster_arn_prefix, CLUSTER_ID=resource.cluster_uuid, ) for regex in topics_regex ], } statement.append(access_type_statement)
[docs]def handle_cluster_permissions( resource: MskCluster, settings: ComposeXSettings, cluster_def: dict, statement: list, cluster_arn_prefix, ) -> None: """ Handle Cluster permissions """ idempotent_write = set_else_none("IdempotentWrite", cluster_def, False) if idempotent_write: statement.append( { "Effect": "Allow", "Action": ["kafka-cluster:WriteDataIdempotently"], "Sid": "ClusterWriteIdempotently", "Resource": Ref(resource.cluster_arn_parameter), } )
[docs]def handle_iam_kafka_resources_access( resource: MskCluster, settings: ComposeXSettings, family: ComposeFamily, family_target_def: dict, ) -> None: """ Creates IAM Policies for access to MSK resources [topic, group, cluster, transactional-id] """ if resource.cfn_resource: cluster_arn_id = resource.add_attribute_to_another_stack( family.stack, resource.cluster_arn_parameter, settings ) cluster_arn_prefix = Select( 0, Split(":cluster/", Ref(cluster_arn_id["ImportParameter"])) ) else: cluster_arn_id = resource.attributes_outputs[MSK_CLUSTER_ARN]["ImportValue"] cluster_arn_prefix = Select(0, Split(":cluster/", cluster_arn_id)) statement: list = [ { "Sid": "ConnectToCluster", "Effect": "Allow", "Resource": [ Ref(cluster_arn_id["ImportParameter"]) if resource.cfn_resource else cluster_arn_id ], "Action": ["kafka-cluster:Connect"], } ] handle_cluster_permissions( resource, settings, set_else_none("cluster", family_target_def, {}), statement, cluster_arn_prefix, ) handle_topic_iam_policy( resource, set_else_none("topic", family_target_def, {}), statement, cluster_arn_prefix, ) handle_group_iam_policy( resource, set_else_none("group", family_target_def, {}), statement, cluster_arn_prefix, ) handle_transactional_iam_policy( resource, set_else_none("transactional-id", family_target_def, {}), statement, cluster_arn_prefix, ) iam_policy = PolicyType( f"KafkaAccess{family.logical_name}{resource.module.mapping_key}{resource.logical_name}", PolicyName=f"KafkaAccess{family.logical_name}{resource.module.mapping_key}{resource.logical_name}", PolicyDocument={"Version": "2012-10-17", "Statement": statement}, Roles=[family.iam_manager.task_role.name], ) add_resource(family.template, iam_policy)