Confluent Cloud Setup
On this page, you will:
- Create a Confluent Cloud account
- Set up environment and Basic cluster
- Configure Schema Registry
- Generate API keys with least privilege
- Store credentials in AWS Secrets Manager
- Test connectivity with Python
- Implement security best practices
Overview
Confluent Cloud provides a fully managed Kafka service. In this guide, you'll create your account, provision a Basic tier cluster, and configure Schema Registry for Avro schemas.
You'll generate API keys with minimal permissions, store them securely in AWS Secrets Manager, and verify connectivity with a Python producer.
┌─────────────────────────────────────────────────────────────────────────┐
│ CONFLUENT CLOUD SETUP FLOW │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. Create Account │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Sign up → Verify email → Add payment method │ │
│ └────────────────────────────────────────────────────┘ │
│ │ │
│ 2. Create Environment ▼ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Environment: production │ │
│ │ Region: eu-west-2 (London) │ │
│ └────────────────────────────────────────────────────┘ │
│ │ │
│ 3. Create Cluster ▼ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Cluster: streaming-cluster │ │
│ │ Type: Basic (1 CKU) │ │
│ │ Cloud: AWS, Region: eu-west-2 │ │
│ └────────────────────────────────────────────────────┘ │
│ │ │
│ 4. Enable Schema Reg ▼ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Schema Registry: eu-west-2 │ │
│ │ Essentials package (included in Basic) │ │
│ └────────────────────────────────────────────────────┘ │
│ │ │
│ 5. Create API Keys ▼ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ Cluster API Key (producer/consumer) │ │
│ │ Schema Registry API Key (schema access) │ │
│ └────────────────────────────────────────────────────┘ │
│ │ │
│ 6. Store in Secrets ▼ │
│ ┌────────────────────────────────────────────────────┐ │
│ │ AWS Secrets Manager: │ │
│ │ confluent/kafka-cluster │ │
│ │ confluent/schema-registry │ │
│ └────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Create Confluent Cloud Account
Sign Up
- Navigate to confluent.cloud
- Click Start free
- Enter your email and create a password
- Verify your email address
Add Payment Method
Confluent Cloud requires a payment method even for the free trial.
- Go to Billing & payment
- Add a credit/debit card
- Review pricing (Basic tier starts at ~$150/month after trial)
Free Trial Credits
New accounts receive $400 in free credits valid for 30 days. This is enough to run a Basic cluster for the trial period.
Set Up Billing Alerts
Navigate to Billing → Notifications and configure: - Alert at 50% of monthly budget ($75 if budgeting $150/month) - Alert at 80% of monthly budget ($120) - Email notifications to your team
Create Environment
Environments organise clusters by purpose (dev, staging, production).
Via Web UI
- Navigate to Environments in the left sidebar
- Click + Add cloud environment
- Configure:
- Name:
production - Stream Governance Package: Essentials (included with Basic)
- Name:
- Click Continue
Via CLI (Optional)
Install Confluent CLI:
# macOS
brew install confluentinc/tap/cli
# Verify installation
confluent version
Expected output:
confluent - Confluent CLI
Version: v3.45.0
Git Ref: e3c9d32
Build Date: 2026-02-15T10:23:45Z
Go Version: go1.21.5 (darwin/arm64)
Create environment:
# Login
confluent login
# Create environment
confluent environment create production \
--governance-package essentials
# List environments
confluent environment list
Expected output:
Id | Name | Stream Governance Package
--------------+-------------+--------------------------
env-abc123 | production | ESSENTIALS
Create Kafka Cluster
Via Web UI
- Navigate to your
productionenvironment - Click + Add cluster
- Select Basic cluster type
- Configure cluster:
- Cloud provider: AWS
- Region: eu-west-2 (London) - choose region closest to your Snowflake
- Cluster name:
streaming-cluster
- Review pricing (~$150/month)
- Click Launch cluster
Wait for cluster provisioning to complete.
Region Selection
Choose the same region as your Snowflake account to minimise latency and data transfer costs. If your Snowflake is in eu-west-2, deploy Confluent in eu-west-2.
Via CLI
# Use the production environment
confluent environment use env-abc123
# Create Basic cluster
confluent kafka cluster create streaming-cluster \
--cloud aws \
--region eu-west-2 \
--type basic
# Wait for cluster to be ready
confluent kafka cluster list
Expected output:
Id | Name | Type | Cloud | Region | Status
--------------+-------------------+--------+-------+-----------+---------
lkc-xyz789 | streaming-cluster | BASIC | aws | eu-west-2 | UP
Cluster Configuration
Once provisioned, review your cluster settings:
- Navigate to Cluster overview
- Note the Bootstrap server endpoint (e.g.,
pkc-abc123.eu-west-2.aws.confluent.cloud:9092) - Verify Availability: Single-zone (Basic tier)
Configure Schema Registry
Schema Registry is automatically provisioned with the Essentials package.
Verify Schema Registry
- In your environment, click Schema Registry tab
- Verify Status: Enabled
- Note the API endpoint (e.g.,
https://psrc-abc123.eu-west-2.aws.confluent.cloud)
Schema Registry Configuration
Schema Registry settings:
- Compatibility mode:
BACKWARD(default) - new schemas can read old data - Schema validation: Enabled
- Schema ID strategy:
TopicNameStrategy(schema per topic)
You can change compatibility mode later:
# Set compatibility for a specific subject
confluent schema-registry compatibility set BACKWARD \
--subject order-events-value
Generate API Keys
Create separate API keys for cluster access and Schema Registry access.
Cluster API Key (Kafka)
This key authenticates producers and consumers.
Via Web UI:
- Navigate to your cluster → API keys tab
- Click + Add key
- Select Global access (or scope to specific service account later)
- Click Generate API key
- Download the key immediately - you cannot retrieve it later
- Store securely (we'll add to AWS Secrets Manager next)
Via CLI:
# Use your cluster
confluent kafka cluster use lkc-xyz789
# Create API key
confluent api-key create --resource lkc-xyz789 \
--description "Producer/consumer key for streaming-cluster"
Expected output:
API Key | abc123xyz789
API Secret | VeryLongSecretStringHere1234567890ABCDEF
Save Immediately
The API secret is only shown once. Copy it to a secure location immediately.
Schema Registry API Key
This key authenticates schema operations (register, retrieve schemas).
Via Web UI:
- Navigate to Schema Registry → API credentials
- Click + Add key
- Download the key
Via CLI:
# Create Schema Registry API key
confluent api-key create --resource lsrc-abc123 \
--description "Schema Registry access"
Expected output:
API Key | SR123ABCXYZ
API Secret | AnotherVeryLongSecretString7890
Store Credentials in AWS Secrets Manager
Store Confluent credentials securely in AWS Secrets Manager for use in Prefect tasks and Snowflake connectors.
Kafka Cluster Credentials
Create secret for Kafka cluster API key:
# Set AWS profile
export AWS_PROFILE=data-engineer
# Create secret
aws secretsmanager create-secret \
--name confluent/kafka-cluster \
--description "Confluent Cloud Kafka cluster credentials" \
--secret-string '{
"bootstrap_servers": "pkc-abc123.eu-west-2.aws.confluent.cloud:9092",
"api_key": "abc123xyz789",
"api_secret": "VeryLongSecretStringHere1234567890ABCDEF",
"cluster_id": "lkc-xyz789"
}' \
--region eu-west-2
Expected output:
{
"ARN": "arn:aws:secretsmanager:eu-west-2:123456789012:secret:confluent/kafka-cluster-abc123",
"Name": "confluent/kafka-cluster",
"VersionId": "e1f2a3b4-5c6d-7e8f-9a0b-1c2d3e4f5a6b"
}
Schema Registry Credentials
Create secret for Schema Registry API key:
aws secretsmanager create-secret \
--name confluent/schema-registry \
--description "Confluent Schema Registry credentials" \
--secret-string '{
"url": "https://psrc-abc123.eu-west-2.aws.confluent.cloud",
"api_key": "SR123ABCXYZ",
"api_secret": "AnotherVeryLongSecretString7890"
}' \
--region eu-west-2
Retrieve Credentials (Test)
Verify you can retrieve secrets:
# Get Kafka credentials
aws secretsmanager get-secret-value \
--secret-id confluent/kafka-cluster \
--region eu-west-2 \
--query SecretString \
--output text | jq .
Expected output:
{
"bootstrap_servers": "pkc-abc123.eu-west-2.aws.confluent.cloud:9092",
"api_key": "abc123xyz789",
"api_secret": "VeryLongSecretStringHere1234567890ABCDEF",
"cluster_id": "lkc-xyz789"
}
Test Connectivity with Python
Verify your Kafka cluster is accessible using a Python producer.
Install Dependencies
# Create virtual environment
uv init
uv add confluent-kafka boto3
Python Producer Test
Create test_producer.py:
#!/usr/bin/env python3
"""
Test Confluent Cloud connectivity by producing a message.
"""
import json
import boto3
from confluent_kafka import Producer
def get_kafka_config():
"""Retrieve Kafka credentials from AWS Secrets Manager."""
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name='eu-west-2'
)
response = client.get_secret_value(SecretId='confluent/kafka-cluster')
secret = json.loads(response['SecretString'])
return {
'bootstrap.servers': secret['bootstrap_servers'],
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': secret['api_key'],
'sasl.password': secret['api_secret']
}
def delivery_report(err, msg):
"""Callback for message delivery confirmation."""
if err is not None:
print(f'❌ Message delivery failed: {err}')
else:
print(f'✅ Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
def main():
print("🔗 Connecting to Confluent Cloud...")
# Get config from Secrets Manager
config = get_kafka_config()
# Create producer
producer = Producer(config)
# Produce test message
topic = 'test-connectivity'
message = {
'test_id': 1,
'message': 'Hello from Python!',
'timestamp': '2026-03-05T12:00:00Z'
}
print(f"📤 Producing message to topic: {topic}")
producer.produce(
topic,
key=str(message['test_id']).encode('utf-8'),
value=json.dumps(message).encode('utf-8'),
callback=delivery_report
)
# Wait for message delivery
producer.flush(timeout=10)
print("✅ Test complete!")
if __name__ == '__main__':
main()
Run Test
python test_producer.py
Expected output:
🔗 Connecting to Confluent Cloud...
📤 Producing message to topic: test-connectivity
✅ Message delivered to test-connectivity [0] at offset 0
✅ Test complete!
Connectivity Verified
If you see the message delivered successfully, your Confluent Cloud cluster is ready for production use!
Troubleshooting
Error: Failed to connect to broker
Check: - Bootstrap servers URL is correct - API key and secret are valid - Network allows HTTPS traffic to Confluent Cloud (port 9092)
Error: Broker: Topic authorization failed
Your API key doesn't have permission to create topics. Either: - Enable auto-create topics in cluster settings - Pre-create the topic via UI or CLI
Error: Authentication failed
Your API credentials are incorrect. Regenerate API key and update AWS Secrets Manager.
Security Best Practices
Principle of Least Privilege
Don't use global API keys for production. Create service accounts with scoped permissions.
Example: Service account for producers only
# Create service account
confluent iam service-account create producer-service-account \
--description "Service account for event producers"
# Grant write access to specific topic
confluent kafka acl create \
--allow \
--service-account sa-123456 \
--operation WRITE \
--topic order-events
# Generate API key for this service account
confluent api-key create \
--service-account sa-123456 \
--resource lkc-xyz789
Now this API key can only write to the order-events topic.
Rotate API Keys Regularly
Set a reminder to rotate API keys every 90 days:
- Generate new API key
- Update AWS Secrets Manager
- Deploy updated credentials to Prefect
- Delete old API key
Audit API Key Usage
Monitor which API keys are being used:
# List all API keys
confluent api-key list
Delete unused keys:
confluent api-key delete <key-id>
Network Security
Basic tier clusters are publicly accessible. For production:
- Use VPC peering (Dedicated tier) to restrict access to your VPC
- Use PrivateLink (Dedicated tier) for private connectivity
- Allowlist IPs in Confluent Cloud network settings
Basic tier workaround: - Use AWS Secrets Manager to avoid exposing credentials - Restrict AWS IAM permissions to Secrets Manager
Enable Audit Logging (Dedicated Tier Only)
Audit logs track all cluster operations (topic creation, ACL changes, etc.). Available on Dedicated tier only.
Summary
You've set up Confluent Cloud:
- Account created — with billing alerts configured
- Environment created —
productionwith Essentials package - Cluster deployed — Basic tier in eu-west-2
- Schema Registry enabled — for Avro schema management
- API keys generated — cluster and Schema Registry credentials
- Credentials stored — in AWS Secrets Manager for secure access
- Connectivity tested — Python producer verified
- Security practices — least privilege, rotation, audit
Your Kafka cluster is ready to receive events. Next, prepare Snowflake to consume those events.
What's Next
Configure Snowflake infrastructure for streaming ingestion (STREAMING database, service account).
Continue to Snowflake Infrastructure →