Anomaly Detection
On this page, you will:
- Understand different types of data anomalies
- Configure Elementary's automated anomaly detection
- Implement custom anomaly detection with SQL
- Set up Great Expectations for statistical validation
- Reduce false positives in anomaly alerts
Overview
Anomaly detection catches unexpected changes in data before they cause downstream problems. Unlike explicit tests (which check known rules), anomaly detection identifies deviations from historical patterns.
Types of anomalies: - Volume anomalies — row counts significantly different from expected - Freshness anomalies — data is stale (no new rows) - Schema anomalies — columns added, removed, or type changes - Distribution anomalies — values outside expected ranges - Null rate anomalies — sudden increase in null values
┌─────────────────────────────────────────────────────────────────────────┐
│ ANOMALY DETECTION STACK │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Data Observation Anomaly Detection Alerts │
│ ──────────────── ──────────────────── ────── │
│ │
│ ┌──────────────┐ ┌──────────────────┐ ┌──────────────┐ │
│ │ Row counts │────▶│ Elementary │───▶│ Slack │ │
│ │ Null rates │ │ • Volume │ │ #data-alerts │ │
│ │ Column types │ │ • Freshness │ └──────────────┘ │
│ │ Distributions│ │ • Schema │ │
│ └──────────────┘ └──────────────────┘ │
│ │ │ │
│ │ ┌───────▼───────────┐ │
│ └─────────────▶│ Custom SQL │ │
│ │ anomaly checks │ │
│ └───────────────────┘ │
│ │ │
│ ┌───────▼───────────┐ │
│ │ Great Expectations│ │
│ │ • Statistical │ │
│ │ validation │ │
│ └───────────────────┘ │
│ │
│ All anomalies stored in ELEMENTARY schema for trend analysis │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Elementary Anomaly Detection
Elementary provides automated anomaly detection with minimal configuration (you've already set this up in Elementary Setup).
Volume Anomaly Detection
What it detects: Row counts that deviate significantly from historical averages.
Algorithm:
1. Calculate 7-day rolling average row count
2. Calculate standard deviation
3. Alert if current row count outside average ± (sensitivity × std_dev)
Configuration:
# models/marts/core/fct_orders.yml
models:
- name: fct_orders
tests:
- elementary.volume_anomalies:
timestamp_column: "order_date"
where_expression: "order_status != 'cancelled'"
config:
severity: warn
tags: ['elementary', 'anomaly_detection']
Example scenario:
| Date | Row Count | 7-Day Avg | Std Dev | Threshold (3σ) | Anomaly? |
|---|---|---|---|---|---|
| Feb 14 | 1,000 | 980 | 50 | 830-1,130 | ✓ Normal |
| Feb 15 | 1,020 | 985 | 52 | 829-1,141 | ✓ Normal |
| Feb 16 | 450 | 990 | 55 | 825-1,155 | ✗ Anomaly! |
Alert message:
🚨 Volume anomaly detected in fct_orders
Expected: ~990 rows (based on 7-day average)
Actual: 450 rows
Deviation: -54.5% from expected
Freshness Anomaly Detection
What it detects: Data hasn't been updated recently (stale data).
Algorithm:
1. Check MAX(timestamp_column) in table
2. Compare to current time
3. Alert if current_time - MAX(timestamp_column) > threshold
Configuration:
models:
- name: fct_orders
config:
elementary:
timestamp_column: "loaded_at" # Column tracking when data was loaded
tests:
- elementary.freshness_anomalies:
timestamp_column: "loaded_at"
config:
severity: error
freshness_threshold: 24 # Alert if data older than 24 hours
Example scenario:
- Expected: Data loaded every 6 hours
- Current:
MAX(loaded_at) = 2026-02-19 08:00 - Now:
2026-02-20 10:00(26 hours ago) - Threshold: 24 hours
- Result: ✗ Anomaly! Data is stale
Alert message:
🚨 Freshness anomaly detected in fct_orders
Last updated: 2026-02-19 08:00 (26 hours ago)
Threshold: 24 hours
Action required: Check upstream pipelines (dlt, Airbyte)
Schema Change Detection
What it detects: Columns added, removed, or type changes.
Algorithm: 1. Store schema snapshot after each dbt run 2. Compare current schema to previous snapshot 3. Alert on differences
Configuration:
models:
- name: fct_orders
tests:
- elementary.schema_changes:
config:
severity: warn
Example scenario:
Previous schema:
order_id (NUMBER)
customer_id (NUMBER)
order_total (DECIMAL(10,2))
order_date (DATE)
Current schema:
order_id (NUMBER)
customer_id (NUMBER)
order_total (DECIMAL(10,2))
order_date (DATE)
order_status (VARCHAR) ← New column added
Alert message:
⚠️ Schema change detected in fct_orders
Change: Column added
Column name: order_status
Column type: VARCHAR
Impact: Downstream dashboards/models may need updates
Null Rate Anomaly Detection
What it detects: Sudden increase in null values.
Configuration:
models:
- name: fct_orders
columns:
- name: customer_email
tests:
- elementary.anomaly_detection:
type: null_rate
config:
severity: warn
anomaly_threshold: 0.1 # Alert if null rate > 10%
Example scenario:
| Date | Total Rows | Null customer_email | Null Rate |
|---|---|---|---|
| Feb 14 | 1,000 | 5 | 0.5% |
| Feb 15 | 1,020 | 8 | 0.8% |
| Feb 16 | 950 | 150 | 15.8% ← Anomaly! |
Alert message:
🚨 Null rate anomaly in fct_orders.customer_email
Expected null rate: ~0.5%
Actual null rate: 15.8%
Rows affected: 150 out of 950
Run Elementary Anomaly Detection
Elementary anomaly detection runs automatically if configured in on-run-end:
# dbt_project.yml
on-run-end:
- "{{ elementary.upload_test_results() }}"
Or run manually:
cd ~/projects/dbt/dbt-transform
# Run all Elementary anomaly tests
dbt test --select tag:elementary
# Run via Elementary CLI (includes ML-based detection)
edr monitor --profiles-dir ~/.dbt
Custom Anomaly Detection with SQL
For anomalies not covered by Elementary, write custom SQL tests.
Custom Test: Spike in Refund Rate
Business rule: Refund rate should be <5%. Alert if >10%.
-- tests/custom/refund_rate_anomaly.sql
WITH daily_refunds AS (
SELECT
DATE(order_date) AS day,
COUNT(CASE WHEN order_status = 'refunded' THEN 1 END) AS refunds,
COUNT(*) AS total_orders,
(refunds * 100.0 / total_orders) AS refund_rate_pct
FROM {{ ref('fct_orders') }}
WHERE order_date >= CURRENT_DATE() - INTERVAL '7 days'
GROUP BY day
)
SELECT
day,
refund_rate_pct
FROM daily_refunds
WHERE refund_rate_pct > 10 -- Anomaly threshold
Add to schema.yml:
models:
- name: fct_orders
tests:
- custom.refund_rate_anomaly:
config:
severity: error
If this test fails, dbt will alert you to the anomaly.
Custom Test: Sudden Drop in Revenue
Business rule: Daily revenue should not drop >30% from 7-day average.
-- tests/custom/revenue_drop_anomaly.sql
WITH daily_revenue AS (
SELECT
DATE(order_date) AS day,
SUM(order_total) AS revenue
FROM {{ ref('fct_orders') }}
WHERE order_date >= CURRENT_DATE() - INTERVAL '14 days'
GROUP BY day
),
revenue_with_avg AS (
SELECT
day,
revenue,
AVG(revenue) OVER (
ORDER BY day
ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING
) AS avg_revenue_7d
FROM daily_revenue
)
SELECT
day,
revenue,
avg_revenue_7d,
((revenue - avg_revenue_7d) / avg_revenue_7d * 100) AS pct_change
FROM revenue_with_avg
WHERE day = CURRENT_DATE()
AND pct_change < -30 -- Alert if >30% drop
Custom Test: Unexpected Value in Column
Business rule: order_status should only contain known values.
-- tests/custom/unexpected_order_status.sql
SELECT
order_status,
COUNT(*) AS row_count
FROM {{ ref('fct_orders') }}
WHERE order_status NOT IN ('pending', 'confirmed', 'shipped', 'delivered', 'cancelled', 'refunded')
GROUP BY order_status
If this query returns any rows, dbt test fails, alerting you to unexpected values.
Great Expectations
Great Expectations is a Python library for data validation with extensive statistical checks.
Install Great Expectations
cd ~/projects/dbt/dbt-transform
uv add great_expectations
Initialise Great Expectations
great_expectations init
This creates a great_expectations/ directory:
great_expectations/
├── great_expectations.yml
├── expectations/
├── checkpoints/
└── plugins/
Create Expectation Suite
great_expectations suite new
Example expectations:
# great_expectations/expectations/fct_orders_suite.json
{
"expectation_suite_name": "fct_orders_suite",
"expectations": [
{
"expectation_type": "expect_table_row_count_to_be_between",
"kwargs": {
"min_value": 100,
"max_value": 10000
}
},
{
"expectation_type": "expect_column_values_to_be_in_set",
"kwargs": {
"column": "order_status",
"value_set": ["pending", "confirmed", "shipped", "delivered", "cancelled", "refunded"]
}
},
{
"expectation_type": "expect_column_mean_to_be_between",
"kwargs": {
"column": "order_total",
"min_value": 20,
"max_value": 500
}
},
{
"expectation_type": "expect_column_quantile_values_to_be_between",
"kwargs": {
"column": "order_total",
"quantile_ranges": {
"quantiles": [0.25, 0.5, 0.75],
"value_ranges": [[10, 30], [40, 80], [100, 200]]
}
}
}
]
}
Run Expectations
# scripts/run_great_expectations.py
import great_expectations as gx
# Connect to data
context = gx.get_context()
# Add Snowflake datasource
datasource = context.sources.add_snowflake(
name="snowflake_analytics",
connection_string="snowflake://user:pass@account/database/schema"
)
# Get data asset
data_asset = datasource.add_table_asset(
name="fct_orders",
table_name="fct_orders"
)
# Create checkpoint
checkpoint = context.add_or_update_checkpoint(
name="fct_orders_checkpoint",
validations=[
{
"batch_request": data_asset.build_batch_request(),
"expectation_suite_name": "fct_orders_suite"
}
]
)
# Run validation
result = checkpoint.run()
# Check results
if not result.success:
print("❌ Great Expectations validation failed!")
for validation in result.run_results.values():
for expectation_result in validation["validation_result"]["results"]:
if not expectation_result["success"]:
print(f"Failed: {expectation_result['expectation_config']['expectation_type']}")
print(f"Details: {expectation_result['result']}")
else:
print("✅ All expectations passed")
Integrate with Prefect
from prefect import flow, task
@task
def run_great_expectations_validation():
result = checkpoint.run()
if not result.success:
# Send alert
slack_webhook.notify(text="🚨 Great Expectations validation failed for fct_orders")
raise ValueError("Great Expectations validation failed")
@flow
def dbt_with_validation():
run_dbt_build()
run_great_expectations_validation() # Run after dbt
Reducing False Positives
Anomaly detection can generate false positives (alerts for normal behaviour). Tune thresholds to reduce noise.
Strategy 1: Adjust Sensitivity
Elementary's anomaly_sensitivity parameter controls how strict detection is:
vars:
elementary:
anomaly_sensitivity: 3 # Standard deviations (1 = very sensitive, 5 = very strict)
Low sensitivity (1-2): Catch more anomalies, more false positives Medium sensitivity (3): Balanced (default) High sensitivity (4-5): Fewer alerts, may miss subtle anomalies
Recommendation: Start with 3, increase to 4-5 if too many false positives.
Strategy 2: Exclude Known Patterns
If certain days are always low (e.g., weekends), exclude them:
tests:
- elementary.volume_anomalies:
where_expression: "DAYOFWEEK(order_date) NOT IN (0, 6)" # Exclude Sat/Sun
Strategy 3: Set Time-Based Thresholds
Different thresholds for different times:
-- Custom anomaly test with time-based thresholds
WITH daily_revenue AS (
SELECT
DATE(order_date) AS day,
DAYOFWEEK(order_date) AS day_of_week,
SUM(order_total) AS revenue
FROM {{ ref('fct_orders') }}
GROUP BY day, day_of_week
),
expected_revenue AS (
SELECT
day,
revenue,
CASE
WHEN day_of_week IN (0, 6) THEN 5000 -- Weekend threshold
ELSE 15000 -- Weekday threshold
END AS expected_min_revenue
FROM daily_revenue
)
SELECT *
FROM expected_revenue
WHERE revenue < expected_min_revenue
Strategy 4: Use Anomaly Score
Instead of binary pass/fail, calculate anomaly score:
-- Anomaly score: how many standard deviations from mean
WITH daily_orders AS (
SELECT
DATE(order_date) AS day,
COUNT(*) AS order_count
FROM {{ ref('fct_orders') }}
WHERE order_date >= CURRENT_DATE() - INTERVAL '30 days'
GROUP BY day
),
stats AS (
SELECT
AVG(order_count) AS avg_count,
STDDEV(order_count) AS stddev_count
FROM daily_orders
),
anomaly_scores AS (
SELECT
d.day,
d.order_count,
s.avg_count,
ABS(d.order_count - s.avg_count) / s.stddev_count AS anomaly_score
FROM daily_orders d
CROSS JOIN stats s
)
SELECT
day,
order_count,
avg_count,
anomaly_score,
CASE
WHEN anomaly_score > 3 THEN 'Critical'
WHEN anomaly_score > 2 THEN 'Warning'
ELSE 'Normal'
END AS severity
FROM anomaly_scores
WHERE day = CURRENT_DATE()
AND anomaly_score > 2 -- Only alert if score > 2
Strategy 5: Alert Aggregation
Instead of alerting on every anomaly, aggregate:
@task
def aggregate_anomalies():
"""Send one Slack message summarising all anomalies detected today"""
# Query Elementary results
anomalies = query_elementary_anomalies()
if len(anomalies) == 0:
return # No anomalies, no alert
# Group by severity
critical = [a for a in anomalies if a['severity'] == 'critical']
warnings = [a for a in anomalies if a['severity'] == 'warning']
# Send aggregated alert
slack_webhook.notify(
text=f"📊 Daily Anomaly Summary ({len(anomalies)} total)\n\n" +
f"🚨 Critical: {len(critical)}\n" +
f"⚠️ Warnings: {len(warnings)}\n\n" +
"View details: https://elementary-ui.com/anomalies"
)
Anomaly Detection Best Practices
1. Start with High-Impact Tables
Focus anomaly detection on critical tables first:
- fct_orders (revenue data)
- fct_customers (customer data)
- fct_usage (product usage)
Don't add anomaly detection to every table — it creates noise.
2. Combine Elementary + Custom Tests
- Elementary: General anomalies (volume, freshness, schema)
- Custom tests: Business-specific rules (refund rate, revenue drops)
3. Review Anomalies Weekly
Not all anomalies require immediate action. Review weekly: - Which anomalies were true issues? - Which were false positives? - Adjust thresholds accordingly
4. Document Expected Anomalies
Some anomalies are expected (e.g., Black Friday spike):
# models/marts/core/fct_orders.yml
models:
- name: fct_orders
description: |
Expected anomalies:
- Black Friday (last Friday of November): 10x normal volume
- New Year's Day: 50% drop in volume
- End of month: 2x normal volume (invoicing)
5. Alert on Trends, Not Single Points
Instead of alerting on one day's anomaly, alert on trends:
-- Alert if 3 consecutive days show anomalies
WITH daily_anomalies AS (
-- Your anomaly detection query
...
),
consecutive_anomalies AS (
SELECT
day,
SUM(CASE WHEN is_anomaly THEN 1 ELSE 0 END) OVER (
ORDER BY day
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) AS anomaly_count_3d
FROM daily_anomalies
)
SELECT *
FROM consecutive_anomalies
WHERE anomaly_count_3d >= 3 -- Alert if 3 consecutive days
Summary
You've implemented comprehensive anomaly detection:
- Elementary automated detection — volume, freshness, schema, null rate anomalies
- Custom SQL anomaly tests — business-specific rules (refund rate, revenue drops)
- Great Expectations — statistical validation for distributions and value ranges
- False positive reduction — sensitivity tuning, time-based thresholds, anomaly scores
- Best practices — focus on high-impact tables, weekly reviews, trend-based alerting
Anomaly detection catches issues before they reach users, improving data quality proactively.
What's Next
Wrap up the Observability section and plan your next steps.
Continue to Finishing Up →