بناء «Data Lakehouse» باستخدام «DuckDB» و«dbt»

15 دقيقة قراءة
Data EngineeringdbtDuckDBTutorial

بنيت «Data Lakehouse» كامل على اللابتوب. بلا «Snowflake». بلا «BigQuery». بلا سيرفر ينتظرني في مكان ما. ملف واحد من «DuckDB»، ومشروع «dbt»، وبيانات «NYC Taxi» تكفي لتفضح أي تصميم كسول.

الفكرة بسيطة. نأخذ بيانات الرحلات، ونمررها على «medallion architecture». طبقة «bronze» تستقبل. طبقة «silver» تنظف وتغني. طبقة «gold» تعطيك جداول جاهزة للتحليل.

في النهاية يصير عندك خط بيانات يعمل محليا، فيه معالجة «incremental»، واختبارات جودة البيانات، و«CI/CD» على «GitHub Actions». الحجم حقيقي. البيانات بالملايين. لكن البنية التحتية؟ صفر.

لماذا «DuckDB» مع «dbt»؟

مستودعات البيانات الكبيرة ممتازة. «Snowflake» و«BigQuery» يعرفون شغلهم. لكنك أحيانا لا تحتاج منصة كاملة عشان تتعلم نمط، أو تبني نموذج أولي، أو تشغل تحليل بحجم معقول.

هنا يدخل «DuckDB». قاعدة بيانات بملف واحد. تقرأ «Parquet» مباشرة. تكتب «SQL» طبيعي. تستخدم «window functions» و«CTEs» وتجميعات بدون لف ودوران.

  • بلا بنية تحتية: ملف واحد. لا سيرفر. لا إعدادات طويلة.
  • سريع للتحليل: تخزين عمودي مناسب للاستعلامات الثقيلة.
  • يتعامل مع «Parquet»: تقرأ الملفات من مكانها.
  • «SQL» كامل: دوال، تجميعات، ونوافذ. كلها تعمل.

أضف «dbt» فوقه، وتأخذ الجزء الذي يهم في هندسة البيانات الحديثة: نماذج قابلة للمراجعة، تحولات محفوظة في «git»، توثيق، واختبارات.

تهيئة المشروع

المتطلبات

  • «Python» 3.11 أو أحدث.
  • مدير حزم «UV»، أو «pip» إذا تفضل الطريق المعروف. «UV» موجود هنا: github.com/astral-sh/uv.
  • مساحة قرص تقارب 10 قيقا لملفات البيانات.

التثبيت

انسخ المشروع وثبت الاعتمادات. لا يوجد إعداد مخفي.

bash
# Clone the repository
git clone https://github.com/AlharbiAbdullah/data-lakehouse
cd data-lakehouse

# Install dependencies with UV
uv sync

# Or with pip
pip install -r requirements.txt

هيكل المشروع

فصلت جلب البيانات عن التحولات. «Python» يجلب الملفات ويجهز «warehouse». «dbt» يتولى النماذج من «staging» إلى «marts».

text
data-lakehouse/
|-- data/
|   |-- raw/                    # Parquet files (gitignored)
|   +-- warehouse/              # DuckDB database
|
|-- dbt_project/
|   |-- models/
|   |   |-- staging/            # Bronze layer
|   |   |-- intermediate/       # Silver layer
|   |   +-- marts/              # Gold layer
|   |-- macros/                 # Reusable SQL
|   +-- seeds/                  # Reference data
|
|-- scripts/
|   |-- download_data.py        # Fetch NYC Taxi data
|   |-- setup_warehouse.py      # Initialize DuckDB
|   +-- run_pipeline.py         # Orchestrate everything
|
+-- .github/workflows/
    +-- dbt_ci.yml              # CI/CD pipeline

المعمارية: «medallion architecture»

اخترت «medallion architecture» لأنها تفرض عقدا واضحا بين الطبقات. كل طبقة لها وظيفة. وكل طبقة تترك البيانات أنظف من التي قبلها.

text
+==============================================================================+
|                            MEDALLION LAYERS                                  |
+==============================================================================+
|                                                                              |
|    +--------------+       +------------------+       +--------------+        |
|    |    BRONZE    |       |      SILVER      |       |     GOLD     |        |
|    |   (staging)  | ----> |  (intermediate)  | ----> |    (marts)   |        |
|    +--------------+       +------------------+       +--------------+        |
|                                                                              |
|     Raw ingestion          Cleaned &                  Analytics-ready        |
|     Type casting           Validated                  Aggregated             |
|     Hash-based IDs         Zone enrichment            Incremental            |
|                            Calculated fields          Daily metrics          |
|                                                                              |
+==============================================================================+

