Skip to content

Sources and Staging Models

On this page, you will:

  • Define sources for all three raw databases (DLT, SNOWPIPE, AIRBYTE)
  • Configure freshness checks so stale data is detected
  • Build staging models that clean and rename raw columns

Overview

The staging layer is the first transformation layer. It sits directly on top of raw source tables and does only light work:

  • Renaming columns to consistent conventions (snake_case, no prefixes)
  • Casting data types where the loader produces incorrect types
  • Deduplicating records where the loader may produce duplicates
  • Removing loader-internal metadata columns

Staging models are views by default — they don't materialise data, they just provide a clean interface to the raw tables. This keeps the staging layer cheap to maintain.

Raw databases          Staging models (views)      Downstream
─────────────          ──────────────────────      ──────────

DLT.APPLICATION_DATA
  products        ────▶  stg_dlt__products    ────────┐
                                                      │
DLT.OPEN_EXCHANGE_RATES                               ├──▶ intermediate
  currencies      ────▶  stg_dlt__currencies          │
                                                      │
SNOWPIPE.OPEN_EXCHANGE_RATES                          │
  exchange_rates  ────▶  stg_snowpipe__exchange_rates ┘

AIRBYTE.HUBSPOT
  contacts        ────▶  stg_airbyte__contacts    ──────▶ intermediate

Define Sources

Sources are declared in _sources.yml files alongside the staging models. Each source group has its own file, co-located with the models that reference it.

DLT Sources

Create models/staging/dlt/_sources.yml:

version: 2

sources:
  - name: dlt_application_data
    description: >
      Products data loaded by dlt from PostgreSQL (Clever Cloud).
      Incremental merge-based load runs daily.
    database: DLT
    schema: APPLICATION_DATA
    loader: dlt
    loaded_at_field: _dlt_load_time
    freshness:
      warn_after: {count: 25, period: hour}
      error_after: {count: 49, period: hour}
    tables:
      - name: products
        description: >
          Type 2 SCD products data from PostgreSQL. Each row represents a version
          of a product with operation type (INSERT, UPDATE, DELETE) and timestamp.
        columns:
          - name: audit_id
            description: Unique row identifier (primary key).
          - name: product_id
            description: Product business key (multiple rows per product).
          - name: product_name
            description: Product name.
          - name: price_usd
            description: Product price in USD.
          - name: operation
            description: Operation type (INSERT, UPDATE, DELETE).
          - name: valid_ts
            description: Timestamp when the change occurred.
          - name: _dlt_load_id
            description: dlt load batch identifier.
          - name: _dlt_load_time
            description: Timestamp when this row was loaded by dlt.

  - name: dlt_open_exchange_rates
    description: >
      Currency reference data loaded by dlt from Open Exchange Rates API.
      Updated weekly.
    database: DLT
    schema: OPEN_EXCHANGE_RATES
    loader: dlt
    loaded_at_field: _dlt_load_time
    freshness:
      warn_after: {count: 8, period: day}
      error_after: {count: 15, period: day}
    tables:
      - name: currencies
        description: >
          Reference list of all available currency codes and names from Open Exchange Rates.
          Updated weekly. One row per currency.
        columns:
          - name: _dlt_id
            description: Internal dlt row identifier.
          - name: _dlt_load_time
            description: Timestamp when this row was loaded by dlt.
          - name: currency_code
            description: ISO 4217 currency code (e.g. GBP, EUR, USD).
          - name: currency_name
            description: Full name of the currency (e.g. British Pound Sterling).

Snowpipe Sources

Create models/staging/snowpipe/_sources.yml:

version: 2

sources:
  - name: snowpipe_open_exchange_rates
    description: >
      Exchange rate data loaded via Snowpipe from S3.
      Files are deposited to S3 by dlt and auto-ingested by Snowpipe within minutes.
    database: SNOWPIPE
    schema: OPEN_EXCHANGE_RATES
    loader: snowpipe
    loaded_at_field: _loaded_at
    freshness:
      warn_after: {count: 25, period: hour}
      error_after: {count: 49, period: hour}
    tables:
      - name: exchange_rates
        description: >
          Daily USD base exchange rates for all currencies from Open Exchange Rates API.
          Loaded via Snowpipe from S3. One row per currency per day.
        columns:
          - name: rate_date
            description: The date for which these exchange rates apply.
          - name: base_currency
            description: The base currency (always USD for this source).
          - name: target_currency
            description: The target currency code (e.g. GBP, EUR).
          - name: rate
            description: Exchange rate from base to target currency.
          - name: _loaded_at
            description: Timestamp when Snowpipe loaded this row.

