Source code for ecs_composex_msk_cluster.msk_cluster_logging

#  SPDX-License-Identifier: MPL-2.0
#  Copyright 2020-2022 John Mille <john@compose-x.io>

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from .msk_cluster import MskCluster
    from ecs_composex.common.settings import ComposeXSettings
    from troposphere import Template

from compose_x_common.compose_x_common import keyisset
from ecs_composex.common import get_nested_property
from ecs_composex.common.logging import LOG
from ecs_composex.common.troposphere_tools import add_resource
from ecs_composex.kinesis_firehose.kinesis_firehose_stack import DeliveryStream
from ecs_composex.s3.s3_bucket import Bucket
from troposphere import Ref
from troposphere.logs import LogGroup
from troposphere.msk import BrokerLogs, CloudWatchLogs, LoggingInfo


[docs]def add_log_group(cluster: MskCluster, template: Template) -> None: """ Creates a new Log Group for the cluster and configures the property for it. """ resource_props_to_update: list = get_nested_property( cluster.cfn_resource, "LoggingInfo.BrokerLogs.CloudWatchLogs" ) for _resource_prop_to_update in resource_props_to_update: param, prop, value = _resource_prop_to_update if isinstance(value, CloudWatchLogs): LOG.warning( f"{cluster.module.res_key}.{cluster.name} - " "Properties for LoggingInfo.BrokerLogs.CloudWatchLogs already set. Skipping" ) return log_group = LogGroup(f"{cluster.logical_name}BrokersLogGroup") add_resource(template, log_group) if value and value is not None: setattr(param, prop, CloudWatchLogs(Enabled=True, LogGroup=Ref(log_group))) else: if not hasattr(cluster.cfn_resource, "LoggingInfo"): setattr( cluster.cfn_resource, "LoggingInfo", LoggingInfo( BrokerLogs=BrokerLogs( CloudWatchLogs=CloudWatchLogs( Enabled=True, LogGroup=Ref(log_group) ) ) ), ) else: if not hasattr( cluster.cfn_resource.LoggingInfo.BrokerLogs, "CloudWatchLogs" ): setattr( cluster.cfn_resource.LoggingInfo, "CloudWatchLogs", CloudWatchLogs(Enabled=True, LogGroup=Ref(log_group)), )
[docs]def handle_logging( cluster: MskCluster, cluster_template: Template, settings: ComposeXSettings ) -> None: if cluster.parameters and keyisset("CreateLogGroup", cluster.parameters): add_log_group(cluster, cluster_template)