«bronze»: استقبال أولي. تحويل أنواع. توحيد أسماء الأعمدة. وتوليد معرفات ثابتة.

«silver»: تنظيف وإثراء. ربط مع جداول المناطق. حسابات مشتقة. حذف الرحلات غير المنطقية.

«gold»: جداول جاهزة للتحليل. تجميعات يومية. مقاييس تصلح للـ«BI» واللوحات.

طبقة «bronze»: استقبال البيانات

مصادر البيانات

استخدمت بيانات «NYC Taxi & Limousine Commission» ليناير وفبراير ومارس 2024. ليست عينة صغيرة للتجميل. البيانات كبيرة بما يكفي لتظهر مشاكل الأداء، والتكرار، وتأخر السجلات.

  • Yellow Taxi: نحو 3 إلى 4 ملايين رحلة شهريا. أغلبها حول مانهاتن.
  • Green Taxi: نحو 300 إلى 500 ألف رحلة شهريا. أكثرها خارج المركز.
  • FHV: نحو 5 إلى 6 ملايين رحلة شهريا. خدمات شبيهة بالـ«rideshare».

سكربت download_data.py يجلب الملفات بشكل غير متزامن. ثم setup_warehouse.py يجهز ملف «DuckDB».

bash
# Download all data
uv run python scripts/download_data.py

# Set up DuckDB warehouse
uv run python scripts/setup_warehouse.py

نماذج «staging»

كل نوع رحلة له نموذج «staging» خاص. هذا نموذج رحلات التاكسي الأصفر. خذ ملاحظتين: لا أغير أكثر من اللازم، وأولد trip_id من حقول الرحلة نفسها.

stg_yellow_trips.sql
{{
    config(
        materialized='view'
    )
}}

with source as (
    select * from {{ source('raw', 'yellow_tripdata') }}
),

with_base_hash as (
    select
        -- Deterministic ID from key fields
        {{ generate_trip_id(
            'tpep_pickup_datetime',
            'tpep_dropoff_datetime',
            'PULocationID',
            'DOLocationID',
            'fare_amount',
            'trip_distance',
            'passenger_count'
        ) }} as base_hash,

        -- Trip type identifier
        'yellow' as trip_type,

        -- Timestamps
        tpep_pickup_datetime as pickup_datetime,
        tpep_dropoff_datetime as dropoff_datetime,

        -- Locations
        PULocationID as pickup_zone_id,
        DOLocationID as dropoff_zone_id,

        -- Trip details
        cast(passenger_count as integer) as passenger_count,
        cast(trip_distance as double) as trip_distance,

        -- Fare components
        cast(fare_amount as double) as fare_amount,
        cast(tip_amount as double) as tip_amount,
        cast(total_amount as double) as total_amount,

        -- Metadata
        current_timestamp as loaded_at

    from source
    where tpep_pickup_datetime is not null
      and tpep_dropoff_datetime is not null
),

staged as (
    select
        -- Unique trip ID: base_hash + row number for duplicates
        base_hash || '_' || cast(row_number() over (
            partition by base_hash order by pickup_datetime
        ) as varchar) as trip_id,
        * exclude (base_hash)
    from with_base_hash
)

select * from staged

النمط المهم: معرفات ثابتة

بيانات الرحلات لا تعطيك دائما معرفا نظيفا تعتمد عليه. فأنا صنعت واحدا بنفسي. أستخدم «MD5» على حقول الرحلة الأساسية. الوقت، المناطق، السعر، المسافة، وعدد الركاب.

macros/generate_trip_id.sql
{% macro generate_trip_id(pickup_datetime, dropoff_datetime, pickup_zone,
                          dropoff_zone, extra_field_1, extra_field_2, extra_field_3) %}
    md5(
        coalesce(cast({{ pickup_datetime }} as varchar), '') ||
        '|' ||
        coalesce(cast({{ dropoff_datetime }} as varchar), '') ||
        '|' ||
        coalesce(cast({{ pickup_zone }} as varchar), '') ||
        '|' ||
        coalesce(cast({{ dropoff_zone }} as varchar), '') ||
        '|' ||
        coalesce(cast({{ extra_field_1 }} as varchar), '') ||
        '|' ||
        coalesce(cast({{ extra_field_2 }} as varchar), '') ||
        '|' ||
        coalesce(cast({{ extra_field_3 }} as varchar), '')
    )
{% endmacro %}

