Syntax Reference

x-msk_cluster:
  Properties: {}
  MacroParameters: {}
  Lookup: {}
  Services: {}

Properties

All properties are supported!

See Properties for MSK Cluster in AWS Cloudformation documentation.

Hint

With new MSK clusters, your ECS Service automatically is granted access via a clients dedicated security group which automatically is granted the appropriate access via Security Group ingress rules.

You can later re-use this Security Groups with other services to get ingress access automatically.

Services

The Services for the MSK module are more advanced than other ECS Compose-X modules this far. It contains the default Access section, allowing only to describe the cluster with the RO permissions.

However to grant meaningful access to your services to MSK cluster, you can define KafkaAccess which will set rules based on the authentication method chosen to MSK.

KafkaAccess

KafkaAccess allows you to define whether your service is going to access the cluster via IAM, SCRAM, or TLS. For SCRAM and TLS, that’s up to you to administer and manage Kafka ACLs to grant further access to your services.

Tip

Checkout CFN Kafka Admin , an open source CloudFormation resource to manage Kafka ACLs, Topics & Schemas.

Attention

You cannot set both Iam and SaslScram both at the same time.

Iam

This section allows you to define 4 different types of resources access

For each of these resource types, you can use one or more of the predefined profiles, and for each of them, grant access to resources using a syntax compatible with PREFIX pattern with Kafka ACLs.

For example, my-app-topics* grants access to all topics starting with my-app-topics .

Note

By default, the clients are all given kafka-cluster:Connect to the cluster which is required for all operations.

cluster

As described above, kafka-cluster:Connect is granted to the service by Task role by default. This section allows you to define further cluster-level ACL access.

Currently the only option is IdempotentWrite , boolean, which allows to idempotent writes on the MSK Cluster.

topic

There are 4 profiles to manage topics:

  • Admin: allows all actions on topics, including create and delete

  • Producer: allows to write data to the topic(s)

  • Consumer: allows to read data from the topic(s)

  • ProducerConsumer: allows both read and write access to the topic(s)

Admin

Producer

Consumer

ProducerConsumer

kafka-cluster: Topic

kafka-cluster:ReadData

kafka-cluster:WriteData

kafka-cluster:DescribeTopic

kafka-cluster:WriteData

kafka-cluster:DescribeTopic

kafka-cluster:ReadData

kafka-cluster:DescribeTopic

kafka-cluster:WriteData

kafka-cluster:ReadData

group

There are 2 profiles to manage consumer groups

  • Admin: grants all Kafka ACLs on the security group, including delete

  • Consumer: grants normal use of the consumer group(s) for the application.

Admin

Consumer

kafka-cluster:DescribeGroup

kafka-cluster:AlterGroup

kafka-cluster:DeleteGroup

kafka-cluster:DescribeGroup

kafka-cluster:AlterGroup

transactional-id

There is 1 profile for Kafka transactional IDs.

  • Producer: allows all Kafka ACLs related to Transactional IDs

Producer

kafka-cluster:AlterTransactionalId

kafka-cluster:DescribeTransactionalId

Schema Definition

