Local Development Guide
Quick Start
Prerequisites
- Docker & Docker Compose
- Python 3.12+
- uv (recommended) or pip
1. Start Emulators
docker compose up -dThis starts:
- Firestore emulator on
localhost:8080(for Firestore storage mode) - Pub/Sub emulator on
localhost:8085(for distributed queue mode)
For GCS + BigQuery mode, you can run a GCS emulator:
docker run -d -p 9023:9023 --name gcs-emulator \
fsouza/fake-gcs-server -scheme http
export STORAGE_EMULATOR_HOST=http://localhost:9023See tests/integration/README.md for full emulator setup.
2. Install Dependencies
uv sync3. Run the API Server
Option A: GCS + BigQuery Mode (production pattern with emulator)
export STORAGE_EMULATOR_HOST="http://localhost:9023"
export GCP_PROJECT_ID="test-project"
export GCP_GCS_BUCKET="test-events"
export GCP_BIGQUERY_DATASET="events"
export GCP_BIGQUERY_TABLE="raw_events"
export EVENTKIT_EVENT_STORE="gcs"
export EVENTKIT_WAREHOUSE_ENABLED="true"
export EVENTKIT_QUEUE_MODE="async"
export EVENTKIT_ASYNC_WORKERS="4"
export EVENTKIT_RING_BUFFER_DB_PATH="./eventkit_ring_buffer.db"
uv run uvicorn eventkit.api.app:app --reload --port 8000Option B: Firestore Mode (fast local development)
export FIRESTORE_EMULATOR_HOST="localhost:8080"
export GCP_PROJECT_ID="test-project"
export EVENTKIT_EVENT_STORE="firestore"
export EVENTKIT_QUEUE_MODE="async"
export EVENTKIT_ASYNC_WORKERS="4"
export EVENTKIT_RING_BUFFER_DB_PATH="./eventkit_ring_buffer.db"
uv run uvicorn eventkit.api.app:app --reload --port 8000Option C: Pub/Sub Queue Mode (distributed workers)
export FIRESTORE_EMULATOR_HOST="localhost:8080"
export PUBSUB_EMULATOR_HOST="localhost:8085"
export GCP_PROJECT_ID="test-project"
export EVENTKIT_EVENT_STORE="firestore"
export EVENTKIT_QUEUE_MODE="pubsub"
export EVENTKIT_PUBSUB_WORKERS="4"
export EVENTKIT_RING_BUFFER_DB_PATH="./eventkit_ring_buffer.db"
uv run uvicorn eventkit.api.app:app --reload --port 8000Note: The ring buffer (Write-Ahead Log) is always enabled for durability. Events are persisted to SQLite before processing, ensuring no data loss even if the service crashes.
The API will be available at http://localhost:8000.
4. Test the API
Health Check:
curl http://localhost:8000/health
# {"status": "ok"}Send an Event:
curl -X POST http://localhost:8000/collect \
-H "Content-Type: application/json" \
-d '{"type": "track", "event": "button_click", "userId": "user_123"}'
# {"status": "accepted"}Running Tests
# Start emulators
docker compose up -d
# Run all tests (unit + integration)
export FIRESTORE_EMULATOR_HOST="localhost:8080"
export PUBSUB_EMULATOR_HOST="localhost:8085"
export GCP_PROJECT_ID="test-project"
uv run pytest --cov=src/eventkit
# Run only unit tests (fast, no emulator needed)
uv run pytest tests/unit/
# Run only integration tests
uv run pytest tests/integration/
# Stop emulators
docker compose downTest Isolation
Docker emulators keep data in memory between test runs (while containers are running). This is useful for fast iteration, but means:
- First test run: Clean slate ✅
- Second test run (without restarting): Data from previous run persists ⚠️
To get a clean slate for integration tests:
# Option 1: Restart emulators (fast, keeps containers)
docker compose restart firestore-emulator pubsub-emulator
# Option 2: Full restart (slower, recreates containers)
docker compose down
docker compose up -d --waitIn CI: Each workflow run starts fresh containers, so tests are always isolated.
Configuration
See src/eventkit/config.py for all available settings.
Key Settings:
| Variable | Default | Description |
|---|---|---|
GCP_PROJECT_ID | required | GCP project ID |
FIRESTORE_EMULATOR_HOST | - | Firestore emulator address (e.g., localhost:8080) |
PUBSUB_EMULATOR_HOST | - | Pub/Sub emulator address (e.g., localhost:8085) |
STORAGE_EMULATOR_HOST | - | GCS emulator address (e.g., http://localhost:9023) |
| Storage Mode | ||
EVENTKIT_EVENT_STORE | "gcs" | Storage backend: gcs, firestore |
GCP_GCS_BUCKET | required for GCS | GCS bucket name for event storage |
GCP_BIGQUERY_DATASET | required for GCS | BigQuery dataset name |
GCP_BIGQUERY_TABLE | required for GCS | BigQuery table name |
EVENTKIT_WAREHOUSE_ENABLED | true | Enable background warehouse loader |
EVENTKIT_WAREHOUSE_LOADER_INTERVAL | 300.0 | Seconds between loader polls (5 min) |
| Queue Mode | ||
EVENTKIT_QUEUE_MODE | "async" | Queue mode: async, pubsub |
EVENTKIT_ASYNC_WORKERS | 4 | Number of async workers (async mode) |
EVENTKIT_PUBSUB_WORKERS | 4 | Number of Pub/Sub workers (pubsub mode) |
| Ring Buffer (Write-Ahead Log) | ||
EVENTKIT_RING_BUFFER_MODE | "sqlite" | Ring buffer implementation (currently: sqlite) |
EVENTKIT_RING_BUFFER_DB_PATH | "eventkit_ring_buffer.db" | Path to SQLite database file |
EVENTKIT_RING_BUFFER_MAX_SIZE | 100000 | Max published events to keep (size-based cleanup) |
EVENTKIT_RING_BUFFER_RETENTION_HOURS | 24 | Max age for published events (time-based cleanup) |
EVENTKIT_RING_BUFFER_PUBLISHER_BATCH_SIZE | 100 | Events per publisher batch |
EVENTKIT_RING_BUFFER_PUBLISHER_POLL_INTERVAL | 0.1 | Seconds between ring buffer polls |
EVENTKIT_RING_BUFFER_CLEANUP_INTERVAL | 3600.0 | Seconds between cleanup runs (1 hour) |
| EventLoader | ||
EVENTKIT_EVENTLOADER_BATCH_SIZE | adaptive | Events per batch (1000 for GCS, 100 for Firestore) |
EVENTKIT_EVENTLOADER_FLUSH_INTERVAL | adaptive | Flush interval seconds (60 for GCS, 5 for Firestore) |
EVENTKIT_BUFFER_SIZE | 100 | Events per partition before flush (deprecated) |
EVENTKIT_BUFFER_MAX_SIZE | 1000 | Hard limit per partition |
EVENTKIT_BUFFER_TIMEOUT | 5.0 | Max seconds before flush (deprecated) |
Ring Buffer (Durability Layer)
The ring buffer provides Write-Ahead Log (WAL) durability:
- Events are never lost - Written to SQLite before processing
- Survives crashes - SQLite WAL mode ensures durability
- Automatic cleanup - Old published events are removed based on time/size limits
- Background publisher - Moves events from ring buffer to queue asynchronously
Architecture:
API → ring buffer (durable) → publisher → queue → workers → FirestoreWhy SQLite?
- Local durability (no network calls on hot path)
- WAL mode for concurrent reads/writes
- Zero dependencies (built into Python)
- Production-proven (similar to Lytics’ BoltDB approach)
Logging
eventkit uses structured logging via structlog for production observability.
Development (Pretty Logs)
By default, logs are colored and human-readable:
export EVENTKIT_LOG_LEVEL="DEBUG" # or INFO (default), WARNING, ERROR
export EVENTKIT_JSON_LOGS="false" # default
uv run uvicorn eventkit.api.app:app --reload --port 8000Example output:
2026-01-12T10:30:45.123Z [info ] request_received method=POST path=/collect/users event_type=identify request_id=abc-123 stream=users
2026-01-12T10:30:45.125Z [debug ] event_received event_type=identify stream=users
2026-01-12T10:30:45.126Z [debug ] event_adapted event_type=identify
2026-01-12T10:30:45.127Z [debug ] event_sequenced partition=7
2026-01-12T10:30:45.128Z [info ] request_completed status_code=202 duration_ms=5.23 request_id=abc-123 stream=usersProduction (JSON Logs)
For log aggregation systems (e.g., GCP Logging, Datadog):
export EVENTKIT_LOG_LEVEL="INFO"
export EVENTKIT_JSON_LOGS="true"
uv run uvicorn eventkit.api.app:app --port 8000Example output (single-line JSON per log):
{"event": "request_received", "method": "POST", "path": "/collect/users", "event_type": "identify", "request_id": "abc-123", "stream": "users", "level": "info", "timestamp": "2026-01-12T10:30:45.123Z", "logger": "eventkit.api.router"}
{"event": "request_completed", "status_code": 202, "duration_ms": 5.23, "request_id": "abc-123", "stream": "users", "level": "info", "timestamp": "2026-01-12T10:30:45.128Z", "logger": "eventkit.api.router"}Log Levels
| Level | When to Use | Example Operations |
|---|---|---|
| DEBUG | Development debugging | Event adaptation details, sequencer routing |
| INFO | Normal operations | API requests, buffer flushes, store writes |
| WARNING | Recoverable errors | Validation failures, retries |
| ERROR | Unrecoverable errors | Store failures, ring buffer failures |
What’s Logged
Always Logged:
- API requests: method, path, status_code, duration_ms
- Events: stream, event_type (NOT full payload for PII safety)
- EventLoader flushes: partition, event_count, duration_ms
- Store writes: event_count, duration_ms
Never Logged:
- Full event payloads (PII risk)
- User identifiers in plaintext
- Auth tokens or secrets
Context Propagation
All logs within a request include the same request_id and stream for tracing:
# Example: Trace a single event through the entire pipeline
cat logs.json | jq 'select(.request_id == "abc-123")'Stopping Services
# Stop API server: Ctrl+C
# Stop emulators
docker compose down