Add a New Source and Staging Model
This skill adds a dbt source definition and staging model for a new raw table, following the established conventions.
When to Use
- A new raw table is available in a source database (DLT, SNOWPIPE, AIRBYTE, STREAMING)
- A new schema was added to an existing source database
- Setting up dbt for a newly ingested data source
Before You Start
Gather the following information:
- Source system name - the loader database:
dlt,snowpipe,airbyte, orstreaming - Source database and schema - e.g.
DLT.APPLICATION_DATA - Table name(s) - the raw table name(s)
- Column names and types - query the raw table to see what columns exist
- Primary key column - the column (or columns) that uniquely identify a row
- Freshness requirements -
warn_afteranderror_afterthresholds - Loaded-at field - e.g.
_dlt_load_time,_airbyte_extracted_at,_loaded_at
Reference: Existing Sources
Read models/staging/ to see existing source definitions and staging models:
models/staging/
├── dlt/
│ ├── _sources.yml # DLT.APPLICATION_DATA + DLT.OPEN_EXCHANGE_RATES
│ ├── stg_dlt__products.sql
│ ├── stg_dlt__products.yml
│ ├── stg_dlt__currencies.sql
│ └── stg_dlt__currencies.yml
├── snowpipe/
│ ├── _sources.yml # SNOWPIPE.OPEN_EXCHANGE_RATES
│ ├── stg_snowpipe__exchange_rates.sql
│ └── stg_snowpipe__exchange_rates.yml
└── airbyte/
├── _sources.yml # AIRBYTE.HUBSPOT
├── stg_airbyte__contacts.sql
└── stg_airbyte__contacts.yml
Steps
1. Create or Update Source Definition
If the source system directory does not exist, create it:
mkdir -p models/staging/{source_system}
Create or update models/staging/{source_system}/_sources.yml:
version: 2
sources:
- name: {source_system}_{schema_name}
description: >
Description of what data this source contains and how it is loaded.
database: {DATABASE}
schema: {SCHEMA}
loader: {loader_tool}
loaded_at_field: {timestamp_field}
freshness:
warn_after: {count: 25, period: hour}
error_after: {count: 49, period: hour}
tables:
- name: {table_name}
description: >
Description of this table and what each row represents.
columns:
- name: {primary_key}
description: Primary key.
If the _sources.yml already exists for this source system, add the new source group or table to it.
2. Create Staging Model SQL
Create models/staging/{source_system}/stg_{source_system}__{table_name}.sql:
with source as (
select * from {{ source('{source_system}_{schema}', '{table_name}') }}
),
renamed as (
select
-- Primary key
{pk_column} as {clean_pk_name},
-- Dimensions
{dimension_columns},
-- Timestamps
{timestamp_columns},
-- Metadata
{loaded_at_field} as loaded_at
from source
)
select * from renamed
The staging model should:
- Rename columns to clean
snake_casenames - Cast data types where the loader produces incorrect types
- Remove loader-internal metadata columns (
_dlt_id,_dlt_load_id,_airbyte_raw_id, etc.) - Keep a single
loaded_attimestamp from the loader metadata - Not add business logic - that belongs in intermediate or mart models
3. Create Model YAML
Create models/staging/{source_system}/stg_{source_system}__{table_name}.yml:
version: 2
models:
- name: stg_{source_system}__{table_name}
description: >
Staging model for {table_name} from {source_system}.
Cleans column names, casts types, and removes loader metadata.
columns:
- name: {primary_key}
description: Primary key.
data_tests:
- unique
- not_null
- name: {other_columns}
description: Column description.
Every column must have a description. The primary key must have unique and not_null tests.
4. Validate
dbt run -s stg_{source_system}__{table_name}
dbt test -s stg_{source_system}__{table_name}
dbt source freshness -s source:{source_system}_{schema_name}
Verify:
- Model compiles and runs without errors
- All tests pass
- Source freshness check returns within thresholds
Naming Rules
| Element | Convention | Example |
|---|---|---|
| Model name | stg_{source}__{table} (double underscore) |
stg_dlt__products |
| Source name | {source}_{schema} (single underscore) |
dlt_application_data |
| Column names | snake_case, no loader prefixes |
product_name not PRODUCT_NAME |
| Metadata | Remove all _dlt_*, _airbyte_*, _sdc_* columns except loaded_at |
Keep one timestamp |
Testing Requirements
- Primary key:
unique+not_null - Source freshness:
warn_after+error_after - All columns: description in YAML file
- Additional tests as appropriate:
accepted_values,relationships,dbt_expectations