Skip to Content
eventkit is in active development. APIs may change.
GuidesLocal Development

Local Development Guide

Quick Start

Prerequisites

  • Docker & Docker Compose
  • Python 3.12+
  • uv  (recommended) or pip

1. Start Emulators

docker compose up -d

This starts:

  • Firestore emulator on localhost:8080 (for Firestore storage mode)
  • Pub/Sub emulator on localhost:8085 (for distributed queue mode)

For GCS + BigQuery mode, you can run a GCS emulator:

docker run -d -p 9023:9023 --name gcs-emulator \ fsouza/fake-gcs-server -scheme http export STORAGE_EMULATOR_HOST=http://localhost:9023

See tests/integration/README.md for full emulator setup.

2. Install Dependencies

uv sync

3. Run the API Server

Option A: GCS + BigQuery Mode (production pattern with emulator)

export STORAGE_EMULATOR_HOST="http://localhost:9023" export GCP_PROJECT_ID="test-project" export GCP_GCS_BUCKET="test-events" export GCP_BIGQUERY_DATASET="events" export GCP_BIGQUERY_TABLE="raw_events" export EVENTKIT_EVENT_STORE="gcs" export EVENTKIT_WAREHOUSE_ENABLED="true" export EVENTKIT_QUEUE_MODE="async" export EVENTKIT_ASYNC_WORKERS="4" export EVENTKIT_RING_BUFFER_DB_PATH="./eventkit_ring_buffer.db" uv run uvicorn eventkit.api.app:app --reload --port 8000

Option B: Firestore Mode (fast local development)

export FIRESTORE_EMULATOR_HOST="localhost:8080" export GCP_PROJECT_ID="test-project" export EVENTKIT_EVENT_STORE="firestore" export EVENTKIT_QUEUE_MODE="async" export EVENTKIT_ASYNC_WORKERS="4" export EVENTKIT_RING_BUFFER_DB_PATH="./eventkit_ring_buffer.db" uv run uvicorn eventkit.api.app:app --reload --port 8000

Option C: Pub/Sub Queue Mode (distributed workers)

export FIRESTORE_EMULATOR_HOST="localhost:8080" export PUBSUB_EMULATOR_HOST="localhost:8085" export GCP_PROJECT_ID="test-project" export EVENTKIT_EVENT_STORE="firestore" export EVENTKIT_QUEUE_MODE="pubsub" export EVENTKIT_PUBSUB_WORKERS="4" export EVENTKIT_RING_BUFFER_DB_PATH="./eventkit_ring_buffer.db" uv run uvicorn eventkit.api.app:app --reload --port 8000

Note: The ring buffer (Write-Ahead Log) is always enabled for durability. Events are persisted to SQLite before processing, ensuring no data loss even if the service crashes.

The API will be available at http://localhost:8000.

4. Test the API

Health Check:

curl http://localhost:8000/health # {"status": "ok"}

Send an Event:

curl -X POST http://localhost:8000/collect \ -H "Content-Type: application/json" \ -d '{"type": "track", "event": "button_click", "userId": "user_123"}' # {"status": "accepted"}

Running Tests

# Start emulators docker compose up -d # Run all tests (unit + integration) export FIRESTORE_EMULATOR_HOST="localhost:8080" export PUBSUB_EMULATOR_HOST="localhost:8085" export GCP_PROJECT_ID="test-project" uv run pytest --cov=src/eventkit # Run only unit tests (fast, no emulator needed) uv run pytest tests/unit/ # Run only integration tests uv run pytest tests/integration/ # Stop emulators docker compose down

Test Isolation

Docker emulators keep data in memory between test runs (while containers are running). This is useful for fast iteration, but means:

  • First test run: Clean slate ✅
  • Second test run (without restarting): Data from previous run persists ⚠️

To get a clean slate for integration tests:

# Option 1: Restart emulators (fast, keeps containers) docker compose restart firestore-emulator pubsub-emulator # Option 2: Full restart (slower, recreates containers) docker compose down docker compose up -d --wait

In CI: Each workflow run starts fresh containers, so tests are always isolated.

Configuration

See src/eventkit/config.py for all available settings.

Key Settings:

