Skip to content

Stream Processing (Optional)

Optional Advanced Topic

This page is optional. Most teams can process streaming data using dbt after it lands in Snowflake. Only add stream processing if you need sub-second transformations, real-time aggregations, or complex event processing that can't wait for dbt runs.

On this page, you will:

  • Understand when to process in-stream vs in-warehouse
  • Deploy ksqlDB for SQL-based stream processing
  • Deploy Apache Flink on ECS for complex event processing
  • Create real-time aggregations and windowed calculations
  • Implement sessionisation and pattern matching
  • Evaluate cost implications of adding stream processing
  • Choose the right tool for your use case

Overview

Stream processing transforms data in-flight before it lands in Snowflake. This enables real-time use cases that can't wait for batch dbt transformations.

Trade-off: Added complexity and cost vs real-time insights.

┌─────────────────────────────────────────────────────────────────────────┐
│                    STREAM PROCESSING OPTIONS                            │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  Without Stream Processing (Recommended Default)                        │
│  ───────────────────────────────────────────────                        │
│  ┌──────────┐      ┌──────────┐      ┌──────────┐      ┌──────────┐   │
│  │ Producer │─────▶│  Kafka   │─────▶│ Snowflake│─────▶│   dbt    │   │
│  │          │ <1s  │  Topic   │ <5s  │  Table   │ 5min │ Transform│   │
│  └──────────┘      └──────────┘      └──────────┘      └──────────┘   │
│                                                                         │
│  Total latency: ~5 minutes (sufficient for most use cases)             │
│  Cost: $200/month (Confluent Cloud + Snowflake)                        │
│                                                                         │
│  ─────────────────────────────────────────────────────────────────────  │
│                                                                         │
│  With Stream Processing (Real-Time Aggregations)                        │
│  ────────────────────────────────────────────────                       │
│  ┌──────────┐      ┌──────────┐      ┌──────────┐      ┌──────────┐   │
│  │ Producer │─────▶│  Kafka   │─────▶│ ksqlDB / │─────▶│  Kafka   │   │
│  │          │ <1s  │  Topic   │ <1s  │  Flink   │ <1s  │  Topic   │   │
│  └──────────┘      └──────────┘      └──────────┘      └──────────┘   │
│                                            │                   │         │
│                                            ▼                   ▼         │
│                                       ┌──────────┐      ┌──────────┐   │
│                                       │ Snowflake│      │ Snowflake│   │
│                                       │ (raw)    │      │(processed)│  │
│                                       └──────────┘      └──────────┘   │
│                                                                         │
│  Total latency: ~3 seconds (real-time aggregations)                    │
│  Cost: $400-600/month (+$200-400 for processing layer)                 │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

When to Use Stream Processing

Decision Tree

┌─────────────────────────────────────────────────────────────────────────┐
│                    STREAM PROCESSING DECISION TREE                      │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  Do you need results within seconds (not minutes)?                      │
│  ├─ NO  → Use dbt in Snowflake (sufficient for most use cases)         │
│  └─ YES → Continue                                                      │
│                                                                         │
│  Can you wait for dbt to run every 5 minutes?                           │
│  ├─ YES → Use dbt (simpler, cheaper)                                   │
│  └─ NO  → Continue                                                      │
│                                                                         │
│  Is the logic stateless (per-event filtering/enrichment)?               │
│  ├─ YES → Use Kafka Streams in producer app (simplest)                 │
│  └─ NO  → Continue                                                      │
│                                                                         │
│  Is the logic SQL-based (aggregations, joins, windowing)?               │
│  ├─ YES → Use ksqlDB                                                    │
│  └─ NO  → Continue                                                      │
│                                                                         │
│  Is the logic complex (CEP, ML, custom state management)?               │
│  ├─ YES → Use Apache Flink                                             │
│  └─ NO  → Reconsider if you need stream processing                     │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Use Cases by Latency Requirement

Use Case Acceptable Latency Recommended Approach
Daily dashboards Hours dbt batch (scheduled)
Hourly reports 5-15 minutes dbt micro-batching (every 5 min)
Real-time dashboards 30-60 seconds dbt + Snowflake auto-refresh
Operational alerts < 10 seconds ksqlDB or Flink
Fraud detection < 1 second Flink + in-memory state
High-frequency trading < 100ms Custom in-process (not Kafka)

Good Reasons to Add Stream Processing