Airbyte Sources

Create models/staging/airbyte/_sources.yml:

version: 2

sources:
  - name: airbyte_hubspot
    description: >
      HubSpot CRM data loaded by Airbyte. Incremental append+dedup sync.
      Updated daily.
    database: AIRBYTE
    schema: HUBSPOT
    loader: airbyte
    loaded_at_field: _airbyte_extracted_at
    freshness:
      warn_after: {count: 25, period: hour}
      error_after: {count: 49, period: hour}
    tables:
      - name: contacts
        description: >
          HubSpot contacts. One row per contact per sync (append+dedup).
          The most recent record per contact_id is the current version.
        columns:
          - name: _airbyte_raw_id
            description: Unique row identifier assigned by Airbyte.
          - name: _airbyte_extracted_at
            description: Timestamp when Airbyte extracted this record from HubSpot.
          - name: _airbyte_meta
            description: Airbyte sync metadata (changes, sync ID).
          - name: id
            description: HubSpot contact ID. Unique per contact.
          - name: email
            description: Primary email address for the contact.
          - name: firstname
            description: Contact first name.
          - name: lastname
            description: Contact last name.
          - name: createdat
            description: Timestamp when the contact was created in HubSpot.
          - name: lastmodifieddate
            description: Timestamp when the contact was last modified in HubSpot.
          - name: hs_lifecyclestage
            description: HubSpot lifecycle stage (subscriber, lead, customer, etc.)

Build Staging Models

stg_dlt__products

Create models/staging/dlt/stg_dlt__products.sql:

with source as (

    select * from {{ source('dlt_application_data', 'products') }}

),

renamed as (

    select
        -- identifiers
        audit_id,
        product_id,

        -- attributes
        product_name,
        cast(price_usd as float) as price_usd,

        -- metadata
        upper(operation)               as operation,
        cast(valid_ts as timestamp_ntz) as valid_ts,
        _dlt_load_id                   as dlt_load_id,
        cast(_dlt_load_time as timestamp_ntz) as loaded_at

    from source

)

select * from renamed

Create models/staging/dlt/stg_dlt__products.yml:

version: 2

models:
  - name: stg_dlt__products
    description: >
      Type 2 SCD products data from PostgreSQL via dlt.
      Each row represents a version of a product with operation type and timestamp.
    columns:
      - name: audit_id
        description: Unique row identifier (primary key for this table).
        tests:
          - unique
          - not_null
      - name: product_id
        description: Product business key. Multiple rows per product represent history.
        tests:
          - not_null
      - name: product_name
        description: Product name. Null for DELETE operations.
      - name: price_usd
        description: Product price in USD. Null for DELETE operations.
        tests:
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0
              strictly: true
              row_condition: "operation != 'DELETE'"
      - name: operation
        description: Operation type indicating how this row was created.
        tests:
          - not_null
          - accepted_values:
              values: ['INSERT', 'UPDATE', 'DELETE']
      - name: valid_ts
        description: Timestamp when this product version became valid.
        tests:
          - not_null

stg_dlt__currencies

Create models/staging/dlt/stg_dlt__currencies.sql:

with source as (

    select * from {{ source('dlt_open_exchange_rates', 'currencies') }}

),

renamed as (

    select
        -- identifiers
        _dlt_id                     as dlt_row_id,

        -- dimensions
        upper(currency_code)        as currency_code,
        currency_name,

        -- metadata
        cast(_dlt_load_time as timestamp_ntz) as loaded_at

    from source

)

select * from renamed

Create models/staging/dlt/stg_dlt__currencies.yml:

version: 2

models:
  - name: stg_dlt__currencies
    description: >
      Currency reference data from Open Exchange Rates API via dlt.
      One row per currency. Updated weekly.
    columns:
      - name: currency_code
        description: ISO 4217 currency code (e.g. GBP, EUR, USD).
        tests:
          - unique
          - not_null
      - name: currency_name
        description: Full name of the currency.
        tests:
          - not_null

stg_snowpipe__exchange_rates

Create models/staging/snowpipe/stg_snowpipe__exchange_rates.sql:

with source as (

    select * from {{ source('snowpipe_open_exchange_rates', 'exchange_rates') }}

),

renamed as (

    select
        -- timestamps
        cast(_loaded_at as timestamp_ntz) as loaded_at,
        cast(rate_date as date)           as rate_date,

        -- dimensions
        upper(base_currency)        as base_currency,
        upper(target_currency)      as target_currency,

        -- measures
        cast(rate as float)         as exchange_rate

    from source

)

