Runbook: Adding Data Sources
Summary
Add a new data source to the platform - including Snowflake infrastructure (database, service account, schemas), ingestion pipeline, dbt staging models, and orchestration. This runbook covers the end-to-end process across all three repositories.
When to Use
- A new SaaS tool, API, or database needs to be ingested
- A new category of data requires its own loader database
- A new team wants to bring data into the warehouse
Prerequisites
- Access: Write access to
terraform,data-pipelines, anddbt-transformrepositories - Tools: Terraform CLI, AWS CLI (
infrastructure-adminprofile), dlt or Airbyte configured - Context: Source system name, expected schemas, ingestion method (dlt, Airbyte, Snowpipe, or streaming)
Steps
1. Choose the Ingestion Method
| Method | Best For | Repository | Destination Database |
|---|---|---|---|
| dlt | APIs, databases, 1-2 simple sources | data-pipelines | DLT |
| Airbyte | Complex SaaS (5+ sources), non-engineer config | Airbyte Cloud/self-hosted | AIRBYTE |
| Snowpipe | File-based (S3 auto-ingest) | terraform (pipe config) | SNOWPIPE |
| Kafka Connect | Real-time event streams | Confluent Cloud | STREAMING |
If the source fits into an existing loader database (e.g. a new API via dlt goes into the DLT database), skip to Step 3 - you only need a new schema, not a new database.
If the source requires a new loader tool (e.g. adding Fivetran), follow all steps.
2. Create Snowflake Infrastructure (New Loader Only)
Claude Code Automation
The add-data-source Claude skill in the terraform repository automates steps 2a-2d. See Claude Code Setup.
2a. Create the Database
Add to snowflake/config/databases.tf:
module "database_<loader>" {
source = "./modules/snowflake_database"
providers = {
snowflake.sys_admin = snowflake.sys_admin
snowflake.security_admin = snowflake.security_admin
}
database_name = "<LOADER>"
database_comment = "Raw data loaded by <loader>."
grant_reader_to_account_roles = [
module.role_analytics_sources_reader.role_name,
]
grant_writer_to_account_roles = [
module.user_svc_<loader>.user_default_role,
]
}
The database module creates <LOADER>_DB_READER and <LOADER>_DB_WRITER database roles. Granting the reader to ANALYTICS_SOURCES_READER maintains the reader access chain so developers and transformers can query the data.
2b. Create the Service Account
Add to snowflake/config/users.tf:
module "user_svc_<loader>" {
source = "./modules/snowflake_user"
providers = {
snowflake.security_admin = snowflake.security_admin
snowflake.user_admin = snowflake.user_admin
}
user_name = "SVC_<LOADER>"
user_display_name = "<Loader> Service Account"
user_comment = "Service account for <loader> data loading."
user_is_service_account = true
user_create_dedicated_role = true
user_default_warehouse = module.warehouse_loading.warehouse_name
user_additional_roles = []
}
2c. Add AWS Secrets Manager Container
Add to aws/config/secrets.tf:
resource "aws_secretsmanager_secret" "<loader>_snowflake_credentials" {
name = "<loader>/snowflake-credentials"
description = "Snowflake credentials for SVC_<LOADER>."
}
2d. Validate and Deploy
cd snowflake/config && terraform plan
cd ../../aws/config && terraform plan
Create a PR, get approval, and merge. CI/CD applies the changes.
After merge, generate a key pair and store credentials in Secrets Manager (see Adding Users step 6 for the key pair process).
3. Create Schemas
Add to snowflake/config/schemas.tf for each data source within the database:
module "schema_<loader>_<source>" {
source = "./modules/snowflake_schema"
providers = {
snowflake.sys_admin = snowflake.sys_admin
snowflake.security_admin = snowflake.security_admin
}
database_name = module.database_<loader>.database_name
schema_name = "<SOURCE>"
schema_comment = "Data from <source> loaded by <loader>."
}
Repeat for each source (e.g. HUBSPOT, STRIPE, SALESFORCE). Create a PR and merge.
4. Build the Ingestion Pipeline
In the data-pipelines repository:
-
Create the source in
sources/<source_name>/:import dlt @dlt.source(section="<source_name>") def <source_name>(): @dlt.resource(write_disposition="replace") def <table_name>(): # Fetch data from the API yield from api_client.get_data() return <table_name> -
Create the pipeline in
pipelines/<source_name>.py:import dlt from sources.<source_name> import <source_name> def run(): pipeline = dlt.pipeline( pipeline_name="<source_name>", destination="snowflake", dataset_name="<SOURCE>", ) load_info = pipeline.run(<source_name>()) return load_info -
Add credentials to
.dlt/secrets.toml(local) and AWS Secrets Manager (production) -
Test locally with DuckDB first:
# Temporarily change destination to duckdb for testing python -m pipelines.<source_name>
In Airbyte Cloud or self-hosted UI:
- Add the source connector and configure credentials
- Add the Snowflake destination (using
SVC_AIRBYTEcredentials) - Set the destination namespace to
<SOURCE>within theAIRBYTEdatabase - Configure sync frequency and select streams
- Run an initial sync and verify data appears
See HubSpot Connection for a worked example.
- Add a storage integration in
snowflake/config/storage_integrations.tf(if not already present) - Create the pipe configuration using the
snowflake_snowpipemodule - Configure S3 event notifications to trigger the pipe
- Write a pipeline that uploads files to the S3 staging location
See Exchange Rates to S3 for a worked example.
5. Create Prefect Flow (if Using dlt)
Create flows/<source_name>.py:
"""Prefect flow for <source_name> pipeline."""
from prefect import flow, task, get_run_logger
from prefect.tasks import exponential_backoff
@task(
name="load-<source-name>",
retries=3,
retry_delay_seconds=exponential_backoff(backoff_factor=10),
retry_jitter_factor=0.5,
)
def load_task():
"""Load <source_name> data to Snowflake."""
logger = get_run_logger()
logger.info("Starting <source_name> load")
from pipelines.<source_name> import run
load_info = run()
logger.info(f"Load completed: {load_info}")
return load_info
@flow(name="<source-name>-daily", log_prints=True)
def daily_flow():
"""Daily <source_name> load."""
load_info = load_task()
print(f"<Source> loaded: {load_info}")
return {"status": "success", "load_info": str(load_info)}
if __name__ == "__main__":
daily_flow()
Add the deployment to prefect.yaml:
deployments:
# ... existing deployments ...
- name: <source-name>-daily
entrypoint: flows/<source_name>.py:daily_flow
description: "Daily <source_name> load"
work_pool:
name: production
schedules:
- cron: "0 7 * * *"
timezone: "UTC"
active: true
tags:
- dlt
- <source-name>
- daily
Deploy:
prefect deploy --all
6. Add dbt Source and Staging Model
In the dbt-transform repository:
-
Create or update
_sources.ymlinmodels/staging/<loader>/:version: 2 sources: - name: <loader>_<source> database: <LOADER> schema: <SOURCE> loaded_at_field: _dlt_load_time # or _airbyte_extracted_at for Airbyte freshness: warn_after: { count: 24, period: hour } error_after: { count: 48, period: hour } tables: - name: <table_name> description: "<Description of the table>" -
Create the staging model
models/staging/<loader>/stg_<loader>__<table>.sql:with source as ( select * from {{ source('<loader>_<source>', '<table_name>') }} ), renamed as ( select -- identifiers id as <entity>_id, -- dimensions column_a, column_b, -- timestamps cast(created_at as timestamp_ntz) as created_at from source ) select * from renamed -
Create the model YAML
models/staging/<loader>/stg_<loader>__<table>.yml:version: 2 models: - name: stg_<loader>__<table> description: > <Description>. One row per <entity>. columns: - name: <entity>_id description: Primary key. tests: - unique - not_null -
Run and test:
dbt build --select stg_<loader>__<table>
Verification
-
Snowflake database exists (if new loader):
SHOW DATABASES LIKE '<LOADER>'; -
Schema exists:
SHOW SCHEMAS IN DATABASE <LOADER>; -
Reader access chain works:
USE ROLE ANALYTICS_DEVELOPER; SELECT * FROM <LOADER>.<SOURCE>.<TABLE> LIMIT 10; -
Pipeline runs successfully (Prefect UI or manual run)
-
dbt staging model builds and tests pass:
dbt build --select stg_<loader>__<table> -
Source freshness check passes:
dbt source freshness --select source:<loader>_<source>
Rollback
If something goes wrong at any stage:
- Terraform changes - Revert the PR in the terraform repository. CI/CD runs
terraform applyto remove the resources - Pipeline code - Revert the PR in the data-pipelines repository
- dbt models - Revert the PR in the dbt-transform repository. Run
dbt runto clean up any partially created models -
Snowflake data (manual cleanup):
USE ROLE SYSADMIN; DROP SCHEMA IF EXISTS <LOADER>.<SOURCE> CASCADE; -- Only if removing the entire loader: DROP DATABASE IF EXISTS <LOADER>;
Escalation
- First contact: Data Engineering team in #data-eng Slack
- Escalation: Infrastructure team lead (for Terraform), pipeline team lead (for ingestion)
See Also
- Databases - Database module patterns
- Schemas - Schema module patterns
- Batch Data Ingestion - Full dlt pipeline guide
- SaaS Ingestion - Full Airbyte guide
- Sources and Staging - dbt staging patterns