Real-time alerting - Detect anomalies and trigger actions within seconds ✅ Live aggregations - Website visitor counts, active sessions, running totals ✅ Complex event processing - Pattern detection across multiple event types ✅ Sessionisation - Group events by user sessions with timeouts ✅ Real-time enrichment - Join streaming events with reference data ✅ Data quality filtering - Drop invalid events before storage

Bad Reasons

"We want real-time" - without specific business value measured in $ ❌ Over-engineering - Adding complexity before validating need ❌ Resume building - Using Flink because it's trendy, not because it's needed ❌ Avoiding dbt - Stream processing is harder to maintain than dbt

ksqlDB for SQL-Based Processing

What is ksqlDB?

ksqlDB is a SQL streaming database built on Kafka. It lets you:

  • Write SQL queries that run continuously on Kafka topics
  • Create materialized views (aggregations updated in real-time)
  • Join streams with tables
  • Window data by time (tumbling, hopping, session windows)

Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                    ksqlDB ARCHITECTURE                                  │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  Kafka Topics                                                           │
│  ────────────                                                           │
│  ┌────────────────────┐          ┌────────────────────┐                │
│  │ order-events       │          │ customer-events    │                │
│  │ (raw orders)       │          │ (raw customers)    │                │
│  └────────────────────┘          └────────────────────┘                │
│           │                               │                             │
│           └───────────────┬───────────────┘                             │
│                           ▼                                             │
│  ksqlDB Server (ECS Fargate)                                            │
│  ────────────────────────────                                           │
│  ┌─────────────────────────────────────────────────────┐               │
│  │ CREATE STREAM orders ...                            │               │
│  │ CREATE TABLE order_totals_by_customer AS            │               │
│  │   SELECT customer_id,                               │               │
│  │          COUNT(*) as order_count,                   │               │
│  │          SUM(order_total) as total_spent            │               │
│  │   FROM orders                                       │               │
│  │   WINDOW TUMBLING (SIZE 1 HOUR)                     │               │
│  │   GROUP BY customer_id;                             │               │
│  └─────────────────────────────────────────────────────┘               │
│                           │                                             │
│                           ▼                                             │
│  Output Topic                                                           │
│  ────────────                                                           │
│  ┌────────────────────┐                                                 │
│  │ order_totals       │────▶ Snowflake Sink ────▶ Snowflake            │
│  │ (aggregated)       │                                                 │
│  └────────────────────┘                                                 │
│                                                                         │
│  Cost: $30-60/month (ECS Fargate, 1-2 vCPU)                            │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Deployment on ECS Fargate

Terraform Configuration

Create ~/src/terraform/aws/ksqldb/main.tf:

# ECS task definition for ksqlDB
resource "aws_ecs_task_definition" "ksqldb" {
  family                   = "ksqldb"
  requires_compatibilities = ["FARGATE"]
  network_mode             = "awsvpc"
  cpu                      = "1024"  # 1 vCPU
  memory                   = "2048"  # 2 GB
  execution_role_arn       = aws_iam_role.ecs_execution.arn
  task_role_arn            = aws_iam_role.ecs_task.arn

  container_definitions = jsonencode([
    {
      name  = "ksqldb-server"
      image = "confluentinc/ksqldb-server:0.29.0"

      portMappings = [
        {
          containerPort = 8088
          protocol      = "tcp"
        }
      ]

      environment = [
        {
          name  = "KSQL_BOOTSTRAP_SERVERS"
          value = var.kafka_bootstrap_servers
        },
        {
          name  = "KSQL_LISTENERS"
          value = "http://0.0.0.0:8088"
        },
        {
          name  = "KSQL_KSQL_SERVICE_ID"
          value = "ksqldb-cluster"
        },
        {
          name  = "KSQL_KSQL_SCHEMA_REGISTRY_URL"
          value = var.schema_registry_url
        },
        {
          name  = "KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE"
          value = "true"
        },
        {
          name  = "KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE"
          value = "true"
        }
      ]

      logConfiguration = {
        logDriver = "awslogs"
        options = {
          "awslogs-group"         = aws_cloudwatch_log_group.ksqldb.name
          "awslogs-region"        = "eu-west-2"
          "awslogs-stream-prefix" = "ksqldb"
        }
      }
    }
  ])

  tags = {
    Name = "ksqldb-task"
  }
}

resource "aws_cloudwatch_log_group" "ksqldb" {
  name              = "/ecs/ksqldb"
  retention_in_days = 7

  tags = {
    Name = "ksqldb-logs"
  }
}