The below schema is used to perform input validation from users. It allows to validate the input early and flag any errors in your YAML compose files.

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "id": "x-msk_cluster.spec.json",
  "$id": "x-msk_cluster.spec.json",
  "title": "x-msk_cluster",
  "description": "x-msk_cluster specification",
  "type": "object",
  "additionalProperties": false,
  "oneOf": [
    {
      "required": [
        "Properties"
      ]
    },
    {
      "required": [
        "Lookup"
      ]
    }
  ],
  "properties": {
    "Lookup": {
      "type": "object",
      "additionalProperties": false,
      "required": ["Cluster"],
      "properties": {
        "Cluster": {
          "$ref": "x-resources.common.spec.json#/definitions/Lookup"
        },
        "ClientsSecurityGroup": {
          "$ref": "x-resources.common.spec.json#/definitions/Lookup"
        }
      }
    },
    "Properties": {
      "type": "object",
      "description": "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-msk-cluster.html"
    },
    "Settings": {
      "$ref": "x-resources.common.spec.json#/definitions/Settings"
    },
    "MacroParameters": {
      "type": "object",
      "additionalProperties": false,
      "properties": {
        "CreateCMK": {
          "type": "boolean",
          "description": "Creates a new KMS Key for the cluster encryption"
        },
        "CreateLogGroup": {
          "type": "boolean",
          "description": "Creates a new CloudWatch Log Group for the Broker logs"
        },
        "CreateKafkaConfiguration": {
          "description": "AWS::MSK::Configuration Properties or just the Kafka ``server.properties``",
          "oneOf": [
            {
              "type": "object",
              "description": "The Properties of https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-msk-configuration.html#cfn-msk-configuration-serverproperties"
            },
            {
              "type": "string",
              "description": "raw content for ``server.properties``"
            },
            {
              "type": "array",
              "uniqueItems": true,
              "description": "Array representation of key=value of ``server.properties``",
              "items": {
                "type": "string",
                "format": "(\\S+)=(\\S+)$"
              }
            }
          ]
        },
        "StorageScaling": {
          "type": "object",
          "additionalProperties": false,
          "required": [
            "MaxInGB",
            "Target"
          ],
          "properties": {
            "MaxInGB": {
              "type": "integer",
              "description": "Maximum amount of storage for cluster EBS Storage"
            },
            "Target": {
              "type": "number",
              "maximum": 100.0,
              "minimum": 10.0,
              "description": "Target in percentage to maintain cluster capacity"
            }
          }
        }
      }
    },
    "Services": {
      "type": "object",
      "patternProperties": {
        "[\\x20-\\x7E]+$": {
          "description": "Object representation of the service to use.",
          "additionalProperties": false,
          "properties": {
            "Access": {
              "oneOf": [
                {
                  "type": "string"
                },
                {
                  "type": "object"
                }
              ]
            },
            "Scaling": {
              "$ref": "x-resources.common.spec.json#/definitions/ScalingDefinition"
            },
            "ReturnValues": {
              "type": "object",
              "description": "Set the CFN Return Value and the environment variable name you want to expose to the service",
              "additionalProperties": false,
              "patternProperties": {
                "[\\x20-\\x7E]+$": {
                  "oneOf": [
                    {
                      "$ref": "x-resources.common.spec.json#/definitions/varNameDef"
                    },
                    {
                      "type": "object",
                      "additionalProperties": false,
                      "properties": {
                        "EnvVarName": {
                          "$ref": "x-resources.common.spec.json#/definitions/varNameDef"
                        }
                      }
                    }
                  ]
                }
              }
            },
            "KafkaAccess": {
              "type": "object",
              "oneOf": [
                {
                  "required": [
                    "SaslScram"
                  ]
                },
                {
                  "required": [
                    "Iam"
                  ]
                }
              ],
              "properties": {
                "SaslScram": {
                  "description": "SASL Scram kafka native settings. Do not work in combination with KafkaIamAccess",
                  "type": "object",
                  "additionalProperties": false,
                  "properties": {
                    "CreateNewScramUser": {
                      "type": "boolean"
                    }
                  }
                },
                "Iam": {
                  "type": "object",
                  "required": [
                    "group"
                  ],
                  "properties": {
                    "topic": {
                      "type": "object",
                      "$ref": "#/definitions/TopicsAccessDefinition"
                    },
                    "group": {
                      "type": "object",
                      "$ref": "#/definitions/GroupsAccessDefinition"
                    },
                    "cluster": {
                      "type": "object",
                      "additionalProperties": false,
                      "properties": {
                        "IdempotentWrite": {
                          "type": "boolean",
                          "description": "Enables IdempotentWrite on the Kafka cluster"
                        }
                      }
                    },
                    "transactional-id": {
                      "type": "object",
                      "$ref": "#/definitions/TransactionsAccessDefinition"
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
  },
  "definitions": {
    "TopicsAccessDefinition": {
      "type": "object",
      "additionalProperties": false,
      "minProperties": 1,
      "properties": {
        "Producer": {
          "type": "array",
          "items": {
            "$ref": "#/definitions/TopicName"
          }
        },
        "Consumer": {
          "type": "array",
          "items": {
            "$ref": "#/definitions/TopicName"
          }
        },
        "ProducerConsumer": {
          "type": "array",
          "items": {
            "type": "string"
          }
        },
        "Admin": {
          "type": "array",
          "items": {
            "$ref": "#/definitions/TopicName"
          }
        }
      }
    },
    "GroupsAccessDefinition": {
      "type": "object",
      "additionalProperties": false,
      "minProperties": 1,
      "properties": {
        "Producer": {
          "type": "array",
          "items": {
            "$ref": "#/definitions/GroupName"
          }
        },
        "Admin": {
          "type": "array",
          "items": {
            "$ref": "#/definitions/GroupName"
          }
        }
      }
    },
    "TransactionsAccessDefinition": {
      "type": "object",
      "additionalProperties": false,
      "minProperties": 1,
      "properties": {
        "Producer": {
          "type": "array",
          "items": {
            "type": "string"
          }
        },
        "Admin": {
          "type": "array",
          "items": {
            "type": "string"
          }
        }
      }
    },
    "TopicName": {
      "description": "Kafka Topic Name. Restrictions from latest source code. Added * for Pattern",
      "type": "string",
      "format": "^[a-zA-Z0-9._\\-\\*]+$",
      "minLength": 1,
      "maxLength": 249
    },
    "GroupName": {
      "description": "Kafka Topic Name. Restrictions from latest source code. Added * for Pattern",
      "type": "string",
      "format": "^[a-zA-Z0-9._\\-\\*]+$",
      "minLength": 1,
      "maxLength": 249
    }
  }
}

Lookup

The Lookup for MSK allows to use an existing MSK Cluster and setup all the appropriate IAM permissions to access said cluster.

Lookup.Cluster

Tags associated with the MSK cluster. The tags must point to a single cluster. If multiple clusters match with the given tags, the lookup will result in error.

Lookup.ClientsSecurityGroup

You can set ClientsSecurityGroup which will allow you to automatically add an existing Security Group which would have access to the MSK cluster in order to provide yourself with access to said cluster, on the network.

For example, the below Lookup will search for a cluster with tag Name and value msk-lookup . It will also look for a security group to use for the services in addition to their new default one.

x-msk_cluster:
  new-cluster:
    Lookup:
      Cluster:
        Tags:
          - Name: msk-lookup
      ClientsSecurityGroup:
        Tags:
          - aws:cloudformation:logical-id: newclusterClientsSecurityGroup