Currencies Pipeline
On this page, you will:
- Build a dlt source for the currencies endpoint
- Create a pipeline that loads directly to Snowflake
- Understand how dlt resolves credentials automatically
- Verify data in Snowflake
Overview
The currencies pipeline is your first dlt pipeline. It extracts currency reference data from the Open Exchange Rates API and loads it directly into Snowflake. This is the simplest ingestion pattern: API → Snowflake.
┌─────────────────────────────────────────────────────────────────────────────┐
│ CURRENCIES PIPELINE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Open Exchange │ dlt │ Snowflake │ │ │ │
│ │ Rates API │ ────▶ │ DLT.OPEN_ │ │ Weekly run │ │
│ │ /currencies.json│ │ EXCHANGE_RATES │ │ Full refresh │ │
│ │ │ │ .CURRENCIES │ │ │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
The Currencies Endpoint
The /currencies.json endpoint returns a mapping of currency codes to names:
{
"AED": "United Arab Emirates Dirham",
"AFN": "Afghan Afghani",
"ALL": "Albanian Lek",
"GBP": "British Pound Sterling",
"USD": "United States Dollar"
}
This is reference data that changes rarely, so a weekly full refresh is sufficient.
The dlt Source
Create sources/currencies/__init__.py:
"""Currencies source."""
from .source import currencies
__all__ = ["currencies"]
Create sources/currencies/source.py:
"""dlt source for Open Exchange Rates currencies endpoint."""
from datetime import datetime
from typing import Iterator
import dlt
import requests
BASE_URL = "https://openexchangerates.org/api"
@dlt.source(section="open_exchange_rates")
def currencies(api_key: str = dlt.secrets.value):
"""
Source for Open Exchange Rates currencies endpoint.
Returns the list of all available currencies with their names.
The api_key is resolved automatically by dlt from:
- Environment: SOURCES__OPEN_EXCHANGE_RATES__API_KEY
- secrets.toml: [sources.open_exchange_rates] api_key
- VaultDocProvider: dlt/open-exchange-rates → {"api_key": "..."}
Args:
api_key: Open Exchange Rates App ID (auto-resolved by dlt)
"""
return currency_list(api_key)
@dlt.resource(
name="currencies",
write_disposition="replace", # Full refresh each time
)
def currency_list(api_key: str) -> Iterator[dict]:
"""
Fetch the list of available currencies.
Uses replace disposition since this is reference data
that should be fully refreshed each run.
"""
response = requests.get(
f"{BASE_URL}/currencies.json",
params={"app_id": api_key},
timeout=30,
)
response.raise_for_status()
data = response.json()
loaded_at = datetime.utcnow().isoformat()
# Transform key-value pairs into rows
for code, name in data.items():
yield {
"code": code,
"name": name,
"_loaded_at": loaded_at,
}
Key Design Decisions
-
section="open_exchange_rates": This tells dlt to look for configuration undersources.open_exchange_rates, so both the currencies and exchange rates sources share the same API key config path. -
api_key: str = dlt.secrets.value: Thedlt.secrets.valuesentinel tells dlt to resolve this parameter from the configuration chain automatically. You don't need to pass it explicitly. -
write_disposition="replace": Unlike exchange rates (which accumulate over time), currencies is reference data — the full list should be refreshed each run. Old codes that are removed from the API disappear from the table.
The dlt Pipeline
Create pipelines/currencies.py:
"""Currencies pipeline: Open Exchange Rates API → Snowflake."""
import dlt
from sources.currencies import currencies
from utils.vault_provider import register_aws_secrets
def create_pipeline() -> dlt.Pipeline:
"""Create the currencies pipeline with Snowflake destination."""
# Register AWS Secrets Manager for production credential resolution.
# Locally, secrets.toml values take priority (provider is never reached).
register_aws_secrets()
pipeline = dlt.pipeline(
pipeline_name="currencies",
destination="snowflake",
dataset_name="OPEN_EXCHANGE_RATES", # Schema in DLT database
)
return pipeline
def run():
"""Run the currencies pipeline."""
pipeline = create_pipeline()
# api_key is resolved automatically by dlt from the config chain
source = currencies()
load_info = pipeline.run(source)
print(load_info)
return load_info
if __name__ == "__main__":
run()
No Explicit Credentials
Notice that the pipeline code doesn't fetch credentials explicitly. dlt resolves destination.snowflake.credentials and sources.open_exchange_rates.api_key from the configuration chain:
- Locally: from
.dlt/secrets.toml(your own Snowflake user) - Production: from the
AWSSecretsManagerProvider(SVC_DLT service account)
The register_aws_secrets() call adds the provider to the chain. If secrets.toml exists (local dev), it takes priority and the AWS provider is never queried.
Run Locally
Test the pipeline locally:
cd ~/projects/data/data-pipelines
# Run the pipeline
python -m pipelines.currencies
Expected output:
Pipeline currencies completed in X.XX seconds
1 load package(s) were loaded to destination snowflake and target schema OPEN_EXCHANGE_RATES
The snowflake destination used snowflake://YOUR_USER@orgname-accountname/DLT location to store data
Verify in Snowflake
Check the loaded data:
-- View currencies
SELECT code, name, _loaded_at
FROM DLT.OPEN_EXCHANGE_RATES.CURRENCIES
ORDER BY code
LIMIT 20;
-- Count total currencies (should be ~170)
SELECT COUNT(*)
FROM DLT.OPEN_EXCHANGE_RATES.CURRENCIES;
-- Check dlt metadata tables
SHOW TABLES IN DLT.OPEN_EXCHANGE_RATES;
-- Should show: CURRENCIES, _DLT_LOADS, _DLT_VERSION, _DLT_PIPELINE_STATE
The _dlt_* tables are managed by dlt:
| Table | Purpose |
|---|---|
_DLT_LOADS |
Load history (timestamps, row counts, status) |
_DLT_VERSION |
Schema version tracking |
_DLT_PIPELINE_STATE |
Pipeline state for incremental loading and recovery |
Test with DuckDB
During development, you can use DuckDB to test without connecting to Snowflake:
# Override destination via environment variable
DESTINATION__NAME=duckdb python -m pipelines.currencies
This creates a local currencies.duckdb file. You can query it:
# test_duckdb.py
import duckdb
conn = duckdb.connect("currencies.duckdb")
print(conn.sql("SELECT * FROM OPEN_EXCHANGE_RATES.currencies LIMIT 5").df())
python test_duckdb.py
rm test_duckdb.py currencies.duckdb
Testing
Create a test to verify the source works:
# tests/test_currencies.py
from unittest.mock import MagicMock, patch
from sources.currencies import currencies
def test_currency_list_returns_rows():
"""Test that currencies returns one row per currency."""
mock_response = MagicMock()
mock_response.json.return_value = {
"GBP": "British Pound Sterling",
"USD": "United States Dollar",
"EUR": "Euro",
}
mock_response.raise_for_status = MagicMock()
with patch("sources.currencies.source.requests.get", return_value=mock_response):
source = currencies(api_key="test")
records = list(source)
assert len(records) == 3
codes = {r["code"] for r in records}
assert codes == {"GBP", "USD", "EUR"}
assert all("name" in r for r in records)
assert all("_loaded_at" in r for r in records)
pytest tests/test_currencies.py -v
Summary
You've built the currencies pipeline:
- dlt source for the
/currencies.jsonendpoint - Pipeline loading directly to Snowflake (
DLT.OPEN_EXCHANGE_RATES.CURRENCIES) - Replace write disposition for full refresh of reference data
- Automatic credential resolution via dlt's config chain
- DuckDB testing for local development
What's Next
The next pipeline demonstrates a different pattern: loading to S3 for Snowpipe auto-ingestion. This is useful for high-volume or historical data where you want files staged in S3 before loading.
Continue to Exchange Rates to S3 →