Skip to content

Building an ETL Pipeline with Python, Prefect, and DuckDB

7 min read

A deep dive into production-ready data extraction, loading, and orchestration


This is Article 5 in a 7-part series on building a complete analytics platform for Claude Code conversation logs. In Article 4, we explored the high-level architecture of our analytics module, including the medallion pattern (Bronze, Silver, Gold layers) and how dbt transformations create business-ready data models.

Now we are going to get our hands dirty with the Python code that powers the ELT pipeline. We will walk through every module in the analytics/analytics/ package, examining how each component fulfills a single responsibility while working together as a cohesive system.

By the end of this article, you will understand how to:

  • Build type-safe configuration with Pydantic Settings
  • Extract data from MongoDB with incremental high-water-mark tracking
  • Load Parquet files into DuckDB with upsert semantics
  • Orchestrate everything with Prefect flows and tasks
  • Validate data quality with Great Expectations

Let us dive in.


Module Structure Overview

Before examining individual files, let us understand how the package is organized. Each module has a single, clear responsibility:

analytics/
├── analytics/
│ ├── __init__.py # Package initialization, exports
│ ├── config.py # Configuration management (Pydantic)
│ ├── extractor.py # MongoDB → Parquet extraction
│ ├── loader.py # Parquet → DuckDB loading
│ ├── quality.py # Great Expectations integration
│ ├── cli.py # Command-line interface (Typer)
│ └── flows/
│ ├── __init__.py # Flow exports
│ ├── main_pipeline.py # Prefect flow definitions
│ └── deployment.py # Deployment helpers
├── prefect.yaml # Prefect deployment config
└── pyproject.toml # Package metadata

The data flows through these modules in a clear sequence:

Mermaid diagram

The config.py module provides settings to all other modules. The extractor.py pulls from MongoDB and writes Parquet. The loader.py reads Parquet and loads into DuckDB. The flows/ package orchestrates these steps with Prefect. And cli.py provides human-friendly commands for all operations.


Configuration Management with Pydantic Settings

Good configuration management is the foundation of any production system. The config.py module uses Pydantic Settings to provide type-safe, validated configuration with automatic environment variable loading.

Nested Settings Classes

Rather than one monolithic settings class, the configuration is organized into focused, nested classes:

from pydantic import Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
class MongoSettings(BaseSettings):
"""MongoDB source configuration."""
model_config = SettingsConfigDict(
env_prefix="MONGO_",
env_file=".env.analytics",
env_file_encoding="utf-8",
extra="ignore",
)
uri: str = Field(
default="mongodb://localhost:27017",
description="MongoDB connection URI",
)
db: str = Field(
default="claude_logs",
description="Database name containing conversation logs",
)
collection: str = Field(
default="conversations",
description="Collection name with conversation entries",
)

Each settings class has its own environment variable prefix. The MongoSettings class reads MONGO_URI, MONGO_DB, and MONGO_COLLECTION from the environment. This prevents naming collisions and makes configuration self-documenting.

The pattern continues for other concerns:

class DuckDBSettings(BaseSettings):
"""DuckDB target configuration."""
model_config = SettingsConfigDict(env_prefix="DUCKDB_")
path: Path = Field(
default=Path("/duckdb/analytics.db"),
description="Path to DuckDB database file",
)
threads: int = Field(
default=4,
ge=1,
le=32,
description="Number of threads for DuckDB queries",
)

Notice the ge=1, le=32 constraints on threads. Pydantic validates these at load time, failing fast if someone configures an invalid value.

The Main Settings Aggregator

All nested settings come together in the main Settings class:

class Settings(BaseSettings):
"""Main settings class that aggregates all configuration sections."""
model_config = SettingsConfigDict(
env_file=".env.analytics",
env_file_encoding="utf-8",
extra="ignore",
)
# Nested settings
mongo: MongoSettings = Field(default_factory=MongoSettings)
duckdb: DuckDBSettings = Field(default_factory=DuckDBSettings)
data: DataSettings = Field(default_factory=DataSettings)
pipeline: PipelineSettings = Field(default_factory=PipelineSettings)
prefect: PrefectSettings = Field(default_factory=PrefectSettings)
dbt: DbtSettings = Field(default_factory=DbtSettings)
great_expectations: GreatExpectationsSettings = Field(
default_factory=GreatExpectationsSettings
)
logging: LoggingSettings = Field(default_factory=LoggingSettings)
alerting: AlertingSettings = Field(default_factory=AlertingSettings)
def setup(self) -> None:
"""Initialize settings: create directories, configure logging."""
self.data.ensure_directories()