# ECS service
resource "aws_ecs_service" "ksqldb" {
  name            = "ksqldb"
  cluster         = var.ecs_cluster_id
  task_definition = aws_ecs_task_definition.ksqldb.arn
  desired_count   = 1
  launch_type     = "FARGATE"

  network_configuration {
    subnets          = var.private_subnet_ids
    security_groups  = [aws_security_group.ksqldb.id]
    assign_public_ip = false
  }

  tags = {
    Name = "ksqldb-service"
  }
}

# Security group
resource "aws_security_group" "ksqldb" {
  name_description = "ksqldb"
  description = "Security group for ksqlDB server"
  vpc_id      = var.vpc_id

  ingress {
    from_port   = 8088
    to_port     = 8088
    protocol    = "tcp"
    cidr_blocks = [var.vpc_cidr_block]
    description = "ksqlDB API from VPC"
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }

  tags = {
    Name = "ksqldb-sg"
  }
}

# Variables
variable "kafka_bootstrap_servers" {
  description = "Kafka bootstrap servers"
  type        = string
}

variable "schema_registry_url" {
  description = "Schema Registry URL"
  type        = string
}

variable "ecs_cluster_id" {
  description = "ECS cluster ID"
  type        = string
}

variable "vpc_id" {
  description = "VPC ID"
  type        = string
}

variable "vpc_cidr_block" {
  description = "VPC CIDR block"
  type        = string
}

variable "private_subnet_ids" {
  description = "Private subnet IDs"
  type        = list(string)
}

# Outputs
output "ksqldb_endpoint" {
  value       = "http://${aws_ecs_service.ksqldb.name}:8088"
  description = "ksqlDB server endpoint"
}

Deploy ksqlDB

cd ~/src/terraform/aws/ksqldb

terraform init
terraform plan -out=tfplan
terraform apply tfplan

Example: Real-Time Order Aggregations

Create a ksqlDB query to aggregate orders by customer in real-time.

Connect to ksqlDB

# Port-forward to ksqlDB server
aws ecs execute-command \
    --cluster streaming-cluster \
    --task <task-id> \
    --container ksqldb-server \
    --interactive \
    --command "/bin/bash"

# Inside container, start ksqlDB CLI
ksql http://localhost:8088

Create Stream from Topic

-- Define stream from order-events topic
CREATE STREAM order_events (
    order_id BIGINT,
    customer_id STRING,
    order_status STRING,
    order_total DOUBLE,
    created_at BIGINT
) WITH (
    KAFKA_TOPIC='order-events',
    VALUE_FORMAT='AVRO'
);

Create Aggregation Table

-- Real-time aggregation: total orders and spend per customer
CREATE TABLE customer_order_totals AS
    SELECT
        customer_id,
        COUNT(*) AS order_count,
        SUM(order_total) AS total_spent,
        MAX(created_at) AS last_order_time
    FROM order_events
    WINDOW TUMBLING (SIZE 1 HOUR)
    GROUP BY customer_id
    EMIT CHANGES;

This query: - Continuously processes new order events - Aggregates by customer in 1-hour windows - Writes results to a new Kafka topic (customer_order_totals) - Updates as new events arrive

Query the Table

-- Query current aggregations
SELECT * FROM customer_order_totals WHERE customer_id = 'cust_123';

-- Result (updated in real-time):
-- customer_id | order_count | total_spent | last_order_time
-- cust_123    | 15          | 1,234.56    | 1709643845000

Sink to Snowflake

Add a Snowflake Sink Connector for the customer_order_totals topic - it lands in Snowflake within seconds.

Example: Sessionisation

Group events by user sessions with 30-minute inactivity timeout.

-- Define clickstream events
CREATE STREAM clickstream (
    user_id STRING,
    page_url STRING,
    event_time BIGINT
) WITH (
    KAFKA_TOPIC='clickstream-events',
    VALUE_FORMAT='AVRO'
);

-- Sessionise: group events into sessions (30-min timeout)
CREATE TABLE user_sessions AS
    SELECT
        user_id,
        COUNT(*) AS event_count,
        COLLECT_LIST(page_url) AS pages_visited,
        MIN(event_time) AS session_start,
        MAX(event_time) AS session_end
    FROM clickstream
    WINDOW SESSION (30 MINUTES)
    GROUP BY user_id
    EMIT CHANGES;

Use case: Real-time analytics on active user sessions for dashboards.

ksqlDB Cost

Component Monthly Cost Notes
ECS Fargate (1 vCPU, 2 GB) $30 24/7
CloudWatch Logs $5 Processing logs
Additional Kafka storage $10 Output topics
Ops time 2-4 hours/month Query tuning, monitoring
Total $45 + $120-240 ops = $165-285/month

