Building a Data Orchestrator with Dagster and dbt
Modern data pipelines require more than just moving data from A to B. They need orchestration, transformation, testing, and observability. In this post, I walk through building a production-grade weather data pipeline using Dagster, dbt, and DuckDB.
What We Are Building
A complete ELT (Extract-Load-Transform) pipeline that fetches weather data from the Open-Meteo API, loads it into DuckDB, and transforms it through staging, intermediate, and marts layers using dbt.
+===========================================================================+
| DATA ORCHESTRATOR |
+===========================================================================+
| |
| +-------------+ +-------------+ +------------------+ |
| | EXTRACT | | LOAD | | TRANSFORM | |
| | (Dagster) | --> | (Dagster) | --> | (dbt) | |
| +-------------+ +-------------+ +------------------+ |
| | | | |
| v v v |
| Open-Meteo API DuckDB Raw Staging/Int/Marts |
| |
+===========================================================================+Key Features:
- Asset-based orchestration with Dagster (not task-based DAGs)
- SQL transformations with dbt and 19 automated data quality tests
- Zero infrastructure - DuckDB runs as an embedded database
- No API keys required - uses free Open-Meteo weather API
- Fully containerized with Docker Compose
Technology Stack
The stack is chosen for simplicity and production-readiness:
- Dagster 1.9+ - Asset-based orchestration with lineage tracking
- dbt 1.9+ - SQL transformations with testing and documentation
- DuckDB 1.1+ - Embedded analytics database (single file)
- Python 3.11+ - With Pydantic for type-safe configuration
- UV - Fast, deterministic dependency management
- Docker - Multi-container deployment
Project Structure
dataOrchestrator/
|-- config/
| +-- cities.yml # Configurable city coordinates
|
|-- data/
| |-- raw/ # JSON extraction files
| +-- warehouse/ # DuckDB database file
|
|-- dagster_project/
| |-- definitions.py # Main entry point
| |-- assets/
| | |-- extract.py # Weather API extraction
| | |-- load.py # DuckDB loading
| | +-- transform.py # dbt integration
| |-- resources/
| | |-- weather_api.py # HTTP client resource
| | +-- duckdb.py # Database resource
| +-- schedules/
| +-- daily.py # 6 AM UTC daily schedule
|
+-- dbt_project/
|-- models/
| |-- staging/ # Clean and rename
| |-- intermediate/ # Add business logic
| +-- marts/ # Analytics-ready tables
+-- profiles.yml # DuckDB adapter configAsset-Based Orchestration
Unlike traditional task-based workflows (Airflow), Dagster uses assets as the primary abstraction. Each asset represents a data artifact that can be materialized.
+----------------+ +------------------+ +-------------------+
| raw_weather | | staged_weather | | dbt_weather |
| _data | ---> | _data | ---> | _models |
| (JSON files) | | (DuckDB tables) | | (dbt models) |
+----------------+ +------------------+ +-------------------+Why assets over tasks?
- Data lineage is automatic - see what depends on what
- Re-run specific assets without running the entire pipeline
- Metadata tracking shows row counts, file sizes, timestamps
- The UI visualizes the asset graph
Extract: Fetching Weather Data
The extraction asset fetches weather data from Open-Meteo for configurable cities. It returns 7 days of history plus 7 days of forecast.
@asset(
description="Extract weather data from Open-Meteo API",
group_name="extract",
)
def raw_weather_data(
context: AssetExecutionContext,
weather_api: WeatherAPIClient,
) -> MaterializeResult:
settings = get_settings()
all_data = {"cities": {}, "extracted_at": timestamp}
for city_name, city_config in settings.cities.items():
response = weather_api.fetch_weather(
latitude=city_config.lat,
longitude=city_config.lon,
past_days=7,
forecast_days=7,
)
all_data["cities"][city_name] = response
# Save raw JSON
output_file = RAW_DATA_PATH / f"weather_{timestamp}.json"
output_file.write_text(json.dumps(all_data, indent=2))
return MaterializeResult(
metadata={
"cities_extracted": len(settings.cities),
"total_hourly_records": total_records,
"output_file": MetadataValue.path(str(output_file)),
}
)The WeatherAPIClient is a Dagster resource, allowing for dependency injection and easy testing:
class WeatherAPIClient(ConfigurableResource):
base_url: str = "https://api.open-meteo.com/v1/forecast"
timeout: int = 30
def fetch_weather(
self,
latitude: float,
longitude: float,
past_days: int = 7,
forecast_days: int = 7,
) -> dict:
params = {
"latitude": latitude,
"longitude": longitude,
"hourly": "temperature_2m,relative_humidity_2m,precipitation",
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum",
"past_days": past_days,
"forecast_days": forecast_days,
}
with httpx.Client(timeout=self.timeout) as client:
response = client.get(self.base_url, params=params)
response.raise_for_status()
return response.json()Load: DuckDB Staging Tables
The load asset reads the latest JSON file and upserts data into DuckDB staging tables. Using INSERT OR REPLACE makes the process idempotent - safe to re-run without duplicates.
@asset(
deps=[raw_weather_data],
description="Load weather data into DuckDB staging tables",
group_name="load",
)
def staged_weather_data(
context: AssetExecutionContext,
duckdb_resource: DuckDBResource,
) -> MaterializeResult:
raw_file = get_latest_raw_file()
data = json.loads(raw_file.read_text())
with duckdb_resource.get_connection() as conn:
for city_name, city_data in data["cities"].items():
# Upsert hourly data
for i, timestamp in enumerate(city_data["hourly"]["time"]):
conn.execute("""
INSERT OR REPLACE INTO raw_hourly_weather
(city, timestamp, temperature_c, humidity_pct, ...)
VALUES (?, ?, ?, ?, ...)
""", [city_name, timestamp, temp, humidity, ...])
return MaterializeResult(
metadata={
"hourly_records_loaded": hourly_count,
"daily_records_loaded": daily_count,
}
)DuckDB Resource Pattern
The database resource uses a context manager for safe connection handling:
class DuckDBResource(ConfigurableResource):
database_path: str = "data/warehouse/weather.duckdb"
@contextmanager
def get_connection(self):
conn = duckdb.connect(self.database_path)
try:
yield conn
finally:
conn.close()
def init_schema(self):
with self.get_connection() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS raw_hourly_weather (
city VARCHAR,
timestamp TIMESTAMP,
temperature_c DOUBLE,
humidity_pct DOUBLE,
precipitation_mm DOUBLE,
PRIMARY KEY (city, timestamp)
)
""")Transform: dbt Medallion Architecture
The transformation layer uses dbt with three model layers following medallion architecture:
+==============================================================================+
| TRANSFORMATION LAYERS |
+==============================================================================+
| |
| +-----------------+ +-------------------+ +------------------+ |
| | STAGING | | INTERMEDIATE | | MARTS | |
| | (views) | -> | (views) | -> | (tables) | |
| +-----------------+ +-------------------+ +------------------+ |
| |
| - Clean columns - Add categories - Aggregations |
| - Rename fields - Business logic - Rankings |
| - Extract dates - Derived fields - Analytics-ready |
| |
+==============================================================================+Staging Layer
Staging models clean and standardize raw data:
-- models/staging/stg_hourly_weather.sql
select
city,
timestamp as observation_timestamp,
date_trunc('day', timestamp) as observation_date,
extract(hour from timestamp) as hour_of_day,
temperature_c,
humidity_pct as relative_humidity,
precipitation_mm,
wind_speed_kmh,
weather_code
from {{ source('raw', 'raw_hourly_weather') }}Intermediate Layer
Intermediate models add business logic and categorizations:
-- models/intermediate/int_hourly_enriched.sql
select
*,
case
when temperature_c < 0 then 'freezing'
when temperature_c < 10 then 'cold'
when temperature_c < 20 then 'mild'
when temperature_c < 30 then 'warm'
else 'hot'
end as temp_category,
case
when precipitation_mm = 0 then 'dry'
when precipitation_mm < 2.5 then 'light'
when precipitation_mm < 7.5 then 'moderate'
else 'heavy'
end as precip_category,
case
when hour_of_day between 6 and 11 then 'morning'
when hour_of_day between 12 and 17 then 'afternoon'
when hour_of_day between 18 and 21 then 'evening'
else 'night'
end as time_of_day
from {{ ref('stg_hourly_weather') }}Marts Layer
Mart models are materialized as tables for fast analytics queries:
-- models/marts/fct_city_comparison.sql
{{ config(materialized='table') }}
with daily_stats as (
select
city,
avg(temp_avg_c) as avg_temperature,
sum(precipitation_sum_mm) as total_precipitation,
avg(avg_humidity) as avg_humidity,
count(*) as total_days
from {{ ref('fct_daily_weather') }}
group by city
)
select
*,
rank() over (order by avg_temperature desc) as warmest_rank,
rank() over (order by total_precipitation desc) as wettest_rank,
rank() over (order by avg_humidity desc) as humidity_rank
from daily_statsData Quality Testing
The pipeline includes 19 automated dbt tests that run on every execution:
# models/staging/_schema.yml
version: 2
models:
- name: stg_hourly_weather
description: "Cleaned hourly weather observations"
columns:
- name: city
tests:
- not_null
- name: observation_timestamp
tests:
- not_null
- name: temperature_c
tests:
- not_null
- name: fct_city_comparison
columns:
- name: city
tests:
- not_null
- uniqueTests run as part of dbt build, blocking bad data from reaching the marts layer.
Dagster + dbt Integration
Dagster provides first-class dbt integration. Each dbt model becomes a Dagster asset with automatic dependency tracking:
# dagster_project/assets/transform.py
from dagster_dbt import DbtCliResource, dbt_assets, DbtProject
dbt_project = DbtProject(
project_dir=Path(__file__).parent.parent.parent / "dbt_project",
)
@dbt_assets(
manifest=dbt_project.manifest_path,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def dbt_weather_models(
context: AssetExecutionContext,
dbt: DbtCliResource,
):
yield from dbt.cli(["build"], context=context).stream()This creates assets for each dbt model (stg_hourly_weather, int_hourly_enriched, fct_daily_weather, etc.) that are visible in the Dagster UI.
Scheduling
The pipeline runs daily at 6 AM UTC:
# dagster_project/schedules/daily.py
from dagster import ScheduleDefinition, AssetSelection
daily_weather_schedule = ScheduleDefinition(
name="daily_weather_pipeline",
cron_schedule="0 6 * * *", # 6 AM UTC daily
target=AssetSelection.all(),
default_status=DefaultScheduleStatus.STOPPED,
)Enable the schedule from the Dagster UI when ready for production.
Running the Pipeline
Option 1: Docker Compose
# Start the containers
docker-compose up -d
# Access Dagster UI
open http://localhost:3333
# View logs
docker-compose logs -fOption 2: Local Development
# Install dependencies
uv sync
# Start Dagster development server
uv run dagster dev
# Access UI at http://localhost:3000Materialize Assets
From the Dagster UI, click Materialize All to run the complete pipeline. You will see:
- Asset graph visualization with dependencies
- Real-time execution logs
- Metadata for each asset (row counts, file paths, timestamps)
- Run history and timing information
Sample Results
After running the pipeline, query the marts tables:
-- Query city comparison rankings
SELECT
city,
round(avg_temperature, 1) as avg_temp_c,
round(total_precipitation, 1) as total_precip_mm,
warmest_rank,
wettest_rank
FROM fct_city_comparison
ORDER BY warmest_rank;
-- Results:
-- | city | avg_temp_c | total_precip_mm | warmest_rank | wettest_rank |
-- |----------|------------|-----------------|--------------|--------------|
-- | Dubai | 21.3 | 0.1 | 1 | 4 |
-- | Riyadh | 15.1 | 0.0 | 2 | 3 |
-- | London | 3.1 | 2.4 | 3 | 2 |
-- | New York | -0.8 | 19.6 | 4 | 1 |Key Takeaways
This project demonstrates several modern data engineering practices:
- Asset-based orchestration provides better lineage and observability than task-based DAGs
- Separation of concerns - Dagster handles orchestration, dbt handles transformations
- Idempotent operations with
INSERT OR REPLACEallow safe re-runs - Dependency injection via Dagster resources enables testing
- Data quality testing catches issues before they reach production tables
- Zero infrastructure - DuckDB eliminates database setup
The complete source code is available on GitHub.