VariableDefaultDescription
GCP_PROJECT_IDrequiredGCP project ID
FIRESTORE_EMULATOR_HOST-Firestore emulator address (e.g., localhost:8080)
PUBSUB_EMULATOR_HOST-Pub/Sub emulator address (e.g., localhost:8085)
STORAGE_EMULATOR_HOST-GCS emulator address (e.g., http://localhost:9023)
Storage Mode
EVENTKIT_EVENT_STORE"gcs"Storage backend: gcs, firestore
GCP_GCS_BUCKETrequired for GCSGCS bucket name for event storage
GCP_BIGQUERY_DATASETrequired for GCSBigQuery dataset name
GCP_BIGQUERY_TABLErequired for GCSBigQuery table name
EVENTKIT_WAREHOUSE_ENABLEDtrueEnable background warehouse loader
EVENTKIT_WAREHOUSE_LOADER_INTERVAL300.0Seconds between loader polls (5 min)
Queue Mode
EVENTKIT_QUEUE_MODE"async"Queue mode: async, pubsub
EVENTKIT_ASYNC_WORKERS4Number of async workers (async mode)
EVENTKIT_PUBSUB_WORKERS4Number of Pub/Sub workers (pubsub mode)
Ring Buffer (Write-Ahead Log)
EVENTKIT_RING_BUFFER_MODE"sqlite"Ring buffer implementation (currently: sqlite)
EVENTKIT_RING_BUFFER_DB_PATH"eventkit_ring_buffer.db"Path to SQLite database file
EVENTKIT_RING_BUFFER_MAX_SIZE100000Max published events to keep (size-based cleanup)
EVENTKIT_RING_BUFFER_RETENTION_HOURS24Max age for published events (time-based cleanup)
EVENTKIT_RING_BUFFER_PUBLISHER_BATCH_SIZE100Events per publisher batch
EVENTKIT_RING_BUFFER_PUBLISHER_POLL_INTERVAL0.1Seconds between ring buffer polls
EVENTKIT_RING_BUFFER_CLEANUP_INTERVAL3600.0Seconds between cleanup runs (1 hour)
EventLoader
EVENTKIT_EVENTLOADER_BATCH_SIZEadaptiveEvents per batch (1000 for GCS, 100 for Firestore)
EVENTKIT_EVENTLOADER_FLUSH_INTERVALadaptiveFlush interval seconds (60 for GCS, 5 for Firestore)
EVENTKIT_BUFFER_SIZE100Events per partition before flush (deprecated)
EVENTKIT_BUFFER_MAX_SIZE1000Hard limit per partition
EVENTKIT_BUFFER_TIMEOUT5.0Max seconds before flush (deprecated)

Ring Buffer (Durability Layer)

The ring buffer provides Write-Ahead Log (WAL) durability:

  • Events are never lost - Written to SQLite before processing
  • Survives crashes - SQLite WAL mode ensures durability
  • Automatic cleanup - Old published events are removed based on time/size limits
  • Background publisher - Moves events from ring buffer to queue asynchronously

Architecture:

API → ring buffer (durable) → publisher → queue → workers → Firestore

Why SQLite?

  • Local durability (no network calls on hot path)
  • WAL mode for concurrent reads/writes
  • Zero dependencies (built into Python)
  • Production-proven (similar to Lytics’ BoltDB approach)

Logging

eventkit uses structured logging via structlog for production observability.

Development (Pretty Logs)

By default, logs are colored and human-readable:

export EVENTKIT_LOG_LEVEL="DEBUG" # or INFO (default), WARNING, ERROR export EVENTKIT_JSON_LOGS="false" # default uv run uvicorn eventkit.api.app:app --reload --port 8000

Example output:

2026-01-12T10:30:45.123Z [info ] request_received method=POST path=/collect/users event_type=identify request_id=abc-123 stream=users 2026-01-12T10:30:45.125Z [debug ] event_received event_type=identify stream=users 2026-01-12T10:30:45.126Z [debug ] event_adapted event_type=identify 2026-01-12T10:30:45.127Z [debug ] event_sequenced partition=7 2026-01-12T10:30:45.128Z [info ] request_completed status_code=202 duration_ms=5.23 request_id=abc-123 stream=users

Production (JSON Logs)

For log aggregation systems (e.g., GCP Logging, Datadog):

export EVENTKIT_LOG_LEVEL="INFO" export EVENTKIT_JSON_LOGS="true" uv run uvicorn eventkit.api.app:app --port 8000

Example output (single-line JSON per log):

{"event": "request_received", "method": "POST", "path": "/collect/users", "event_type": "identify", "request_id": "abc-123", "stream": "users", "level": "info", "timestamp": "2026-01-12T10:30:45.123Z", "logger": "eventkit.api.router"} {"event": "request_completed", "status_code": 202, "duration_ms": 5.23, "request_id": "abc-123", "stream": "users", "level": "info", "timestamp": "2026-01-12T10:30:45.128Z", "logger": "eventkit.api.router"}

Log Levels

LevelWhen to UseExample Operations
DEBUGDevelopment debuggingEvent adaptation details, sequencer routing
INFONormal operationsAPI requests, buffer flushes, store writes
WARNINGRecoverable errorsValidation failures, retries
ERRORUnrecoverable errorsStore failures, ring buffer failures

What’s Logged

Always Logged:

  • API requests: method, path, status_code, duration_ms
  • Events: stream, event_type (NOT full payload for PII safety)
  • EventLoader flushes: partition, event_count, duration_ms
  • Store writes: event_count, duration_ms

Never Logged:

  • Full event payloads (PII risk)
  • User identifiers in plaintext
  • Auth tokens or secrets

Context Propagation

All logs within a request include the same request_id and stream for tracing:

# Example: Trace a single event through the entire pipeline cat logs.json | jq 'select(.request_id == "abc-123")'

Stopping Services

# Stop API server: Ctrl+C # Stop emulators docker compose down
Last updated on