Apache Flink is a distributed stream processing framework for complex, stateful computations. Use it for:

  • Complex event processing (CEP) - Pattern detection across multiple events
  • Stateful processing - Maintaining large state (GBs) across events
  • Custom business logic - Python/Java code for non-SQL transformations
  • Low-latency ML inference - Real-time model scoring
  • Exactly-once guarantees - Critical financial transactions

Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                    FLINK ON ECS                                         │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  Flink Cluster (ECS)                                                    │
│  ───────────────────                                                    │
│  ┌────────────────────────────────────────────────────────────┐        │
│  │ JobManager (1 task, 1 vCPU, 2 GB)                         │        │
│  │ • Coordinates job execution                                │        │
│  │ • Manages checkpoints                                      │        │
│  └────────────────────────────────────────────────────────────┘        │
│                           │                                             │
│                           ▼                                             │
│  ┌────────────────────────────────────────────────────────────┐        │
│  │ TaskManagers (2 tasks, 2 vCPU each, 4 GB each)            │        │
│  │ • Execute streaming jobs                                   │        │
│  │ • Maintain state                                           │        │
│  │ • Parallel processing                                      │        │
│  └────────────────────────────────────────────────────────────┘        │
│                           │                                             │
│                           ▼                                             │
│  S3 (Checkpoints)                                                       │
│  ────────────────                                                       │
│  ┌────────────────────────────────────────────────────────────┐        │
│  │ State snapshots every 5 minutes                            │        │
│  │ • Enables recovery from failures                           │        │
│  │ • Exactly-once processing guarantees                       │        │
│  └────────────────────────────────────────────────────────────┘        │
│                                                                         │
│  Cost: $150-250/month (JobManager + 2 TaskManagers)                    │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Deployment on ECS Fargate

Terraform Configuration

Create ~/src/terraform/aws/flink/main.tf:

# S3 bucket for Flink checkpoints
resource "aws_s3_bucket" "flink_checkpoints" {
  bucket = "your-account-flink-checkpoints"

  tags = {
    Name = "flink-checkpoints"
  }
}

resource "aws_s3_bucket_versioning" "flink_checkpoints" {
  bucket = aws_s3_bucket.flink_checkpoints.id

  versioning_configuration {
    status = "Enabled"
  }
}

# IAM role for Flink tasks
resource "aws_iam_role" "flink_task" {
  name = "flink-task-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "ecs-tasks.amazonaws.com"
        }
      }
    ]
  })

  tags = {
    Name = "flink-task-role"
  }
}

resource "aws_iam_role_policy" "flink_s3_access" {
  name = "s3-checkpoint-access"
  role = aws_iam_role.flink_task.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "s3:GetObject",
          "s3:PutObject",
          "s3:DeleteObject",
          "s3:ListBucket"
        ]
        Resource = [
          aws_s3_bucket.flink_checkpoints.arn,
          "${aws_s3_bucket.flink_checkpoints.arn}/*"
        ]
      }
    ]
  })
}

# ECS task definition for Flink JobManager
resource "aws_ecs_task_definition" "flink_jobmanager" {
  family                   = "flink-jobmanager"
  requires_compatibilities = ["FARGATE"]
  network_mode             = "awsvpc"
  cpu                      = "1024"  # 1 vCPU
  memory                   = "2048"  # 2 GB
  execution_role_arn       = aws_iam_role.ecs_execution.arn
  task_role_arn            = aws_iam_role.flink_task.arn

  container_definitions = jsonencode([
    {
      name  = "jobmanager"
      image = "flink:1.18-scala_2.12-java11"
      command = ["jobmanager"]

      portMappings = [
        {
          containerPort = 8081
          protocol      = "tcp"
        },
        {
          containerPort = 6123
          protocol      = "tcp"
        }
      ]

      environment = [
        {
          name  = "JOB_MANAGER_RPC_ADDRESS"
          value = "jobmanager"
        },
        {
          name  = "FLINK_PROPERTIES"
          value = <<EOF
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
parallelism.default: 2
state.backend: rocksdb
state.checkpoints.dir: s3://${aws_s3_bucket.flink_checkpoints.bucket}/checkpoints
state.savepoints.dir: s3://${aws_s3_bucket.flink_checkpoints.bucket}/savepoints
execution.checkpointing.interval: 300000
execution.checkpointing.mode: EXACTLY_ONCE
EOF
        }
      ]

      logConfiguration = {
        logDriver = "awslogs"
        options = {
          "awslogs-group"         = aws_cloudwatch_log_group.flink.name
          "awslogs-region"        = "eu-west-2"
          "awslogs-stream-prefix" = "jobmanager"
        }
      }
    }
  ])

  tags = {
    Name = "flink-jobmanager-task"
  }
}

