Skip to content

Producing Events

On this page, you will:

  • Build production Python producers with confluent-kafka library
  • Register and version Avro schemas in Schema Registry
  • Produce events with serialisation and partition keys
  • Handle errors, retries, and delivery guarantees
  • Integrate producers with Prefect tasks
  • Test event-driven workflows

Overview

Producers are applications that write events to Kafka topics. In this guide, you'll build a production-grade producer that:

  • Retrieves credentials from AWS Secrets Manager
  • Registers Avro schemas with Schema Registry
  • Serialises events with type safety
  • Handles errors and retries
  • Integrates with Prefect for orchestration
┌─────────────────────────────────────────────────────────────────────────┐
│                    PRODUCER ARCHITECTURE                                │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  Application              Producer                   Kafka              │
│  ───────────              ────────                   ─────              │
│                                                                         │
│  ┌────────────┐         ┌────────────┐           ┌────────────┐        │
│  │ Prefect    │────────▶│ Python     │──────────▶│ order-     │        │
│  │ Task       │         │ Producer   │           │ events     │        │
│  │            │         │            │           │ topic      │        │
│  │ Scheduled  │         │ • Get      │           │            │        │
│  │ or event-  │         │   creds    │           │ Partition  │        │
│  │ driven     │         │ • Register │           │ by order_id│        │
│  └────────────┘         │   schema   │           └────────────┘        │
│                         │ • Serialise│                  │              │
│                         │ • Retry    │                  ▼              │
│                         └────────────┘           ┌────────────┐        │
│                                                  │ Schema     │        │
│                                                  │ Registry   │        │
│                                                  │ (Avro)     │        │
│                                                  └────────────┘        │
│                                                                         │
│  Pattern: Task → Producer → Schema validation → Kafka → Connector      │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Python Producer Class

Create a reusable producer class with schema management, error handling, and retries.

Producer Module

Create kafka_producer.py:

"""
Production-grade Kafka producer with Avro schema support.
"""
import json
import logging
from typing import Dict, Any, Optional, Callable
import boto3
from confluent_kafka import Producer, KafkaException
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField


logger = logging.getLogger(__name__)


