Skip to Content
eventkit is in active development. APIs may change.
Core ConceptsCore Concepts

Core Concepts

eventkit is built around a few core concepts that enable flexible, scalable event processing.

Design Philosophy

1. Flexible Ingestion, Strict Processing

Problem: Schema changes break ingestion in most systems.

Solution: Accept any JSON at the edge, validate downstream. Invalid events go to error store, not dropped.

# RawEvent - Accept anything raw = RawEvent(payload={"custom": "field", "nested": {"data": 123}}) # Adapter - Validate downstream result = adapter.adapt(raw) if result.ok: typed_event = result.event # TypedEvent with validation else: error_store.write(result.error) # Dead letter queue

2. Stream-Based Isolation

Problem: Webhook spikes block user event processing.

Solution: Route events to named streams with independent processing.

/collect/users → users stream → Real-time processing /collect/webhooks → webhooks stream → Batch processing (resilient to spikes)

3. Hash-Based Routing (Sequencer)

Problem: Race conditions when processing out-of-order events for same user.

Solution: Hash userId → consistent partition. Same user always processed by same worker in order.

# Events with same userId go to same partition partition = sequencer.get_partition_id(event) # userId="user_123" → partition 7

Benefits:

  • Sequential processing per user
  • Foundation for deduplication
  • Hot user distribution

The Event Pipeline

Phase 1: Ingestion

class RawEvent(BaseModel): payload: dict[str, Any] # Original JSON received_at: datetime # Ingestion timestamp stream: str | None = None # Routing/isolation

Accept any JSON, never reject at edge.

Phase 2: Validation & Adaptation

# ValidationPipeline: Composable validators pipeline = ValidationPipeline([ RequiredFieldsValidator(["type", "userId"]), TypeCheckValidator({"type": str, "userId": str}), TimestampValidator() ]) # SchemaAdapter: Transform RawEvent → TypedEvent result = adapter.adapt(raw_event)

Phase 3: Sequencing

# Hash-based routing for consistency partition_id = sequencer.get_partition_id(typed_event) await event_loader.add(partition_id, typed_event)

Phase 4: Batching & Storage

# EventLoader: Batch events for efficiency # - Size-based: Flush when buffer reaches N events # - Time-based: Flush every N seconds # - Shutdown: Flush all buffers on graceful shutdown await event_loader.flush(partition_id)

Component Overview

  • Architecture - Complete system architecture and design decisions
  • RawEvent - Flexible container for any JSON payload
  • ValidationPipeline - Composable validators
  • SchemaAdapter - Transform and validate events
  • TypedEvent - Type-safe event models (Identify, Track, Page)
  • Sequencer - Hash-based partition routing
  • EventLoader - Per-partition buffering and batching
  • EventStore - Pluggable storage interface
  • ErrorStore - Dead letter queue for failures

Storage Options

GCS + BigQuery (Production)

Write events to Google Cloud Storage as Parquet files, then batch load to BigQuery.

Benefits:

  • Cost-efficient (~50% cheaper than BigQuery active storage)
  • Raw events available for reprocessing
  • Proven at petabyte scale

Firestore (Development)

Managed NoSQL storage with emulator support.

Benefits:

  • Fast local development
  • No GCP account needed (emulator)
  • Good for moderate throughput

Custom Storage

Implement the EventStore protocol for any backend:

class MyCustomStore(EventStore): async def store(self, event: TypedEvent) -> None: ... async def store_batch(self, events: list[TypedEvent]) -> None: ... def health_check(self) -> bool: ...

Queue Modes

AsyncQueue (Default)

In-process async workers with asyncio.Queue.

Use when:

  • Single instance deployment
  • Simple architecture
  • Good for most use cases

PubSubQueue

Distributed workers with Google Pub/Sub.

Use when:

  • Multiple instances
  • Horizontal scaling needed
  • Cloud Run, GKE deployments

Both modes use a ring buffer (Write-Ahead Log) for durability.

Next Steps

Last updated on