# ECS task definition for Flink TaskManager
resource "aws_ecs_task_definition" "flink_taskmanager" {
  family                   = "flink-taskmanager"
  requires_compatibilities = ["FARGATE"]
  network_mode             = "awsvpc"
  cpu                      = "2048"  # 2 vCPU
  memory                   = "4096"  # 4 GB
  execution_role_arn       = aws_iam_role.ecs_execution.arn
  task_role_arn            = aws_iam_role.flink_task.arn

  container_definitions = jsonencode([
    {
      name  = "taskmanager"
      image = "flink:1.18-scala_2.12-java11"
      command = ["taskmanager"]

      environment = [
        {
          name  = "JOB_MANAGER_RPC_ADDRESS"
          value = "jobmanager.internal"  # Use service discovery
        },
        {
          name  = "FLINK_PROPERTIES"
          value = <<EOF
jobmanager.rpc.address: jobmanager.internal
taskmanager.numberOfTaskSlots: 2
taskmanager.memory.process.size: 3072m
EOF
        }
      ]

      logConfiguration = {
        logDriver = "awslogs"
        options = {
          "awslogs-group"         = aws_cloudwatch_log_group.flink.name
          "awslogs-region"        = "eu-west-2"
          "awslogs-stream-prefix" = "taskmanager"
        }
      }
    }
  ])

  tags = {
    Name = "flink-taskmanager-task"
  }
}

resource "aws_cloudwatch_log_group" "flink" {
  name              = "/ecs/flink"
  retention_in_days = 7

  tags = {
    Name = "flink-logs"
  }
}

# ECS services
resource "aws_ecs_service" "flink_jobmanager" {
  name            = "flink-jobmanager"
  cluster         = var.ecs_cluster_id
  task_definition = aws_ecs_task_definition.flink_jobmanager.arn
  desired_count   = 1
  launch_type     = "FARGATE"

  network_configuration {
    subnets          = var.private_subnet_ids
    security_groups  = [aws_security_group.flink.id]
    assign_public_ip = false
  }

  service_registries {
    registry_arn = aws_service_discovery_service.jobmanager.arn
  }

  tags = {
    Name = "flink-jobmanager-service"
  }
}

resource "aws_ecs_service" "flink_taskmanager" {
  name            = "flink-taskmanager"
  cluster         = var.ecs_cluster_id
  task_definition = aws_ecs_task_definition.flink_taskmanager.arn
  desired_count   = 2  # 2 TaskManagers for parallelism
  launch_type     = "FARGATE"

  network_configuration {
    subnets          = var.private_subnet_ids
    security_groups  = [aws_security_group.flink.id]
    assign_public_ip = false
  }

  tags = {
    Name = "flink-taskmanager-service"
  }
}

# Service discovery for JobManager
resource "aws_service_discovery_private_dns_namespace" "flink" {
  name = "flink.internal"
  vpc  = var.vpc_id

  tags = {
    Name = "flink-service-discovery"
  }
}

resource "aws_service_discovery_service" "jobmanager" {
  name = "jobmanager"

  dns_config {
    namespace_id = aws_service_discovery_private_dns_namespace.flink.id

    dns_records {
      ttl  = 10
      type = "A"
    }
  }

  tags = {
    Name = "jobmanager-discovery"
  }
}

# Security group
resource "aws_security_group" "flink" {
  name_description = "flink"
  description = "Security group for Flink cluster"
  vpc_id      = var.vpc_id

  # JobManager RPC
  ingress {
    from_port = 6123
    to_port   = 6123
    protocol  = "tcp"
    self      = true
  }

  # JobManager Web UI
  ingress {
    from_port   = 8081
    to_port     = 8081
    protocol    = "tcp"
    cidr_blocks = [var.vpc_cidr_block]
  }

  # TaskManager data exchange
  ingress {
    from_port = 6121
    to_port   = 6125
    protocol  = "tcp"
    self      = true
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }

  tags = {
    Name = "flink-sg"
  }
}
cd ~/src/terraform/aws/flink

terraform init
terraform plan -out=tfplan
terraform apply tfplan

Example: Fraud Detection with Pattern Matching

