# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from troposphere import Template
from ecs_composex.mods_manager import XResourceModule
from .msk_cluster import MskCluster
from compose_x_common.aws.msk import list_all_kafka_versions
from compose_x_common.compose_x_common import keyisset, set_else_none
from ecs_composex.common.cfn_conditions import define_stack_name
from ecs_composex.common.cfn_params import ROOT_STACK_NAME
from ecs_composex.common.logging import LOG
from ecs_composex.common.stacks import ComposeXStack
from ecs_composex.common.troposphere_tools import (
Parameter,
add_outputs,
add_resource,
build_template,
)
from ecs_composex.resources_import import import_record_properties
from ecs_composex.vpc.vpc_params import (
APP_SUBNETS,
PUBLIC_SUBNETS,
STORAGE_SUBNETS,
VPC_ID,
)
from troposphere import GetAtt, Output, Ref, Sub
from troposphere.ec2 import SecurityGroup
from troposphere.msk import Cluster as AwsMskCluster
from .msk_cluster_conditions import (
CLIENTS_USE_SASL_IAM_CON,
CLIENTS_USE_SASL_IAM_CON_T,
CLIENTS_USE_SASL_SCRAM_CON,
CLIENTS_USE_SASL_SCRAM_CON_T,
CLIENTS_USE_TLS_AUTH_CON,
CLIENTS_USE_TLS_AUTH_CON_T,
CLUSTER_PRIVATE_ONLY_CON,
CLUSTER_PRIVATE_ONLY_CON_T,
CLUSTER_PUBLICLY_ADDRESSED_CON,
CLUSTER_PUBLICLY_ADDRESSED_CON_T,
)
from .msk_cluster_params import (
MSK_CLUSTER_ADDRESSING_TYPE,
MSK_CLUSTER_INSTANCE_TYPES,
MSK_CLUSTER_SG_PARAM,
MSK_CLUSTER_USE_SASL_IAM,
MSK_CLUSTER_USE_SASL_SCRAM,
MSK_PORTS_MAPPING,
)
from .msk_sg_ingress import handle_msk_auth_settings
from .msk_storage_scaling import add_storage_scaling
[docs]class MskClusterStack(ComposeXStack):
"""
Class to represent an MSK Cluster' Stack
"""
def __init__(self, name, stack_template, cluster, top_stack, **kwargs):
self.cluster = cluster
super().__init__(name, stack_template, **kwargs)
self.parent_stack = top_stack
self.cluster.stack = self
[docs]def validate_cluster_version(cluster: MskCluster, input_version) -> Parameter:
"""
Validates the kafka version
"""
valid_versions: list[str] = [
_version["Version"]
for _version in list_all_kafka_versions(session=cluster.lookup_session)
if _version["Status"] == "ACTIVE"
]
if input_version in valid_versions:
return Parameter(
"KafkaVersion",
Type="String",
AllowedValues=valid_versions,
Default=input_version,
)
else:
raise ValueError(
"Version {} is not valid. Active versions supported: {}".format(
input_version, valid_versions
)
)
[docs]def set_msk_cluster_template(module: XResourceModule, cluster: MskCluster) -> Template:
template = build_template(
f"{module.res_key}.{cluster.name} Stack Template",
[
MSK_CLUSTER_USE_SASL_IAM,
MSK_CLUSTER_USE_SASL_SCRAM,
MSK_CLUSTER_ADDRESSING_TYPE,
VPC_ID,
STORAGE_SUBNETS,
APP_SUBNETS,
PUBLIC_SUBNETS,
MSK_CLUSTER_INSTANCE_TYPES,
],
)
template.add_condition(
CLUSTER_PUBLICLY_ADDRESSED_CON_T, CLUSTER_PUBLICLY_ADDRESSED_CON
)
template.add_condition(CLUSTER_PRIVATE_ONLY_CON_T, CLUSTER_PRIVATE_ONLY_CON)
template.add_condition(CLIENTS_USE_SASL_SCRAM_CON_T, CLIENTS_USE_SASL_SCRAM_CON)
template.add_condition(CLIENTS_USE_SASL_IAM_CON_T, CLIENTS_USE_SASL_IAM_CON)
template.add_condition(CLIENTS_USE_TLS_AUTH_CON_T, CLIENTS_USE_TLS_AUTH_CON)
template.add_mapping("MSKPorts", MSK_PORTS_MAPPING)
return template
[docs]def set_instance_type(cluster: MskCluster, cluster_stack: ComposeXStack) -> None:
"""
Checks the instance type value. Replaces it with parameter.
Updates parameter value if valid, uses Default if not
"""
instance_type = getattr(cluster.cfn_resource.BrokerNodeGroupInfo, "InstanceType")
if instance_type not in MSK_CLUSTER_INSTANCE_TYPES.AllowedValues:
LOG.error(
"{}.{} - {} InstanceType is not valid. Using default {}".format(
cluster.module.res_key,
cluster.name,
instance_type,
MSK_CLUSTER_INSTANCE_TYPES.Default,
)
)
else:
cluster_stack.Parameters.update(
{MSK_CLUSTER_INSTANCE_TYPES.title: instance_type}
)
setattr(
cluster.cfn_resource.BrokerNodeGroupInfo,
"InstanceType",
Ref(MSK_CLUSTER_INSTANCE_TYPES),
)
[docs]def build_msk_clusters(module: XResourceModule, msk_top_stack: ComposeXStack):
"""
Creates a new MSK cluster from properties
Each MSK cluster gets its own template & own stack to allow for better re-usability.
"""
for cluster in module.new_resources:
cluster_template = set_msk_cluster_template(module, cluster)
if cluster.parameters and keyisset(
"CreateKafkaConfiguration", cluster.parameters
):
LOG.warning(
f"{cluster.module.res_key}.{cluster.name} - "
"At the moment, the AWS::MSK::Configuration does not return the Revision. "
"Therefore cannot be used here yet. "
"See https://github.com/aws-cloudformation/cloudformation-coverage-roadmap/issues/1138"
)
cluster.clients_security_group = add_resource(
cluster_template,
SecurityGroup(
f"{cluster.logical_name}ClientsSecurityGroup",
GroupName=Sub(
f"${{StackName}}-{cluster.logical_name}--clients",
StackName=define_stack_name(cluster_template),
),
GroupDescription=Sub(
f"${{StackName}} {cluster.name} Clients SG",
StackName=define_stack_name(),
),
VpcId=Ref(VPC_ID),
),
)
cluster.security_group = add_resource(
cluster_template,
SecurityGroup(
f"{cluster.logical_name}SecurityGroup",
GroupName=Sub(
f"${{StackName}}-{cluster.logical_name}",
StackName=define_stack_name(),
),
GroupDescription=Sub(
f"${{StackName}} {cluster.name}", StackName=define_stack_name()
),
VpcId=Ref(VPC_ID),
),
)
cluster_sg_id = GetAtt(
cluster.security_group, MSK_CLUSTER_SG_PARAM.return_value
)
cluster_props = import_record_properties(
cluster.properties,
AwsMskCluster,
)
broker_info = set_else_none("BrokerNodeGroupInfo", cluster_props, {})
security_groups = set_else_none("SecurityGroups", broker_info, [cluster_sg_id])
if security_groups and cluster_sg_id not in security_groups:
security_groups.append(
GetAtt(cluster.security_group, MSK_CLUSTER_SG_PARAM.return_value)
)
if broker_info:
setattr(broker_info, "SecurityGroups", security_groups)
client_subnets = (
getattr(broker_info, "ClientSubnets")
if hasattr(broker_info, "ClientSubnets")
else []
)
if not client_subnets:
setattr(broker_info, "ClientSubnets", Ref(cluster.subnets_override))
else:
cluster_props.update(
{
"BrokerNodeGroupInfo": {"SecurityGroups": security_groups},
"ClientSubnets": Ref(cluster.subnets_override),
}
)
cluster.cfn_resource = AwsMskCluster(cluster.logical_name, **cluster_props)
version_param = cluster_template.add_parameter(
validate_cluster_version(cluster, cluster.cfn_resource.KafkaVersion)
)
setattr(cluster.cfn_resource, "KafkaVersion", Ref(version_param))
cluster_stack = MskClusterStack(
cluster.logical_name,
cluster=cluster,
stack_template=cluster_template,
top_stack=msk_top_stack,
stack_parameters={
ROOT_STACK_NAME.title: Ref(ROOT_STACK_NAME),
VPC_ID.title: Ref(VPC_ID),
STORAGE_SUBNETS.title: Ref(STORAGE_SUBNETS),
PUBLIC_SUBNETS.title: Ref(PUBLIC_SUBNETS),
APP_SUBNETS.title: Ref(APP_SUBNETS),
version_param.title: version_param.Default,
},
)
set_instance_type(cluster, cluster_stack)
handle_msk_auth_settings(cluster)
add_resource(cluster_template, cluster.cfn_resource)
add_storage_scaling(cluster, cluster_template)
add_resource(msk_top_stack.stack_template, cluster_stack)
cluster.init_outputs()
cluster.generate_outputs()
add_outputs(cluster_template, cluster.outputs)
new_outputs = []
for output_name in cluster.stack.stack_template.outputs:
new_outputs.append(
Output(
output_name,
Value=GetAtt(cluster_stack.title, f"Outputs.{output_name}"),
)
)
add_outputs(msk_top_stack.stack_template, new_outputs)