Singleton Pattern with lru_cache

Settings should be loaded once and reused. The get_settings() function uses lru_cache to ensure this:

from functools import lru_cache
@lru_cache
def get_settings() -> Settings:
"""Get cached settings instance."""
settings = Settings()
settings.setup()
return settings

Any module can call get_settings() and receive the same cached instance. This avoids repeated file I/O and ensures consistent configuration across the application.


Extractor Deep Dive: MongoDB to Parquet

The extractor.py module is responsible for pulling data from MongoDB and writing it to Parquet files. This is the “E” in our ELT pipeline. Let us examine its key components.

PyArrow Schema Definition

The extractor defines a strict schema for the output Parquet files:

import pyarrow as pa
CONVERSATION_SCHEMA = pa.schema([
# Primary identifiers
("_id", pa.string()),
("type", pa.string()),
("session_id", pa.string()),
("project_id", pa.string()),
# Timestamps
("timestamp", pa.timestamp("us", tz="UTC")),
("ingested_at", pa.timestamp("us", tz="UTC")),
("extracted_at", pa.timestamp("us", tz="UTC")),
# Message content (flattened)
("message_role", pa.string()),
("message_content", pa.string()),
("message_raw", pa.string()),
# Source tracking
("source_file", pa.string()),
# Partitioning
("date", pa.date32()),
])

Defining the schema upfront has several benefits:

  1. Type safety: PyArrow validates data against the schema at write time
  2. Documentation: The schema serves as a contract between extraction and loading
  3. Performance: DuckDB can read typed Parquet more efficiently than schemaless JSON

Document Transformation

MongoDB documents are flexible, but Parquet needs flat, typed columns. The DocumentTransformer class handles this translation:

class DocumentTransformer:
"""Transforms MongoDB documents into flat structure for Parquet."""
@staticmethod
def flatten_message(message: Any) -> tuple[str | None, str | None, str | None]:
"""Flatten the message field into role, content, and raw JSON."""
if message is None:
return None, None, None
if isinstance(message, str):
return None, message, None
if isinstance(message, dict):
role = message.get("role")
content = message.get("content")
# Handle content that might be a list of blocks
if isinstance(content, list):
text_parts = []
for block in content:
if isinstance(block, dict):
if block.get("type") == "text":
text_parts.append(block.get("text", ""))
elif block.get("type") == "tool_use":
text_parts.append(f"[tool_use: {block.get('name', 'unknown')}]")
content = "\n".join(text_parts) if text_parts else None
raw_json = json.dumps(message, default=str) if message else None
return role, content, raw_json
return None, None, json.dumps(message, default=str)

The transformer handles the three common message formats:

  1. Simple strings: Stored directly as content
  2. Objects with role/content: Extracted to separate columns
  3. Content block arrays: Text blocks joined, tool usage summarized

The original message is also preserved in message_raw for cases where downstream analysis needs the full structure.

High Water Mark Tracking

For incremental extraction, we need to track the last successfully extracted timestamp. The HighWaterMark class manages this:

class HighWaterMark:
"""Tracks last extraction timestamp for incremental extraction."""
def __init__(self, file_path: Path):
self.file_path = file_path
def get(self) -> datetime | None:
"""Get the last extraction timestamp."""
if not self.file_path.exists():
return None
try:
data = json.loads(self.file_path.read_text())
ts = data.get("last_extracted_at")
if ts:
return datetime.fromisoformat(ts)
except (json.JSONDecodeError, ValueError) as e:
logger.warning(f"Failed to read high water mark: {e}")
return None
def set(self, timestamp: datetime) -> None:
"""Set the last extraction timestamp."""
self.file_path.parent.mkdir(parents=True, exist_ok=True)
data = {
"last_extracted_at": timestamp.isoformat(),
"updated_at": datetime.now(timezone.utc).isoformat(),
}
self.file_path.write_text(json.dumps(data, indent=2))

This simple file-based approach provides durability without requiring an external state store. If the pipeline crashes, the next run picks up from where it left off.

