بناء «Data Lakehouse» باستخدام «DuckDB» و«dbt»
بنيت «Data Lakehouse» كامل على اللابتوب. بلا «Snowflake». بلا «BigQuery». بلا سيرفر ينتظرني في مكان ما. ملف واحد من «DuckDB»، ومشروع «dbt»، وبيانات «NYC Taxi» تكفي لتفضح أي تصميم كسول.
الفكرة بسيطة. نأخذ بيانات الرحلات، ونمررها على «medallion architecture». طبقة «bronze» تستقبل. طبقة «silver» تنظف وتغني. طبقة «gold» تعطيك جداول جاهزة للتحليل.
في النهاية يصير عندك خط بيانات يعمل محليا، فيه معالجة «incremental»، واختبارات جودة البيانات، و«CI/CD» على «GitHub Actions». الحجم حقيقي. البيانات بالملايين. لكن البنية التحتية؟ صفر.
لماذا «DuckDB» مع «dbt»؟
مستودعات البيانات الكبيرة ممتازة. «Snowflake» و«BigQuery» يعرفون شغلهم. لكنك أحيانا لا تحتاج منصة كاملة عشان تتعلم نمط، أو تبني نموذج أولي، أو تشغل تحليل بحجم معقول.
هنا يدخل «DuckDB». قاعدة بيانات بملف واحد. تقرأ «Parquet» مباشرة. تكتب «SQL» طبيعي. تستخدم «window functions» و«CTEs» وتجميعات بدون لف ودوران.
- بلا بنية تحتية: ملف واحد. لا سيرفر. لا إعدادات طويلة.
- سريع للتحليل: تخزين عمودي مناسب للاستعلامات الثقيلة.
- يتعامل مع «Parquet»: تقرأ الملفات من مكانها.
- «SQL» كامل: دوال، تجميعات، ونوافذ. كلها تعمل.
أضف «dbt» فوقه، وتأخذ الجزء الذي يهم في هندسة البيانات الحديثة: نماذج قابلة للمراجعة، تحولات محفوظة في «git»، توثيق، واختبارات.
تهيئة المشروع
المتطلبات
- «Python» 3.11 أو أحدث.
- مدير حزم «UV»، أو «pip» إذا تفضل الطريق المعروف. «UV» موجود هنا: github.com/astral-sh/uv.
- مساحة قرص تقارب 10 قيقا لملفات البيانات.
التثبيت
انسخ المشروع وثبت الاعتمادات. لا يوجد إعداد مخفي.
# Clone the repository
git clone https://github.com/AlharbiAbdullah/data-lakehouse
cd data-lakehouse
# Install dependencies with UV
uv sync
# Or with pip
pip install -r requirements.txtهيكل المشروع
فصلت جلب البيانات عن التحولات. «Python» يجلب الملفات ويجهز «warehouse». «dbt» يتولى النماذج من «staging» إلى «marts».
data-lakehouse/
|-- data/
| |-- raw/ # Parquet files (gitignored)
| +-- warehouse/ # DuckDB database
|
|-- dbt_project/
| |-- models/
| | |-- staging/ # Bronze layer
| | |-- intermediate/ # Silver layer
| | +-- marts/ # Gold layer
| |-- macros/ # Reusable SQL
| +-- seeds/ # Reference data
|
|-- scripts/
| |-- download_data.py # Fetch NYC Taxi data
| |-- setup_warehouse.py # Initialize DuckDB
| +-- run_pipeline.py # Orchestrate everything
|
+-- .github/workflows/
+-- dbt_ci.yml # CI/CD pipelineالمعمارية: «medallion architecture»
اخترت «medallion architecture» لأنها تفرض عقدا واضحا بين الطبقات. كل طبقة لها وظيفة. وكل طبقة تترك البيانات أنظف من التي قبلها.
+==============================================================================+
| MEDALLION LAYERS |
+==============================================================================+
| |
| +--------------+ +------------------+ +--------------+ |
| | BRONZE | | SILVER | | GOLD | |
| | (staging) | ----> | (intermediate) | ----> | (marts) | |
| +--------------+ +------------------+ +--------------+ |
| |
| Raw ingestion Cleaned & Analytics-ready |
| Type casting Validated Aggregated |
| Hash-based IDs Zone enrichment Incremental |
| Calculated fields Daily metrics |
| |
+==============================================================================+«bronze»: استقبال أولي. تحويل أنواع. توحيد أسماء الأعمدة. وتوليد معرفات ثابتة.
«silver»: تنظيف وإثراء. ربط مع جداول المناطق. حسابات مشتقة. حذف الرحلات غير المنطقية.
«gold»: جداول جاهزة للتحليل. تجميعات يومية. مقاييس تصلح للـ«BI» واللوحات.
طبقة «bronze»: استقبال البيانات
مصادر البيانات
استخدمت بيانات «NYC Taxi & Limousine Commission» ليناير وفبراير ومارس 2024. ليست عينة صغيرة للتجميل. البيانات كبيرة بما يكفي لتظهر مشاكل الأداء، والتكرار، وتأخر السجلات.
- Yellow Taxi: نحو 3 إلى 4 ملايين رحلة شهريا. أغلبها حول مانهاتن.
- Green Taxi: نحو 300 إلى 500 ألف رحلة شهريا. أكثرها خارج المركز.
- FHV: نحو 5 إلى 6 ملايين رحلة شهريا. خدمات شبيهة بالـ«rideshare».
سكربت download_data.py يجلب الملفات بشكل غير متزامن. ثم setup_warehouse.py يجهز ملف «DuckDB».
# Download all data
uv run python scripts/download_data.py
# Set up DuckDB warehouse
uv run python scripts/setup_warehouse.pyنماذج «staging»
كل نوع رحلة له نموذج «staging» خاص. هذا نموذج رحلات التاكسي الأصفر. خذ ملاحظتين: لا أغير أكثر من اللازم، وأولد trip_id من حقول الرحلة نفسها.
{{
config(
materialized='view'
)
}}
with source as (
select * from {{ source('raw', 'yellow_tripdata') }}
),
with_base_hash as (
select
-- Deterministic ID from key fields
{{ generate_trip_id(
'tpep_pickup_datetime',
'tpep_dropoff_datetime',
'PULocationID',
'DOLocationID',
'fare_amount',
'trip_distance',
'passenger_count'
) }} as base_hash,
-- Trip type identifier
'yellow' as trip_type,
-- Timestamps
tpep_pickup_datetime as pickup_datetime,
tpep_dropoff_datetime as dropoff_datetime,
-- Locations
PULocationID as pickup_zone_id,
DOLocationID as dropoff_zone_id,
-- Trip details
cast(passenger_count as integer) as passenger_count,
cast(trip_distance as double) as trip_distance,
-- Fare components
cast(fare_amount as double) as fare_amount,
cast(tip_amount as double) as tip_amount,
cast(total_amount as double) as total_amount,
-- Metadata
current_timestamp as loaded_at
from source
where tpep_pickup_datetime is not null
and tpep_dropoff_datetime is not null
),
staged as (
select
-- Unique trip ID: base_hash + row number for duplicates
base_hash || '_' || cast(row_number() over (
partition by base_hash order by pickup_datetime
) as varchar) as trip_id,
* exclude (base_hash)
from with_base_hash
)
select * from stagedالنمط المهم: معرفات ثابتة
بيانات الرحلات لا تعطيك دائما معرفا نظيفا تعتمد عليه. فأنا صنعت واحدا بنفسي. أستخدم «MD5» على حقول الرحلة الأساسية. الوقت، المناطق، السعر، المسافة، وعدد الركاب.
{% macro generate_trip_id(pickup_datetime, dropoff_datetime, pickup_zone,
dropoff_zone, extra_field_1, extra_field_2, extra_field_3) %}
md5(
coalesce(cast({{ pickup_datetime }} as varchar), '') ||
'|' ||
coalesce(cast({{ dropoff_datetime }} as varchar), '') ||
'|' ||
coalesce(cast({{ pickup_zone }} as varchar), '') ||
'|' ||
coalesce(cast({{ dropoff_zone }} as varchar), '') ||
'|' ||
coalesce(cast({{ extra_field_1 }} as varchar), '') ||
'|' ||
coalesce(cast({{ extra_field_2 }} as varchar), '') ||
'|' ||
coalesce(cast({{ extra_field_3 }} as varchar), '')
)
{% endmacro %}النتيجة مهمة: الرحلة نفسها تأخذ المعرف نفسه في كل تشغيل. هذا يجعل المعالجة «incremental» أسهل، ويجعل المطابقة بين التشغيلات ممكنة.
طبقة «silver»: تنظيف وإثراء
هنا تبدأ البيانات تصير مفيدة. أوحد أنواع الرحلات الثلاثة في جدول واحد عبر int_trips_unioned.sql. بعدها أربطها بمرجع المناطق، وأحسب حقولا لا تأتي جاهزة من المصدر.
{{
config(
materialized='view'
)
}}
with trips as (
select * from {{ ref('int_trips_unioned') }}
),
zones as (
select * from {{ ref('stg_taxi_zones') }}
),
enriched as (
select
t.trip_id,
t.trip_type,
t.pickup_datetime,
t.dropoff_datetime,
t.pickup_zone_id,
t.dropoff_zone_id,
t.passenger_count,
t.trip_distance,
t.fare_amount,
t.tip_amount,
t.total_amount,
t.loaded_at,
-- Pickup zone info
pz.zone_name as pickup_zone_name,
pz.borough as pickup_borough,
-- Dropoff zone info
dz.zone_name as dropoff_zone_name,
dz.borough as dropoff_borough,
-- Calculated fields
datediff('minute', t.pickup_datetime, t.dropoff_datetime)
as trip_duration_minutes,
-- Average speed (mph)
case
when t.trip_distance > 0
and datediff('minute', t.pickup_datetime, t.dropoff_datetime) > 0
then t.trip_distance / (datediff('minute', t.pickup_datetime,
t.dropoff_datetime) / 60.0)
else null
end as avg_speed_mph,
-- Tip percentage
case
when t.fare_amount > 0 and t.tip_amount is not null
then (t.tip_amount / t.fare_amount) * 100
else null
end as tip_percentage
from trips t
left join zones pz on t.pickup_zone_id = pz.zone_id
left join zones dz on t.dropoff_zone_id = dz.zone_id
)
select * from enrichedالتحقق من البيانات
النموذج int_trips_validated.sql يفلتر القيم الشاذة قبل أن تصل إلى «gold». لا أريد رحلة مدتها 14 ساعة تدخل متوسط اليوم.
- مدة الرحلة بين دقيقة و180 دقيقة.
- الأجرة موجبة لرحلات الأصفر والأخضر.
- معرفات المناطق موجودة وصحيحة.
- المسافة صفر أو أكثر.
طبقة «gold»: جاهزة للتحليل
طبقة «gold» ليست مكانا لرمي كل شيء. هي المكان الذي أسأل منه. كم رحلة في اليوم؟ أي منطقة أنشط؟ كيف تغير متوسط السعر؟ لذلك بنيت جداول مجمعة، وأهمها fct_daily_trips.sql.
{{
config(
materialized='incremental',
unique_key=['trip_date', 'pickup_borough', 'trip_type'],
incremental_strategy='merge'
)
}}
with trips as (
select * from {{ ref('int_trips_validated') }}
{% if is_incremental() %}
-- 3-day lookback for late-arriving data
where pickup_datetime >= (
select dateadd('day', -3, max(trip_date))
from {{ this }}
)
{% endif %}
),
daily_aggregates as (
select
cast(date_trunc('day', pickup_datetime) as date) as trip_date,
pickup_borough,
trip_type,
-- Trip counts
count(*) as total_trips,
sum(coalesce(passenger_count, 0)) as total_passengers,
-- Distance metrics
sum(coalesce(trip_distance, 0)) as total_distance_miles,
avg(trip_distance) as avg_distance_miles,
-- Fare metrics
sum(coalesce(fare_amount, 0)) as total_fare,
avg(fare_amount) as avg_fare,
sum(coalesce(tip_amount, 0)) as total_tips,
avg(tip_percentage) as avg_tip_percentage,
-- Duration metrics
avg(trip_duration_minutes) as avg_duration_minutes,
-- Metadata
current_timestamp as updated_at
from trips
where pickup_borough is not null
group by 1, 2, 3
)
select * from daily_aggregatesالنمط المهم: نافذة 3 أيام
لاحظ شرط is_incremental(). بعد أول تشغيل، لا أعيد معالجة كل البيانات. أرجع فقط 3 أيام من آخر تاريخ موجود. هذا يمسك السجلات المتأخرة بدون ما يحرق وقت التشغيل.
بدون هذا النمط عندك خياران سيئان. إما تفوتك بيانات متأخرة، أو تعيد تشغيل كل شيء كل مرة.
اختبارات جودة البيانات
اختبارات «dbt» هي الحاجز قبل انتشار الخطأ. أكتبها في ملفات «YAML»، وأخلي كل طبقة تثبت الحد الأدنى من الصحة قبل أن تعتمد عليها الطبقة التالية.
version: 2
models:
- name: stg_yellow_trips
columns:
- name: trip_id
tests:
- unique
- not_null
- name: pickup_datetime
tests:
- not_null
- name: dropoff_datetime
tests:
- not_null
- name: trip_type
tests:
- accepted_values:
values: ['yellow']تشغيل الاختبارات مباشر:
cd dbt_project
uv run dbt testهذه الاختبارات تمسك المعرفات المكررة، والأوقات الفارغة، وأنواع الرحلات الخاطئة، والقيم الخارجة عن النطاق. الخطأ يوقف هنا.
«CI/CD» على «GitHub Actions»
كل push أو pull request يشغل المسار كامل. في «CI» لا أحتاج كل الشهور. أستخدم عينة يناير، أبني «warehouse»، أشغل «dbt»، ثم أختبر.
name: dbt CI
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
dbt-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install UV
run: curl -LsSf https://astral.sh/uv/install.sh | sh
- name: Install dependencies
run: uv sync
- name: Download January data (CI subset)
run: uv run python scripts/download_data.py --months 1
- name: Setup warehouse
run: uv run python scripts/setup_warehouse.py
- name: Run dbt
run: |
cd dbt_project
uv run dbt deps
uv run dbt seed
uv run dbt run
uv run dbt testتشغيل الخط
إذا أردت تشغيل كل شيء مرة واحدة، استخدم السكربت الرئيسي. وإذا أردت تفهم كل خطوة، شغلها يدوي.
# Full pipeline
uv run python scripts/run_pipeline.py
# Or step by step:
uv run python scripts/download_data.py # ~10GB download
uv run python scripts/setup_warehouse.py
cd dbt_project
uv run dbt deps # Install dbt packages
uv run dbt seed # Load reference data
uv run dbt run # Execute transformations
uv run dbt test # Validate data qualityاستعلامات سريعة
بعد التشغيل، اسأل طبقة «gold». هذا مثال لملخص الرحلات اليومي، ومثال للمناطق الأكثر نشاطا.
-- Daily trip summary by borough
SELECT
trip_date,
pickup_borough,
trip_type,
total_trips,
avg_fare,
avg_duration_minutes
FROM marts.fct_daily_trips
WHERE trip_date >= '2024-01-01'
ORDER BY trip_date, total_trips DESC;
-- Busiest zones
SELECT
z.zone_name,
z.borough,
SUM(m.pickups) as total_pickups
FROM marts.fct_zone_metrics m
JOIN marts.dim_zones z ON m.zone_id = z.zone_id
GROUP BY z.zone_name, z.borough
ORDER BY total_pickups DESC
LIMIT 10;الخلاصة
بنيت «Data Lakehouse» محليا على «DuckDB» و«dbt». البيانات تدخل «bronze»، تنظف في «silver»، وتخرج من «gold» كجداول جاهزة للتحليل.
- «medallion architecture» بعقود واضحة بين الطبقات.
- معرفات ثابتة عبر «MD5» لمعالجة قابلة للإعادة.
- معالجة «incremental» مع نافذة 3 أيام للسجلات المتأخرة.
- اختبارات جودة البيانات في كل طبقة.
- «CI/CD» على «GitHub Actions» للتحقق المستمر.
الكود الكامل موجود على «GitHub». وإذا كبر الحجم، انقل نفس النمط إلى مستودع سحابي. التصميم هو الجزء المهم هنا، لا مكان التشغيل.