Building an ETL Pipeline with Python, Prefect, and DuckDB
A deep dive into production-ready data extraction, loading, and orchestration
This is Article 5 in a 7-part series on building a complete analytics platform for Claude Code conversation logs. In Article 4, we explored the high-level architecture of our analytics module, including the medallion pattern (Bronze, Silver, Gold layers) and how dbt transformations create business-ready data models.
Now we are going to get our hands dirty with the Python code that powers the ELT pipeline. We will walk through every module in the analytics/analytics/ package, examining how each component fulfills a single responsibility while working together as a cohesive system.
By the end of this article, you will understand how to:
- Build type-safe configuration with Pydantic Settings
- Extract data from MongoDB with incremental high-water-mark tracking
- Load Parquet files into DuckDB with upsert semantics
- Orchestrate everything with Prefect flows and tasks
- Validate data quality with Great Expectations
Let us dive in.
Module Structure Overview
Before examining individual files, let us understand how the package is organized. Each module has a single, clear responsibility:
analytics/├── analytics/│ ├── __init__.py # Package initialization, exports│ ├── config.py # Configuration management (Pydantic)│ ├── extractor.py # MongoDB → Parquet extraction│ ├── loader.py # Parquet → DuckDB loading│ ├── quality.py # Great Expectations integration│ ├── cli.py # Command-line interface (Typer)│ └── flows/│ ├── __init__.py # Flow exports│ ├── main_pipeline.py # Prefect flow definitions│ └── deployment.py # Deployment helpers├── prefect.yaml # Prefect deployment config└── pyproject.toml # Package metadataThe data flows through these modules in a clear sequence:
The config.py module provides settings to all other modules. The extractor.py pulls from MongoDB and writes Parquet. The loader.py reads Parquet and loads into DuckDB. The flows/ package orchestrates these steps with Prefect. And cli.py provides human-friendly commands for all operations.
Configuration Management with Pydantic Settings
Good configuration management is the foundation of any production system. The config.py module uses Pydantic Settings to provide type-safe, validated configuration with automatic environment variable loading.
Nested Settings Classes
Rather than one monolithic settings class, the configuration is organized into focused, nested classes:
from pydantic import Field, field_validatorfrom pydantic_settings import BaseSettings, SettingsConfigDict
class MongoSettings(BaseSettings): """MongoDB source configuration."""
model_config = SettingsConfigDict( env_prefix="MONGO_", env_file=".env.analytics", env_file_encoding="utf-8", extra="ignore", )
uri: str = Field( default="mongodb://localhost:27017", description="MongoDB connection URI", ) db: str = Field( default="claude_logs", description="Database name containing conversation logs", ) collection: str = Field( default="conversations", description="Collection name with conversation entries", )Each settings class has its own environment variable prefix. The MongoSettings class reads MONGO_URI, MONGO_DB, and MONGO_COLLECTION from the environment. This prevents naming collisions and makes configuration self-documenting.
The pattern continues for other concerns:
class DuckDBSettings(BaseSettings): """DuckDB target configuration."""
model_config = SettingsConfigDict(env_prefix="DUCKDB_")
path: Path = Field( default=Path("/duckdb/analytics.db"), description="Path to DuckDB database file", ) threads: int = Field( default=4, ge=1, le=32, description="Number of threads for DuckDB queries", )Notice the ge=1, le=32 constraints on threads. Pydantic validates these at load time, failing fast if someone configures an invalid value.
The Main Settings Aggregator
All nested settings come together in the main Settings class:
class Settings(BaseSettings): """Main settings class that aggregates all configuration sections."""
model_config = SettingsConfigDict( env_file=".env.analytics", env_file_encoding="utf-8", extra="ignore", )
# Nested settings mongo: MongoSettings = Field(default_factory=MongoSettings) duckdb: DuckDBSettings = Field(default_factory=DuckDBSettings) data: DataSettings = Field(default_factory=DataSettings) pipeline: PipelineSettings = Field(default_factory=PipelineSettings) prefect: PrefectSettings = Field(default_factory=PrefectSettings) dbt: DbtSettings = Field(default_factory=DbtSettings) great_expectations: GreatExpectationsSettings = Field( default_factory=GreatExpectationsSettings ) logging: LoggingSettings = Field(default_factory=LoggingSettings) alerting: AlertingSettings = Field(default_factory=AlertingSettings)
def setup(self) -> None: """Initialize settings: create directories, configure logging.""" self.data.ensure_directories()Singleton Pattern with lru_cache
Settings should be loaded once and reused. The get_settings() function uses lru_cache to ensure this:
from functools import lru_cache
@lru_cachedef get_settings() -> Settings: """Get cached settings instance.""" settings = Settings() settings.setup() return settingsAny module can call get_settings() and receive the same cached instance. This avoids repeated file I/O and ensures consistent configuration across the application.
Extractor Deep Dive: MongoDB to Parquet
The extractor.py module is responsible for pulling data from MongoDB and writing it to Parquet files. This is the “E” in our ELT pipeline. Let us examine its key components.
PyArrow Schema Definition
The extractor defines a strict schema for the output Parquet files:
import pyarrow as pa
CONVERSATION_SCHEMA = pa.schema([ # Primary identifiers ("_id", pa.string()), ("type", pa.string()), ("session_id", pa.string()), ("project_id", pa.string()),
# Timestamps ("timestamp", pa.timestamp("us", tz="UTC")), ("ingested_at", pa.timestamp("us", tz="UTC")), ("extracted_at", pa.timestamp("us", tz="UTC")),
# Message content (flattened) ("message_role", pa.string()), ("message_content", pa.string()), ("message_raw", pa.string()),
# Source tracking ("source_file", pa.string()),
# Partitioning ("date", pa.date32()),])Defining the schema upfront has several benefits:
- Type safety: PyArrow validates data against the schema at write time
- Documentation: The schema serves as a contract between extraction and loading
- Performance: DuckDB can read typed Parquet more efficiently than schemaless JSON
Document Transformation
MongoDB documents are flexible, but Parquet needs flat, typed columns. The DocumentTransformer class handles this translation:
class DocumentTransformer: """Transforms MongoDB documents into flat structure for Parquet."""
@staticmethod def flatten_message(message: Any) -> tuple[str | None, str | None, str | None]: """Flatten the message field into role, content, and raw JSON.""" if message is None: return None, None, None
if isinstance(message, str): return None, message, None
if isinstance(message, dict): role = message.get("role") content = message.get("content")
# Handle content that might be a list of blocks if isinstance(content, list): text_parts = [] for block in content: if isinstance(block, dict): if block.get("type") == "text": text_parts.append(block.get("text", "")) elif block.get("type") == "tool_use": text_parts.append(f"[tool_use: {block.get('name', 'unknown')}]") content = "\n".join(text_parts) if text_parts else None
raw_json = json.dumps(message, default=str) if message else None return role, content, raw_json
return None, None, json.dumps(message, default=str)The transformer handles the three common message formats:
- Simple strings: Stored directly as content
- Objects with role/content: Extracted to separate columns
- Content block arrays: Text blocks joined, tool usage summarized
The original message is also preserved in message_raw for cases where downstream analysis needs the full structure.
High Water Mark Tracking
For incremental extraction, we need to track the last successfully extracted timestamp. The HighWaterMark class manages this:
class HighWaterMark: """Tracks last extraction timestamp for incremental extraction."""
def __init__(self, file_path: Path): self.file_path = file_path
def get(self) -> datetime | None: """Get the last extraction timestamp.""" if not self.file_path.exists(): return None
try: data = json.loads(self.file_path.read_text()) ts = data.get("last_extracted_at") if ts: return datetime.fromisoformat(ts) except (json.JSONDecodeError, ValueError) as e: logger.warning(f"Failed to read high water mark: {e}")
return None
def set(self, timestamp: datetime) -> None: """Set the last extraction timestamp.""" self.file_path.parent.mkdir(parents=True, exist_ok=True) data = { "last_extracted_at": timestamp.isoformat(), "updated_at": datetime.now(timezone.utc).isoformat(), } self.file_path.write_text(json.dumps(data, indent=2))This simple file-based approach provides durability without requiring an external state store. If the pipeline crashes, the next run picks up from where it left off.
The MongoExtractor Class
The main extractor class ties everything together:
class MongoExtractor: """Extracts conversation data from MongoDB to Parquet files."""
def __init__(self, settings: Settings | None = None): self.settings = settings or get_settings() self.transformer = DocumentTransformer() self.high_water_mark = HighWaterMark( self.settings.pipeline.high_water_mark_file ) self._client: MongoClient | None = None
def extract( self, full_backfill: bool = False, output_dir: Path | None = None, ) -> list[Path]: """Extract data from MongoDB and write to Parquet files.""" output_dir = output_dir or self.settings.data.raw_dir
# Determine start time since = None if not full_backfill: since = self.high_water_mark.get() if since: logger.info(f"Incremental extraction since {since.isoformat()}")
extracted_at = datetime.now(timezone.utc) records_by_date: dict[str, list[dict]] = {}
try: self.connect()
for doc in self._fetch_documents(since=since): record = self.transformer.transform(doc, extracted_at)
# Group by partition date date_key = record["date"].isoformat() if date_key not in records_by_date: records_by_date[date_key] = [] records_by_date[date_key].append(record)
# Write partitions for date_key, records in records_by_date.items(): partition_date = datetime.fromisoformat(date_key) self._write_partition(records, partition_date, output_dir)
finally: self.disconnect()
return written_filesKey design decisions:
- Lazy connection: MongoDB client is created on first use
- Batched writes: Records are grouped by date before writing
- Partition directories: Output follows Hive-style
date=YYYY-MM-DDpartitioning - Guaranteed cleanup:
finallyblock ensures connection closes even on error
Date-Partitioned Output
The _write_partition method writes records with Snappy compression:
def _write_partition( self, records: list[dict], partition_date: datetime, output_dir: Path,) -> Path: """Write records to a Parquet file in date-partitioned directory.""" date_str = partition_date.strftime("%Y-%m-%d") partition_dir = output_dir / f"date={date_str}" partition_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f") file_path = partition_dir / f"conversations_{timestamp}.parquet"
table = pa.Table.from_pylist(records, schema=CONVERSATION_SCHEMA) pq.write_table( table, file_path, compression="snappy", write_statistics=True, )
return file_pathHive-style partitioning (date=2024-01-15/) enables DuckDB to read only relevant partitions when filtering by date, dramatically improving query performance.
Loader Deep Dive: Parquet to DuckDB
The loader.py module handles the “L” in ELT, taking extracted Parquet files and loading them into DuckDB. DuckDB is an excellent choice here because it can read Parquet files natively without intermediate steps.
Schema Creation
The loader creates the target schema and table if they do not exist:
CREATE_RAW_SCHEMA = "CREATE SCHEMA IF NOT EXISTS raw;"
CREATE_CONVERSATIONS_TABLE = """CREATE TABLE IF NOT EXISTS raw.conversations ( _id VARCHAR PRIMARY KEY, type VARCHAR, session_id VARCHAR, project_id VARCHAR, timestamp TIMESTAMP WITH TIME ZONE, ingested_at TIMESTAMP WITH TIME ZONE, extracted_at TIMESTAMP WITH TIME ZONE, message_role VARCHAR, message_content VARCHAR, message_raw VARCHAR, source_file VARCHAR, date DATE);"""
CREATE_INDEXES = [ "CREATE INDEX IF NOT EXISTS idx_conversations_project_id ON raw.conversations(project_id);", "CREATE INDEX IF NOT EXISTS idx_conversations_session_id ON raw.conversations(session_id);", "CREATE INDEX IF NOT EXISTS idx_conversations_date ON raw.conversations(date);", "CREATE INDEX IF NOT EXISTS idx_conversations_type ON raw.conversations(type);", "CREATE INDEX IF NOT EXISTS idx_conversations_timestamp ON raw.conversations(timestamp);",]Indexes are created on columns commonly used in WHERE clauses and GROUP BY operations. The _id column serves as the primary key for upsert operations.
Upsert with ON CONFLICT
The magic happens in the load SQL. DuckDB supports PostgreSQL-style upsert syntax:
LOAD_FROM_PARQUET = """INSERT INTO raw.conversationsSELECT * FROM read_parquet('{parquet_path}', hive_partitioning=true)ON CONFLICT (_id) DO UPDATE SET type = EXCLUDED.type, session_id = EXCLUDED.session_id, project_id = EXCLUDED.project_id, timestamp = EXCLUDED.timestamp, ingested_at = EXCLUDED.ingested_at, extracted_at = EXCLUDED.extracted_at, message_role = EXCLUDED.message_role, message_content = EXCLUDED.message_content, message_raw = EXCLUDED.message_raw, source_file = EXCLUDED.source_file, date = EXCLUDED.date;"""This pattern is incredibly powerful:
- New records: Inserted normally
- Existing records: Updated with new values (idempotent reprocessing)
- Native Parquet reading:
read_parquet()withhive_partitioning=trueunderstands our directory structure
The DuckDBLoader Class
class DuckDBLoader: """Loads Parquet files into DuckDB database."""
def __init__(self, settings: Settings | None = None): self.settings = settings or get_settings() self._conn: duckdb.DuckDBPyConnection | None = None
def connect(self) -> duckdb.DuckDBPyConnection: """Establish connection to DuckDB database.""" if self._conn is not None: return self._conn
db_path = self.settings.duckdb.path db_path.parent.mkdir(parents=True, exist_ok=True)
self._conn = duckdb.connect(str(db_path)) self._conn.execute(f"SET threads TO {self.settings.duckdb.threads};")
return self._conn
def load_from_parquet( self, parquet_path: Path | str, full_refresh: bool = False, ) -> int: """Load Parquet files into DuckDB.""" parquet_path = Path(parquet_path)
self.create_database()
if parquet_path.is_dir(): glob_pattern = str(parquet_path / "**" / "*.parquet") else: glob_pattern = str(parquet_path)
count_before = self._get_row_count()
if full_refresh: self.conn.execute("DELETE FROM raw.conversations;")
load_sql = LOAD_FROM_PARQUET.format(parquet_path=glob_pattern) self.conn.execute(load_sql)
count_after = self._get_row_count() return count_after - count_before if not full_refresh else count_afterThe loader is intentionally simple. DuckDB handles the heavy lifting of reading Parquet, and the upsert pattern handles both initial loads and incremental updates.
Statistics Gathering
The loader also provides diagnostic information:
def get_table_stats(self) -> dict[str, Any]: """Get statistics about the loaded data.""" stats = {"row_count": self._get_row_count()}
# Date range date_range = self.conn.execute(""" SELECT MIN(date), MAX(date), COUNT(DISTINCT date) FROM raw.conversations """).fetchone()
if date_range and date_range[0]: stats["date_range"] = { "min": str(date_range[0]), "max": str(date_range[1]), "count": date_range[2], }
# Type distribution type_counts = self.conn.execute(""" SELECT type, COUNT(*) as count FROM raw.conversations GROUP BY type ORDER BY count DESC """).fetchall()
stats["type_distribution"] = [ {"type": row[0], "count": row[1]} for row in type_counts ]
return statsThese statistics are invaluable for monitoring pipeline health and understanding data characteristics.
Prefect Flows: Orchestrating the Pipeline
With extraction and loading in place, we need orchestration. Prefect provides a Python-native way to define workflows with built-in retry logic, observability, and scheduling.
Task Definitions
Each pipeline step is a Prefect task with retry configuration:
from prefect import flow, task, get_run_logger
RETRY_DELAYS = [30, 60, 120] # Exponential backoff
@task( name="extract-mongodb", description="Extract data from MongoDB to Parquet files", retries=3, retry_delay_seconds=RETRY_DELAYS,)def extract_task( full_backfill: bool = False, since: Optional[datetime] = None,) -> dict: """Extract data from MongoDB and write to Parquet files.""" logger = get_run_logger() settings = get_settings()
logger.info("Starting MongoDB extraction") extractor = MongoExtractor(settings=settings)
try: if full_backfill: stats = extractor.full_extract() else: stats = extractor.incremental_extract(since=since) return stats finally: extractor.disconnect()The retry configuration uses exponential backoff (30s, 60s, 120s) to handle transient failures gracefully.
Load and Transform Tasks
@task( name="load-duckdb", description="Load Parquet files into DuckDB", retries=3, retry_delay_seconds=RETRY_DELAYS,)def load_task( extraction_stats: dict, full_refresh: bool = False,) -> dict: """Load Parquet files into DuckDB.""" logger = get_run_logger() settings = get_settings()
loader = DuckDBLoader(settings=settings)
try: loader.create_database() parquet_path = Path(settings.data.raw_dir)
if full_refresh: stats = loader.load_from_parquet(str(parquet_path)) else: stats = loader.upsert_incremental(str(parquet_path))
return {"load_stats": stats, "table_stats": loader.get_table_stats()} finally: loader.disconnect()
@task( name="transform-dbt", description="Run dbt transformations", retries=2, retry_delay_seconds=RETRY_DELAYS,)def transform_task( load_stats: dict, full_refresh: bool = False, select: Optional[str] = None,) -> dict: """Run dbt transformations.""" logger = get_run_logger() settings = get_settings()
cmd = [ "dbt", "build", "--project-dir", str(settings.dbt.project_dir), "--profiles-dir", str(settings.dbt.profiles_dir), "--target", settings.dbt.target, ]
if full_refresh: cmd.append("--full-refresh")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0: raise RuntimeError(f"dbt build failed: {result.stdout}")
return {"build_success": True, "output": result.stdout}Notice how load_task takes extraction_stats as a parameter. This creates an explicit dependency in Prefect, ensuring extraction completes before loading begins.
The Main Flow
The flow orchestrates all tasks:
@flow( name="claude-analytics-pipeline", description="Main ELT pipeline for Claude conversation analytics", version="1.0.0",)def analytics_pipeline( full_backfill: bool = False, full_refresh: bool = False, skip_extract: bool = False, skip_load: bool = False, skip_transform: bool = False, dbt_select: Optional[str] = None,) -> dict: """Main analytics pipeline orchestrating extract, load, and transform.""" logger = get_run_logger() results = {}
logger.info("Starting Claude Analytics Pipeline")
# Step 1: Extract if not skip_extract: extraction_stats = extract_task(full_backfill=full_backfill) results["extraction"] = extraction_stats else: results["extraction"] = {"skipped": True}
# Step 2: Load if not skip_load: load_stats = load_task( extraction_stats=results["extraction"], full_refresh=full_refresh, ) results["load"] = load_stats else: results["load"] = {"skipped": True}
# Step 3: Transform if not skip_transform: transform_stats = transform_task( load_stats=results["load"], full_refresh=full_refresh, select=dbt_select, ) results["transform"] = transform_stats else: results["transform"] = {"skipped": True}
return resultsThe flow supports skip flags for partial runs, useful during development or when reprocessing specific stages.
Pipeline Flow Diagram
Deployment Configuration
The prefect.yaml file defines multiple deployment variants:
name: claude-analytics
deployments: # Hourly incremental pipeline - name: hourly-analytics version: "1.0.0" tags: [analytics, scheduled, hourly] description: "Hourly incremental analytics pipeline" entrypoint: analytics/flows/main_pipeline.py:scheduled_pipeline work_pool: name: analytics-pool schedules: - interval: 3600 # 1 hour
# Daily full refresh at 2 AM - name: daily-full-refresh version: "1.0.0" tags: [analytics, scheduled, daily] description: "Daily full refresh of analytics (2 AM)" entrypoint: analytics/flows/main_pipeline.py:analytics_pipeline work_pool: name: analytics-pool schedules: - cron: "0 2 * * *" parameters: full_backfill: false full_refresh: true
# Manual full backfill - name: full-backfill version: "1.0.0" tags: [analytics, manual, backfill] description: "Full historical backfill" entrypoint: analytics/flows/main_pipeline.py:analytics_pipeline work_pool: name: analytics-pool parameters: full_backfill: true full_refresh: trueThis gives us:
- Hourly incremental: Keeps data fresh throughout the day
- Daily full refresh: Rebuilds all dbt models nightly
- Manual backfill: For initial setup or recovery scenarios
CLI Interface: Developer Experience Matters
The cli.py module provides a polished command-line interface using Typer and Rich. A good CLI makes the difference between a tool developers avoid and one they reach for daily.
Command Structure
import typerfrom rich.console import Consolefrom rich.table import Table
app = typer.Typer( name="claude-analytics", help="Claude Analytics Platform - ELT pipeline for conversation logs", add_completion=False,)console = Console()Available Commands
The CLI exposes six primary commands:
1. config - Display current configuration:
$ claude-analytics config
Current ConfigurationMongoDB URI: mongodb://localhost:27017MongoDB DB: claude_logsDuckDB Path: /duckdb/analytics.dbBatch Size: 100002. extract - Run MongoDB extraction:
$ claude-analytics extract --full-backfill --verbose
MongoDB Extraction
Source: mongodb://localhost:27017/claude_logs.conversations Mode: Full Backfill Output: /data/raw
Extraction complete!Written 5 Parquet file(s):┌─────────────────────────────────────┬────────────────┐│ File │ Partition │├─────────────────────────────────────┼────────────────┤│ conversations_20240115_143022.parq │ date=2024-01-15││ conversations_20240115_143022.parq │ date=2024-01-14│└─────────────────────────────────────┴────────────────┘3. load - Load into DuckDB:
$ claude-analytics load --stats
DuckDB Loading
Database: /duckdb/analytics.db Source: /data/raw Mode: Upsert
Loading complete!Rows loaded/updated: 15234
Table Statistics: Total rows: 45892 Date range: 2024-01-01 to 2024-01-15 (15 days)4. transform - Run dbt models:
$ claude-analytics transform --models "+fct_messages"5. pipeline - Run the complete ELT pipeline:
$ claude-analytics pipeline --full-backfill --prefect
Claude Analytics Pipeline
Mode: Full Backfill Refresh: Incremental Steps: Prefect
Running via Prefect...Pipeline complete!6. validate - Run data quality checks:
$ claude-analytics validate --build-docsImplementation Example
Here is the pipeline command implementation showing how options are handled:
@app.command()def pipeline( full_backfill: bool = typer.Option( False, "--full-backfill", help="Run full historical backfill", ), full_refresh: bool = typer.Option( False, "--full-refresh", help="Rebuild all data and dbt models", ), skip_extract: bool = typer.Option( False, "--skip-extract", help="Skip extraction step", ), use_prefect: bool = typer.Option( False, "--prefect", help="Run via Prefect orchestration", ), verbose: bool = typer.Option( False, "--verbose", "-v", help="Enable verbose logging", ),) -> None: """Run the complete analytics pipeline.""" setup_logging("DEBUG" if verbose else "INFO")
console.print("[bold blue]Claude Analytics Pipeline[/bold blue]\n") console.print(f" Mode: {'Full Backfill' if full_backfill else 'Incremental'}")
if use_prefect: from analytics.flows import analytics_pipeline as prefect_pipeline result = prefect_pipeline( full_backfill=full_backfill, full_refresh=full_refresh, ) console.print(f"[bold green]Pipeline complete![/bold green]") else: # Direct execution without Prefect # ... step-by-step execution with progress outputThe CLI supports both Prefect-orchestrated runs (for production) and direct execution (for development and debugging).
Data Quality with Great Expectations
Data pipelines are only as good as the data they produce. The quality.py module integrates Great Expectations for automated data validation.
The DataQualityValidator Class
class DataQualityValidator: """Data quality validator using Great Expectations."""
def __init__(self, ge_project_dir: Path | None = None): self.settings = get_settings() self.ge_project_dir = ge_project_dir or Path( self.settings.great_expectations.project_dir ) self._context = None
@property def context(self) -> Any: """Get or create Great Expectations data context.""" if self._context is None: try: import great_expectations as gx self._context = gx.get_context( context_root_dir=str(self.ge_project_dir) ) except ImportError: logger.warning("Great Expectations not installed") return None return self._contextThe validator lazily loads the Great Expectations context, gracefully handling cases where the library is not installed.
Bronze Layer Expectations
The bronze (raw) layer validates fundamental data integrity:
{ "expectation_suite_name": "bronze_expectations", "expectations": [ { "expectation_type": "expect_column_values_to_not_be_null", "kwargs": { "column": "_id" }, "meta": { "notes": "Primary key must not be null" } }, { "expectation_type": "expect_column_values_to_be_unique", "kwargs": { "column": "_id" }, "meta": { "notes": "Primary key must be unique" } }, { "expectation_type": "expect_column_values_to_be_in_set", "kwargs": { "column": "type", "value_set": ["user", "assistant", "tool_use", "tool_result"] }, "meta": { "notes": "Entry types must be valid" } } ]}These checks catch data corruption early, before it propagates through the pipeline.
Silver Layer Expectations
The silver (intermediate) layer validates business logic:
{ "expectation_suite_name": "silver_expectations", "expectations": [ { "expectation_type": "expect_column_values_to_be_in_set", "kwargs": { "column": "task_category", "value_set": ["bug_fix", "feature", "refactor", "testing", "documentation", "review", "other"] } }, { "expectation_type": "expect_column_values_to_be_in_set", "kwargs": { "column": "time_of_day", "value_set": ["morning", "afternoon", "evening", "night"] } }, { "expectation_type": "expect_column_values_to_be_between", "kwargs": { "column": "hour_of_day", "min_value": 0, "max_value": 23 } } ]}These expectations verify that dbt transformations produce valid categorizations and derived fields.
Running Validations
def validate_pipeline_data( validate_bronze: bool = True, validate_silver: bool = True,) -> dict[str, Any]: """Validate pipeline data.""" validator = DataQualityValidator() results = {}
if validate_bronze: results["bronze"] = validator.validate_bronze()
if validate_silver: results["silver"] = validator.validate_silver()
results["success"] = all( r.get("success", False) for r in results.values() if isinstance(r, dict) )
return resultsThis function provides a simple interface for validating data at any point in the pipeline.
Putting It All Together
We have covered a lot of ground. Let us see how to use this pipeline in practice.
Initial Setup
# Install the packagecd analyticspip install -e ".[all]"
# Configure environmentcp .env.analytics.example .env.analytics# Edit .env.analytics with your MongoDB URI
# Initialize DuckDB schemaclaude-analytics load --init-only
# Run initial backfillclaude-analytics pipeline --full-backfill --full-refreshDaily Operations
# Run incremental updateclaude-analytics pipeline
# Check data qualityclaude-analytics validate
# View statisticsclaude-analytics load --statsProduction Deployment
# Start Prefect and deploy flowsmake upmake deploy
# Flows will run on schedule:# - hourly-analytics: Every hour# - daily-full-refresh: 2 AM daily
# Trigger manual runprefect deployment run 'claude-analytics-pipeline/adhoc-analytics'What We Built
In this article, we explored the Python modules that power our analytics ELT pipeline:
- config.py: Type-safe configuration with Pydantic Settings
- extractor.py: MongoDB extraction with incremental high-water-mark tracking
- loader.py: DuckDB loading with native Parquet support and upsert semantics
- flows/: Prefect orchestration with retries and scheduling
- cli.py: Developer-friendly command-line interface
- quality.py: Great Expectations integration for data validation
Each module follows the single responsibility principle, making the system easy to understand, test, and maintain.
In Article 6, we will dive deep into the dbt transformations that turn this raw data into business-ready analytics models. We will explore the medallion architecture implementation, incremental models, and the fact/dimension tables that power our Metabase dashboards.
Have questions or suggestions? Share your thoughts in the comments below. If you found this useful, follow for more articles in this series.
Suggested Tags: Python, ETL, Data Engineering, Prefect, DuckDB