select * from renamed

Create models/staging/snowpipe/stg_snowpipe__exchange_rates.yml:

version: 2

models:
  - name: stg_snowpipe__exchange_rates
    description: >
      Exchange rates from Open Exchange Rates API loaded via Snowpipe from S3.
      One row per currency per day. Base currency is always USD.
    columns:
      - name: rate_date
        description: The date for which the exchange rate applies.
        tests:
          - not_null
      - name: base_currency
        description: Base currency code. Always USD for this source.
        tests:
          - not_null
          - accepted_values:
              values: ['USD']
      - name: target_currency
        description: Target currency code (e.g. GBP, EUR).
        tests:
          - not_null
      - name: exchange_rate
        description: Exchange rate from USD to target currency.
        tests:
          - not_null
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0
              strictly: true

stg_airbyte__contacts

Airbyte uses append+dedup sync mode — multiple versions of each contact may exist in the raw table. Select only the most recent version of each contact using the HubSpot id:

Create models/staging/airbyte/stg_airbyte__contacts.sql:

with source as (

    select * from {{ source('airbyte_hubspot', 'contacts') }}

),

-- Airbyte append+dedup: take the most recently extracted version of each contact
deduped as (

    select *
    from source
    qualify row_number() over (
        partition by id
        order by _airbyte_extracted_at desc
    ) = 1

),

renamed as (

    select
        -- identifiers
        id                                          as contact_id,

        -- personal details
        lower(trim(email))                          as email,
        initcap(trim(firstname))                    as first_name,
        initcap(trim(lastname))                     as last_name,

        -- lifecycle
        hs_lifecyclestage                           as lifecycle_stage,

        -- timestamps
        cast(createdat as timestamp_ntz)            as created_at,
        cast(lastmodifieddate as timestamp_ntz)     as last_modified_at,
        cast(_airbyte_extracted_at as timestamp_ntz) as airbyte_extracted_at

    from deduped

)

select * from renamed

Create models/staging/airbyte/stg_airbyte__contacts.yml:

version: 2

models:
  - name: stg_airbyte__contacts
    description: >
      HubSpot contacts loaded via Airbyte with deduplication.
      One row per contact (most recent version).
    columns:
      - name: contact_id
        description: HubSpot contact ID.
        tests:
          - unique
          - not_null
      - name: email
        description: Contact email address (lowercased and trimmed).
        tests:
          - not_null
      - name: first_name
        description: Contact first name (title case).
      - name: last_name
        description: Contact last name (title case).
      - name: lifecycle_stage
        description: HubSpot lifecycle stage.
      - name: created_at
        description: When the contact was created in HubSpot.
        tests:
          - not_null
      - name: last_modified_at
        description: When the contact was last modified in HubSpot.

Run the Staging Models

# Run all staging models
dbt run --select staging

# Run a specific model
dbt run --select stg_airbyte__contacts

# Run and test together
dbt build --select staging

Check that the views are created in Snowflake:

USE DATABASE ANALYTICS_DEV;
SHOW SCHEMAS;
-- Should include: DBT_<YOURNAME>_STAGING

USE SCHEMA DBT_<YOURNAME>_STAGING;
SHOW VIEWS;
-- Should include: STG_DLT__PRODUCTS, STG_DLT__CURRENCIES,
--                 STG_SNOWPIPE__EXCHANGE_RATES, STG_AIRBYTE__CONTACTS

Check Source Freshness

dbt source freshness

This queries the loaded_at_field on each source table and compares it against the freshness thresholds defined in _sources.yml. Run this as a separate step in CI/CD to alert on stale data before running transformations.

Found 5 sources.

Source dlt_application_data.products           ✓ Pass  (loaded 3 hours ago)
Source dlt_open_exchange_rates.currencies      ✓ Pass  (loaded 2 days ago)
Source snowpipe_open_exchange_rates.exchange_rates  ✓ Pass  (loaded 15 minutes ago)
Source airbyte_hubspot.contacts                ✓ Pass  (loaded 1 hour ago)

Summary

You've defined sources and built the staging layer:

  • Defined sources for DLT (products, currencies), Snowpipe (exchange rates), and Airbyte (contacts) databases
  • Configured freshness thresholds for each source
  • Built staging views with cleaned and renamed columns
  • Handled Airbyte deduplication in stg_airbyte__contacts
  • Handled dlt Type 2 SCD in stg_dlt__products
  • Added YAML documentation and tests

What's Next

Combine and reshape staging models into the intermediate layer.

Continue to Intermediate Models