Alerting and Notifications
On this page, you will:
- Set up Slack webhook notifications
- Configure PagerDuty integration (optional)
- Create Prefect automations for failure alerts
- Understand alerting best practices
Overview
Alerting ensures you know when pipelines fail so you can respond quickly. Prefect provides two mechanisms for notifications:
- Flow-level hooks: Code-based notifications attached to specific flows
- Automations: Platform-level rules that trigger actions based on events
For production systems, automations are recommended because they're centralised and don't require code changes.
┌─────────────────────────────────────────────────────────────────────────────┐
│ ALERTING ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Prefect Cloud │ │
│ │ │ │
│ │ Flow Run Failed ──────▶ Automation Trigger ──────▶ Actions │ │
│ │ │ │
│ │ ┌─────────────────┐ │ │
│ │ │ Slack Webhook │ │ │
│ │ └────────┬────────┘ │ │
│ │ │ │ │
│ │ ┌────────▼────────┐ │ │
│ │ │ PagerDuty │ │ │
│ │ │ (optional) │ │ │
│ │ └─────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Prerequisites
- Prefect Setup - Prefect Cloud or self-hosted configured
- Slack workspace with permission to add apps
- (Optional) PagerDuty account
Slack Webhook Setup
Create a Slack App
- Go to api.slack.com/apps
- Click Create New App → From scratch
- Name it (e.g., "Prefect Alerts") and select your workspace
- Click Create App
Enable Incoming Webhooks
- In the left sidebar, click Incoming Webhooks
- Toggle Activate Incoming Webhooks to On
- Click Add New Webhook to Workspace
- Select the channel for alerts (e.g.,
#data-alerts) - Click Allow
Copy the webhook URL - you'll need it for Prefect. Make sure to store it in your password manager.
Create Slack Block in Prefect
Create a Prefect Block to store the webhook URL:
Add to terraform/prefect/blocks.tf:
# =============================================================================
# Notification Blocks
# =============================================================================
resource "prefect_block" "slack_alerts" {
name = "alerts"
type_slug = "slack-webhook"
data = jsonencode({
url = var.slack_webhook_url
})
}
Add to terraform/prefect/variables.tf:
variable "slack_webhook_url" {
description = "Slack webhook URL for pipeline failure alerts"
type = string
sensitive = true
}
Store the webhook URL in AWS Secrets Manager:
aws secretsmanager create-secret \
--name "prefect/slack-webhook-url" \
--description "Slack webhook URL for Prefect pipeline alerts" \
--secret-string "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX" \
--profile admin
Avoid Shell History
To keep the URL out of your shell history, pipe it from 1Password:
aws secretsmanager create-secret \
--name "prefect/slack-webhook-url" \
--description "Slack webhook URL for Prefect pipeline alerts" \
--secret-string "$(op item get 'Slack Webhook - Prefect Alerts' --fields credential)" \
--profile admin
Update your GitHub Actions workflows to retrieve the secret. In .github/workflows/terraform_ci.yml and .github/workflows/terraform_apply.yml, add to the secret-ids in the Prefect plan/apply jobs:
- name: Get secrets from AWS Secrets Manager
uses: aws-actions/aws-secretsmanager-get-secrets@v2
with:
secret-ids: |
TF_VAR_SLACK_WEBHOOK_URL, prefect/slack-webhook-url
parse-json-secrets: false
This sets TF_VAR_SLACK_WEBHOOK_URL as an environment variable, which Terraform automatically uses for the slack_webhook_url variable.
prefect block register -m prefect.blocks.notifications
prefect block create slack-webhook/alerts \
--url "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX"
from prefect.blocks.notifications import SlackWebhook
slack = SlackWebhook(url="https://hooks.slack.com/services/...")
slack.save("alerts")
Test the Webhook
from prefect.blocks.notifications import SlackWebhook
slack = SlackWebhook.load("alerts")
slack.notify("Test notification from Prefect!")
You should see the message in your Slack channel.
Prefect Automations
Automations are event-driven rules configured in Prefect Cloud. They're more flexible than flow hooks because:
- They work across all flows without code changes
- They can be managed centrally by platform admins
- They support complex conditions and multiple actions
Create a Failure Automation
- In Prefect Cloud, navigate to Automations
- Click Create Automation
- Configure the trigger:
| Field | Value |
|---|---|
| Trigger type | Flow run state change |
| Flow run state | Failed, Crashed |
| Tags | dlt (to match all dlt flows) |
- Configure the action:
| Field | Value |
|---|---|
| Action type | Send notification |
| Block | slack-webhook/alerts |
| Message | See template below |
Message Template
Use this template for informative Slack messages:
:x: *Pipeline Failed*
*Flow*: {{ flow.name }}
*Deployment*: {{ deployment.name }}
*Run ID*: {{ flow_run.id }}
*State*: {{ flow_run.state.name }}
*Error*: {{ flow_run.state.message }}
<{{ flow_run_url }}|View in Prefect Cloud>
Terraform Automation
Add to terraform/prefect/automations.tf:
resource "prefect_automation" "dlt_failure_alert" {
name = "DLT Pipeline Failure Alert"
description = "Alert on any dlt pipeline failure via Slack"
enabled = true
trigger = {
type = "event"
posture = "Reactive"
expect = ["prefect.flow-run.Failed", "prefect.flow-run.Crashed"]
match_related = {
"prefect.resource.role" = "flow-run"
"prefect.tag" = "dlt"
}
}
actions = [
{
type = "send-notification"
block_id = prefect_block.slack_alerts.id
subject = "Pipeline Failed: {{ flow.name }}"
body = <<-EOT
:x: *Pipeline Failed*
*Flow*: {{ flow.name }}
*State*: {{ flow_run.state.name }}
*Error*: {{ flow_run.state.message }}
EOT
}
]
}
PagerDuty Integration (Optional)
PagerDuty provides advanced incident management for production systems. Use it when you need:
- On-call rotations with schedule management
- Escalation policies (alert team lead if engineer doesn't respond)
- Phone/SMS alerts for critical failures
- Incident tracking and post-mortems
PagerDuty Setup
- Create a PagerDuty account at pagerduty.com
- Create a Service for data pipelines:
- Navigate to Services → Service Directory → New Service
- Name: "Data Pipelines"
- Integration: Select Events API V2
- Copy the Integration Key
Create PagerDuty Block
Add to terraform/prefect/blocks.tf:
resource "prefect_block" "pagerduty_data_pipelines" {
name = "data-pipelines"
type_slug = "pager-duty-webhook"
data = jsonencode({
integration_key = var.pagerduty_integration_key
api_key = var.pagerduty_api_key
})
}
Add to terraform/prefect/variables.tf:
variable "pagerduty_integration_key" {
description = "PagerDuty Events API v2 integration key for data pipeline alerts"
type = string
sensitive = true
default = ""
}
variable "pagerduty_api_key" {
description = "PagerDuty API key for incident management"
type = string
sensitive = true
default = ""
}
Store the credentials in AWS Secrets Manager:
aws secretsmanager create-secret \
--name "prefect/pagerduty-integration-key" \
--description "PagerDuty integration key for Prefect pipeline alerts" \
--secret-string "YOUR_INTEGRATION_KEY" \
--profile admin
Update your GitHub Actions workflows to retrieve the secret. In .github/workflows/terraform_ci.yml and .github/workflows/terraform_apply.yml, add to the secret-ids in the Prefect plan/apply jobs:
- name: Get secrets from AWS Secrets Manager
uses: aws-actions/aws-secretsmanager-get-secrets@v2
with:
secret-ids: |
TF_VAR_PAGERDUTY_INTEGRATION_KEY, prefect/pagerduty-integration-key
parse-json-secrets: false
prefect block register -m prefect.blocks.notifications
prefect block create pager-duty-webhook/data-pipelines \
--integration-key "your-integration-key"
from prefect.blocks.notifications import PagerDutyWebHook
pagerduty = PagerDutyWebHook(
integration_key="your-integration-key",
api_key="your-api-key", # Optional, for API access
)
pagerduty.save("data-pipelines")
Add PagerDuty Automation (Terraform)
For critical failures, add an automation that pages via PagerDuty. Add to terraform/prefect/automations.tf:
resource "prefect_automation" "dlt_critical_alert" {
name = "DLT Pipeline Critical Alert (PagerDuty)"
description = "Page on-call engineer after 3 consecutive dlt pipeline failures"
enabled = true
trigger = {
type = "event"
posture = "Reactive"
expect = ["prefect.flow-run.Failed", "prefect.flow-run.Crashed"]
match_related = {
"prefect.resource.role" = "flow-run"
"prefect.tag" = "dlt"
}
# Only trigger after 3 consecutive failures
threshold = 3
within = 86400 # 24 hours
}
actions = [
{
type = "send-notification"
block_id = prefect_block.pagerduty_data_pipelines.id
subject = "CRITICAL: {{ flow.name }} failed 3 times"
body = "Flow {{ flow.name }} has failed 3 times in 24 hours. State: {{ flow_run.state.message }}"
}
]
}
Tiered Alerting
Configure different actions based on severity:
| Severity | Condition | Action |
|---|---|---|
| Warning | Single failure | Slack notification |
| Critical | 3+ consecutive failures | Slack + PagerDuty |
| P1 | All pipelines down | PagerDuty with immediate escalation |
Create multiple automations with different triggers:
Warning Automation: - Trigger: Any flow failure - Action: Slack notification
Critical Automation: - Trigger: 3 consecutive failures of same flow - Action: Slack + PagerDuty
Cost Considerations
| PagerDuty Tier | Monthly Cost | Features |
|---|---|---|
| Free | $0 (≤5 users) | Basic alerting, email/push |
| Professional | $21/user | Phone/SMS, schedules, escalations |
| Business | $41/user | Multiple teams, analytics |
For most small data teams, the free tier or Slack-only is sufficient.
Flow-Level Notifications
For specific flows that need custom notification logic, use flow hooks:
from prefect import flow, get_run_logger
from prefect.blocks.notifications import SlackWebhook
def notify_on_failure(flow, flow_run, state):
"""Send detailed failure notification."""
logger = get_run_logger()
logger.error(f"Flow {flow.name} failed: {state.message}")
slack = SlackWebhook.load("alerts")
slack.notify(
f":x: *{flow.name}* failed!\n"
f"Error: {state.message}\n"
f"Run ID: `{flow_run.id}`"
)
def notify_on_completion(flow, flow_run, state):
"""Send completion notification (optional)."""
# Only for critical flows
if flow.name in ["products-daily", "exchange-rates-daily"]:
slack = SlackWebhook.load("alerts")
slack.notify(f":white_check_mark: *{flow.name}* completed successfully")
@flow(
name="exchange-rates-daily",
on_failure=[notify_on_failure],
on_crashed=[notify_on_failure],
# on_completion=[notify_on_completion], # Optional
)
def exchange_rates_daily_flow():
# ... flow logic
pass
Best Practices
1. Don't Alert on Everything
Alert fatigue reduces effectiveness. Only alert on:
- Failures: Pipeline failures that need attention
- SLA breaches: Data not refreshed by expected time
- Anomalies: Unusual patterns (e.g., 0 rows loaded)
Don't alert on:
- Successful runs (use dashboards instead)
- Retries that succeed
- Expected maintenance windows
2. Include Actionable Information
Every alert should help the responder understand:
- What failed
- When it failed
- Where to investigate (link to logs)
- How to start troubleshooting
3. Set Up Escalation
For critical pipelines, configure escalation:
- Immediate: Slack notification
- After 15 mins: Page on-call engineer
- After 30 mins: Escalate to team lead
- After 1 hour: Escalate to manager
4. Separate Channels by Severity
| Channel | Purpose |
|---|---|
#data-alerts |
All pipeline notifications |
#data-alerts-critical |
Failures requiring immediate action |
#data-info |
Success notifications, metrics (optional) |
5. Test Your Alerts
Regularly test that alerts work:
@flow(name="test-alert")
def test_alert_flow():
"""Flow that intentionally fails to test alerting."""
raise Exception("This is a test failure - ignore")
Run this periodically to verify Slack/PagerDuty integration.
Monitoring Dashboard
Beyond alerts, consider a monitoring dashboard showing:
- Recent flow runs and their states
- Success rate over time
- Average run duration
- Data freshness (when was data last loaded)
Prefect Cloud provides these in the UI. For custom dashboards, use the Prefect API:
from prefect.client import get_client
async def get_recent_failures():
async with get_client() as client:
runs = await client.read_flow_runs(
flow_run_filter={
"state": {"type": {"any_": ["FAILED", "CRASHED"]}},
"start_time": {"after_": "2026-01-01T00:00:00Z"},
},
limit=10,
)
return runs
Summary
You've configured alerting for your data pipelines:
- Created Slack webhook block for notifications
- Set up Prefect automations for failure alerts
- Understood PagerDuty integration options
- Learned alerting best practices
What's Next
With alerting configured, your orchestration layer is complete. Continue to build data pipelines in the Batch Data Ingestion section.