Source code for ecs_composex_msk_cluster.msk_cluster_stack

# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>

"""Main module."""

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from ecs_composex.mods_manager import XResourceModule
    from ecs_composex.common.settings import ComposeXSettings

from compose_x_common.compose_x_common import keyisset
from ecs_composex.common.stacks import ComposeXStack
from ecs_composex.common.troposphere_tools import build_template
from ecs_composex.vpc.vpc_params import (
    APP_SUBNETS,
    PUBLIC_SUBNETS,
    STORAGE_SUBNETS,
    VPC_ID,
)

from .msk_cluster import define_msk_clusters_mappings
from .msk_cluster_template import build_msk_clusters


[docs]class XStack(ComposeXStack): """ Class to handle MSK resources """ def __init__( self, title: str, settings: ComposeXSettings, module: XResourceModule, **kwargs ): if module.lookup_resources: if not keyisset(module.mapping_key, settings.mappings): settings.mappings[module.mapping_key] = {} define_msk_clusters_mappings(module, settings) if module.new_resources: stack_template = build_template( f"{module.res_key} - Root Stack", [ VPC_ID, STORAGE_SUBNETS, APP_SUBNETS, PUBLIC_SUBNETS, ], ) super().__init__(title, stack_template, **kwargs) build_msk_clusters(module, self) self.parent_stack = settings.root_stack else: self.is_void = True for resource in module.lookup_resources: resource.stack = self