Detect suspicious patterns: 3+ failed login attempts within 5 minutes from the same IP.

Create fraud_detection_job.py:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer, FlinkKafkaProducer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor
import json
from datetime import datetime, timedelta

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(2)

# Kafka consumer (login events)
kafka_consumer = FlinkKafkaConsumer(
    topics='login-events',
    deserialization_schema=SimpleStringSchema(),
    properties={
        'bootstrap.servers': 'broker1:9092,broker2:9092',
        'group.id': 'fraud-detection'
    }
)

# Kafka producer (alerts)
kafka_producer = FlinkKafkaProducer(
    topic='fraud-alerts',
    serialization_schema=SimpleStringSchema(),
    producer_config={
        'bootstrap.servers': 'broker1:9092,broker2:9092'
    }
)

class FraudDetector(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        # State: track failed login count and timestamps
        self.failed_logins_state = runtime_context.get_state(
            ValueStateDescriptor("failed-logins", Types.LIST(Types.LONG()))
        )

    def process_element(self, value, ctx):
        event = json.loads(value)

        # Only process failed logins
        if event['status'] != 'FAILED':
            return

        current_time = ctx.timestamp()
        ip_address = event['ip_address']

        # Get failed login timestamps for this IP
        failed_logins = self.failed_logins_state.value() or []

        # Remove timestamps older than 5 minutes
        five_minutes_ago = current_time - (5 * 60 * 1000)
        recent_failures = [ts for ts in failed_logins if ts > five_minutes_ago]

        # Add current failure
        recent_failures.append(current_time)

        # Update state
        self.failed_logins_state.update(recent_failures)

        # Alert if 3+ failures in 5 minutes
        if len(recent_failures) >= 3:
            alert = {
                'alert_type': 'SUSPECTED_FRAUD',
                'ip_address': ip_address,
                'failure_count': len(recent_failures),
                'time_window': '5_minutes',
                'timestamp': current_time
            }
            yield json.dumps(alert)

# Build streaming pipeline
login_events = env.add_source(kafka_consumer)

fraud_alerts = (login_events
    .key_by(lambda event: json.loads(event)['ip_address'])
    .process(FraudDetector()))

fraud_alerts.add_sink(kafka_producer)

env.execute("Fraud Detection Job")

Package and Deploy

# Build Flink job JAR
python -m PyFlinkJob fraud_detection_job.py

# Upload to S3
aws s3 cp fraud_detection_job.jar s3://your-flink-jobs-bucket/

# Submit job to Flink cluster
flink run -d \
    -m jobmanager.internal:8081 \
    s3://your-flink-jobs-bucket/fraud_detection_job.jar
Component Monthly Cost Notes
JobManager ECS (1 vCPU, 2 GB) $30 24/7
TaskManager ECS (2 × 2 vCPU, 4 GB) $120 24/7
S3 checkpoints $5 State snapshots
CloudWatch Logs $10 Processing logs
Development time (one-time) - 40-80 hours @ $60 = $2,400-4,800
Ops time (monthly) 4-8 hours/month Job monitoring, tuning
Total $165 + $240-480 ops = $405-645/month

Stream Processing vs dbt

Latency Comparison

Approach Latency Cost/Month Complexity
dbt hourly 30-60 min $50 Low
dbt every 5 min 5-10 min $100 Low
ksqlDB < 10 sec $165-285 Medium
Flink < 1 sec $405-645 High

When to Use Each

Requirement Use dbt Use ksqlDB Use Flink
Batch reports
Simple aggregations
Real-time dashboards (< 1 min) ⚠️
Sessionisation
Pattern detection ⚠️
ML inference
Custom business logic ⚠️

Recommendation: Start with dbt. Add ksqlDB if you need < 10s latency for SQL transformations. Add Flink only for complex CEP or < 1s latency.

Summary

You've learned when and how to add stream processing:

  • Decision framework - Most teams don't need stream processing (dbt is sufficient)
  • ksqlDB - SQL-based processing for real-time aggregations and joins ($165-285/month)
  • Apache Flink - Complex event processing and pattern detection ($405-645/month)
  • Use cases - Fraud detection, sessionisation, real-time dashboards
  • Cost trade-offs - 2-4× cost increase for sub-second latency
  • Complexity - Significant operational burden vs dbt

Key insight: Stream processing adds cost and complexity. Only add it when business value (measured in $) exceeds the $200-400/month premium.

What's Next

Learn about Change Data Capture (CDC) for real-time database replication.

Continue to Change Data Capture