class KafkaProducer:
    """
    Kafka producer with Schema Registry integration.

    Features:
    - Retrieves credentials from AWS Secrets Manager
    - Registers Avro schemas
    - Handles serialisation and delivery
    - Retries on transient failures
    """

    def __init__(
        self,
        topic: str,
        schema: Dict[str, Any],
        aws_region: str = 'eu-west-2',
        kafka_secret_name: str = 'confluent/kafka-cluster',
        schema_registry_secret_name: str = 'confluent/schema-registry'
    ):
        """
        Initialise Kafka producer.

        Args:
            topic: Kafka topic name
            schema: Avro schema as dict
            aws_region: AWS region for Secrets Manager
            kafka_secret_name: Secret name for Kafka credentials
            schema_registry_secret_name: Secret name for Schema Registry credentials
        """
        self.topic = topic
        self.schema = schema

        # Retrieve credentials
        self.kafka_config = self._get_kafka_config(aws_region, kafka_secret_name)
        self.schema_registry_config = self._get_schema_registry_config(
            aws_region, schema_registry_secret_name
        )

        # Initialise producer and schema registry client
        self.producer = Producer(self.kafka_config)
        self.schema_registry_client = SchemaRegistryClient(self.schema_registry_config)

        # Create Avro serialiser
        self.avro_serialiser = AvroSerializer(
            self.schema_registry_client,
            json.dumps(schema),
            lambda obj, ctx: obj  # Object is already a dict
        )

        logger.info(f"Kafka producer initialised for topic: {topic}")

    def _get_kafka_config(self, region: str, secret_name: str) -> Dict[str, str]:
        """Retrieve Kafka credentials from AWS Secrets Manager."""
        client = boto3.client('secretsmanager', region_name=region)
        response = client.get_secret_value(SecretId=secret_name)
        secret = json.loads(response['SecretString'])

        return {
            'bootstrap.servers': secret['bootstrap_servers'],
            'security.protocol': 'SASL_SSL',
            'sasl.mechanisms': 'PLAIN',
            'sasl.username': secret['api_key'],
            'sasl.password': secret['api_secret'],
            # Production settings
            'acks': 'all',  # Wait for all replicas
            'retries': 10,  # Retry up to 10 times
            'enable.idempotence': True,  # Exactly-once semantics
            'compression.type': 'snappy',  # Compress messages
            'linger.ms': 100,  # Batch messages for 100ms
            'batch.size': 16384  # Batch up to 16KB
        }

    def _get_schema_registry_config(self, region: str, secret_name: str) -> Dict[str, str]:
        """Retrieve Schema Registry credentials from AWS Secrets Manager."""
        client = boto3.client('secretsmanager', region_name=region)
        response = client.get_secret_value(SecretId=secret_name)
        secret = json.loads(response['SecretString'])

        return {
            'url': secret['url'],
            'basic.auth.user.info': f"{secret['api_key']}:{secret['api_secret']}"
        }

    def produce(
        self,
        event: Dict[str, Any],
        key: Optional[str] = None,
        callback: Optional[Callable] = None
    ) -> None:
        """
        Produce an event to Kafka.

        Args:
            event: Event data as dict (must match Avro schema)
            key: Partition key (optional)
            callback: Delivery callback (optional)
        """
        try:
            # Serialise event
            serialised_value = self.avro_serialiser(
                event,
                SerializationContext(self.topic, MessageField.VALUE)
            )

            # Produce to Kafka
            self.producer.produce(
                topic=self.topic,
                key=key.encode('utf-8') if key else None,
                value=serialised_value,
                callback=callback or self._default_delivery_callback
            )

            logger.debug(f"Event produced to {self.topic}: key={key}")

        except Exception as e:
            logger.error(f"Failed to produce event: {e}")
            raise

    def _default_delivery_callback(self, err, msg):
        """Default callback for delivery confirmation."""
        if err:
            logger.error(f"Message delivery failed: {err}")
        else:
            logger.info(
                f"Message delivered to {msg.topic()} "
                f"[partition {msg.partition()}] at offset {msg.offset()}"
            )

    def flush(self, timeout: float = 10.0) -> int:
        """
        Wait for all messages to be delivered.

        Args:
            timeout: Maximum time to wait (seconds)

        Returns:
            Number of messages still in queue
        """
        remaining = self.producer.flush(timeout)
        if remaining > 0:
            logger.warning(f"{remaining} messages still in queue after flush")
        return remaining

    def close(self) -> None:
        """Close the producer and clean up resources."""
        self.flush()
        logger.info("Kafka producer closed")

Configuration Explanation

Producer settings:

'acks': 'all',  # Wait for all replicas to acknowledge (durability)
'retries': 10,  # Retry failed sends up to 10 times
'enable.idempotence': True,  # Exactly-once delivery (no duplicates)
'compression.type': 'snappy',  # Compress messages (reduce bandwidth)
'linger.ms': 100,  # Wait 100ms to batch messages (throughput)
'batch.size': 16384  # Batch up to 16KB before sending

Trade-offs:

Setting Low Latency High Throughput
acks 1 (leader only) all (all replicas)
linger.ms 0 (send immediately) 100-1000 (batch)
batch.size Small (1KB) Large (64KB)
compression.type none snappy/lz4

Recommendation: Use settings above for production (balance latency and throughput).

Produce Order Events

Create a script to produce sample order events.

Order Events Producer

Create produce_orders.py:

#!/usr/bin/env python3
"""
Produce sample order events to Kafka.
"""
import time
import logging
from kafka_producer import KafkaProducer


# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


