Building a Robust File-to-MongoDB Sync Service in TypeScript
How to build a crash-resistant data pipeline that never loses a line
This is Article 2 of a 7-part series on building a complete log analytics platform. In [Article 1], we covered the high-level architecture of our system. Today, we are rolling up our sleeves and diving deep into the sync service—the component that watches files, buffers changes durably, and syncs them to MongoDB.
The Problem: Files That Never Stop Growing
Imagine you have an application that continuously appends JSON lines to log files. Maybe it is an AI coding assistant logging every conversation, a web server writing access logs, or an IoT device streaming sensor readings. These files grow constantly, and you need to get that data into MongoDB for querying and analysis.
Sounds simple, right? Just tail the file and insert each line into the database.
But here is where it gets interesting. What happens when:
- Your service crashes mid-sync? How do you know where you left off?
- MongoDB goes down for maintenance? Do you lose all the data generated during that window?
- You restart the service? Does it re-process the entire file, creating duplicates?
The naive approach—keeping file positions in memory—fails spectacularly on any of these scenarios. We need something more robust.
The Solution: A Three-Layer Architecture
Our sync service solves these problems with three distinct components, each with a single responsibility:
Here is the key insight: SQLite sits in the middle as a durable buffer. If MongoDB is down, entries queue up in SQLite. If the service crashes, SQLite remembers exactly which byte position we reached in each file. This design makes the entire pipeline crash-resistant.
Let me walk you through each component, showing you the actual TypeScript code that makes this work.
The Type Foundation
Before diving into the components, let us establish our type contracts. These interfaces define the shape of data as it flows through our system:
export interface ClaudeEntry { type: string; sessionId?: string; timestamp?: string; message?: unknown; [key: string]: unknown;}
export interface BufferedEntry { id: number; project_id: string; session_id: string | null; source_file: string; entry_json: string; created_at: string; synced: number;}
export interface MongoDocument extends ClaudeEntry { projectId: string; sourceFile: string; ingestedAt: Date;}
export interface SyncStats { pending: number; synced: number; lastSyncAt: Date | null; mongoConnected: boolean;}Notice how BufferedEntry represents the SQLite row structure (snake_case, as is conventional for SQL), while MongoDocument extends our base entry with metadata (camelCase for JavaScript/MongoDB). The SyncStats interface powers our health endpoint, giving us visibility into the pipeline state.
Deep Dive: File Watching with Chokidar
The Watcher class is our eyes on the file system. It uses chokidar, a battle-tested file watching library, to detect when JSONL files change.
The Configuration That Matters
import chokidar, { FSWatcher } from 'chokidar';import fs from 'fs';import readline from 'readline';import { Buffer } from './buffer';import { ClaudeEntry } from './types';
export class Watcher { private watcher: FSWatcher | null = null; private buffer: Buffer; private watchDir: string; private processing = new Set<string>();
constructor(watchDir: string, buffer: Buffer) { this.watchDir = watchDir; this.buffer = buffer; }
start(): void { this.watcher = chokidar.watch(`${this.watchDir}/**/*.jsonl`, { persistent: true, ignoreInitial: false, awaitWriteFinish: { stabilityThreshold: 300, pollInterval: 100, }, usePolling: false, alwaysStat: true, });
this.watcher.on('add', (filePath) => this.processFile(filePath)); this.watcher.on('change', (filePath) => this.processFile(filePath)); this.watcher.on('error', (error) => console.error('Watcher error:', error)); }
// ... more methods}Let me explain the key configuration options:
-
ignoreInitial: false: When the service starts, it processes all existing files. Combined with our position tracking, this means we pick up exactly where we left off. -
awaitWriteFinish: This is crucial for log files. Applications often write in bursts, and we do not want to process a half-written line. ThestabilityThresholdof 300ms ensures the file has stopped changing before we read it. -
alwaysStat: true: We always get file stats with each event, which we need to compare against our stored position.
The Processing Lock Pattern
Here is a subtle but important detail—the processing Set:
private processing = new Set<string>();
private async processFile(filePath: string): Promise<void> { // Prevent concurrent processing of same file if (this.processing.has(filePath)) return; this.processing.add(filePath);
try { // ... process the file } finally { this.processing.delete(filePath); }}Why do we need this? File system events can fire rapidly. If a file receives multiple writes in quick succession, we might get overlapping change events. Without this lock, we could end up with two concurrent reads of the same file, leading to duplicate entries or corrupted state.
The Set gives us O(1) lookups, and the try/finally ensures we always release the lock, even if processing throws an error.
Stream-Based Line Parsing
Here is where the magic of crash recovery happens:
private async processFile(filePath: string): Promise<void> { if (this.processing.has(filePath)) return; this.processing.add(filePath);
try { const projectId = this.extractProjectId(filePath); const startPos = this.buffer.getFilePosition(filePath);
let stats: fs.Stats; try { stats = fs.statSync(filePath); } catch { return; // File deleted }
// Skip if file hasn't grown if (stats.size <= startPos) return;
// Start reading from where we left off const stream = fs.createReadStream(filePath, { start: startPos, encoding: 'utf8', });
const rl = readline.createInterface({ input: stream, crlfDelay: Infinity, });
const entries: Array<{ projectId: string; sessionId?: string; sourceFile: string; data: ClaudeEntry; }> = [];
for await (const line of rl) { if (!line.trim()) continue; try { const data = JSON.parse(line) as ClaudeEntry; entries.push({ projectId, sessionId: data.sessionId, sourceFile: filePath, data, }); } catch (e) { console.error(`Parse error in ${path.basename(filePath)}:`, (e as Error).message); } }
if (entries.length > 0) { this.buffer.insertEntries(entries); console.log(`Buffered ${entries.length} entries from ${path.basename(filePath)}`); }
// Update position AFTER successful processing this.buffer.updateFilePosition(filePath, stats.size); } finally { this.processing.delete(filePath); }}The critical insight here is the start parameter in createReadStream. We ask SQLite for the last known position, then tell Node.js to start reading from that byte offset. This means after a crash, we resume exactly where we stopped—no duplicates, no missed lines.
Notice also that we update the file position after successfully buffering entries. This ordering is intentional: if we crash between buffering and updating the position, we will re-read those lines on restart. But that is fine—duplicates are handled downstream. The alternative (updating position first) could lose data.
Deep Dive: SQLite as a Durable Buffer
The Buffer class is the heart of our crash-resistance strategy. It uses SQLite with carefully chosen configuration to provide durability without sacrificing performance.
Why SQLite and Not In-Memory?
You might wonder: why not just use an in-memory queue? Three reasons:
- Crash survival: In-memory state vanishes on restart. SQLite persists to disk.
- MongoDB outages: If MongoDB is down for hours, you need somewhere to store the backlog. Memory would eventually overflow.
- Simplicity: SQLite is a single file, requires no separate server, and has excellent Node.js bindings via
better-sqlite3.
WAL Mode and Performance Pragmas
import Database, { Database as DatabaseType, Statement } from 'better-sqlite3';import path from 'path';import os from 'os';import fs from 'fs';
export class Buffer { private db: DatabaseType;
constructor(dbPath?: string) { const defaultPath = path.join(os.homedir(), '.claude-sync', 'buffer.db'); const finalPath = dbPath || defaultPath;
fs.mkdirSync(path.dirname(finalPath), { recursive: true });
this.db = new Database(finalPath); this.db.pragma('journal_mode = WAL'); this.db.pragma('synchronous = NORMAL');
this.initSchema(); this.statements = this.prepareStatements(); }
// ... more methods}Two pragmas make a huge difference here:
-
journal_mode = WAL(Write-Ahead Logging): Instead of overwriting pages in place, SQLite appends changes to a separate log file. This allows concurrent reads while writing and provides better crash recovery. For a sync service that reads pending entries while writing new ones, WAL is essential. -
synchronous = NORMAL: This is a tradeoff between safety and speed.FULLwould sync after every transaction (slowest, safest).OFFwould never sync (fastest, dangerous).NORMALsyncs at critical moments but not after every transaction—a good balance for our use case where occasional data loss is acceptable if the machine loses power.
The Schema Design
private initSchema(): void { this.db.run(` CREATE TABLE IF NOT EXISTS file_positions ( file_path TEXT PRIMARY KEY, position INTEGER NOT NULL, updated_at TEXT NOT NULL );
CREATE TABLE IF NOT EXISTS pending_entries ( id INTEGER PRIMARY KEY AUTOINCREMENT, project_id TEXT NOT NULL, session_id TEXT, source_file TEXT NOT NULL, entry_json TEXT NOT NULL, created_at TEXT NOT NULL, synced INTEGER DEFAULT 0 );
CREATE INDEX IF NOT EXISTS idx_pending_synced ON pending_entries(synced); CREATE INDEX IF NOT EXISTS idx_pending_project ON pending_entries(project_id); `);}We have two tables:
-
file_positions: Tracks the byte offset for each watched file. Thefile_pathis the primary key since each file has exactly one position. -
pending_entries: The queue of entries waiting to be synced. Thesyncedflag (0 or 1) marks whether an entry has been successfully written to MongoDB. We keep synced entries around for 7 days for debugging purposes.
The indexes are carefully chosen: idx_pending_synced speeds up our main query (fetching unsynced entries), and idx_pending_project helps if you ever need to query by project.
Prepared Statements for Performance
Here is a pattern that makes a significant performance difference:
private statements: { getPosition: Statement<[string]>; upsertPosition: Statement<[string, number]>; insertEntry: Statement<[string, string | null, string, string]>; getPending: Statement<[number]>; markSynced: Statement<[number]>; cleanupSynced: Statement<[]>; countPending: Statement<[]>; countSynced: Statement<[]>;};
private prepareStatements() { return { getPosition: this.db.prepare( 'SELECT position FROM file_positions WHERE file_path = ?' ), upsertPosition: this.db.prepare(` INSERT INTO file_positions (file_path, position, updated_at) VALUES (?, ?, datetime('now')) ON CONFLICT(file_path) DO UPDATE SET position = excluded.position, updated_at = datetime('now') `), insertEntry: this.db.prepare(` INSERT INTO pending_entries (project_id, session_id, source_file, entry_json, created_at) VALUES (?, ?, ?, ?, datetime('now')) `), getPending: this.db.prepare(` SELECT id, entry_json, project_id, source_file, session_id FROM pending_entries WHERE synced = 0 ORDER BY id LIMIT ? `), markSynced: this.db.prepare( 'UPDATE pending_entries SET synced = 1 WHERE id = ?' ), cleanupSynced: this.db.prepare(` DELETE FROM pending_entries WHERE synced = 1 AND created_at < datetime('now', '-7 days') `), countPending: this.db.prepare( 'SELECT COUNT(*) as count FROM pending_entries WHERE synced = 0' ), countSynced: this.db.prepare( 'SELECT COUNT(*) as count FROM pending_entries WHERE synced = 1' ), };}Why prepare statements upfront? Two reasons:
-
Parse once, run many: SQLite parses and compiles the SQL into bytecode once during
prepare(). Each subsequentrun()orget()skips parsing entirely. -
Type safety: The
Statement<[string, number]>generic tells TypeScript exactly what parameters this statement expects. Try to pass the wrong types, and you get a compile error.
The upsertPosition statement uses SQLite’s ON CONFLICT clause for atomic upserts—no need for separate “check if exists, then insert or update” logic.
Transaction Wrapping for Batch Inserts
When inserting multiple entries, we wrap them in a transaction:
insertEntries(entries: Array<{ projectId: string; sessionId?: string; sourceFile: string; data: ClaudeEntry;}>): void { const insertMany = this.db.transaction((items: typeof entries) => { for (const entry of items) { this.statements.insertEntry.run( entry.projectId, entry.sessionId || null, entry.sourceFile, JSON.stringify(entry.data) ); } }); insertMany(entries);}The better-sqlite3 transaction wrapper ensures all inserts succeed or all fail together. It also provides a massive performance boost—instead of committing after each insert, we commit once at the end.
Deep Dive: The MongoDB Sync Engine
The MongoSync class handles the final hop from SQLite to MongoDB. Its design prioritizes resilience over strict consistency.
Connection Handling
import { MongoClient, Collection, Db } from 'mongodb';import { Buffer } from './buffer';import { MongoDocument, SyncStats } from './types';
export class MongoSync { private client: MongoClient | null = null; private collection: Collection<MongoDocument> | null = null; private buffer: Buffer; private uri: string; private dbName: string; private batchSize: number; private syncInterval: number; private intervalId: NodeJS.Timeout | null = null; private lastSyncAt: Date | null = null;
async connect(): Promise<boolean> { try { if (this.client) return true;
this.client = await MongoClient.connect(this.uri, { serverSelectionTimeoutMS: 5000, connectTimeoutMS: 10000, });
const db: Db = this.client.db(this.dbName); this.collection = db.collection<MongoDocument>('conversations');
// Create indexes for common query patterns await this.collection.createIndex({ projectId: 1, timestamp: -1 }); await this.collection.createIndex({ sessionId: 1 }); await this.collection.createIndex({ 'ingestedAt': -1 }); await this.collection.createIndex({ message: 'text' });
console.log('MongoDB connected'); return true; } catch (err) { console.error('MongoDB connection failed:', (err as Error).message); this.client = null; this.collection = null; return false; } }
// ... more methods}The connection logic is defensive:
- Short timeouts (5s for server selection, 10s for connection) prevent the service from hanging indefinitely if MongoDB is unreachable.
- Index creation happens on every connect. MongoDB’s
createIndexis idempotent—if the index exists, it returns immediately. - Graceful failure: If connection fails, we log and return
false. The caller can decide what to do (in our case, buffer the entries and retry later).
The Sync Cycle
Here is where things get interesting:
async sync(): Promise<number> { const connected = await this.connect(); if (!connected || !this.collection) { const stats = this.buffer.getStats(); if (stats.pending > 0) { console.log(`MongoDB unavailable, ${stats.pending} entries buffered`); } return 0; }
const pending = this.buffer.getPendingEntries(this.batchSize); if (pending.length === 0) return 0;
const docs: MongoDocument[] = pending.map((row) => { const entry = JSON.parse(row.entry_json); return { ...entry, projectId: row.project_id, sourceFile: row.source_file, ingestedAt: new Date(), }; });
try { await this.collection.insertMany(docs, { ordered: false }); this.buffer.markAsSynced(pending.map((r) => r.id)); this.lastSyncAt = new Date(); console.log(`Synced ${docs.length} entries to MongoDB`); return docs.length; } catch (err: unknown) { const mongoErr = err as { code?: number; writeErrors?: Array<{ code: number }> };
// Handle duplicate key errors (partial success) if (mongoErr.code === 11000 || mongoErr.writeErrors?.some(e => e.code === 11000)) { this.buffer.markAsSynced(pending.map((r) => r.id)); console.log('Handled duplicates, marked as synced'); return docs.length; }
console.error('Sync error:', (err as Error).message); await this.disconnect(); return 0; }}The ordered: false Strategy
This single option—{ ordered: false }—is perhaps the most important design decision in the entire sync engine:
await this.collection.insertMany(docs, { ordered: false });By default, insertMany is ordered: if document 5 out of 100 fails, MongoDB stops and documents 6-100 are never inserted. With ordered: false, MongoDB attempts to insert all documents, collecting errors along the way.
Why does this matter? Consider this scenario:
- We sync 100 entries to MongoDB
- Entry 23 already exists (duplicate key)
- With
ordered: true: entries 24-100 are lost until next sync - With
ordered: false: entries 24-100 are inserted, only 23 fails
Combined with our duplicate handling code, this means we can safely retry syncs without worrying about partial failures:
if (mongoErr.code === 11000 || mongoErr.writeErrors?.some(e => e.code === 11000)) { this.buffer.markAsSynced(pending.map((r) => r.id)); console.log('Handled duplicates, marked as synced'); return docs.length;}Error code 11000 means “duplicate key.” When we see it, we know the data is already in MongoDB, so we mark it as synced and move on. No data loss, no infinite retry loops.
The Sync State Machine
The periodic sync runs on a configurable interval:
startPeriodicSync(): void { // Initial sync this.sync();
// Periodic sync this.intervalId = setInterval(() => this.sync(), this.syncInterval); console.log(`Sync interval: ${this.syncInterval}ms`);}
stopPeriodicSync(): void { if (this.intervalId) { clearInterval(this.intervalId); this.intervalId = null; }}Deep Dive: Bootstrap and Graceful Shutdown
The index.ts file ties everything together with proper lifecycle management.
Configuration Loading
import { config as loadEnv } from 'dotenv';import path from 'path';import os from 'os';import http from 'http';import { Buffer } from './buffer';import { Watcher } from './watcher';import { MongoSync } from './sync';
function expandTilde(filePath: string): string { if (filePath.startsWith('~/')) { return path.join(os.homedir(), filePath.slice(2)); } return filePath;}
const config = { claudeDir: expandTilde(process.env.CLAUDE_DIR || path.join(os.homedir(), '.claude', 'projects')), mongoUri: process.env.MONGO_URI || 'mongodb://localhost:27017', dbName: process.env.MONGO_DB || 'claude_logs', sqlitePath: expandTilde(process.env.SQLITE_PATH || path.join(os.homedir(), '.claude-sync', 'buffer.db')), syncIntervalMs: parseInt(process.env.SYNC_INTERVAL_MS || '5000', 10), batchSize: parseInt(process.env.BATCH_SIZE || '100', 10), cleanupIntervalMs: parseInt(process.env.CLEANUP_INTERVAL_MS || '3600000', 10), healthPort: parseInt(process.env.HEALTH_PORT || '9090', 10),};The expandTilde helper handles ~/ paths, making configuration more user-friendly. All settings come from environment variables with sensible defaults.
Component Initialization
async function main(): Promise<void> { console.log('Starting Claude MongoDB Sync'); console.log(`Watch dir: ${config.claudeDir}`); console.log(`Buffer: ${config.sqlitePath}`); console.log(`MongoDB: ${config.mongoUri}/${config.dbName}`);
// Initialize components in dependency order buffer = new Buffer(config.sqlitePath);
watcher = new Watcher(config.claudeDir, buffer); watcher.start();
mongoSync = new MongoSync(buffer, { uri: config.mongoUri, dbName: config.dbName, batchSize: config.batchSize, syncIntervalMs: config.syncIntervalMs, }); mongoSync.startPeriodicSync();
// Periodic cleanup of old synced entries cleanupIntervalId = setInterval(() => { buffer.cleanup(); console.log('Cleanup completed'); }, config.cleanupIntervalMs);
// ... health endpoint setup}The initialization order matters: Buffer first (it is a dependency of both Watcher and MongoSync), then Watcher, then MongoSync. The cleanup interval removes synced entries older than 7 days to prevent unbounded SQLite growth.
Health Endpoint
healthServer = http.createServer((req, res) => { if (req.url === '/health' || req.url === '/') { const stats = mongoSync.getStats(); const response = { status: 'ok', ...stats, uptime: process.uptime(), }; res.writeHead(200, { 'Content-Type': 'application/json' }); res.end(JSON.stringify(response, null, 2)); } else if (req.url === '/stats') { const stats = mongoSync.getStats(); res.writeHead(200, { 'Content-Type': 'application/json' }); res.end(JSON.stringify(stats, null, 2)); } else { res.writeHead(404); res.end('Not found'); }});
healthServer.listen(config.healthPort, () => { console.log(`Health endpoint: http://localhost:${config.healthPort}/health`);});A simple health endpoint that returns:
{ "status": "ok", "pending": 42, "synced": 1337, "lastSyncAt": "2024-01-15T10:30:00.000Z", "mongoConnected": true, "uptime": 3600.5}This is invaluable for monitoring. You can set up alerts when pending grows too large or mongoConnected goes false.
Graceful Shutdown
async function shutdown(signal: string): Promise<void> { console.log(`\n${signal} received, shutting down gracefully...`);
// Stop accepting new work watcher?.stop(); mongoSync?.stopPeriodicSync(); clearInterval(cleanupIntervalId);
// Final sync attempt try { console.log('Final sync...'); await mongoSync?.sync(); } catch (e) { console.error('Final sync failed:', (e as Error).message); }
// Cleanup await mongoSync?.disconnect(); buffer?.close(); healthServer?.close();
console.log('Shutdown complete'); process.exit(0);}
// Graceful shutdown handlersprocess.on('SIGINT', () => shutdown('SIGINT'));process.on('SIGTERM', () => shutdown('SIGTERM'));process.on('uncaughtException', (err) => { console.error('Uncaught exception:', err); shutdown('uncaughtException');});The shutdown sequence is carefully ordered:
- Stop accepting new work: The watcher stops detecting file changes, and the sync interval is cleared.
- Final sync: We attempt one last sync to minimize data left in the buffer.
- Close connections: MongoDB connection and SQLite database are properly closed.
- Exit: Clean exit with code 0.
This ensures that Ctrl+C or a kill command does not result in data loss.
Key Design Patterns Summary
Let us recap the patterns that make this service robust:
| Pattern | Problem Solved | Implementation |
|---|---|---|
| SQLite as durable buffer | Crash recovery, MongoDB outages | Buffer class with WAL mode |
| Prepared statements | SQL parsing overhead | Pre-compiled at startup |
| Processing lock (Set) | Concurrent file access | processing Set in Watcher |
| Unordered inserts | Partial failures | { ordered: false } in insertMany |
| Position tracking | Resume after restart | file_positions table |
| Graceful shutdown | Data loss on termination | Signal handlers with final sync |
Configuration Reference
Create a .env file with these options:
| Variable | Default | Description |
|---|---|---|
MONGO_URI | mongodb://localhost:27017 | MongoDB connection string |
MONGO_DB | claude_logs | Database name |
CLAUDE_DIR | ~/.claude/projects | Directory to watch for JSONL files |
SQLITE_PATH | ~/.claude-sync/buffer.db | SQLite buffer file location |
SYNC_INTERVAL_MS | 5000 | How often to sync (milliseconds) |
BATCH_SIZE | 100 | Entries per sync batch |
HEALTH_PORT | 9090 | Port for health endpoint |
Production Deployment with PM2
For production, we use PM2 to manage the Node.js process. Here is our ecosystem.config.js:
module.exports = { apps: [ { name: 'claude-mongo-sync', script: './dist/index.js', instances: 1, autorestart: true, watch: false, max_memory_restart: '200M',
env: { NODE_ENV: 'development', }, env_production: { NODE_ENV: 'production', },
// Restart behavior exp_backoff_restart_delay: 100, max_restarts: 10, min_uptime: '10s',
// Logging log_date_format: 'YYYY-MM-DD HH:mm:ss Z', error_file: '~/.pm2/logs/claude-mongo-sync-error.log', out_file: '~/.pm2/logs/claude-mongo-sync-out.log', merge_logs: true,
// Graceful shutdown kill_timeout: 10000, listen_timeout: 5000, shutdown_with_message: true, }, ],};Key PM2 settings explained:
max_memory_restart: '200M': Automatic restart if memory exceeds 200MB, preventing memory leaks from becoming outages.exp_backoff_restart_delay: Exponential backoff on crashes prevents rapid restart loops.kill_timeout: 10000: Gives our graceful shutdown handler 10 seconds to complete the final sync.
To deploy:
# Build TypeScriptnpm run build
# Start with PM2pm2 start ecosystem.config.js --env production
# Check statuspm2 status
# View logspm2 logs claude-mongo-sync
# Monitor health endpointcurl localhost:9090/healthWrapping Up
We have built a sync service that solves the three core problems of file-to-database synchronization:
- Crash recovery: SQLite stores file positions, so we always know where we left off.
- Batching for efficiency: Configurable batch sizes balance throughput and latency.
- Duplicate handling: Unordered inserts with error code 11000 handling make retries safe.
The code is straightforward—no complex frameworks, no magic. Just TypeScript, SQLite, and MongoDB, composed with careful attention to failure modes.
In the next article, we will build a Next.js UI to browse and search these synced conversations. But that is a story for another day.
Want to try it yourself? The complete source code is available on GitHub. Clone it, configure your .env, and run npm run dev. Watch as your JSONL files flow seamlessly into MongoDB.
Have questions or improvements? Drop a comment below. I read every one.
Suggested Tags: TypeScript, MongoDB, Node.js, Data Engineering, Backend Development