Prefect Orchestration
On this page, you will:
- Monitor Kafka consumer lag with Prefect tasks
- Create flows to produce events on schedules
- Build event-driven workflows triggered by Kafka
- Set up alerting on connector failures
- Track throughput and latency metrics
- Integrate streaming with batch pipelines
Overview
Prefect orchestrates your streaming infrastructure by:
- Monitoring — Track consumer lag, connector health, throughput
- Producing — Schedule event generation flows
- Event-driven workflows — Trigger dbt runs when events arrive
- Alerting — Notify on failures or anomalies
- Integration — Combine streaming and batch pipelines
┌─────────────────────────────────────────────────────────────────────────┐
│ PREFECT STREAMING ORCHESTRATION │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Prefect Flow: Monitor Streaming Infrastructure │ │
│ ├──────────────────────────────────────────────────────────┤ │
│ │ │ │
│ │ Task 1: Check Kafka consumer lag │ │
│ │ ├─▶ Query Confluent Cloud API │ │
│ │ └─▶ Alert if lag > 10,000 events │ │
│ │ │ │
│ │ Task 2: Check connector status │ │
│ │ ├─▶ Query Kafka Connect API │ │
│ │ └─▶ Alert if status != RUNNING │ │
│ │ │ │
│ │ Task 3: Track throughput │ │
│ │ ├─▶ Count events in last 5 minutes (Snowflake) │ │
│ │ └─▶ Alert if throughput drops > 50% │ │
│ │ │ │
│ │ Schedule: Every 5 minutes │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Prefect Flow: Event-Driven dbt Transformation │ │
│ ├──────────────────────────────────────────────────────────┤ │
│ │ │ │
│ │ Task 1: Check for new events (Snowflake) │ │
│ │ ├─▶ Query STREAMING.ORDER_EVENTS │ │
│ │ └─▶ Count events in last 5 minutes │ │
│ │ │ │
│ │ Task 2: Run dbt incremental models (if events > 0) │ │
│ │ ├─▶ dbt run --select tag:streaming │ │
│ │ └─▶ Transform events into ANALYTICS.fct_orders │ │
│ │ │ │
│ │ Schedule: Every 5 minutes │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ Pattern: Monitor → Alert → Transform → Monitor │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Monitor Consumer Lag
Consumer lag measures how far behind a consumer is from the latest message in a topic. High lag indicates the connector can't keep up with event production.
Consumer Lag Monitoring Task
Create tasks/monitor_kafka_lag.py:
"""
Monitor Kafka consumer lag via Confluent Cloud API.
"""
import requests
import json
import logging
import boto3
from prefect import task
logger = logging.getLogger(__name__)
@task(retries=2, retry_delay_seconds=10)
def get_consumer_lag(
consumer_group: str,
alert_threshold: int = 10000
) -> dict:
"""
Check Kafka consumer lag via Confluent Cloud API.
Args:
consumer_group: Consumer group name (e.g., 'snowflake-sink')
alert_threshold: Alert if lag exceeds this value
Returns:
Dict with lag metrics per partition
"""
logger.info(f"Checking consumer lag for group: {consumer_group}")
# Get credentials from Secrets Manager
secrets_client = boto3.client('secretsmanager', region_name='eu-west-2')
response = secrets_client.get_secret_value(SecretId='confluent/kafka-cluster')
creds = json.loads(response['SecretString'])
# Confluent Cloud API endpoint
# Note: This uses Kafka's consumer group describe API
# For production, use Confluent Cloud Metrics API (requires separate API key)
# Simplified example - query Snowflake for lag proxy
lag_metrics = _query_snowflake_lag()
# Check threshold
total_lag = sum(lag_metrics.values())
if total_lag > alert_threshold:
logger.warning(f"⚠️ High consumer lag detected: {total_lag} messages")
return {
'status': 'WARNING',
'total_lag': total_lag,
'lag_per_partition': lag_metrics,
'alert': True
}
logger.info(f"✅ Consumer lag OK: {total_lag} messages")
return {
'status': 'OK',
'total_lag': total_lag,
'lag_per_partition': lag_metrics,
'alert': False
}
def _query_snowflake_lag() -> dict:
"""
Query Snowflake to estimate lag based on event timestamps.
Returns:
Dict of partition -> lag estimate
"""
import snowflake.connector
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
# Get Snowflake credentials
secrets_client = boto3.client('secretsmanager', region_name='eu-west-2')
response = secrets_client.get_secret_value(SecretId='snowflake/svc-kafka-connector')
creds = json.loads(response['SecretString'])
# Convert private key
private_key = serialization.load_pem_private_key(
creds['private_key'].encode('utf-8'),
password=None,
backend=default_backend()
)
private_key_bytes = private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
# Connect to Snowflake
conn = snowflake.connector.connect(
account=creds['account'],
user=creds['user'],
private_key=private_key_bytes,
warehouse=creds['warehouse'],
database=creds['database'],
schema=creds['schema'],
role=creds['role']
)
# Query lag (time since last event per partition)
query = """
SELECT
RECORD_METADATA:partition::INT AS partition,
DATEDIFF(second, MAX(RECORD_METADATA:timestamp::TIMESTAMP), CURRENT_TIMESTAMP()) AS lag_seconds,
COUNT(*) AS event_count
FROM ORDER_EVENTS
WHERE RECORD_METADATA:timestamp >= DATEADD(minute, -15, CURRENT_TIMESTAMP())
GROUP BY partition
ORDER BY partition
"""
cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()
lag_metrics = {}
for row in results:
partition = row[0]
lag_seconds = row[1]
# Estimate lag in messages (rough approximation)
lag_metrics[partition] = lag_seconds * 10 # Assume 10 msgs/sec
cursor.close()
conn.close()
return lag_metrics
Consumer Lag Flow
Create flows/monitor_streaming_flow.py:
"""
Prefect flow to monitor streaming infrastructure.
"""
from prefect import flow, task
from tasks.monitor_kafka_lag import get_consumer_lag
import logging
logger = logging.getLogger(__name__)
@task
def check_connector_status() -> dict:
"""
Check Snowflake Kafka Connector status via Confluent Cloud API.
Returns:
Dict with connector status
"""
# Simplified example - in production, query Confluent Cloud API
# https://docs.confluent.io/cloud/current/api.html
logger.info("Checking connector status...")
# Mock status check
status = "RUNNING"
if status != "RUNNING":
logger.error(f"❌ Connector is {status}")
return {'status': status, 'alert': True}
logger.info(f"✅ Connector is {status}")
return {'status': status, 'alert': False}
@task
def check_throughput() -> dict:
"""
Check event throughput in last 5 minutes.
Returns:
Dict with throughput metrics
"""
import snowflake.connector
import boto3
import json
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
logger.info("Checking event throughput...")
# Get credentials
secrets_client = boto3.client('secretsmanager', region_name='eu-west-2')
response = secrets_client.get_secret_value(SecretId='snowflake/svc-kafka-connector')
creds = json.loads(response['SecretString'])
# Convert private key
private_key = serialization.load_pem_private_key(
creds['private_key'].encode('utf-8'),
password=None,
backend=default_backend()
)
private_key_bytes = private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
# Connect
conn = snowflake.connector.connect(
account=creds['account'],
user=creds['user'],
private_key=private_key_bytes,
warehouse=creds['warehouse'],
database=creds['database'],
schema=creds['schema'],
role=creds['role']
)
# Query throughput
query = """
SELECT COUNT(*) AS event_count
FROM ORDER_EVENTS
WHERE RECORD_METADATA:timestamp >= DATEADD(minute, -5, CURRENT_TIMESTAMP())
"""
cursor = conn.cursor()
cursor.execute(query)
result = cursor.fetchone()
event_count = result[0]
cursor.close()
conn.close()
events_per_min = event_count / 5
logger.info(f"✅ Throughput: {events_per_min:.1f} events/min")
return {
'event_count': event_count,
'events_per_min': events_per_min,
'alert': events_per_min < 1 # Alert if < 1 event/min
}
@flow(name="monitor-streaming", log_prints=True)
def monitor_streaming_flow():
"""
Monitor streaming infrastructure health.
Checks:
- Consumer lag
- Connector status
- Event throughput
"""
print("🔍 Monitoring streaming infrastructure...")
# Check lag
lag_metrics = get_consumer_lag(
consumer_group='snowflake-sink',
alert_threshold=10000
)
# Check connector
connector_status = check_connector_status()
# Check throughput
throughput_metrics = check_throughput()
# Aggregate alerts
alerts = []
if lag_metrics['alert']:
alerts.append(f"High consumer lag: {lag_metrics['total_lag']} messages")
if connector_status['alert']:
alerts.append(f"Connector not running: {connector_status['status']}")
if throughput_metrics['alert']:
alerts.append(f"Low throughput: {throughput_metrics['events_per_min']:.1f} events/min")
if alerts:
print("⚠️ ALERTS:")
for alert in alerts:
print(f" - {alert}")
# Send to Slack/PagerDuty (covered in alerting section)
else:
print("✅ All checks passed!")
return {
'lag': lag_metrics,
'connector': connector_status,
'throughput': throughput_metrics,
'alerts': alerts
}
if __name__ == '__main__':
monitor_streaming_flow()
Deploy Monitoring Flow
# Deploy with 5-minute schedule
prefect deploy flows/monitor_streaming_flow.py:monitor_streaming_flow \
--name "Streaming Health Monitor" \
--pool "default-agent-pool" \
--cron "*/5 * * * *"
Event-Driven Workflows
Trigger dbt transformations when new events arrive in Snowflake.
Event-Driven dbt Flow
Create flows/streaming_dbt_flow.py:
"""
Event-driven dbt transformation for streaming data.
"""
from prefect import flow, task
import logging
import subprocess
logger = logging.getLogger(__name__)
@task
def count_new_events(since_minutes: int = 5) -> int:
"""
Count new events in STREAMING.ORDER_EVENTS since N minutes ago.
Args:
since_minutes: Look back period in minutes
Returns:
Number of new events
"""
import snowflake.connector
import boto3
import json
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
logger.info(f"Checking for new events in last {since_minutes} minutes...")
# Get credentials
secrets_client = boto3.client('secretsmanager', region_name='eu-west-2')
response = secrets_client.get_secret_value(SecretId='snowflake/svc-kafka-connector')
creds = json.loads(response['SecretString'])
# Convert private key
private_key = serialization.load_pem_private_key(
creds['private_key'].encode('utf-8'),
password=None,
backend=default_backend()
)
private_key_bytes = private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
# Connect
conn = snowflake.connector.connect(
account=creds['account'],
user=creds['user'],
private_key=private_key_bytes,
warehouse=creds['warehouse'],
database=creds['database'],
schema=creds['schema'],
role=creds['role']
)
# Count new events
query = f"""
SELECT COUNT(*) AS event_count
FROM ORDER_EVENTS
WHERE RECORD_METADATA:timestamp >= DATEADD(minute, -{since_minutes}, CURRENT_TIMESTAMP())
"""
cursor = conn.cursor()
cursor.execute(query)
result = cursor.fetchone()
event_count = result[0]
cursor.close()
conn.close()
logger.info(f"Found {event_count} new events")
return event_count
@task
def run_dbt_streaming_models() -> dict:
"""
Run dbt models tagged with 'streaming'.
Returns:
Dict with run results
"""
logger.info("Running dbt streaming models...")
# Run dbt with streaming tag
result = subprocess.run(
['dbt', 'run', '--select', 'tag:streaming'],
cwd='/path/to/dbt/project',
capture_output=True,
text=True
)
if result.returncode != 0:
logger.error(f"dbt run failed: {result.stderr}")
raise Exception(f"dbt run failed: {result.stderr}")
logger.info("✅ dbt streaming models completed")
return {
'status': 'success',
'stdout': result.stdout
}
@flow(name="streaming-dbt-transform", log_prints=True)
def streaming_dbt_flow(
since_minutes: int = 5,
min_events_threshold: int = 1
):
"""
Event-driven dbt transformation flow.
Runs dbt incremental models only if new events exist.
Args:
since_minutes: Look back period
min_events_threshold: Minimum events to trigger dbt run
"""
print(f"🔍 Checking for new events in last {since_minutes} minutes...")
# Count new events
event_count = count_new_events(since_minutes=since_minutes)
# Run dbt only if events exist
if event_count >= min_events_threshold:
print(f"✅ Found {event_count} events, running dbt transformations...")
dbt_result = run_dbt_streaming_models()
print("✅ Transformation complete!")
return {'events': event_count, 'dbt_status': 'ran'}
else:
print(f"⏭️ Only {event_count} events, skipping dbt run")
return {'events': event_count, 'dbt_status': 'skipped'}
if __name__ == '__main__':
streaming_dbt_flow()
Deploy Event-Driven Flow
# Deploy with 5-minute schedule
prefect deploy flows/streaming_dbt_flow.py:streaming_dbt_flow \
--name "Streaming dbt Transformation" \
--pool "default-agent-pool" \
--cron "*/5 * * * *"
Alerting on Failures
Send alerts to Slack when streaming infrastructure fails.
Slack Alerting Task
Create tasks/send_slack_alert.py:
"""
Send alerts to Slack.
"""
import requests
import logging
from prefect import task
logger = logging.getLogger(__name__)
@task
def send_slack_alert(message: str, channel: str = "#data-alerts") -> None:
"""
Send alert to Slack.
Args:
message: Alert message
channel: Slack channel
"""
# Get Slack webhook URL from environment or Secrets Manager
import os
webhook_url = os.getenv('SLACK_WEBHOOK_URL')
if not webhook_url:
logger.warning("SLACK_WEBHOOK_URL not configured, skipping alert")
return
payload = {
'channel': channel,
'username': 'Prefect Alert Bot',
'icon_emoji': ':rotating_light:',
'text': f"⚠️ *Streaming Alert*\n{message}"
}
response = requests.post(webhook_url, json=payload)
if response.status_code == 200:
logger.info(f"✅ Alert sent to {channel}")
else:
logger.error(f"❌ Failed to send alert: {response.text}")
Update Monitoring Flow with Alerts
Update monitor_streaming_flow:
from tasks.send_slack_alert import send_slack_alert
@flow(name="monitor-streaming", log_prints=True)
def monitor_streaming_flow():
"""Monitor streaming infrastructure and alert on failures."""
print("🔍 Monitoring streaming infrastructure...")
# ... (previous monitoring tasks)
# Send alerts if needed
if alerts:
alert_message = "\n".join([f"• {alert}" for alert in alerts])
send_slack_alert(alert_message, channel="#data-alerts")
return {
'lag': lag_metrics,
'connector': connector_status,
'throughput': throughput_metrics,
'alerts': alerts
}
Track Throughput Metrics
Create a dashboard to visualise streaming metrics over time.
Metrics Collection Task
Create tasks/collect_streaming_metrics.py:
"""
Collect streaming metrics and store in Snowflake for dashboards.
"""
from prefect import task
import logging
import time
logger = logging.getLogger(__name__)
@task
def collect_metrics(
lag_metrics: dict,
throughput_metrics: dict
) -> None:
"""
Store streaming metrics in Snowflake for visualisation.
Args:
lag_metrics: Consumer lag metrics
throughput_metrics: Event throughput metrics
"""
import snowflake.connector
import boto3
import json
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
logger.info("Storing streaming metrics...")
# Get credentials
secrets_client = boto3.client('secretsmanager', region_name='eu-west-2')
response = secrets_client.get_secret_value(SecretId='snowflake/svc-kafka-connector')
creds = json.loads(response['SecretString'])
# Convert private key
private_key = serialization.load_pem_private_key(
creds['private_key'].encode('utf-8'),
password=None,
backend=default_backend()
)
private_key_bytes = private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
# Connect
conn = snowflake.connector.connect(
account=creds['account'],
user=creds['user'],
private_key=private_key_bytes,
warehouse=creds['warehouse'],
database=creds['database'],
schema=creds['schema'],
role=creds['role']
)
# Create metrics table if not exists
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS STREAMING_METRICS (
timestamp TIMESTAMP_NTZ,
metric_type STRING,
metric_value FLOAT,
metadata VARIANT
)
""")
# Insert metrics
cursor.execute("""
INSERT INTO STREAMING_METRICS
SELECT
CURRENT_TIMESTAMP() AS timestamp,
'consumer_lag' AS metric_type,
%s AS metric_value,
PARSE_JSON(%s) AS metadata
""", (lag_metrics['total_lag'], json.dumps(lag_metrics)))
cursor.execute("""
INSERT INTO STREAMING_METRICS
SELECT
CURRENT_TIMESTAMP() AS timestamp,
'throughput' AS metric_type,
%s AS metric_value,
PARSE_JSON(%s) AS metadata
""", (throughput_metrics['events_per_min'], json.dumps(throughput_metrics)))
cursor.close()
conn.close()
logger.info("✅ Metrics stored successfully")
Summary
You've orchestrated streaming infrastructure with Prefect:
- Consumer lag monitoring — query Kafka API or Snowflake for lag metrics
- Connector health checks — verify connector status via Confluent API
- Throughput tracking — count events per minute, alert on drops
- Event-driven workflows — trigger dbt runs only when new events arrive
- Slack alerting — send notifications on failures or anomalies
- Metrics collection — store metrics in Snowflake for dashboards
- Scheduled flows — run monitoring every 5 minutes automatically
Your streaming pipeline is monitored and orchestrated. Next, verify end-to-end latency and review costs.
What's Next
Verify end-to-end latency, review costs, and learn when to use streaming vs batch.
Continue to Finishing Up →