The MongoExtractor Class

The main extractor class ties everything together:

class MongoExtractor:
"""Extracts conversation data from MongoDB to Parquet files."""
def __init__(self, settings: Settings | None = None):
self.settings = settings or get_settings()
self.transformer = DocumentTransformer()
self.high_water_mark = HighWaterMark(
self.settings.pipeline.high_water_mark_file
)
self._client: MongoClient | None = None
def extract(
self,
full_backfill: bool = False,
output_dir: Path | None = None,
) -> list[Path]:
"""Extract data from MongoDB and write to Parquet files."""
output_dir = output_dir or self.settings.data.raw_dir
# Determine start time
since = None
if not full_backfill:
since = self.high_water_mark.get()
if since:
logger.info(f"Incremental extraction since {since.isoformat()}")
extracted_at = datetime.now(timezone.utc)
records_by_date: dict[str, list[dict]] = {}
try:
self.connect()
for doc in self._fetch_documents(since=since):
record = self.transformer.transform(doc, extracted_at)
# Group by partition date
date_key = record["date"].isoformat()
if date_key not in records_by_date:
records_by_date[date_key] = []
records_by_date[date_key].append(record)
# Write partitions
for date_key, records in records_by_date.items():
partition_date = datetime.fromisoformat(date_key)
self._write_partition(records, partition_date, output_dir)
finally:
self.disconnect()
return written_files

Key design decisions:

  • Lazy connection: MongoDB client is created on first use
  • Batched writes: Records are grouped by date before writing
  • Partition directories: Output follows Hive-style date=YYYY-MM-DD partitioning
  • Guaranteed cleanup: finally block ensures connection closes even on error

Date-Partitioned Output

The _write_partition method writes records with Snappy compression:

def _write_partition(
self,
records: list[dict],
partition_date: datetime,
output_dir: Path,
) -> Path:
"""Write records to a Parquet file in date-partitioned directory."""
date_str = partition_date.strftime("%Y-%m-%d")
partition_dir = output_dir / f"date={date_str}"
partition_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f")
file_path = partition_dir / f"conversations_{timestamp}.parquet"
table = pa.Table.from_pylist(records, schema=CONVERSATION_SCHEMA)
pq.write_table(
table,
file_path,
compression="snappy",
write_statistics=True,
)
return file_path

Hive-style partitioning (date=2024-01-15/) enables DuckDB to read only relevant partitions when filtering by date, dramatically improving query performance.


Loader Deep Dive: Parquet to DuckDB

The loader.py module handles the “L” in ELT, taking extracted Parquet files and loading them into DuckDB. DuckDB is an excellent choice here because it can read Parquet files natively without intermediate steps.

Schema Creation

The loader creates the target schema and table if they do not exist:

CREATE_RAW_SCHEMA = "CREATE SCHEMA IF NOT EXISTS raw;"
CREATE_CONVERSATIONS_TABLE = """
CREATE TABLE IF NOT EXISTS raw.conversations (
_id VARCHAR PRIMARY KEY,
type VARCHAR,
session_id VARCHAR,
project_id VARCHAR,
timestamp TIMESTAMP WITH TIME ZONE,
ingested_at TIMESTAMP WITH TIME ZONE,
extracted_at TIMESTAMP WITH TIME ZONE,
message_role VARCHAR,
message_content VARCHAR,
message_raw VARCHAR,
source_file VARCHAR,
date DATE
);
"""
CREATE_INDEXES = [
"CREATE INDEX IF NOT EXISTS idx_conversations_project_id ON raw.conversations(project_id);",
"CREATE INDEX IF NOT EXISTS idx_conversations_session_id ON raw.conversations(session_id);",
"CREATE INDEX IF NOT EXISTS idx_conversations_date ON raw.conversations(date);",
"CREATE INDEX IF NOT EXISTS idx_conversations_type ON raw.conversations(type);",
"CREATE INDEX IF NOT EXISTS idx_conversations_timestamp ON raw.conversations(timestamp);",
]

Indexes are created on columns commonly used in WHERE clauses and GROUP BY operations. The _id column serves as the primary key for upsert operations.

Upsert with ON CONFLICT

The magic happens in the load SQL. DuckDB supports PostgreSQL-style upsert syntax:

LOAD_FROM_PARQUET = """
INSERT INTO raw.conversations
SELECT * FROM read_parquet('{parquet_path}', hive_partitioning=true)
ON CONFLICT (_id) DO UPDATE SET
type = EXCLUDED.type,
session_id = EXCLUDED.session_id,
project_id = EXCLUDED.project_id,
timestamp = EXCLUDED.timestamp,
ingested_at = EXCLUDED.ingested_at,
extracted_at = EXCLUDED.extracted_at,
message_role = EXCLUDED.message_role,
message_content = EXCLUDED.message_content,
message_raw = EXCLUDED.message_raw,
source_file = EXCLUDED.source_file,
date = EXCLUDED.date;
"""

This pattern is incredibly powerful:

  1. New records: Inserted normally
  2. Existing records: Updated with new values (idempotent reprocessing)
  3. Native Parquet reading: read_parquet() with hive_partitioning=true understands our directory structure

The DuckDBLoader Class

class DuckDBLoader:
"""Loads Parquet files into DuckDB database."""
def __init__(self, settings: Settings | None = None):
self.settings = settings or get_settings()
self._conn: duckdb.DuckDBPyConnection | None = None
def connect(self) -> duckdb.DuckDBPyConnection:
"""Establish connection to DuckDB database."""
if self._conn is not None:
return self._conn
db_path = self.settings.duckdb.path
db_path.parent.mkdir(parents=True, exist_ok=True)
self._conn = duckdb.connect(str(db_path))
self._conn.execute(f"SET threads TO {self.settings.duckdb.threads};")
return self._conn
def load_from_parquet(
self,
parquet_path: Path | str,
full_refresh: bool = False,
) -> int:
"""Load Parquet files into DuckDB."""
parquet_path = Path(parquet_path)
self.create_database()
if parquet_path.is_dir():
glob_pattern = str(parquet_path / "**" / "*.parquet")
else:
glob_pattern = str(parquet_path)
count_before = self._get_row_count()
if full_refresh:
self.conn.execute("DELETE FROM raw.conversations;")
load_sql = LOAD_FROM_PARQUET.format(parquet_path=glob_pattern)
self.conn.execute(load_sql)
count_after = self._get_row_count()
return count_after - count_before if not full_refresh else count_after

The loader is intentionally simple. DuckDB handles the heavy lifting of reading Parquet, and the upsert pattern handles both initial loads and incremental updates.

Statistics Gathering

The loader also provides diagnostic information:

def get_table_stats(self) -> dict[str, Any]:
"""Get statistics about the loaded data."""
stats = {"row_count": self._get_row_count()}
# Date range
date_range = self.conn.execute("""
SELECT MIN(date), MAX(date), COUNT(DISTINCT date)
FROM raw.conversations
""").fetchone()
if date_range and date_range[0]:
stats["date_range"] = {
"min": str(date_range[0]),
"max": str(date_range[1]),
"count": date_range[2],
}
# Type distribution
type_counts = self.conn.execute("""
SELECT type, COUNT(*) as count
FROM raw.conversations
GROUP BY type
ORDER BY count DESC
""").fetchall()
stats["type_distribution"] = [
{"type": row[0], "count": row[1]}
for row in type_counts
]
return stats

These statistics are invaluable for monitoring pipeline health and understanding data characteristics.


Prefect Flows: Orchestrating the Pipeline

With extraction and loading in place, we need orchestration. Prefect provides a Python-native way to define workflows with built-in retry logic, observability, and scheduling.

Task Definitions

Each pipeline step is a Prefect task with retry configuration:

from prefect import flow, task, get_run_logger
RETRY_DELAYS = [30, 60, 120] # Exponential backoff
@task(
name="extract-mongodb",
description="Extract data from MongoDB to Parquet files",
retries=3,
retry_delay_seconds=RETRY_DELAYS,
)
def extract_task(
full_backfill: bool = False,
since: Optional[datetime] = None,
) -> dict:
"""Extract data from MongoDB and write to Parquet files."""
logger = get_run_logger()
settings = get_settings()
logger.info("Starting MongoDB extraction")
extractor = MongoExtractor(settings=settings)
try:
if full_backfill:
stats = extractor.full_extract()
else:
stats = extractor.incremental_extract(since=since)
return stats
finally:
extractor.disconnect()

