Data Transformation with dbt: From Raw Logs to Business Intelligence
Transform messy conversation logs into a polished analytics warehouse using dbt’s medallion architecture pattern
You have data. Lots of it. Raw JSON lines streaming into a database, timestamped entries piling up, and somewhere in that chaos lies the answer to questions you haven’t even thought to ask yet. The gap between “we have data” and “we have insights” is where most analytics projects stall.
In our previous articles, we built the foundation: Article 4 introduced the analytics architecture that extracts conversation data from MongoDB, and Article 5 covered how the loader gets that data into DuckDB. But raw data in a database is like crude oil in a barrel. It needs refining before it becomes useful.
This is where dbt enters the picture. dbt (data build tool) transforms how we think about data transformation. Instead of one-off scripts or brittle stored procedures, we get version-controlled SQL models, automated testing, and documentation that lives alongside our code. In this article, we’ll walk through a complete dbt project that transforms raw Claude Code conversation logs into a star schema ready for business intelligence dashboards.
The dbt Project Structure
Every dbt project follows a predictable structure. Understanding this layout is your first step toward building maintainable transformation pipelines.
analytics/dbt/├── dbt_project.yml # Project configuration├── profiles.yml # Connection profiles├── packages.yml # External dependencies├── models/│ ├── staging/ # Bronze layer│ ├── intermediate/ # Silver layer│ └── marts/ # Gold layer├── seeds/ # Static reference data├── macros/ # Reusable SQL snippets├── snapshots/ # SCD Type 2 tracking└── tests/ # Custom data testsProject Configuration: dbt_project.yml
The dbt_project.yml file is the heart of your project. It defines how models should be materialized, which schemas they belong to, and project-wide variables.
name: 'claude_analytics'version: '1.0.0'config-version: 2
profile: 'claude_analytics'
model-paths: ["models"]seed-paths: ["seeds"]macro-paths: ["macros"]snapshot-paths: ["snapshots"]test-paths: ["tests"]
models: claude_analytics: # Staging layer (Bronze) - Views for quick iteration staging: +materialized: view +schema: staging +tags: ['staging', 'bronze']
# Intermediate layer (Silver) - Enriched data intermediate: +materialized: view +schema: intermediate +tags: ['intermediate', 'silver']
# Marts layer (Gold) - Tables for performance marts: +materialized: table +schema: marts +tags: ['marts', 'gold']
aggregates: +materialized: table +tags: ['aggregates']Notice the materialization strategy: staging and intermediate layers use views for fast iteration during development, while marts use tables for query performance. This pattern lets you experiment quickly in early layers while ensuring your BI tools hit optimized tables.
The configuration also includes task classification patterns as variables, making it easy to adjust business logic without modifying SQL:
vars: task_patterns: bug_fix: ['bug', 'fix', 'error', 'issue', 'broken'] feature: ['add', 'create', 'implement', 'new', 'build'] refactor: ['refactor', 'restructure', 'clean up', 'simplify']Connection Profiles: profiles.yml
The profiles.yml file defines how dbt connects to your data warehouse. For our DuckDB-based analytics, we configure three environments:
claude_analytics: target: dev
outputs: dev: type: duckdb path: "{{ env_var('DUCKDB_PATH', '/duckdb/analytics.db') }}" threads: 4 extensions: - parquet
prod: type: duckdb path: "{{ env_var('DUCKDB_PATH', '/duckdb/analytics.db') }}" threads: 8 extensions: - parquet
test: type: duckdb path: ":memory:" threads: 1 extensions: - parquetThe test profile uses an in-memory database for fast CI/CD runs. Environment variables keep sensitive paths out of version control.
External Packages: packages.yml
dbt’s package ecosystem provides battle-tested utilities. We use two essential packages:
packages: - package: dbt-labs/dbt_utils version: ">=1.1.0"
- package: calogica/dbt_date version: ">=0.10.0"dbt_utils provides surrogate key generation, pivot operations, and schema testing utilities. dbt_date simplifies date dimension creation. Install them with dbt deps.
The Medallion Architecture
Think of data transformation like refining precious metals. You start with raw ore (Bronze), process it into standardized ingots (Silver), and finally craft finished products (Gold). Each layer adds value and removes impurities.
Bronze (Staging): Minimal transformation. Clean column names, handle nulls, cast types. The goal is a reliable foundation, not business logic.
Silver (Intermediate): Apply business rules. Classify tasks, compute session metrics, enrich with reference data. This layer answers “what happened” with context.
Gold (Marts): Optimize for consumption. Star schema design enables fast BI queries. Aggregates pre-compute expensive operations.
Why this pattern works: each layer has a single responsibility. When business logic changes, you modify the Silver layer. When new sources arrive, you add to Bronze. When dashboards need new metrics, you extend Gold. Changes stay isolated.
Staging Models: The Bronze Layer
The staging layer is your data’s first stop. Here we define sources, clean up raw data, and establish the foundation for everything downstream.
Defining Sources
Before transforming data, we declare where it comes from. The _sources.yml file documents our raw data and enables freshness checks:
version: 2
sources: - name: raw description: "Raw data loaded from MongoDB via Parquet extraction" schema: raw
freshness: warn_after: {count: 24, period: hour} error_after: {count: 48, period: hour} loaded_at_field: extracted_at
tables: - name: conversations description: | Raw conversation entries extracted from MongoDB. Each row represents a single message, tool call, or entry from a Claude Code conversation session.
columns: - name: _id description: "Unique identifier from MongoDB" tests: - unique - not_null
- name: type description: "Entry type: user, assistant, tool_use, tool_result" tests: - not_nullThe freshness configuration alerts us when data pipelines stall. If no new data arrives within 24 hours, we get a warning. After 48 hours, builds fail.
stg_conversations: The Foundation Model
This model cleans and standardizes every conversation entry:
-- Staging model for conversations-- Basic cleaning, type casting, and null handling
{{ config( materialized='view', tags=['staging', 'bronze'] )}}
with source as ( select * from {{ source('raw', 'conversations') }}),
cleaned as ( select -- Primary key _id as conversation_id,
-- Core attributes with null handling coalesce(type, 'unknown') as entry_type, session_id, project_id,
-- Timestamps with fallback chain timestamp as original_timestamp, coalesce(timestamp, ingested_at, extracted_at) as effective_timestamp, ingested_at, extracted_at,
-- Message content message_role, message_content, message_raw,
-- Source tracking source_file, date as partition_date,
-- Computed fields case when message_content is not null then length(message_content) else 0 end as content_length,
-- Boolean flags for common queries type in ('user', 'assistant') as is_message, type in ('tool_use', 'tool_result') as is_tool_related
from source where _id is not null)
select * from cleanedKey patterns here: we rename _id to conversation_id for clarity, create an effective_timestamp with a fallback chain, and add boolean flags that simplify downstream filtering. The coalesce function handles missing values gracefully.
stg_messages: Extracting Conversations
This model filters to actual messages and adds sequence information:
{{ config( materialized='view', tags=['staging', 'bronze'] )}}
with conversations as ( select * from {{ ref('stg_conversations') }}),
messages_only as ( select conversation_id, session_id, project_id, effective_timestamp, partition_date, entry_type, coalesce(message_role, entry_type) as role, message_content, content_length,
-- Sequence number within session row_number() over ( partition by session_id order by effective_timestamp ) as message_sequence,
-- Previous message context lag(message_role) over ( partition by session_id order by effective_timestamp ) as previous_role,
-- Response time calculation extract(epoch from ( effective_timestamp - lag(effective_timestamp) over ( partition by session_id order by effective_timestamp ) )) as seconds_since_previous,
source_file
from conversations where is_message = true and message_content is not null)
select *, -- Classify conversation turn type case when previous_role is null then 'conversation_start' when role = 'user' and previous_role = 'assistant' then 'follow_up' when role = 'assistant' and previous_role = 'user' then 'response' else 'continuation' end as turn_type
from messages_onlyWindow functions shine here. row_number() gives us message ordering within sessions. lag() lets us reference the previous message to calculate response times and classify turn types.
stg_tool_calls: Parsing AI Tool Usage
Tool calls require pattern matching to extract tool names from content:
with tool_entries as ( select * from {{ ref('stg_conversations') }} where is_tool_related = true),
parsed_tools as ( select conversation_id, session_id, project_id, effective_timestamp, partition_date, entry_type,
-- Extract tool name via pattern matching case when message_content like '%Read%' then 'Read' when message_content like '%Write%' then 'Write' when message_content like '%Edit%' then 'Edit' when message_content like '%Bash%' then 'Bash' when message_content like '%Glob%' then 'Glob' when message_content like '%Grep%' then 'Grep' -- ... additional tools else 'unknown' end as tool_name,
entry_type = 'tool_use' as is_invocation, entry_type = 'tool_result' as is_result,
row_number() over ( partition by session_id order by effective_timestamp ) as tool_sequence
from tool_entries)
select *, -- Pair invocations with results for timing lead(effective_timestamp) over ( partition by session_id, tool_name order by effective_timestamp ) as next_same_tool_timestamp
from parsed_toolsThe lead() function looks ahead to find the next occurrence of the same tool, enabling execution time estimation when we pair tool_use with its corresponding tool_result.
Intermediate Models: The Silver Layer
The Silver layer is where business logic lives. We enrich data with classifications, compute aggregates, and join with reference data.
int_messages_enriched: Task Classification
This model adds temporal features and classifies tasks based on message content:
{{ config( materialized='view', tags=['intermediate', 'silver'] )}}
with messages as ( select * from {{ ref('stg_messages') }}),
enriched as ( select conversation_id, session_id, project_id, role, message_content, content_length, message_sequence, turn_type, effective_timestamp, partition_date,
-- Temporal features extract(hour from effective_timestamp) as hour_of_day, extract(dow from effective_timestamp) as day_of_week,
-- Time-of-day classification case when extract(hour from effective_timestamp) between 6 and 11 then 'morning' when extract(hour from effective_timestamp) between 12 and 17 then 'afternoon' when extract(hour from effective_timestamp) between 18 and 21 then 'evening' else 'night' end as time_of_day,
case when extract(dow from effective_timestamp) in (0, 6) then 'weekend' else 'weekday' end as day_type,
-- Task category classification case when lower(message_content) like '%fix%' or lower(message_content) like '%bug%' or lower(message_content) like '%error%' then 'bug_fix'
when lower(message_content) like '%implement%' or lower(message_content) like '%create%' or lower(message_content) like '%add%feature%' then 'feature'
when lower(message_content) like '%refactor%' or lower(message_content) like '%clean up%' then 'refactor'
when lower(message_content) like '%test%' or lower(message_content) like '%spec%' then 'testing'
when lower(message_content) like '%document%' or lower(message_content) like '%readme%' then 'documentation'
else 'other' end as task_category,
-- Content indicators message_content like '%```%' as has_code_block, message_content like '%?%' as is_question
from messages)
select * from enrichedThe task classification uses pattern matching on message content. This heuristic approach works surprisingly well for understanding what developers are asking an AI assistant to do.
int_sessions_computed: Session-Level Metrics
This model aggregates everything we know about a session:
with conversations as ( select * from {{ ref('stg_conversations') }}),
messages as ( select * from {{ ref('stg_messages') }}),
tool_calls as ( select * from {{ ref('stg_tool_calls') }}),
-- Session boundariessession_boundaries as ( select session_id, project_id, min(effective_timestamp) as session_start, max(effective_timestamp) as session_end, count(*) as total_entries from conversations where session_id is not null group by session_id, project_id),
-- Message statistics per sessionmessage_stats as ( select session_id, count(*) as message_count, sum(case when role = 'user' then 1 else 0 end) as user_message_count, sum(case when role = 'assistant' then 1 else 0 end) as assistant_message_count, avg(seconds_since_previous) filter (where seconds_since_previous > 0) as avg_response_time_seconds from messages group by session_id),
-- Tool statistics per sessiontool_stats as ( select session_id, count(*) as tool_call_count, count(distinct tool_name) as unique_tools_used, mode() within group (order by tool_name) as primary_tool from tool_calls group by session_id),
-- Combine all metricssessions_computed as ( select sb.session_id, sb.project_id, sb.session_start, sb.session_end,
-- Duration extract(epoch from (sb.session_end - sb.session_start)) as duration_seconds, extract(epoch from (sb.session_end - sb.session_start)) / 60.0 as duration_minutes,
-- Messages coalesce(ms.message_count, 0) as message_count, coalesce(ms.user_message_count, 0) as user_message_count, coalesce(ms.assistant_message_count, 0) as assistant_message_count,
-- Tools coalesce(ts.tool_call_count, 0) as tool_call_count, coalesce(ts.unique_tools_used, 0) as unique_tools_used, ts.primary_tool,
-- Classifications case when extract(epoch from (sb.session_end - sb.session_start)) < 60 then 'quick' when extract(epoch from (sb.session_end - sb.session_start)) < 600 then 'short' when extract(epoch from (sb.session_end - sb.session_start)) < 3600 then 'medium' else 'long' end as session_duration_category,
case when coalesce(ms.message_count, 0) <= 5 then 'minimal' when coalesce(ms.message_count, 0) <= 20 then 'light' when coalesce(ms.message_count, 0) <= 50 then 'moderate' else 'heavy' end as activity_level
from session_boundaries sb left join message_stats ms on sb.session_id = ms.session_id left join tool_stats ts on sb.session_id = ts.session_id)
select * from sessions_computedThe mode() aggregate function finds the most frequently used tool per session. This tells us what the primary activity was, whether file editing, shell commands, or web searches.
int_tool_usage: Enriched Tool Data
This model joins tool calls with our seed data for category classification:
with tool_calls as ( select * from {{ ref('stg_tool_calls') }}),
tool_categories as ( select * from {{ ref('tool_categories') }}),
enriched_tools as ( select tc.conversation_id, tc.session_id, tc.tool_name, tc.is_invocation, tc.is_result, tc.effective_timestamp,
-- Join with seed data coalesce(cat.tool_category, 'unknown') as tool_category, cat.description as tool_description,
-- Extract file paths from content case when tc.tool_name in ('Read', 'Write', 'Edit', 'Glob') and tc.tool_content like '%/%' then regexp_extract(tc.tool_content, '([/][a-zA-Z0-9_./-]+)', 1) else null end as file_path,
-- Classify operation types tc.tool_name in ('Read', 'Write', 'Edit', 'NotebookEdit', 'MultiEdit') as is_file_operation, tc.tool_name in ('Glob', 'Grep', 'WebSearch') as is_search_operation, tc.tool_name = 'Bash' as is_shell_command
from tool_calls tc left join tool_categories cat on tc.tool_name = cat.tool_name)
select * from enriched_toolsThe join with tool_categories seed data adds human-readable descriptions and standardized categories without hardcoding them in SQL.
Mart Models: The Gold Layer
The Gold layer delivers a star schema optimized for BI tools. Dimension tables describe entities; fact tables record events.
dim_date: The Date Dimension
Every analytics warehouse needs a date dimension. DuckDB’s generate_series makes this straightforward:
{{ config( materialized='table', tags=['marts', 'gold', 'dimension'] )}}
with date_boundaries as ( select min(partition_date) as min_date, max(partition_date) as max_date from {{ ref('stg_conversations') }}),
date_spine as ( select generate_series::date as date_day from date_boundaries, generate_series( date_boundaries.min_date, date_boundaries.max_date, interval '1 day' )),
dim_date as ( select date_day as date_key,
-- ISO components extract(year from date_day) as year, extract(quarter from date_day) as quarter, extract(month from date_day) as month, extract(dow from date_day) as day_of_week,
-- Readable labels strftime(date_day, '%Y-%m') as year_month, strftime(date_day, '%B') as month_name, strftime(date_day, '%A') as day_name,
-- Week boundaries date_trunc('week', date_day)::date as week_start_date,
-- Boolean flags extract(dow from date_day) in (0, 6) as is_weekend, date_day = current_date as is_today,
-- Relative indicators current_date - date_day as days_ago
from date_spine)
select * from dim_dateorder by date_keyThe dimension generates one row per day between the first and last data points. Boolean flags like is_weekend and is_today simplify common filter patterns in dashboards.
Fact Tables with Incremental Materialization
Fact tables use incremental materialization to avoid reprocessing all history:
{{ config( materialized='incremental', unique_key='message_key', tags=['marts', 'gold', 'fact'] )}}
with messages as ( select * from {{ ref('int_messages_enriched') }} {% if is_incremental() %} where partition_date >= (select max(date_key) - interval '1 day' from {{ this }}) {% endif %}),
dim_sessions as ( select session_key, session_id from {{ ref('dim_sessions') }}),
fct_messages as ( select md5(m.conversation_id) as message_key,
-- Dimension foreign keys ds.session_key, m.partition_date as date_key, m.project_id,
-- Message attributes m.role, m.turn_type, m.message_sequence, m.content_length, m.task_category, m.time_of_day,
-- Flags m.has_code_block, m.is_question,
m.effective_timestamp
from messages m left join dim_sessions ds on m.session_id = ds.session_id)
select * from fct_messagesThe {% if is_incremental() %} block only processes recent data after the initial load. The one-day overlap (max(date_key) - interval '1 day') handles late-arriving data gracefully.
Aggregate Tables: Pre-Computed Metrics
Aggregate tables trade storage for query speed. They pre-compute metrics that dashboards request repeatedly.
agg_daily_summary: Time-Series Metrics
{{ config( materialized='table', tags=['marts', 'gold', 'aggregate'] )}}
with sessions as ( select * from {{ ref('dim_sessions') }}),
messages as ( select * from {{ ref('fct_messages') }}),
tool_calls as ( select * from {{ ref('fct_tool_calls') }}),
dim_date as ( select * from {{ ref('dim_date') }}),
daily_sessions as ( select date_key, count(*) as session_count, count(distinct project_id) as active_projects, sum(duration_minutes) as total_session_minutes, avg(duration_minutes) as avg_session_minutes from sessions group by date_key),
daily_messages as ( select date_key, count(*) as message_count, sum(case when role = 'user' then 1 else 0 end) as user_messages, sum(case when has_code_block then 1 else 0 end) as messages_with_code from messages group by date_key),
daily_tools as ( select date_key, count(*) as tool_call_count, count(distinct tool_name) as unique_tools_used from tool_calls group by date_key)
select d.date_key, d.year, d.month, d.day_name, d.is_weekend,
coalesce(ds.session_count, 0) as session_count, coalesce(ds.active_projects, 0) as active_projects, coalesce(dm.message_count, 0) as message_count, coalesce(dt.tool_call_count, 0) as tool_call_count,
coalesce(ds.session_count, 0) > 0 as has_activity, current_timestamp as computed_at
from dim_date dleft join daily_sessions ds on d.date_key = ds.date_keyleft join daily_messages dm on d.date_key = dm.date_keyleft join daily_tools dt on d.date_key = dt.date_keyorder by d.date_key descThis aggregate powers daily trend charts. The computed_at timestamp helps debug stale data issues.
agg_hourly_activity: Heatmap Data
For work pattern analysis, we need a 24x7 activity matrix:
-- Create complete hour x day matrixhour_day_matrix as ( select h.hour_of_day, d.day_of_week from (select unnest(generate_series(0, 23)) as hour_of_day) h cross join (select unnest(generate_series(0, 6)) as day_of_week) d),
hourly_activity as ( select hdm.hour_of_day, hdm.day_of_week,
case hdm.day_of_week when 0 then 'Sunday' when 1 then 'Monday' -- ... etc end as day_name,
lpad(hdm.hour_of_day::text, 2, '0') || ':00' as hour_label,
coalesce(hm.message_count, 0) + coalesce(ht.tool_call_count, 0) as total_activity
from hour_day_matrix hdm left join hourly_messages hm on hdm.hour_of_day = hm.hour_of_day and hdm.day_of_week = hm.day_of_week left join hourly_tools ht on hdm.hour_of_day = ht.hour_of_day and hdm.day_of_week = ht.day_of_week)The cross join ensures every hour-day combination exists, even with zero activity. This prevents gaps in heatmap visualizations.
Seeds: Static Reference Data
Seeds are CSV files that dbt loads as tables. They’re perfect for reference data that changes infrequently.
tool_categories.csv
tool_name,tool_category,descriptionRead,file_operations,Read file contentsWrite,file_operations,Write file contentsEdit,file_operations,Edit file contentsNotebookEdit,file_operations,Edit Jupyter notebook cellsMultiEdit,file_operations,Multiple file edits in one operationBash,shell,Execute shell commandsGlob,search,Search files by glob patternGrep,search,Search file contents with regexTask,agent,Spawn sub-agent for complex tasksTodoRead,planning,Read todo listTodoWrite,planning,Write/update todo listWebFetch,network,Fetch web page contentWebSearch,network,Search the webAskFollowupQuestion,interaction,Ask user for clarificationAttemptCompletion,interaction,Signal task completionThe _seeds.yml file documents and tests seed data:
version: 2
seeds: - name: tool_categories description: "Reference table mapping Claude Code tool names to categories"
columns: - name: tool_name tests: - unique - not_null
- name: tool_category tests: - not_null - accepted_values: values: - file_operations - shell - search - agent - planning - network - interactionLoad seeds with dbt seed. They reload on every run, making updates simple: edit the CSV and run dbt seed again.
Testing Strategy
dbt tests validate data quality at build time. Tests are defined in schema YAML files alongside model documentation.
Schema Tests
Each model has tests defined in its _schema.yml:
version: 2
models: - name: stg_conversations columns: - name: conversation_id tests: - unique - not_null
- name: entry_type tests: - not_null - accepted_values: values: - user - assistant - tool_use - tool_result - system - summary - unknownRelationship Tests
Referential integrity between dimensions and facts:
- name: fct_messages columns: - name: session_key tests: - relationships: to: ref('dim_sessions') field: session_key
- name: date_key tests: - relationships: to: ref('dim_date') field: date_keyRunning Tests
# Test everythingdbt test
# Test specific modeldbt test --select stg_conversations
# Test only marts layerdbt test --select martsFailed tests produce clear error messages pointing to the exact rows that violate constraints.
DuckDB Integration
DuckDB provides an excellent development experience for analytics workloads. Its columnar storage and vectorized execution make it fast, while file-based databases eliminate infrastructure overhead.
DuckDB-Specific Functions
Our models use several DuckDB functions:
generate_series()- Creates date spines without helper tablesstrftime()- Formats dates as stringsregexp_extract()- Extracts patterns from textmode() within group- Finds most frequent valuespercentile_cont()- Calculates percentiles
Profile Configuration
The development profile enables the Parquet extension for reading source data:
dev: type: duckdb path: "{{ env_var('DUCKDB_PATH', '/duckdb/analytics.db') }}" threads: 4 extensions: - parquetMultiple threads parallelize query execution. For production, increase this based on available CPU cores.
Putting It All Together
With all models in place, building the entire warehouse is a single command:
# Install packagesdbt deps
# Load seed datadbt seed
# Build all modelsdbt run
# Run all testsdbt test
# Generate documentationdbt docs generatedbt docs serveThe dependency graph ensures models build in the correct order. Staging first, then intermediate, then marts. dbt figures this out from the {{ ref() }} calls in your SQL.
The medallion architecture provides a clean mental model for organizing transformation logic. Bronze cleans, Silver enriches, Gold optimizes. Each layer has one job.
dbt makes this pattern practical by handling dependencies, testing, and documentation. Your transformation logic lives in version control, reviewed like application code, tested before deployment.
The star schema at the end powers dashboards that answer real questions: When do developers use AI assistants most? Which tools drive productivity? How do session patterns change over time?
Start with your own data. Identify the Bronze-Silver-Gold layers. Define your dimensions and facts. Let dbt handle the rest.
Suggested Tags: dbt, Data Engineering, Analytics, SQL, DuckDB