النتيجة مهمة: الرحلة نفسها تأخذ المعرف نفسه في كل تشغيل. هذا يجعل المعالجة «incremental» أسهل، ويجعل المطابقة بين التشغيلات ممكنة.

طبقة «silver»: تنظيف وإثراء

هنا تبدأ البيانات تصير مفيدة. أوحد أنواع الرحلات الثلاثة في جدول واحد عبر int_trips_unioned.sql. بعدها أربطها بمرجع المناطق، وأحسب حقولا لا تأتي جاهزة من المصدر.

int_trips_enriched.sql
{{
    config(
        materialized='view'
    )
}}

with trips as (
    select * from {{ ref('int_trips_unioned') }}
),

zones as (
    select * from {{ ref('stg_taxi_zones') }}
),

enriched as (
    select
        t.trip_id,
        t.trip_type,
        t.pickup_datetime,
        t.dropoff_datetime,
        t.pickup_zone_id,
        t.dropoff_zone_id,
        t.passenger_count,
        t.trip_distance,
        t.fare_amount,
        t.tip_amount,
        t.total_amount,
        t.loaded_at,

        -- Pickup zone info
        pz.zone_name as pickup_zone_name,
        pz.borough as pickup_borough,

        -- Dropoff zone info
        dz.zone_name as dropoff_zone_name,
        dz.borough as dropoff_borough,

        -- Calculated fields
        datediff('minute', t.pickup_datetime, t.dropoff_datetime)
            as trip_duration_minutes,

        -- Average speed (mph)
        case
            when t.trip_distance > 0
                 and datediff('minute', t.pickup_datetime, t.dropoff_datetime) > 0
            then t.trip_distance / (datediff('minute', t.pickup_datetime,
                                             t.dropoff_datetime) / 60.0)
            else null
        end as avg_speed_mph,

        -- Tip percentage
        case
            when t.fare_amount > 0 and t.tip_amount is not null
            then (t.tip_amount / t.fare_amount) * 100
            else null
        end as tip_percentage

    from trips t
    left join zones pz on t.pickup_zone_id = pz.zone_id
    left join zones dz on t.dropoff_zone_id = dz.zone_id
)

select * from enriched

التحقق من البيانات

النموذج int_trips_validated.sql يفلتر القيم الشاذة قبل أن تصل إلى «gold». لا أريد رحلة مدتها 14 ساعة تدخل متوسط اليوم.

  • مدة الرحلة بين دقيقة و180 دقيقة.
  • الأجرة موجبة لرحلات الأصفر والأخضر.
  • معرفات المناطق موجودة وصحيحة.
  • المسافة صفر أو أكثر.

طبقة «gold»: جاهزة للتحليل

طبقة «gold» ليست مكانا لرمي كل شيء. هي المكان الذي أسأل منه. كم رحلة في اليوم؟ أي منطقة أنشط؟ كيف تغير متوسط السعر؟ لذلك بنيت جداول مجمعة، وأهمها fct_daily_trips.sql.

fct_daily_trips.sql
{{
    config(
        materialized='incremental',
        unique_key=['trip_date', 'pickup_borough', 'trip_type'],
        incremental_strategy='merge'
    )
}}

with trips as (
    select * from {{ ref('int_trips_validated') }}

    {% if is_incremental() %}
    -- 3-day lookback for late-arriving data
    where pickup_datetime >= (
        select dateadd('day', -3, max(trip_date))
        from {{ this }}
    )
    {% endif %}
),

daily_aggregates as (
    select
        cast(date_trunc('day', pickup_datetime) as date) as trip_date,
        pickup_borough,
        trip_type,

        -- Trip counts
        count(*) as total_trips,
        sum(coalesce(passenger_count, 0)) as total_passengers,

        -- Distance metrics
        sum(coalesce(trip_distance, 0)) as total_distance_miles,
        avg(trip_distance) as avg_distance_miles,

        -- Fare metrics
        sum(coalesce(fare_amount, 0)) as total_fare,
        avg(fare_amount) as avg_fare,
        sum(coalesce(tip_amount, 0)) as total_tips,
        avg(tip_percentage) as avg_tip_percentage,

        -- Duration metrics
        avg(trip_duration_minutes) as avg_duration_minutes,

        -- Metadata
        current_timestamp as updated_at

    from trips
    where pickup_borough is not null
    group by 1, 2, 3
)