The retry configuration uses exponential backoff (30s, 60s, 120s) to handle transient failures gracefully.

Load and Transform Tasks

@task(
name="load-duckdb",
description="Load Parquet files into DuckDB",
retries=3,
retry_delay_seconds=RETRY_DELAYS,
)
def load_task(
extraction_stats: dict,
full_refresh: bool = False,
) -> dict:
"""Load Parquet files into DuckDB."""
logger = get_run_logger()
settings = get_settings()
loader = DuckDBLoader(settings=settings)
try:
loader.create_database()
parquet_path = Path(settings.data.raw_dir)
if full_refresh:
stats = loader.load_from_parquet(str(parquet_path))
else:
stats = loader.upsert_incremental(str(parquet_path))
return {"load_stats": stats, "table_stats": loader.get_table_stats()}
finally:
loader.disconnect()
@task(
name="transform-dbt",
description="Run dbt transformations",
retries=2,
retry_delay_seconds=RETRY_DELAYS,
)
def transform_task(
load_stats: dict,
full_refresh: bool = False,
select: Optional[str] = None,
) -> dict:
"""Run dbt transformations."""
logger = get_run_logger()
settings = get_settings()
cmd = [
"dbt", "build",
"--project-dir", str(settings.dbt.project_dir),
"--profiles-dir", str(settings.dbt.profiles_dir),
"--target", settings.dbt.target,
]
if full_refresh:
cmd.append("--full-refresh")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"dbt build failed: {result.stdout}")
return {"build_success": True, "output": result.stdout}

Notice how load_task takes extraction_stats as a parameter. This creates an explicit dependency in Prefect, ensuring extraction completes before loading begins.

The Main Flow

The flow orchestrates all tasks:

@flow(
name="claude-analytics-pipeline",
description="Main ELT pipeline for Claude conversation analytics",
version="1.0.0",
)
def analytics_pipeline(
full_backfill: bool = False,
full_refresh: bool = False,
skip_extract: bool = False,
skip_load: bool = False,
skip_transform: bool = False,
dbt_select: Optional[str] = None,
) -> dict:
"""Main analytics pipeline orchestrating extract, load, and transform."""
logger = get_run_logger()
results = {}
logger.info("Starting Claude Analytics Pipeline")
# Step 1: Extract
if not skip_extract:
extraction_stats = extract_task(full_backfill=full_backfill)
results["extraction"] = extraction_stats
else:
results["extraction"] = {"skipped": True}
# Step 2: Load
if not skip_load:
load_stats = load_task(
extraction_stats=results["extraction"],
full_refresh=full_refresh,
)
results["load"] = load_stats
else:
results["load"] = {"skipped": True}
# Step 3: Transform
if not skip_transform:
transform_stats = transform_task(
load_stats=results["load"],
full_refresh=full_refresh,
select=dbt_select,
)
results["transform"] = transform_stats
else:
results["transform"] = {"skipped": True}
return results

The flow supports skip flags for partial runs, useful during development or when reprocessing specific stages.

Pipeline Flow Diagram

Mermaid diagram

Deployment Configuration

The prefect.yaml file defines multiple deployment variants:

name: claude-analytics
deployments:
# Hourly incremental pipeline
- name: hourly-analytics
version: "1.0.0"
tags: [analytics, scheduled, hourly]
description: "Hourly incremental analytics pipeline"
entrypoint: analytics/flows/main_pipeline.py:scheduled_pipeline
work_pool:
name: analytics-pool
schedules:
- interval: 3600 # 1 hour
# Daily full refresh at 2 AM
- name: daily-full-refresh
version: "1.0.0"
tags: [analytics, scheduled, daily]
description: "Daily full refresh of analytics (2 AM)"
entrypoint: analytics/flows/main_pipeline.py:analytics_pipeline
work_pool:
name: analytics-pool
schedules:
- cron: "0 2 * * *"
parameters:
full_backfill: false
full_refresh: true
# Manual full backfill
- name: full-backfill
version: "1.0.0"
tags: [analytics, manual, backfill]
description: "Full historical backfill"
entrypoint: analytics/flows/main_pipeline.py:analytics_pipeline
work_pool:
name: analytics-pool
parameters:
full_backfill: true
full_refresh: true

