MSK Self-Hosted (Optional)
Optional Advanced Topic
This page is optional. Most teams should use Confluent Cloud (covered in earlier pages) for simplicity and lower total cost. Only consider MSK if you have AWS-centric architecture, dedicated DevOps resources, or specific data residency requirements.
On this page, you will:
- Evaluate AWS MSK vs Confluent Cloud for your use case
- Deploy 3-broker MSK provisioned cluster via Terraform
- Configure VPC networking and security groups
- Deploy MSK Connect for Kafka Connect workers
- Set up self-hosted Confluent Schema Registry on ECS
- Compare total cost of ownership (infrastructure + ops time)
- Understand when MSK makes sense
Overview
AWS Managed Streaming for Kafka (MSK) is Amazon's Kafka service. Unlike Confluent Cloud (fully managed), MSK requires you to manage Kafka configuration, Schema Registry, and Kafka Connect separately.
Trade-off: Lower infrastructure cost but significantly higher operational burden.
┌─────────────────────────────────────────────────────────────────────────┐
│ MSK SELF-HOSTED ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ VPC (10.0.0.0/16) │
│ ────────────────── │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ Private Subnets (3 AZs) │ │
│ │ │ │
│ │ MSK Cluster │ │
│ │ ─────────── │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Broker 1 │────▶│ Broker 2 │────▶│ Broker 3 │ │ │
│ │ │ AZ-1 │ │ AZ-2 │ │ AZ-3 │ │ │
│ │ │ m5.large │ │ m5.large │ │ m5.large │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ │ │ │ │ │
│ │ └──────────────────┼──────────────────┘ │ │
│ │ │ │ │
│ │ MSK Connect (Kafka Connect Workers) │ │
│ │ ──────────────────────────────────── │ │
│ │ ┌──────────────────────────────────────┐ │ │
│ │ │ ECS Tasks (Fargate) │ │ │
│ │ │ • Snowflake Sink Connector │ │ │
│ │ │ • S3 Sink Connector │ │ │
│ │ └──────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ Schema Registry (Self-Hosted) │ │
│ │ ───────────────────────────── │ │
│ │ ┌──────────────────────────────────────┐ │ │
│ │ │ ECS Tasks (Fargate) │ │ │
│ │ │ • Confluent Schema Registry │ │ │
│ │ │ • Backed by RDS PostgreSQL │ │ │
│ │ └──────────────────────────────────────┘ │ │
│ │ │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ Cost: $450/month infrastructure + $360-720/month ops time │
│ Total: $810-1,170/month (vs $217-277/month for Confluent Cloud) │
│ │
└─────────────────────────────────────────────────────────────────────────┘
When to Use MSK
Good Reasons
✅ AWS-centric architecture - Already using MSK for other teams, amortised ops cost ✅ Data residency - Strict requirements to keep data in specific AWS regions/accounts ✅ Enterprise AWS commitment - AWS discount programmes make MSK significantly cheaper ✅ Existing DevOps team - Engineers already managing MSK clusters ✅ Very high throughput - > 100 GB/day where MSK cost advantage matters
Bad Reasons
❌ Lower cost - Total cost (infra + ops) is usually higher than Confluent Cloud ❌ More control - You're debugging Kafka internals instead of building data pipelines ❌ Learning - Learn Kafka concepts with Confluent Cloud first, add ops complexity later ❌ Avoiding vendor lock-in - Kafka is open source, migration is straightforward
Cost Reality Check
| Workload | Confluent Cloud Total | MSK Total | Winner |
|---|---|---|---|
| 5 GB/day | $217/month | $810/month | Confluent Cloud (73% cheaper) |
| 50 GB/day | $310/month | $950/month | Confluent Cloud (67% cheaper) |
| 200 GB/day | $900/month | $1,400/month | Confluent Cloud (36% cheaper) |
MSK becomes competitive only at very high scale (> 500 GB/day) or when ops time is free (existing team).
MSK Cluster Deployment
Prerequisites
You need: - AWS VPC with private subnets in 3 AZs - Terraform (>= 1.0) - AWS CLI configured - Existing STREAMING database in Snowflake (from page 4)
Terraform Configuration
Create ~/src/terraform/aws/msk/main.tf:
terraform {
required_version = ">= 1.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
backend "s3" {
bucket = "your-terraform-state-bucket"
key = "aws/msk/terraform.tfstate"
region = "eu-west-2"
encrypt = true
dynamodb_table = "terraform-state-lock"
}
}
provider "aws" {
region = "eu-west-2"
default_tags {
tags = {
Project = "ModernDataStack"
ManagedBy = "Terraform"
Environment = "production"
Component = "streaming"
}
}
}
# Data source: Get VPC and subnets
data "aws_vpc" "main" {
tags = {
Name = "data-platform-vpc"
}
}
data "aws_subnets" "private" {
filter {
name = "vpc-id"
values = [data.aws_vpc.main.id]
}
tags = {
Type = "private"
}
}
# Security group for MSK cluster
resource "aws_security_group" "msk_cluster" {
name_description = "msk-cluster"
description = "Security group for MSK cluster"
vpc_id = data.aws_vpc.main.id
# Allow Kafka protocol (9092) from VPC
ingress {
from_port = 9092
to_port = 9092
protocol = "tcp"
cidr_blocks = [data.aws_vpc.main.cidr_block]
description = "Kafka plaintext"
}
# Allow TLS Kafka (9094) from VPC
ingress {
from_port = 9094
to_port = 9094
protocol = "tcp"
cidr_blocks = [data.aws_vpc.main.cidr_block]
description = "Kafka TLS"
}
# Allow Zookeeper (2181) from VPC
ingress {
from_port = 2181
to_port = 2181
protocol = "tcp"
cidr_blocks = [data.aws_vpc.main.cidr_block]
description = "Zookeeper"
}
# Allow all outbound
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
description = "Allow all outbound"
}
tags = {
Name = "msk-cluster-sg"
}
}
# CloudWatch log group for MSK logs
resource "aws_cloudwatch_log_group" "msk" {
name = "/aws/msk/data-platform-cluster"
retention_in_days = 7
tags = {
Name = "msk-cluster-logs"
}
}
# MSK cluster configuration
resource "aws_msk_configuration" "this" {
name = "data-platform-msk-config"
kafka_versions = ["3.5.1"]
server_properties = <<EOF
auto.create.topics.enable=false
default.replication.factor=3
min.insync.replicas=2
num.partitions=3
log.retention.hours=168
log.retention.bytes=107374182400
log.segment.bytes=1073741824
compression.type=snappy
EOF
description = "MSK configuration for data platform"
}
# MSK cluster (3-broker provisioned)
resource "aws_msk_cluster" "this" {
cluster_name = "data-platform-cluster"
kafka_version = "3.5.1"
number_of_broker_nodes = 3
broker_node_group_info {
instance_type = "kafka.m5.large" # 2 vCPU, 8 GB RAM
client_subnets = slice(data.aws_subnets.private.ids, 0, 3)
security_groups = [aws_security_group.msk_cluster.id]
storage_info {
ebs_storage_info {
volume_size = 100 # 100 GB per broker
}
}
connectivity_info {
public_access {
type = "DISABLED"
}
}
}
encryption_info {
encryption_in_transit {
client_broker = "TLS" # Require TLS
in_cluster = true # Encrypt broker-to-broker
}
encryption_at_rest_kms_key_arn = aws_kms_key.msk.arn
}
configuration_info {
arn = aws_msk_configuration.this.arn
revision = aws_msk_configuration.this.latest_revision
}
logging_info {
broker_logs {
cloudwatch_logs {
enabled = true
log_group = aws_cloudwatch_log_group.msk.name
}
}
}
tags = {
Name = "data-platform-msk-cluster"
}
}
# KMS key for encryption at rest
resource "aws_kms_key" "msk" {
description = "KMS key for MSK cluster encryption"
deletion_window_in_days = 7
enable_key_rotation = true
tags = {
Name = "msk-cluster-key"
}
}
resource "aws_kms_alias" "msk" {
name = "alias/msk-cluster"
target_key_id = aws_kms_key.msk.key_id
}
# Outputs
output "msk_cluster_arn" {
value = aws_msk_cluster.this.arn
description = "ARN of MSK cluster"
}
output "msk_bootstrap_brokers_tls" {
value = aws_msk_cluster.this.bootstrap_brokers_tls
description = "TLS connection string for MSK cluster"
sensitive = true
}
output "msk_zookeeper_connect_string" {
value = aws_msk_cluster.this.zookeeper_connect_string
description = "Zookeeper connection string"
sensitive = true
}
Deploy MSK Cluster
cd ~/src/terraform/aws/msk
# Initialise
terraform init
# Plan
terraform plan -out=tfplan
# Apply (MSK cluster provisioning takes some time)
terraform apply tfplan
Expected output:
aws_msk_cluster.this: Creating... (this may take 15-30 minutes)
aws_msk_cluster.this: Still creating... [5m0s elapsed]
aws_msk_cluster.this: Still creating... [10m0s elapsed]
aws_msk_cluster.this: Still creating... [15m0s elapsed]
aws_msk_cluster.this: Creation complete after 22m15s
Outputs:
msk_cluster_arn = "arn:aws:kafka:eu-west-2:123456789012:cluster/data-platform-cluster/abc-123"
msk_bootstrap_brokers_tls = "b-1.dataplatformcluster.abc123.kafka.eu-west-2.amazonaws.com:9094,..."
Verify MSK Cluster
# Get cluster ARN
MSK_ARN=$(terraform output -raw msk_cluster_arn)
# Check cluster status
aws kafka describe-cluster --cluster-arn $MSK_ARN --region eu-west-2
# Expected state: "ACTIVE"
Schema Registry Deployment
MSK doesn't include Schema Registry. Deploy Confluent Schema Registry on ECS Fargate.
Why Self-Host Schema Registry?
Options: 1. AWS Glue Schema Registry - Free but limited (no Avro logical types, no compatibility checks) 2. Confluent Schema Registry - Industry standard, full features, needs self-hosting 3. No Schema Registry - JSON without schemas (not recommended for production)
Recommendation: Self-host Confluent Schema Registry for full Avro support.
Architecture
┌─────────────────────────────────────────────────────────────────────────┐
│ SCHEMA REGISTRY ON ECS │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Application Load Balancer (Internal) │
│ ───────────────────────────────────── │
│ ┌──────────────────────────────────────┐ │
│ │ schema-registry.internal │ │
│ │ Port 8081 │ │
│ └──────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ECS Service (Fargate) │
│ ───────────────────── │
│ ┌──────────────────────────────────────┐ │
│ │ Schema Registry Task │ │
│ │ • 0.5 vCPU, 1 GB RAM │ │
│ │ • Confluent Image │ │
│ │ • Stores schemas in RDS │ │
│ └──────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ RDS PostgreSQL (db.t3.micro) │
│ ──────────────────────────── │
│ ┌──────────────────────────────────────┐ │
│ │ Schema metadata storage │ │
│ │ • Multi-AZ: No (dev/test) │ │
│ │ • Backup: 7-day retention │ │
│ └──────────────────────────────────────┘ │
│ │
│ Cost: ~$50/month (ECS $15 + RDS $20 + ALB $15) │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Terraform Configuration
Create ~/src/terraform/aws/msk/schema_registry.tf:
# RDS PostgreSQL for Schema Registry metadata
resource "aws_db_instance" "schema_registry" {
identifier = "schema-registry-db"
engine = "postgres"
engine_version = "15.4"
instance_class = "db.t3.micro" # 1 vCPU, 1 GB RAM
allocated_storage = 20
storage_type = "gp3"
storage_encrypted = true
kms_key_id = aws_kms_key.msk.arn
db_name = "schemaregistry"
username = "schemaregistry"
password = random_password.schema_registry_db.result
vpc_security_group_ids = [aws_security_group.schema_registry_db.id]
db_subnet_group_name = aws_db_subnet_group.schema_registry.name
backup_retention_period = 7
backup_window = "03:00-04:00"
maintenance_window = "Mon:04:00-Mon:05:00"
skip_final_snapshot = true # Change to false for production
tags = {
Name = "schema-registry-db"
}
}
resource "random_password" "schema_registry_db" {
length = 32
special = true
}
resource "aws_db_subnet_group" "schema_registry" {
name = "schema-registry-subnet-group"
subnet_ids = data.aws_subnets.private.ids
tags = {
Name = "schema-registry-db-subnet-group"
}
}
# Security group for RDS
resource "aws_security_group" "schema_registry_db" {
name_description = "schema-registry-db"
description = "Security group for Schema Registry RDS"
vpc_id = data.aws_vpc.main.id
ingress {
from_port = 5432
to_port = 5432
protocol = "tcp"
security_groups = [aws_security_group.schema_registry_ecs.id]
description = "PostgreSQL from Schema Registry ECS"
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
tags = {
Name = "schema-registry-db-sg"
}
}
# Security group for Schema Registry ECS tasks
resource "aws_security_group" "schema_registry_ecs" {
name_description = "schema-registry-ecs"
description = "Security group for Schema Registry ECS tasks"
vpc_id = data.aws_vpc.main.id
ingress {
from_port = 8081
to_port = 8081
protocol = "tcp"
security_groups = [aws_security_group.schema_registry_alb.id]
description = "Schema Registry API from ALB"
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
tags = {
Name = "schema-registry-ecs-sg"
}
}
# Application Load Balancer (internal)
resource "aws_lb" "schema_registry" {
name = "schema-registry-alb"
internal = true
load_balancer_type = "application"
security_groups = [aws_security_group.schema_registry_alb.id]
subnets = data.aws_subnets.private.ids
tags = {
Name = "schema-registry-alb"
}
}
resource "aws_security_group" "schema_registry_alb" {
name_description = "schema-registry-alb"
description = "Security group for Schema Registry ALB"
vpc_id = data.aws_vpc.main.id
ingress {
from_port = 8081
to_port = 8081
protocol = "tcp"
cidr_blocks = [data.aws_vpc.main.cidr_block]
description = "Schema Registry API from VPC"
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
tags = {
Name = "schema-registry-alb-sg"
}
}
resource "aws_lb_target_group" "schema_registry" {
name = "schema-registry-tg"
port = 8081
protocol = "HTTP"
vpc_id = data.aws_vpc.main.id
target_type = "ip"
health_check {
enabled = true
path = "/subjects"
port = 8081
protocol = "HTTP"
healthy_threshold = 2
unhealthy_threshold = 3
timeout = 5
interval = 30
}
tags = {
Name = "schema-registry-tg"
}
}
resource "aws_lb_listener" "schema_registry" {
load_balancer_arn = aws_lb.schema_registry.arn
port = 8081
protocol = "HTTP"
default_action {
type = "forward"
target_group_arn = aws_lb_target_group.schema_registry.arn
}
}
# ECS cluster
resource "aws_ecs_cluster" "streaming" {
name = "streaming-cluster"
tags = {
Name = "streaming-ecs-cluster"
}
}
# ECS task definition for Schema Registry
resource "aws_ecs_task_definition" "schema_registry" {
family = "schema-registry"
requires_compatibilities = ["FARGATE"]
network_mode = "awsvpc"
cpu = "512" # 0.5 vCPU
memory = "1024" # 1 GB
execution_role_arn = aws_iam_role.ecs_execution.arn
task_role_arn = aws_iam_role.ecs_task.arn
container_definitions = jsonencode([
{
name = "schema-registry"
image = "confluentinc/cp-schema-registry:7.5.0"
portMappings = [
{
containerPort = 8081
protocol = "tcp"
}
]
environment = [
{
name = "SCHEMA_REGISTRY_HOST_NAME"
value = "schema-registry"
},
{
name = "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS"
value = aws_msk_cluster.this.bootstrap_brokers_tls
},
{
name = "SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL"
value = "SSL"
},
{
name = "SCHEMA_REGISTRY_LISTENERS"
value = "http://0.0.0.0:8081"
}
]
logConfiguration = {
logDriver = "awslogs"
options = {
"awslogs-group" = aws_cloudwatch_log_group.schema_registry.name
"awslogs-region" = "eu-west-2"
"awslogs-stream-prefix" = "schema-registry"
}
}
}
])
tags = {
Name = "schema-registry-task"
}
}
resource "aws_cloudwatch_log_group" "schema_registry" {
name = "/ecs/schema-registry"
retention_in_days = 7
tags = {
Name = "schema-registry-logs"
}
}
# ECS service
resource "aws_ecs_service" "schema_registry" {
name = "schema-registry"
cluster = aws_ecs_cluster.streaming.id
task_definition = aws_ecs_task_definition.schema_registry.arn
desired_count = 1
launch_type = "FARGATE"
network_configuration {
subnets = data.aws_subnets.private.ids
security_groups = [aws_security_group.schema_registry_ecs.id]
assign_public_ip = false
}
load_balancer {
target_group_arn = aws_lb_target_group.schema_registry.arn
container_name = "schema-registry"
container_port = 8081
}
depends_on = [aws_lb_listener.schema_registry]
tags = {
Name = "schema-registry-service"
}
}
# IAM roles for ECS
resource "aws_iam_role" "ecs_execution" {
name = "schema-registry-ecs-execution-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "ecs-tasks.amazonaws.com"
}
}
]
})
tags = {
Name = "schema-registry-ecs-execution-role"
}
}
resource "aws_iam_role_policy_attachment" "ecs_execution" {
role = aws_iam_role.ecs_execution.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
}
resource "aws_iam_role" "ecs_task" {
name = "schema-registry-ecs-task-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "ecs-tasks.amazonaws.com"
}
}
]
})
tags = {
Name = "schema-registry-ecs-task-role"
}
}
# Output Schema Registry URL
output "schema_registry_url" {
value = "http://${aws_lb.schema_registry.dns_name}:8081"
description = "Schema Registry URL (internal)"
}
Deploy Schema Registry
terraform plan -out=tfplan
terraform apply tfplan
Expected output:
aws_ecs_service.schema_registry: Creating...
aws_ecs_service.schema_registry: Creation complete after 2m30s
Outputs:
schema_registry_url = "http://schema-registry-alb-123456789.eu-west-2.elb.amazonaws.com:8081"
Test Schema Registry
# Get Schema Registry URL
SCHEMA_REGISTRY_URL=$(terraform output -raw schema_registry_url)
# Test health
curl $SCHEMA_REGISTRY_URL/subjects
# Expected: [] (empty list, no schemas registered yet)
MSK Connect Deployment
Deploy Kafka Connect workers using MSK Connect (AWS managed).
Why MSK Connect?
MSK Connect is AWS's managed Kafka Connect service - it runs Kafka Connect workers for you.
Pros: - AWS manages worker scaling, failures, upgrades - Integrated with CloudWatch for logging - Pay per worker hour (no idle cost if auto-scaled)
Cons: - Less flexible than self-hosted (limited connector plugins) - Higher cost than ECS Fargate for always-on workloads
Create Snowflake Sink Connector Plugin
MSK Connect requires custom connector plugins uploaded to S3.
# Download Snowflake Kafka Connector
wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/1.9.3/snowflake-kafka-connector-1.9.3.jar
# Create ZIP for MSK Connect
mkdir snowflake-connector
cp snowflake-kafka-connector-1.9.3.jar snowflake-connector/
zip -r snowflake-connector.zip snowflake-connector/
# Upload to S3
aws s3 cp snowflake-connector.zip s3://your-kafka-connect-plugins-bucket/snowflake-connector.zip
Terraform Configuration for MSK Connect
Add to ~/src/terraform/aws/msk/msk_connect.tf:
# S3 bucket for connector plugins
resource "aws_s3_bucket" "connector_plugins" {
bucket = "your-account-kafka-connect-plugins"
tags = {
Name = "kafka-connect-plugins"
}
}
# Custom plugin for Snowflake Sink
resource "aws_mskconnect_custom_plugin" "snowflake_sink" {
name = "snowflake-sink-connector"
content_type = "ZIP"
location {
s3 {
bucket_arn = aws_s3_bucket.connector_plugins.arn
file_key = "snowflake-connector.zip"
}
}
description = "Snowflake Kafka Connector for MSK Connect"
}
# MSK Connect worker configuration
resource "aws_mskconnect_worker_configuration" "this" {
name = "data-platform-worker-config"
properties_file_content = <<EOF
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=${aws_lb.schema_registry.dns_name}:8081
# Offset storage
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3
# Config storage
config.storage.topic=connect-configs
config.storage.replication.factor=3
# Status storage
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=3
EOF
}
# IAM role for MSK Connect
resource "aws_iam_role" "msk_connect" {
name = "msk-connect-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "kafkaconnect.amazonaws.com"
}
}
]
})
tags = {
Name = "msk-connect-role"
}
}
resource "aws_iam_role_policy" "msk_connect_msk_access" {
name = "msk-access"
role = aws_iam_role.msk_connect.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster"
]
Resource = aws_msk_cluster.this.arn
},
{
Effect = "Allow"
Action = [
"kafka-cluster:*Topic*",
"kafka-cluster:WriteData",
"kafka-cluster:ReadData"
]
Resource = "${aws_msk_cluster.this.arn}/*"
},
{
Effect = "Allow"
Action = [
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeGroup"
]
Resource = "${aws_msk_cluster.this.arn}/*"
}
]
})
}
resource "aws_iam_role_policy" "msk_connect_secrets_access" {
name = "secrets-access"
role = aws_iam_role.msk_connect.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"secretsmanager:GetSecretValue"
]
Resource = "arn:aws:secretsmanager:eu-west-2:*:secret:snowflake/svc-kafka-connector-*"
}
]
})
}
# Security group for MSK Connect workers
resource "aws_security_group" "msk_connect" {
name_description = "msk-connect"
description = "Security group for MSK Connect workers"
vpc_id = data.aws_vpc.main.id
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
tags = {
Name = "msk-connect-sg"
}
}
# MSK Connect connector (Snowflake Sink)
resource "aws_mskconnect_connector" "snowflake_sink" {
name = "snowflake-sink-order-events"
kafkaconnect_version = "2.7.1"
capacity {
autoscaling {
mcu_count = 1
min_worker_count = 1
max_worker_count = 2
scale_in_policy {
cpu_utilization_percentage = 20
}
scale_out_policy {
cpu_utilization_percentage = 80
}
}
}
connector_configuration = {
"connector.class" = "com.snowflake.kafka.connector.SnowflakeSinkConnector"
"tasks.max" = "1"
# Kafka topic
"topics" = "order-events"
# Snowflake connection (from Secrets Manager)
"snowflake.url.name" = "your-account.eu-west-2.aws.snowflakecomputing.com"
"snowflake.user.name" = "SVC_KAFKA_CONNECTOR"
"snowflake.private.key" = "${aws_secretsmanager_secret.snowflake_credentials.arn}:private_key::"
"snowflake.database.name" = "STREAMING"
"snowflake.schema.name" = "PUBLIC"
"snowflake.role.name" = "SVC_KAFKA_CONNECTOR"
# Buffer settings
"buffer.count.records" = "10000"
"buffer.flush.time" = "60"
"buffer.size.bytes" = "5000000"
# Table configuration
"snowflake.topic2table.map" = "order-events:ORDER_EVENTS"
# Schema Registry
"value.converter" = "io.confluent.connect.avro.AvroConverter"
"value.converter.schema.registry.url" = "http://${aws_lb.schema_registry.dns_name}:8081"
"key.converter" = "org.apache.kafka.connect.storage.StringConverter"
}
kafka_cluster {
apache_kafka_cluster {
bootstrap_servers = aws_msk_cluster.this.bootstrap_brokers_tls
vpc {
security_groups = [aws_security_group.msk_connect.id]
subnets = data.aws_subnets.private.ids
}
}
}
kafka_cluster_client_authentication {
authentication_type = "NONE" # TLS only (no SASL)
}
kafka_cluster_encryption_in_transit {
encryption_type = "TLS"
}
plugin {
custom_plugin {
arn = aws_mskconnect_custom_plugin.snowflake_sink.arn
revision = aws_mskconnect_custom_plugin.snowflake_sink.latest_revision
}
}
service_execution_role_arn = aws_iam_role.msk_connect.arn
worker_configuration {
arn = aws_mskconnect_worker_configuration.this.arn
revision = aws_mskconnect_worker_configuration.this.latest_revision
}
log_delivery {
worker_log_delivery {
cloudwatch_logs {
enabled = true
log_group = aws_cloudwatch_log_group.msk_connect.name
}
}
}
}
resource "aws_cloudwatch_log_group" "msk_connect" {
name = "/aws/mskconnect/snowflake-sink"
retention_in_days = 7
tags = {
Name = "msk-connect-logs"
}
}
Deploy MSK Connect
terraform plan -out=tfplan
terraform apply tfplan
# Check connector status (takes 5-10 minutes to start)
aws kafkaconnect describe-connector \
--connector-arn $(terraform output -raw snowflake_sink_connector_arn) \
--region eu-west-2
Cost Comparison
MSK Total Cost (5 GB/day workload)
| Component | Monthly Cost | Notes |
|---|---|---|
| MSK cluster (3 × kafka.m5.large) | $252 | 24/7 brokers |
| EBS storage (3 × 100 GB) | $24 | gp3 volumes |
| Schema Registry ECS (0.5 vCPU) | $15 | Fargate 24/7 |
| Schema Registry RDS (db.t3.micro) | $20 | PostgreSQL |
| Schema Registry ALB | $15 | Internal load balancer |
| MSK Connect (1 MCU × 720 hours) | $86 | $0.12/MCU-hour |
| CloudWatch Logs | $10 | Broker + connector logs |
| Data transfer | $5 | Minimal within VPC |
| KMS | $3 | Encryption keys |
| Setup time (one-time) | - | 40 hours @ $60 = $2,400 |
| Ops time (monthly) | - | 6-12 hours @ $60 = $360-720 |
| Total infrastructure | $430/month | |
| Total with ops | $790-1,150/month |
Confluent Cloud Total Cost (5 GB/day workload)
| Component | Monthly Cost | Notes |
|---|---|---|
| Confluent Cloud Basic | $150 | 1 CKU cluster |
| Data ingress (1 GB/day) | $2.50 | $0.08/GB |
| Data egress (1 GB/day) | $2.50 | Connector reads |
| Storage (30 GB) | $2.50 | 7-day retention |
| Snowflake compute | $35 | INGEST_WH (XSMALL) |
| Snowflake storage | $5 | STREAMING database |
| AWS Secrets Manager | $1.50 | Credentials |
| Setup time (one-time) | - | 8 hours @ $60 = $480 |
| Ops time (monthly) | - | 1-2 hours @ $60 = $60-120 |
| Total infrastructure | $199/month | |
| Total with ops | $259-319/month |
Winner: Confluent Cloud (72% cheaper)
| Metric | MSK | Confluent Cloud | Difference |
|---|---|---|---|
| Infrastructure cost | $430/month | $199/month | +116% |
| Ops time | 6-12 hours/month | 1-2 hours/month | +400% |
| Total cost | $790-1,150/month | $259-319/month | +205-260% |
| Setup time | 40 hours | 8 hours | +400% |
When MSK Makes Sense
Scale Tipping Point
MSK becomes cost-competitive at very high scale:
| Daily Volume | Confluent Cloud Total | MSK Total | Cheaper Option |
|---|---|---|---|
| 5 GB | $259/month | $790/month | Confluent (67% cheaper) |
| 50 GB | $374/month | $950/month | Confluent (61% cheaper) |
| 200 GB | $900/month | $1,400/month | Confluent (36% cheaper) |
| 500 GB | $2,200/month | $2,400/month | Similar |
| 1 TB | $4,000/month | $3,200/month | MSK (20% cheaper) |
Break-even: ~500 GB/day (~15 TB/month)
Non-Cost Reasons
MSK makes sense when:
✅ Existing MSK infrastructure - Team already running MSK for other use cases ✅ AWS commitment - Enterprise discount programme makes MSK significantly cheaper ✅ Data residency - Must keep data in specific AWS account/region ✅ Compliance - Custom security controls not available in Confluent Cloud ✅ Custom Kafka configuration - Need non-standard broker settings
Migration Path
Phase 1: Start with Confluent Cloud (months 0-12) - Learn Kafka concepts without ops burden - Build data pipelines and producers - Validate business value
Phase 2: Evaluate MSK (month 12+) - If throughput > 200 GB/day - AND team has DevOps capacity - AND cost savings justify migration effort
Migration effort: 40-80 hours (planning, testing, cutover)
Summary
You've learned when and how to self-host Kafka with MSK:
- MSK provisioned cluster - 3-broker setup with TLS encryption
- Schema Registry on ECS - Self-hosted Confluent Schema Registry
- MSK Connect - Managed Kafka Connect for Snowflake Sink
- VPC networking - Security groups and private subnets
- Cost comparison - $790-1,150/month vs $259-319/month for Confluent Cloud
- Break-even analysis - MSK competitive at > 500 GB/day
Key insight: MSK has lower infrastructure cost but much higher total cost due to operational burden. Start with Confluent Cloud for 99% of use cases.
What's Next
Explore stream processing to aggregate and enrich events in real-time.
Continue to Stream Processing →