Source code for ecs_composex_msk_cluster.msk_storage_scaling

# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille <john@compose-x.io>

"""
Configure MSK Cluster autoscaling
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Union

if TYPE_CHECKING:
    from troposphere import Template
    from .msk_cluster import MskCluster

from compose_x_common.compose_x_common import set_else_none
from ecs_composex.common.cfn_conditions import define_stack_name
from ecs_composex.common.troposphere_tools import add_resource
from troposphere import (
    AWS_ACCOUNT_ID,
    AWS_PARTITION,
    AWS_REGION,
    AWS_URL_SUFFIX,
    Ref,
    Sub,
)
from troposphere.applicationautoscaling import (
    PredefinedMetricSpecification,
    ScalableTarget,
    ScalableTargetAction,
    ScalingPolicy,
    TargetTrackingScalingPolicyConfiguration,
)


[docs]def set_scalable_target( cluster: MskCluster, storage_scaling: dict ) -> Union[None, ScalableTarget]: return ScalableTarget( f"{cluster.logical_name}ScalableTarget", DependsOn=[cluster.cfn_resource.title], ServiceNamespace="kafka", MaxCapacity=storage_scaling["MaxInGB"], MinCapacity=1, ResourceId=Ref(cluster.cfn_resource), RoleARN=Sub( f"arn:${{{AWS_PARTITION}}}:iam::${{{AWS_ACCOUNT_ID}}}:role/" f"aws-service-role/kafka.application-autoscaling.${{{AWS_URL_SUFFIX}}}/" "AWSServiceRoleForApplicationAutoScaling_KafkaCluster" ), ScalableDimension="kafka:broker-storage:VolumeSize", )
[docs]def set_scaling_policy( cluster: MskCluster, storage_scaling: dict, scaling_target: ScalableTarget, ) -> ScalingPolicy: return ScalingPolicy( f"{cluster.logical_name}StorageScalingPolicy", DependsOn=[scaling_target.title], PolicyName=Sub( "MSK Storage Scaling - ${StackName}", StackName=define_stack_name() ), PolicyType="TargetTrackingScaling", ScalingTargetId=Ref(scaling_target), TargetTrackingScalingPolicyConfiguration=TargetTrackingScalingPolicyConfiguration( DisableScaleIn=True, TargetValue=float(storage_scaling["Target"]), PredefinedMetricSpecification=PredefinedMetricSpecification( PredefinedMetricType="KafkaBrokerStorageUtilization" ), ), )
[docs]def add_storage_scaling(cluster: MskCluster, template: Template) -> None: """ Adds storage autoscaling to the MSK Cluster. Application Autoscaling will add capacity as the data on the cluster grows. However, it won't scale in (remove capacity). This feature is mostly aimed at ensuring there is always storage available on the Kafka cluster """ if not cluster.cfn_resource or not cluster.parameters: return storage_scaling = set_else_none("StorageScaling", cluster.parameters) if not storage_scaling: return scaling_target = set_scalable_target(cluster, storage_scaling) scaling_policy = set_scaling_policy(cluster, storage_scaling, scaling_target) add_resource(template, scaling_target) add_resource(template, scaling_policy)