select * from daily_aggregates

النمط المهم: نافذة 3 أيام

لاحظ شرط is_incremental(). بعد أول تشغيل، لا أعيد معالجة كل البيانات. أرجع فقط 3 أيام من آخر تاريخ موجود. هذا يمسك السجلات المتأخرة بدون ما يحرق وقت التشغيل.

بدون هذا النمط عندك خياران سيئان. إما تفوتك بيانات متأخرة، أو تعيد تشغيل كل شيء كل مرة.

اختبارات جودة البيانات

اختبارات «dbt» هي الحاجز قبل انتشار الخطأ. أكتبها في ملفات «YAML»، وأخلي كل طبقة تثبت الحد الأدنى من الصحة قبل أن تعتمد عليها الطبقة التالية.

_staging.yml
version: 2

models:
  - name: stg_yellow_trips
    columns:
      - name: trip_id
        tests:
          - unique
          - not_null
      - name: pickup_datetime
        tests:
          - not_null
      - name: dropoff_datetime
        tests:
          - not_null
      - name: trip_type
        tests:
          - accepted_values:
              values: ['yellow']

تشغيل الاختبارات مباشر:

bash
cd dbt_project
uv run dbt test

هذه الاختبارات تمسك المعرفات المكررة، والأوقات الفارغة، وأنواع الرحلات الخاطئة، والقيم الخارجة عن النطاق. الخطأ يوقف هنا.

«CI/CD» على «GitHub Actions»

كل push أو pull request يشغل المسار كامل. في «CI» لا أحتاج كل الشهور. أستخدم عينة يناير، أبني «warehouse»، أشغل «dbt»، ثم أختبر.

.github/workflows/dbt_ci.yml
name: dbt CI

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  dbt-test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install UV
        run: curl -LsSf https://astral.sh/uv/install.sh | sh

      - name: Install dependencies
        run: uv sync

      - name: Download January data (CI subset)
        run: uv run python scripts/download_data.py --months 1

      - name: Setup warehouse
        run: uv run python scripts/setup_warehouse.py

      - name: Run dbt
        run: |
          cd dbt_project
          uv run dbt deps
          uv run dbt seed
          uv run dbt run
          uv run dbt test

تشغيل الخط

إذا أردت تشغيل كل شيء مرة واحدة، استخدم السكربت الرئيسي. وإذا أردت تفهم كل خطوة، شغلها يدوي.

bash
# Full pipeline
uv run python scripts/run_pipeline.py

# Or step by step:
uv run python scripts/download_data.py   # ~10GB download
uv run python scripts/setup_warehouse.py

cd dbt_project
uv run dbt deps     # Install dbt packages
uv run dbt seed     # Load reference data
uv run dbt run      # Execute transformations
uv run dbt test     # Validate data quality

استعلامات سريعة

بعد التشغيل، اسأل طبقة «gold». هذا مثال لملخص الرحلات اليومي، ومثال للمناطق الأكثر نشاطا.

sql
-- Daily trip summary by borough
SELECT
    trip_date,
    pickup_borough,
    trip_type,
    total_trips,
    avg_fare,
    avg_duration_minutes
FROM marts.fct_daily_trips
WHERE trip_date >= '2024-01-01'
ORDER BY trip_date, total_trips DESC;

-- Busiest zones
SELECT
    z.zone_name,
    z.borough,
    SUM(m.pickups) as total_pickups
FROM marts.fct_zone_metrics m
JOIN marts.dim_zones z ON m.zone_id = z.zone_id
GROUP BY z.zone_name, z.borough
ORDER BY total_pickups DESC
LIMIT 10;

الخلاصة

بنيت «Data Lakehouse» محليا على «DuckDB» و«dbt». البيانات تدخل «bronze»، تنظف في «silver»، وتخرج من «gold» كجداول جاهزة للتحليل.

  • «medallion architecture» بعقود واضحة بين الطبقات.
  • معرفات ثابتة عبر «MD5» لمعالجة قابلة للإعادة.
  • معالجة «incremental» مع نافذة 3 أيام للسجلات المتأخرة.
  • اختبارات جودة البيانات في كل طبقة.
  • «CI/CD» على «GitHub Actions» للتحقق المستمر.

الكود الكامل موجود على «GitHub». وإذا كبر الحجم، انقل نفس النمط إلى مستودع سحابي. التصميم هو الجزء المهم هنا، لا مكان التشغيل.