# Avro schema for order events
ORDER_EVENTS_SCHEMA = {
    "type": "record",
    "name": "OrderEvent",
    "namespace": "com.company.streaming",
    "fields": [
        {"name": "order_id", "type": "long"},
        {"name": "customer_id", "type": "string"},
        {
            "name": "order_status",
            "type": {
                "type": "enum",
                "name": "OrderStatus",
                "symbols": ["CREATED", "CONFIRMED", "SHIPPED", "DELIVERED", "CANCELLED"]
            }
        },
        {"name": "order_total", "type": "double"},
        {"name": "created_at", "type": "long", "logicalType": "timestamp-millis"}
    ]
}


def generate_order_events(count: int = 10):
    """Generate sample order events."""
    for i in range(count):
        yield {
            'order_id': 1000 + i,
            'customer_id': f'cust_{1000 + i}',
            'order_status': ['CREATED', 'CONFIRMED', 'SHIPPED'][i % 3],
            'order_total': round(50.0 + (i * 10.5), 2),
            'created_at': int(time.time() * 1000)
        }


def main():
    logger.info("🚀 Starting order events producer...")

    # Initialise producer
    producer = KafkaProducer(
        topic='order-events',
        schema=ORDER_EVENTS_SCHEMA
    )

    # Produce events
    event_count = 0
    for event in generate_order_events(count=10):
        producer.produce(
            event=event,
            key=str(event['order_id'])  # Partition by order_id
        )
        event_count += 1

    # Wait for delivery
    remaining = producer.flush(timeout=10)

    if remaining == 0:
        logger.info(f"✅ Successfully produced {event_count} events")
    else:
        logger.error(f"❌ {remaining} events failed to deliver")

    producer.close()


if __name__ == '__main__':
    main()

Run Producer

python produce_orders.py

Expected output:

2026-03-05 12:00:00 - __main__ - INFO - 🚀 Starting order events producer...
2026-03-05 12:00:00 - kafka_producer - INFO - Kafka producer initialised for topic: order-events
2026-03-05 12:00:01 - kafka_producer - INFO - Message delivered to order-events [0] at offset 10
2026-03-05 12:00:01 - kafka_producer - INFO - Message delivered to order-events [1] at offset 8
...
2026-03-05 12:00:02 - __main__ - INFO - ✅ Successfully produced 10 events
2026-03-05 12:00:02 - kafka_producer - INFO - Kafka producer closed

Verify in Snowflake

SELECT
    RECORD_CONTENT:order_id::INT AS order_id,
    RECORD_CONTENT:customer_id::STRING AS customer_id,
    RECORD_CONTENT:order_status::STRING AS order_status,
    RECORD_CONTENT:order_total::FLOAT AS order_total,
    RECORD_METADATA:timestamp::TIMESTAMP AS kafka_timestamp
FROM STREAMING.PUBLIC.ORDER_EVENTS
WHERE RECORD_METADATA:timestamp >= DATEADD(minute, -5, CURRENT_TIMESTAMP())
ORDER BY kafka_timestamp DESC
LIMIT 10;

Integrate with Prefect

Wrap the producer in a Prefect task for orchestration.

Prefect Flow

Create flows/produce_order_events_flow.py:

"""
Prefect flow to produce order events on a schedule.
"""
from prefect import flow, task
from kafka_producer import KafkaProducer
import logging
import time


logger = logging.getLogger(__name__)


ORDER_EVENTS_SCHEMA = {
    "type": "record",
    "name": "OrderEvent",
    "namespace": "com.company.streaming",
    "fields": [
        {"name": "order_id", "type": "long"},
        {"name": "customer_id", "type": "string"},
        {
            "name": "order_status",
            "type": {
                "type": "enum",
                "name": "OrderStatus",
                "symbols": ["CREATED", "CONFIRMED", "SHIPPED", "DELIVERED", "CANCELLED"]
            }
        },
        {"name": "order_total", "type": "double"},
        {"name": "created_at", "type": "long", "logicalType": "timestamp-millis"}
    ]
}