This gives us:

  • Hourly incremental: Keeps data fresh throughout the day
  • Daily full refresh: Rebuilds all dbt models nightly
  • Manual backfill: For initial setup or recovery scenarios

CLI Interface: Developer Experience Matters

The cli.py module provides a polished command-line interface using Typer and Rich. A good CLI makes the difference between a tool developers avoid and one they reach for daily.

Command Structure

import typer
from rich.console import Console
from rich.table import Table
app = typer.Typer(
name="claude-analytics",
help="Claude Analytics Platform - ELT pipeline for conversation logs",
add_completion=False,
)
console = Console()

Available Commands

The CLI exposes six primary commands:

1. config - Display current configuration:

Terminal window
$ claude-analytics config
Current Configuration
MongoDB URI: mongodb://localhost:27017
MongoDB DB: claude_logs
DuckDB Path: /duckdb/analytics.db
Batch Size: 10000

2. extract - Run MongoDB extraction:

Terminal window
$ claude-analytics extract --full-backfill --verbose
MongoDB Extraction
Source: mongodb://localhost:27017/claude_logs.conversations
Mode: Full Backfill
Output: /data/raw
Extraction complete!
Written 5 Parquet file(s):
┌─────────────────────────────────────┬────────────────┐
File Partition
├─────────────────────────────────────┼────────────────┤
conversations_20240115_143022.parq date=2024-01-15│
conversations_20240115_143022.parq date=2024-01-14│
└─────────────────────────────────────┴────────────────┘

3. load - Load into DuckDB:

Terminal window
$ claude-analytics load --stats
DuckDB Loading
Database: /duckdb/analytics.db
Source: /data/raw
Mode: Upsert
Loading complete!
Rows loaded/updated: 15234
Table Statistics:
Total rows: 45892
Date range: 2024-01-01 to 2024-01-15 (15 days)

4. transform - Run dbt models:

Terminal window
$ claude-analytics transform --models "+fct_messages"

5. pipeline - Run the complete ELT pipeline:

Terminal window
$ claude-analytics pipeline --full-backfill --prefect
Claude Analytics Pipeline
Mode: Full Backfill
Refresh: Incremental
Steps: Prefect
Running via Prefect...
Pipeline complete!

6. validate - Run data quality checks:

Terminal window
$ claude-analytics validate --build-docs

Implementation Example

Here is the pipeline command implementation showing how options are handled:

@app.command()
def pipeline(
full_backfill: bool = typer.Option(
False, "--full-backfill",
help="Run full historical backfill",
),
full_refresh: bool = typer.Option(
False, "--full-refresh",
help="Rebuild all data and dbt models",
),
skip_extract: bool = typer.Option(
False, "--skip-extract",
help="Skip extraction step",
),
use_prefect: bool = typer.Option(
False, "--prefect",
help="Run via Prefect orchestration",
),
verbose: bool = typer.Option(
False, "--verbose", "-v",
help="Enable verbose logging",
),
) -> None:
"""Run the complete analytics pipeline."""
setup_logging("DEBUG" if verbose else "INFO")
console.print("[bold blue]Claude Analytics Pipeline[/bold blue]\n")
console.print(f" Mode: {'Full Backfill' if full_backfill else 'Incremental'}")
if use_prefect:
from analytics.flows import analytics_pipeline as prefect_pipeline
result = prefect_pipeline(
full_backfill=full_backfill,
full_refresh=full_refresh,
)
console.print(f"[bold green]Pipeline complete![/bold green]")
else:
# Direct execution without Prefect
# ... step-by-step execution with progress output

The CLI supports both Prefect-orchestrated runs (for production) and direct execution (for development and debugging).


Data Quality with Great Expectations

Data pipelines are only as good as the data they produce. The quality.py module integrates Great Expectations for automated data validation.

The DataQualityValidator Class

class DataQualityValidator:
"""Data quality validator using Great Expectations."""
def __init__(self, ge_project_dir: Path | None = None):
self.settings = get_settings()
self.ge_project_dir = ge_project_dir or Path(
self.settings.great_expectations.project_dir
)
self._context = None
@property
def context(self) -> Any:
"""Get or create Great Expectations data context."""
if self._context is None:
try:
import great_expectations as gx
self._context = gx.get_context(
context_root_dir=str(self.ge_project_dir)
)
except ImportError:
logger.warning("Great Expectations not installed")
return None
return self._context

