Kafka Connect Snowflake
On this page, you will:
- Understand Snowpipe Streaming vs Snowpipe (batch)
- Deploy Snowflake Sink Connector via Confluent Cloud UI
- Configure buffer settings (count and time thresholds)
- Test with sample events
- Monitor connector status and throughput
- Troubleshoot common issues
Overview
The Snowflake Sink Connector streams events from Kafka topics into Snowflake tables in near real-time. It uses Snowpipe Streaming for low-latency ingestion (seconds, not minutes).
You'll deploy the connector via Confluent Cloud's managed Kafka Connect service, configure buffer settings, and verify end-to-end data flow.
┌─────────────────────────────────────────────────────────────────────────┐
│ KAFKA CONNECT ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Kafka Topic Connector Snowflake │
│ ─────────── ───────── ───────── │
│ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ order- │──────────▶│ Snowflake │──────────▶│ STREAMING │ │
│ │ events │ │ Sink │ │ .ORDER_ │ │
│ │ │ │ Connector │ │ EVENTS │ │
│ │ Partition 0│ │ │ │ │ │
│ │ Partition 1│ │ Buffer: │ │ Snowpipe │ │
│ │ Partition 2│ │ 10k records│ │ Streaming │ │
│ └────────────┘ │ or 60 sec │ │ │ │
│ └────────────┘ └────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌────────────┐ ┌────────────┐ │
│ │ Credentials│ │ Sub-second │ │
│ │ from AWS │ │ latency │ │
│ │ Secrets │ │ │ │
│ └────────────┘ └────────────┘ │
│ │
│ Flow: Events → Connector buffer → Flush → Snowpipe Streaming → Table │
│ Latency: Seconds (not hours like batch) │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Snowpipe Streaming vs Snowpipe (Batch)
Snowpipe (Batch)
Traditional Snowpipe loads files from S3:
- Trigger: S3 event notification (file created)
- Latency: 1-2 minutes (file upload + processing)
- Use case: Batch ingestion from dlt, Airbyte
How it works:
dlt → S3 file → SQS notification → Snowpipe → Snowflake table
Snowpipe Streaming (Real-Time)
Snowpipe Streaming ingests data directly via API:
- Trigger: Continuous stream from Kafka Connect
- Latency: Sub-second to a few seconds
- Use case: Real-time event ingestion
How it works:
Kafka → Kafka Connect → Snowpipe Streaming API → Snowflake table
Comparison
| Feature | Snowpipe (Batch) | Snowpipe Streaming |
|---|---|---|
| Latency | 1-2 minutes | < 10 seconds |
| Trigger | File in S3 | Kafka Connect push |
| Buffer | File size | Record count or time |
| Cost | $0.06 per 1,000 files | Included in compute |
| Use case | Batch (dlt, Airbyte) | Streaming (Kafka) |
Key difference: Snowpipe Streaming bypasses file staging, reducing latency from minutes to seconds.
Deploy Snowflake Sink Connector
Via Confluent Cloud UI
- Navigate to your cluster (
streaming-cluster) - Click Connectors in the left sidebar
- Click + Add connector
- Search for Snowflake Sink
- Click Snowflake Sink tile
Configure Connector: Kafka Credentials
Step 1: Authentication
- Kafka cluster:
streaming-cluster(auto-selected) - API key: Use existing cluster API key
Configure Connector: Snowflake Credentials
Step 2: Snowflake connection
Retrieve credentials from AWS Secrets Manager:
aws secretsmanager get-secret-value \
--secret-id snowflake/svc-kafka-connector \
--region eu-west-2 \
--query SecretString \
--output text | jq -r .
Fill in the form:
- Snowflake URL:
your-account.eu-west-2.aws.snowflakecomputing.com - User name:
SVC_KAFKA_CONNECTOR - Private key: Paste full private key including headers:
-----BEGIN RSA PRIVATE KEY----- MIIJKAIBAAKCAgEA... -----END RSA PRIVATE KEY----- - Database name:
STREAMING - Schema name:
PUBLIC - Role name:
SVC_KAFKA_CONNECTOR
Private Key Format
Paste the entire private key including -----BEGIN RSA PRIVATE KEY----- and -----END RSA PRIVATE KEY----- headers. The connector needs the full PEM format.
Configure Connector: Topics
Step 3: Select topics
- Topics to consume:
order-events(you'll create this topic next) - Input Kafka record value format:
AVRO(Schema Registry) - Input Kafka record key format:
STRING
Configure Connector: Buffer Settings
Step 4: Sizing
- Buffer count records:
10000— Flush after 10,000 records - Buffer flush time:
60— Or flush after 60 seconds (whichever comes first) - Tasks:
1— Number of parallel workers (increase for higher throughput)
Buffer tuning:
| Workload | Buffer Count | Buffer Time | Rationale |
|---|---|---|---|
| Low volume (< 100/min) | 1000 | 60 sec | Flush frequently for low latency |
| Medium volume (1000/min) | 10000 | 60 sec | Balance latency and efficiency |
| High volume (10000+/min) | 50000 | 30 sec | Larger batches for throughput |
Trade-off: - Smaller buffers → Lower latency, more frequent flushes (higher cost) - Larger buffers → Higher latency, fewer flushes (better efficiency)
Configure Connector: Table Creation
Step 5: Configuration
- Snowflake metadata: Default (RECORD_METADATA, RECORD_CONTENT columns)
- Auto-create tables:
true— Connector creates tables automatically - Auto-evolve schema:
false— Require manual schema changes (safer)
Auto-Created Table Structure
The connector creates tables with:
- RECORD_METADATA (variant) — Kafka metadata (topic, partition, offset, timestamp)
- RECORD_CONTENT (variant) — Event payload (JSON/Avro data)
You'll transform these variant columns into typed columns with dbt.
Configure Connector: Review
Step 6: Connector name and review
- Connector name:
snowflake-sink-order-events - Connector class:
SnowflakeSink(auto-selected)
Review configuration and click Launch.
Create Kafka Topic
Before testing, create the order-events topic.
Via Confluent Cloud UI
- Navigate to Topics in your cluster
- Click + Add topic
- Configure:
- Topic name:
order-events - Partitions:
3(allows 3 parallel consumers) - Retention:
7 days(604800000 ms) - Cleanup policy:
delete(remove old messages)
- Topic name:
- Click Create
Via CLI
# Use your cluster
confluent kafka cluster use lkc-xyz789
# Create topic
confluent kafka topic create order-events \
--partitions 3 \
--config retention.ms=604800000
Expected output:
Created topic "order-events".
Verify Topic
confluent kafka topic describe order-events
Expected output:
Topic Name: order-events
Partitions: 3
Replication Factor: 3
Retention: 7 days
Test with Sample Events
Produce test events to verify the connector works.
Register Avro Schema
Create order_events_schema.avsc:
{
"type": "record",
"name": "OrderEvent",
"namespace": "com.company.streaming",
"fields": [
{
"name": "order_id",
"type": "long",
"doc": "Unique order identifier"
},
{
"name": "customer_id",
"type": "string",
"doc": "Customer identifier"
},
{
"name": "order_status",
"type": {
"type": "enum",
"name": "OrderStatus",
"symbols": ["CREATED", "CONFIRMED", "SHIPPED", "DELIVERED", "CANCELLED"]
},
"doc": "Current order status"
},
{
"name": "order_total",
"type": "double",
"doc": "Order total in GBP"
},
{
"name": "created_at",
"type": "long",
"logicalType": "timestamp-millis",
"doc": "Event timestamp (Unix milliseconds)"
}
]
}
Register the schema:
# Get Schema Registry credentials
aws secretsmanager get-secret-value \
--secret-id confluent/schema-registry \
--region eu-west-2 \
--query SecretString \
--output text | jq -r .
# Register schema
curl -X POST \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-u "SR_API_KEY:SR_API_SECRET" \
--data @order_events_schema.avsc \
https://psrc-abc123.eu-west-2.aws.confluent.cloud/subjects/order-events-value/versions
Expected output:
{"id": 1}
Python Producer with Avro
Create produce_order_events.py:
#!/usr/bin/env python3
"""
Produce sample order events to Kafka with Avro serialisation.
"""
import json
import time
import boto3
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField
def get_kafka_config():
"""Retrieve Kafka credentials from AWS Secrets Manager."""
session = boto3.session.Session()
client = session.client(service_name='secretsmanager', region_name='eu-west-2')
response = client.get_secret_value(SecretId='confluent/kafka-cluster')
secret = json.loads(response['SecretString'])
return {
'bootstrap.servers': secret['bootstrap_servers'],
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': secret['api_key'],
'sasl.password': secret['api_secret']
}
def get_schema_registry_config():
"""Retrieve Schema Registry credentials."""
session = boto3.session.Session()
client = session.client(service_name='secretsmanager', region_name='eu-west-2')
response = client.get_secret_value(SecretId='confluent/schema-registry')
secret = json.loads(response['SecretString'])
return {
'url': secret['url'],
'basic.auth.user.info': f"{secret['api_key']}:{secret['api_secret']}"
}
def delivery_report(err, msg):
"""Callback for delivery confirmation."""
if err:
print(f'❌ Failed: {err}')
else:
print(f'✅ Delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}')
def main():
print("🔗 Connecting to Confluent Cloud...")
# Configure producer
producer_config = get_kafka_config()
producer = Producer(producer_config)
# Configure Schema Registry
sr_config = get_schema_registry_config()
schema_registry_client = SchemaRegistryClient(sr_config)
# Load Avro schema
with open('order_events_schema.avsc', 'r') as f:
schema_str = f.read()
avro_serialiser = AvroSerializer(
schema_registry_client,
schema_str,
lambda obj, ctx: obj # Object is already a dict
)
# Produce sample events
sample_events = [
{
'order_id': 1001,
'customer_id': 'cust_abc123',
'order_status': 'CREATED',
'order_total': 99.99,
'created_at': int(time.time() * 1000)
},
{
'order_id': 1002,
'customer_id': 'cust_def456',
'order_status': 'CONFIRMED',
'order_total': 149.50,
'created_at': int(time.time() * 1000)
},
{
'order_id': 1003,
'customer_id': 'cust_ghi789',
'order_status': 'SHIPPED',
'order_total': 75.25,
'created_at': int(time.time() * 1000)
}
]
print(f"📤 Producing {len(sample_events)} events...")
for event in sample_events:
producer.produce(
topic='order-events',
key=str(event['order_id']),
value=avro_serialiser(event, SerializationContext('order-events', MessageField.VALUE)),
callback=delivery_report
)
# Wait for delivery
producer.flush(timeout=10)
print("✅ All events produced!")
if __name__ == '__main__':
main()
Install Dependencies
uv add "confluent-kafka[avro]" boto3
Run Producer
python produce_order_events.py
Expected output:
🔗 Connecting to Confluent Cloud...
📤 Producing 3 events...
✅ Delivered to order-events [0] @ offset 0
✅ Delivered to order-events [1] @ offset 0
✅ Delivered to order-events [2] @ offset 0
✅ All events produced!
Verify Data in Snowflake
Check that events arrived in Snowflake.
Query Snowflake
USE ROLE ANALYTICS_DEVELOPER;
USE DATABASE STREAMING;
USE SCHEMA PUBLIC;
-- Show tables (connector auto-creates table)
SHOW TABLES;
-- Result:
-- ORDER_EVENTS
-- Query events
SELECT * FROM ORDER_EVENTS LIMIT 10;
-- View metadata
SELECT
RECORD_METADATA:topic::STRING AS topic,
RECORD_METADATA:partition::INT AS partition,
RECORD_METADATA:offset::INT AS offset,
RECORD_METADATA:timestamp::TIMESTAMP AS kafka_timestamp,
RECORD_CONTENT:order_id::INT AS order_id,
RECORD_CONTENT:customer_id::STRING AS customer_id,
RECORD_CONTENT:order_status::STRING AS order_status,
RECORD_CONTENT:order_total::FLOAT AS order_total
FROM ORDER_EVENTS
ORDER BY RECORD_METADATA:timestamp DESC;
Expected result:
topic | partition | offset | kafka_timestamp | order_id | customer_id | order_status | order_total
-------------+-----------+--------+-------------------------+----------+--------------+--------------+-------------
order-events | 0 | 0 | 2026-03-05 12:30:15.123 | 1001 | cust_abc123 | CREATED | 99.99
order-events | 1 | 0 | 2026-03-05 12:30:15.124 | 1002 | cust_def456 | CONFIRMED | 149.50
order-events | 2 | 0 | 2026-03-05 12:30:15.125 | 1003 | cust_ghi789 | SHIPPED | 75.25
Streaming Verified!
Events are flowing from Kafka → Connector → Snowflake in seconds!
Monitor Connector Status
Via Confluent Cloud UI
- Navigate to Connectors in your cluster
- Click
snowflake-sink-order-events - View:
- Status: Running / Failed / Paused
- Throughput: Records/sec, Bytes/sec
- Lag: Consumer lag per partition
- Errors: Recent failures
Via CLI
# List connectors
confluent connect cluster list
# Describe connector
confluent connect cluster describe <connector-id>
# View connector logs
confluent connect cluster logs <connector-id>
Key Metrics
| Metric | What it Means | Good Value |
|---|---|---|
| Status | Connector health | Running |
| Tasks running | Active workers | 1+ (matches config) |
| Records processed | Total events ingested | Increasing |
| Consumer lag | Events waiting | < 1000 (low lag) |
| Error rate | Failed deliveries | 0% |
High Consumer Lag
If lag grows continuously:
- Increase tasks (parallel workers)
- Increase buffer.count.records (larger batches)
- Check Snowflake warehouse size (may be slow)
Troubleshooting Common Issues
Error: Authentication failed
Cause: Invalid Snowflake credentials (user, private key, role)
Fix: 1. Verify credentials in AWS Secrets Manager 2. Test authentication with Python script (page 4) 3. Re-enter private key in connector config (full PEM format)
Error: Insufficient privileges
Cause: Service account lacks write permission to STREAMING database
Fix:
-- Check grants
SHOW GRANTS TO ROLE SVC_KAFKA_CONNECTOR;
-- Should include STREAMING_DB_WRITER
-- If missing, re-run Terraform apply
Error: Schema Registry authentication failed
Cause: Schema Registry API key invalid or missing
Fix: 1. Verify Schema Registry credentials in AWS Secrets Manager 2. Re-generate Schema Registry API key in Confluent Cloud 3. Update connector configuration
Warning: Consumer lag increasing
Cause: Connector can't keep up with event production rate
Fix:
1. Increase tasks: Scale connector workers (2-4 tasks)
2. Increase buffer size: buffer.count.records = 50000
3. Increase Snowflake warehouse: Scale up INGEST_WH (XSMALL → SMALL)
Error: Table not found
Cause: Connector failed to auto-create table
Fix:
1. Ensure auto.create.tables = true in connector config
2. Manually create table:
CREATE TABLE STREAMING.PUBLIC.ORDER_EVENTS (
RECORD_METADATA VARIANT,
RECORD_CONTENT VARIANT
);
Data arriving but malformed
Cause: Schema mismatch between producer and Schema Registry
Fix: 1. Verify schema in Schema Registry matches producer schema 2. Check compatibility mode (BACKWARD, FORWARD, FULL) 3. Re-register schema with correct version
Summary
You've deployed the Snowflake Kafka Connector:
- Snowpipe Streaming — Sub-second latency from Kafka to Snowflake
- Connector deployed — via Confluent Cloud managed Kafka Connect
- Credentials configured — retrieved from AWS Secrets Manager
- Buffer settings tuned — 10,000 records or 60 seconds
- Topic created —
order-eventswith 3 partitions - Schema registered — Avro schema in Schema Registry
- Test events produced — Python producer with Avro serialisation
- Data verified — events visible in Snowflake within seconds
- Monitoring configured — connector status and throughput metrics
Your streaming pipeline is operational. Next, build a production-grade Python producer integrated with Prefect.
What's Next
Create Python producers with Avro schemas and integrate with Prefect tasks.
Continue to Producing Events →