@task(retries=3, retry_delay_seconds=5)
def produce_events(event_count: int = 10) -> int:
    """
    Produce order events to Kafka.

    Args:
        event_count: Number of events to produce

    Returns:
        Number of successfully produced events
    """
    logger.info(f"Producing {event_count} order events...")

    producer = KafkaProducer(
        topic='order-events',
        schema=ORDER_EVENTS_SCHEMA
    )

    produced = 0
    for i in range(event_count):
        event = {
            'order_id': int(time.time() * 1000) + i,  # Unique ID
            'customer_id': f'cust_{1000 + i}',
            'order_status': ['CREATED', 'CONFIRMED', 'SHIPPED'][i % 3],
            'order_total': round(50.0 + (i * 10.5), 2),
            'created_at': int(time.time() * 1000)
        }

        producer.produce(event=event, key=str(event['order_id']))
        produced += 1

    # Flush and close
    remaining = producer.flush(timeout=10)
    producer.close()

    if remaining > 0:
        raise Exception(f"{remaining} events failed to deliver")

    logger.info(f"✅ Successfully produced {produced} events")
    return produced


@flow(name="produce-order-events", log_prints=True)
def produce_order_events_flow(event_count: int = 10):
    """
    Flow to produce order events on a schedule.

    Args:
        event_count: Number of events to produce per run
    """
    print(f"🚀 Starting order events production flow...")

    # Produce events
    produced = produce_events(event_count=event_count)

    print(f"✅ Flow complete! Produced {produced} events")
    return produced


if __name__ == '__main__':
    # Run locally for testing
    produce_order_events_flow(event_count=5)

Deploy to Prefect Cloud

# Build deployment
prefect deploy flows/produce_order_events_flow.py:produce_order_events_flow \
    --name "Order Events Producer" \
    --pool "default-agent-pool" \
    --cron "*/5 * * * *"  # Every 5 minutes

# Trigger manually
prefect deployment run produce-order-events/Order\ Events\ Producer

Schedule Options

Schedule Cron Expression Use Case
Every 5 minutes */5 * * * * High-frequency events (testing)
Every hour 0 * * * * Moderate volume
Every day at 9am 0 9 * * * Daily batch events
Weekdays only 0 9 * * 1-5 Business hours events

Error Handling and Retries

Handling Serialisation Errors

Invalid events that don't match the Avro schema will fail serialisation:

try:
    producer.produce(event=event, key=key)
except Exception as e:
    logger.error(f"Serialisation failed for event: {event}")
    logger.error(f"Error: {e}")
    # Send to dead letter queue or skip

Handling Delivery Failures

Delivery failures are retried automatically (up to retries config):

def delivery_callback(err, msg):
    if err:
        # Permanent failure after retries
        logger.error(f"Permanent delivery failure: {err}")
        # Alert or write to DLQ
    else:
        # Success
        logger.info(f"Delivered: {msg.topic()} [{msg.partition()}] @ {msg.offset()}")

Dead Letter Queue Pattern

For permanent failures, send events to a DLQ topic:

def produce_with_dlq(producer, event, key, dlq_producer):
    """Produce event with DLQ fallback."""
    def callback(err, msg):
        if err:
            logger.error(f"Failed to deliver: {err}")
            # Send to DLQ
            dlq_producer.produce(
                event={'original_event': event, 'error': str(err)},
                key=key
            )

    producer.produce(event=event, key=key, callback=callback)

Summary

You've built production Python producers:

  • Reusable producer class — with Schema Registry integration and error handling
  • Avro schema registration — type-safe event serialisation
  • Partition keys — events for the same order_id always in same partition
  • Production settings — exactly-once delivery, compression, batching
  • Error handling — retries, delivery callbacks, DLQ pattern
  • Prefect integration — scheduled flows to produce events
  • Monitoring — delivery confirmations and logging

Your producers are production-ready. Next, orchestrate and monitor with Prefect.

What's Next

Monitor Kafka consumer lag, orchestrate event-driven workflows, and set up alerts with Prefect.

Continue to Prefect Orchestration