Source code for ecs_composex_msk_cluster.msk_sg_ingress

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2020-2022 John Mille <john@compose-x.io>

from __future__ import annotations

from typing import TYPE_CHECKING, Union

if TYPE_CHECKING:
    from .msk_cluster import MskCluster

from ecs_composex.common import get_nested_property
from troposphere import AccountId, FindInMap, GetAtt, If, NoValue, Ref

from .msk_cluster_conditions import (
    CLIENTS_USE_SASL_IAM_CON_T,
    CLIENTS_USE_SASL_SCRAM_CON_T,
    CLIENTS_USE_TLS_AUTH_CON_T,
    CLUSTER_PUBLICLY_ADDRESSED_CON_T,
)
from .msk_cluster_params import (
    MSK_CLUSTER_ADDRESSING_TYPE,
    MSK_CLUSTER_USE_SASL_IAM,
    MSK_CLUSTER_USE_SASL_SCRAM,
    MSK_CLUSTER_USE_TLS_AUTH,
    MSK_PORTS_MAPPING,
)


[docs]def str_to_bool(input_var: Union[str, bool]) -> bool: if isinstance(input_var, bool): return input_var elif isinstance(input_var, str): if input_var.lower() in ["true", "yes"]: return True return False return False
[docs]def allow_clients_sg_ingress(cluster) -> list[If]: rules: list = [] for port_type, port in MSK_PORTS_MAPPING["Zookeeper"].items(): rules.append( { "Description": f"Zookeeper {port_type} MSK", "FromPort": port, "ToPort": port, "SourceSecurityGroupId": GetAtt( cluster.clients_security_group, "GroupId" ), "SourceSecurityGroupOwnerId": AccountId, "IpProtocol": 6, } ) rules.append( If( CLIENTS_USE_SASL_IAM_CON_T, If( CLUSTER_PUBLICLY_ADDRESSED_CON_T, [ { "Description": f"SASL IAM Public Access", "FromPort": FindInMap("MSKPorts", "Public", "Iam"), "ToPort": FindInMap("MSKPorts", "Public", "Iam"), "SourceSecurityGroupId": GetAtt( cluster.clients_security_group, "GroupId" ), "SourceSecurityGroupOwnerId": AccountId, "IpProtocol": 6, }, { "Description": f"SASL IAM Private Access", "FromPort": FindInMap("MSKPorts", "Private", "Iam"), "ToPort": FindInMap("MSKPorts", "Private", "Iam"), "SourceSecurityGroupId": GetAtt( cluster.clients_security_group, "GroupId" ), "SourceSecurityGroupOwnerId": AccountId, "IpProtocol": 6, }, ], { "Description": f"SASL IAM Private Access", "FromPort": FindInMap("MSKPorts", "Private", "Iam"), "ToPort": FindInMap("MSKPorts", "Private", "Iam"), "SourceSecurityGroupId": GetAtt( cluster.clients_security_group, "GroupId" ), "SourceSecurityGroupOwnerId": AccountId, "IpProtocol": 6, }, ), NoValue, ) ) rules.append( If( CLIENTS_USE_SASL_SCRAM_CON_T, If( CLUSTER_PUBLICLY_ADDRESSED_CON_T, [ { "Description": f"SASL SCRAM Public Access", "FromPort": FindInMap("MSKPorts", "Public", "Scram"), "ToPort": FindInMap("MSKPorts", "Public", "Scram"), "SourceSecurityGroupId": GetAtt( cluster.clients_security_group, "GroupId" ), "SourceSecurityGroupOwnerId": AccountId, "IpProtocol": 6, }, { "Description": f"SASL SCRAM Private Access", "FromPort": FindInMap("MSKPorts", "Private", "Scram"), "ToPort": FindInMap("MSKPorts", "Private", "Scram"), "SourceSecurityGroupId": GetAtt( cluster.clients_security_group, "GroupId" ), "SourceSecurityGroupOwnerId": AccountId, "IpProtocol": 6, }, ], { "Description": f"SASL SCRAM Private Access", "FromPort": FindInMap("MSKPorts", "Private", "Scram"), "ToPort": FindInMap("MSKPorts", "Private", "Scram"), "SourceSecurityGroupId": GetAtt( cluster.clients_security_group, "GroupId" ), "SourceSecurityGroupOwnerId": AccountId, "IpProtocol": 6, }, ), NoValue, ) ) rules.append( If( CLIENTS_USE_TLS_AUTH_CON_T, If( CLUSTER_PUBLICLY_ADDRESSED_CON_T, [ { "Description": f"TLS Authentication Public Access", "FromPort": FindInMap("MSKPorts", "Public", "Tls"), "ToPort": FindInMap("MSKPorts", "Public", "Tls"), "SourceSecurityGroupId": GetAtt( cluster.clients_security_group, "GroupId" ), "SourceSecurityGroupOwnerId": AccountId, "IpProtocol": 6, }, { "Description": f"TLS Authentication Private Access", "FromPort": FindInMap("MSKPorts", "Private", "Tls"), "ToPort": FindInMap("MSKPorts", "Private", "Tls"), "SourceSecurityGroupId": GetAtt( cluster.clients_security_group, "GroupId" ), "SourceSecurityGroupOwnerId": AccountId, "IpProtocol": 6, }, ], { "Description": f"TLS Authentication Private Access", "FromPort": FindInMap("MSKPorts", "Private", "Tls"), "ToPort": FindInMap("MSKPorts", "Private", "Tls"), "SourceSecurityGroupId": GetAtt( cluster.clients_security_group, "GroupId" ), "SourceSecurityGroupOwnerId": AccountId, "IpProtocol": 6, }, ), NoValue, ) ) return rules
[docs]def handle_msk_auth_parameters(cluster: MskCluster) -> None: properties: list = [ ( "ClientAuthentication.Sasl.Scram.Enabled", str_to_bool, MSK_CLUSTER_USE_SASL_SCRAM, ), ( "ClientAuthentication.Sasl.Iam.Enabled", str_to_bool, MSK_CLUSTER_USE_SASL_IAM, ), ( "ClientAuthentication.Tls.Enabled", str_to_bool, MSK_CLUSTER_USE_TLS_AUTH, ), ] for auth_property in properties: res_props_to_update = get_nested_property( cluster.cfn_resource, auth_property[0], ) for _prop_to_update in res_props_to_update: resource, prop, value = _prop_to_update if prop and value: cluster.stack.Parameters.update( {auth_property[2].title: str(auth_property[1](value))} ) setattr(resource, prop, Ref(auth_property[2]))
[docs]def handle_msk_auth_settings(cluster: MskCluster): setattr( cluster.security_group, "SecurityGroupIngress", allow_clients_sg_ingress(cluster), ) resource_props_to_update = get_nested_property( cluster.cfn_resource, "BrokerNodeGroupInfo.ConnectivityInfo.PublicAccess.Type", ) for _to_update in resource_props_to_update: resource, prop, value = _to_update if value and value == "SERVICE_PROVIDED_EIPS": cluster.stack.Parameters.update( {MSK_CLUSTER_ADDRESSING_TYPE.title: "PUBLIC"} ) handle_msk_auth_parameters(cluster)