بناء «data orchestrator» باستخدام «Dagster» و«dbt»
بنيت خط بيانات للطقس يعمل من دون بنية تحتية تقريبًا: ملف «DuckDB» واحد، و«API» مجاني بلا مفاتيح، و«Dagster» يربط الخطوات في نظام واحد. الفكرة ليست نقل البيانات من نقطة إلى أخرى. الفكرة أن ترى الـ «lineage»، تختبر النتائج، وتعيد تشغيل الجزء الذي تعطّل وحده.
ماذا بنيت
خط «ELT» متكامل. يسحب بيانات الطقس من «Open-Meteo»، يحمّلها في «DuckDB»، ثم يمرّرها عبر طبقات «staging» و«intermediate» و«marts» باستخدام «dbt».
+===========================================================================+
| DATA ORCHESTRATOR |
+===========================================================================+
| |
| +-------------+ +-------------+ +------------------+ |
| | EXTRACT | | LOAD | | TRANSFORM | |
| | (Dagster) | --> | (Dagster) | --> | (dbt) | |
| +-------------+ +-------------+ +------------------+ |
| | | | |
| v v v |
| Open-Meteo API DuckDB Raw Staging/Int/Marts |
| |
+===========================================================================+المزايا الأساسية:
- تنظيم مبني على الـ «assets» في «Dagster»، لا على «task-based DAGs».
- تحويلات «SQL» في «dbt»، ومعها 19 اختبارًا آليًا لجودة البيانات.
- صفر بنية تحتية: «DuckDB» يعمل قاعدة «embedded».
- بلا «API keys»: واجهة «Open-Meteo» المجانية تكفي.
- تشغيل كامل داخل «Docker Compose».
الحزمة التقنية
اخترت الأدوات على معيار واحد: قليلة الحركة، تصلح للـ «production».
- «Dagster 1.9+» — تنظيم مبني على الـ «assets» مع تتبع «lineage».
- «dbt 1.9+» — تحويلات «SQL» مع «testing» وتوثيق.
- «DuckDB 1.1+» — قاعدة تحليلية «embedded» في ملف واحد.
- «Python 3.11+» — ومعه «Pydantic» لإعدادات «type-safe».
- «uv» — إدارة «dependencies» بسرعة وبنتائج ثابتة.
- «Docker» — تشغيل بعدة حاويات.
هيكل المشروع
dataOrchestrator/
|-- config/
| +-- cities.yml # Configurable city coordinates
|
|-- data/
| |-- raw/ # JSON extraction files
| +-- warehouse/ # DuckDB database file
|
|-- dagster_project/
| |-- definitions.py # Main entry point
| |-- assets/
| | |-- extract.py # Weather API extraction
| | |-- load.py # DuckDB loading
| | +-- transform.py # dbt integration
| |-- resources/
| | |-- weather_api.py # HTTP client resource
| | +-- duckdb.py # Database resource
| +-- schedules/
| +-- daily.py # 6 AM UTC daily schedule
|
+-- dbt_project/
|-- models/
| |-- staging/ # Clean and rename
| |-- intermediate/ # Add business logic
| +-- marts/ # Analytics-ready tables
+-- profiles.yml # DuckDB adapter configالتنظيم المبني على الـ «assets»
في الأدوات التقليدية مثل «Airflow»، تفكر في «tasks». في «Dagster» تبدأ من شيء أوضح: الـ «asset». كل «asset» أثر بياناتي يمكن «materialize» له ومتابعته من واجهة واحدة.
+----------------+ +------------------+ +-------------------+
| raw_weather | | staged_weather | | dbt_weather |
| _data | ---> | _data | ---> | _models |
| (JSON files) | | (DuckDB tables) | | (dbt models) |
+----------------+ +------------------+ +-------------------+لماذا «assets» بدل «tasks»؟
- الـ «lineage» يظهر تلقائيًا: تعرف ماذا يعتمد على ماذا.
- تعيد تشغيل «asset» محددًا من دون تشغيل الخط كله.
- الـ «metadata» تعرض «row counts» و«file sizes» و«timestamps».
- الـ «UI» يرسم الـ «asset graph» بدل قائمة خطوات عمياء.
Extract: جلب بيانات الطقس
أول «asset» يجلب بيانات الطقس من «Open-Meteo» لمدن قابلة للتعديل من الإعدادات. يأخذ 7 أيام مضت، و7 أيام توقعات.
@asset(
description="Extract weather data from Open-Meteo API",
group_name="extract",
)
def raw_weather_data(
context: AssetExecutionContext,
weather_api: WeatherAPIClient,
) -> MaterializeResult:
settings = get_settings()
all_data = {"cities": {}, "extracted_at": timestamp}
for city_name, city_config in settings.cities.items():
response = weather_api.fetch_weather(
latitude=city_config.lat,
longitude=city_config.lon,
past_days=7,
forecast_days=7,
)
all_data["cities"][city_name] = response
# Save raw JSON
output_file = RAW_DATA_PATH / f"weather_{timestamp}.json"
output_file.write_text(json.dumps(all_data, indent=2))
return MaterializeResult(
metadata={
"cities_extracted": len(settings.cities),
"total_hourly_records": total_records,
"output_file": MetadataValue.path(str(output_file)),
}
)جعلت WeatherAPIClient «resource» داخل «Dagster». الـ «dependency injection» يصير واضحًا، والـ «testing» أسهل.
class WeatherAPIClient(ConfigurableResource):
base_url: str = "https://api.open-meteo.com/v1/forecast"
timeout: int = 30
def fetch_weather(
self,
latitude: float,
longitude: float,
past_days: int = 7,
forecast_days: int = 7,
) -> dict:
params = {
"latitude": latitude,
"longitude": longitude,
"hourly": "temperature_2m,relative_humidity_2m,precipitation",
"daily": "temperature_2m_max,temperature_2m_min,precipitation_sum",
"past_days": past_days,
"forecast_days": forecast_days,
}
with httpx.Client(timeout=self.timeout) as client:
response = client.get(self.base_url, params=params)
response.raise_for_status()
return response.json()Load: جداول «DuckDB» الأولية
الـ «asset» الثاني يقرأ آخر ملف «JSON»، ثم يكتب البيانات في جداول «staging» داخل «DuckDB». استخدمت INSERT OR REPLACE ليبقى التشغيل «idempotent». تعيد الخط عشر مرات، ولا تظهر صفوف مكررة.
@asset(
deps=[raw_weather_data],
description="Load weather data into DuckDB staging tables",
group_name="load",
)
def staged_weather_data(
context: AssetExecutionContext,
duckdb_resource: DuckDBResource,
) -> MaterializeResult:
raw_file = get_latest_raw_file()
data = json.loads(raw_file.read_text())
with duckdb_resource.get_connection() as conn:
for city_name, city_data in data["cities"].items():
# Upsert hourly data
for i, timestamp in enumerate(city_data["hourly"]["time"]):
conn.execute("""
INSERT OR REPLACE INTO raw_hourly_weather
(city, timestamp, temperature_c, humidity_pct, ...)
VALUES (?, ?, ?, ?, ...)
""", [city_name, timestamp, temp, humidity, ...])
return MaterializeResult(
metadata={
"hourly_records_loaded": hourly_count,
"daily_records_loaded": daily_count,
}
)نمط الـ «resource» في «DuckDB»
الـ «resource» الخاص بقاعدة البيانات يستخدم «context manager» حتى لا تبقى الاتصالات مفتوحة بعد التنفيذ.
class DuckDBResource(ConfigurableResource):
database_path: str = "data/warehouse/weather.duckdb"
@contextmanager
def get_connection(self):
conn = duckdb.connect(self.database_path)
try:
yield conn
finally:
conn.close()
def init_schema(self):
with self.get_connection() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS raw_hourly_weather (
city VARCHAR,
timestamp TIMESTAMP,
temperature_c DOUBLE,
humidity_pct DOUBLE,
precipitation_mm DOUBLE,
PRIMARY KEY (city, timestamp)
)
""")Transform: «dbt» و«medallion architecture»
طبقة التحويل مبنية في «dbt» على ثلاث طبقات. كل طبقة لها وظيفة صغيرة وواضحة.
+==============================================================================+
| TRANSFORMATION LAYERS |
+==============================================================================+
| |
| +-----------------+ +-------------------+ +------------------+ |
| | STAGING | | INTERMEDIATE | | MARTS | |
| | (views) | -> | (views) | -> | (tables) | |
| +-----------------+ +-------------------+ +------------------+ |
| |
| - Clean columns - Add categories - Aggregations |
| - Rename fields - Business logic - Rankings |
| - Extract dates - Derived fields - Analytics-ready |
| |
+==============================================================================+طبقة الـ «staging»
هنا أنظف البيانات وأسمي الحقول بأسماء قابلة للقراءة.
-- models/staging/stg_hourly_weather.sql
select
city,
timestamp as observation_timestamp,
date_trunc('day', timestamp) as observation_date,
extract(hour from timestamp) as hour_of_day,
temperature_c,
humidity_pct as relative_humidity,
precipitation_mm,
wind_speed_kmh,
weather_code
from {{ source('raw', 'raw_hourly_weather') }}طبقة الـ «intermediate»
هذه الطبقة تضيف التصنيفات والمنطق الذي لا تريد تكراره في كل استعلام.
-- models/intermediate/int_hourly_enriched.sql
select
*,
case
when temperature_c < 0 then 'freezing'
when temperature_c < 10 then 'cold'
when temperature_c < 20 then 'mild'
when temperature_c < 30 then 'warm'
else 'hot'
end as temp_category,
case
when precipitation_mm = 0 then 'dry'
when precipitation_mm < 2.5 then 'light'
when precipitation_mm < 7.5 then 'moderate'
else 'heavy'
end as precip_category,
case
when hour_of_day between 6 and 11 then 'morning'
when hour_of_day between 12 and 17 then 'afternoon'
when hour_of_day between 18 and 21 then 'evening'
else 'night'
end as time_of_day
from {{ ref('stg_hourly_weather') }}طبقة الـ «marts»
في النهاية أكتب جداول جاهزة للتحليل. اخترت «tables» بدل «views» حتى تكون الاستعلامات أسرع.
-- models/marts/fct_city_comparison.sql
{{ config(materialized='table') }}
with daily_stats as (
select
city,
avg(temp_avg_c) as avg_temperature,
sum(precipitation_sum_mm) as total_precipitation,
avg(avg_humidity) as avg_humidity,
count(*) as total_days
from {{ ref('fct_daily_weather') }}
group by city
)
select
*,
rank() over (order by avg_temperature desc) as warmest_rank,
rank() over (order by total_precipitation desc) as wettest_rank,
rank() over (order by avg_humidity desc) as humidity_rank
from daily_statsاختبارات جودة البيانات
أضفت 19 اختبارًا آليًا في «dbt». تعمل مع كل تشغيل، قبل أن تصل البيانات إلى جداول الـ «marts».
# models/staging/_schema.yml
version: 2
models:
- name: stg_hourly_weather
description: "Cleaned hourly weather observations"
columns:
- name: city
tests:
- not_null
- name: observation_timestamp
tests:
- not_null
- name: temperature_c
tests:
- not_null
- name: fct_city_comparison
columns:
- name: city
tests:
- not_null
- uniqueمع dbt build لا تمر البيانات السيئة بصمت. إما تنجح الاختبارات، أو يتوقف الخط.
دمج «Dagster» مع «dbt»
قوة «Dagster» هنا أنه لا يعامل «dbt» كأمر خارجي. كل «dbt model» يتحول إلى «asset» داخل «Dagster»، ومعه تتبع تلقائي للعلاقات.
# dagster_project/assets/transform.py
from dagster_dbt import DbtCliResource, dbt_assets, DbtProject
dbt_project = DbtProject(
project_dir=Path(__file__).parent.parent.parent / "dbt_project",
)
@dbt_assets(
manifest=dbt_project.manifest_path,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def dbt_weather_models(
context: AssetExecutionContext,
dbt: DbtCliResource,
):
yield from dbt.cli(["build"], context=context).stream()النتيجة أن نماذج مثل stg_hourly_weather وint_hourly_enriched وfct_daily_weather تظهر في الـ «UI» كـ «assets» يمكن تتبعها وتشغيلها.
الجدولة
جدولت الخط ليعمل يوميًا عند السادسة صباحًا بتوقيت «UTC».
# dagster_project/schedules/daily.py
from dagster import ScheduleDefinition, AssetSelection
daily_weather_schedule = ScheduleDefinition(
name="daily_weather_pipeline",
cron_schedule="0 6 * * *", # 6 AM UTC daily
target=AssetSelection.all(),
default_status=DefaultScheduleStatus.STOPPED,
)عندما يجهز الخط للـ «production»، فعّل الـ «schedule» من واجهة «Dagster».
تشغيل الخط
الخيار الأول: «Docker Compose»
# Start the containers
docker-compose up -d
# Access Dagster UI
open http://localhost:3333
# View logs
docker-compose logs -fالخيار الثاني: تشغيل محلي
# Install dependencies
uv sync
# Start Dagster development server
uv run dagster dev
# Access UI at http://localhost:3000تشغيل الـ «assets»
من واجهة «Dagster»، اضغط Materialize All ليبدأ الخط كاملًا. ستشاهد:
- رسم الـ «asset graph» مع الـ «dependencies».
- سجلات تنفيذ مباشرة لحظة بلحظة.
- «metadata» لكل «asset»: «row counts» و«file paths» و«timestamps».
- تاريخ التشغيل ومدة كل خطوة.
نتائج عيّنة
بعد التشغيل، استعلم من جداول الـ «marts» مباشرة.
-- Query city comparison rankings
SELECT
city,
round(avg_temperature, 1) as avg_temp_c,
round(total_precipitation, 1) as total_precip_mm,
warmest_rank,
wettest_rank
FROM fct_city_comparison
ORDER BY warmest_rank;
-- Results:
-- | city | avg_temp_c | total_precip_mm | warmest_rank | wettest_rank |
-- |----------|------------|-----------------|--------------|--------------|
-- | Dubai | 21.3 | 0.1 | 1 | 4 |
-- | Riyadh | 15.1 | 0.0 | 2 | 3 |
-- | London | 3.1 | 2.4 | 3 | 2 |
-- | New York | -0.8 | 19.6 | 4 | 1 |الخلاصة
هذا المشروع يلخص ما أريده من هندسة البيانات الحديثة: نظام صغير، واضح، وقابل للإعادة.
- التنظيم المبني على الـ «assets» يعطي «lineage» وملاحظة أوضح من «task-based DAGs».
- فصل المسؤوليات: «Dagster» يدير التشغيل، و«dbt» يدير التحويلات.
- تشغيل «idempotent» عبر
INSERT OR REPLACEيسمح بالإعادة من دون خوف. - «Dependency injection» عبر «resources» في «Dagster» يجعل الاختبار أبسط.
- اختبارات جودة البيانات توقف الأخطاء قبل جداول الـ «production».
- صفر بنية تحتية: «DuckDB» يلغي إعداد قاعدة بيانات كاملة.
الـ «code» كامل متاح على GitHub.