The validator lazily loads the Great Expectations context, gracefully handling cases where the library is not installed.

Bronze Layer Expectations

The bronze (raw) layer validates fundamental data integrity:

{
"expectation_suite_name": "bronze_expectations",
"expectations": [
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": { "column": "_id" },
"meta": { "notes": "Primary key must not be null" }
},
{
"expectation_type": "expect_column_values_to_be_unique",
"kwargs": { "column": "_id" },
"meta": { "notes": "Primary key must be unique" }
},
{
"expectation_type": "expect_column_values_to_be_in_set",
"kwargs": {
"column": "type",
"value_set": ["user", "assistant", "tool_use", "tool_result"]
},
"meta": { "notes": "Entry types must be valid" }
}
]
}

These checks catch data corruption early, before it propagates through the pipeline.

Silver Layer Expectations

The silver (intermediate) layer validates business logic:

{
"expectation_suite_name": "silver_expectations",
"expectations": [
{
"expectation_type": "expect_column_values_to_be_in_set",
"kwargs": {
"column": "task_category",
"value_set": ["bug_fix", "feature", "refactor", "testing", "documentation", "review", "other"]
}
},
{
"expectation_type": "expect_column_values_to_be_in_set",
"kwargs": {
"column": "time_of_day",
"value_set": ["morning", "afternoon", "evening", "night"]
}
},
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "hour_of_day",
"min_value": 0,
"max_value": 23
}
}
]
}

These expectations verify that dbt transformations produce valid categorizations and derived fields.

Running Validations

def validate_pipeline_data(
validate_bronze: bool = True,
validate_silver: bool = True,
) -> dict[str, Any]:
"""Validate pipeline data."""
validator = DataQualityValidator()
results = {}
if validate_bronze:
results["bronze"] = validator.validate_bronze()
if validate_silver:
results["silver"] = validator.validate_silver()
results["success"] = all(
r.get("success", False) for r in results.values()
if isinstance(r, dict)
)
return results

This function provides a simple interface for validating data at any point in the pipeline.


Putting It All Together

We have covered a lot of ground. Let us see how to use this pipeline in practice.

Initial Setup

Terminal window
# Install the package
cd analytics
pip install -e ".[all]"
# Configure environment
cp .env.analytics.example .env.analytics
# Edit .env.analytics with your MongoDB URI
# Initialize DuckDB schema
claude-analytics load --init-only
# Run initial backfill
claude-analytics pipeline --full-backfill --full-refresh

Daily Operations

Terminal window
# Run incremental update
claude-analytics pipeline
# Check data quality
claude-analytics validate
# View statistics
claude-analytics load --stats

Production Deployment

Terminal window
# Start Prefect and deploy flows
make up
make deploy
# Flows will run on schedule:
# - hourly-analytics: Every hour
# - daily-full-refresh: 2 AM daily
# Trigger manual run
prefect deployment run 'claude-analytics-pipeline/adhoc-analytics'

What We Built

In this article, we explored the Python modules that power our analytics ELT pipeline:

  1. config.py: Type-safe configuration with Pydantic Settings
  2. extractor.py: MongoDB extraction with incremental high-water-mark tracking
  3. loader.py: DuckDB loading with native Parquet support and upsert semantics
  4. flows/: Prefect orchestration with retries and scheduling
  5. cli.py: Developer-friendly command-line interface
  6. quality.py: Great Expectations integration for data validation

Each module follows the single responsibility principle, making the system easy to understand, test, and maintain.

In Article 6, we will dive deep into the dbt transformations that turn this raw data into business-ready analytics models. We will explore the medallion architecture implementation, incremental models, and the fact/dimension tables that power our Metabase dashboards.


Have questions or suggestions? Share your thoughts in the comments below. If you found this useful, follow for more articles in this series.

Suggested Tags: Python, ETL, Data Engineering, Prefect, DuckDB


Written by

Farshad Akbari

Software engineer writing about Java, Kotlin TypeScript, Python, data systems and AI

Keyboard Shortcuts

Navigation

  • Open search ⌘K
  • Next article j
  • Previous article k

Actions

  • Toggle dark mode d
  • Toggle table of contents t
  • Show this help ?
  • Close modal Esc

Shortcuts are disabled when typing in inputs or textareas.