Core Concepts
eventkit is built around a few core concepts that enable flexible, scalable event processing.
Design Philosophy
1. Flexible Ingestion, Strict Processing
Problem: Schema changes break ingestion in most systems.
Solution: Accept any JSON at the edge, validate downstream. Invalid events go to error store, not dropped.
# RawEvent - Accept anything
raw = RawEvent(payload={"custom": "field", "nested": {"data": 123}})
# Adapter - Validate downstream
result = adapter.adapt(raw)
if result.ok:
typed_event = result.event # TypedEvent with validation
else:
error_store.write(result.error) # Dead letter queue2. Stream-Based Isolation
Problem: Webhook spikes block user event processing.
Solution: Route events to named streams with independent processing.
/collect/users → users stream → Real-time processing
/collect/webhooks → webhooks stream → Batch processing (resilient to spikes)3. Hash-Based Routing (Sequencer)
Problem: Race conditions when processing out-of-order events for same user.
Solution: Hash userId → consistent partition. Same user always processed by same worker in order.
# Events with same userId go to same partition
partition = sequencer.get_partition_id(event) # userId="user_123" → partition 7Benefits:
- Sequential processing per user
- Foundation for deduplication
- Hot user distribution
The Event Pipeline
Phase 1: Ingestion
class RawEvent(BaseModel):
payload: dict[str, Any] # Original JSON
received_at: datetime # Ingestion timestamp
stream: str | None = None # Routing/isolationAccept any JSON, never reject at edge.
Phase 2: Validation & Adaptation
# ValidationPipeline: Composable validators
pipeline = ValidationPipeline([
RequiredFieldsValidator(["type", "userId"]),
TypeCheckValidator({"type": str, "userId": str}),
TimestampValidator()
])
# SchemaAdapter: Transform RawEvent → TypedEvent
result = adapter.adapt(raw_event)Phase 3: Sequencing
# Hash-based routing for consistency
partition_id = sequencer.get_partition_id(typed_event)
await event_loader.add(partition_id, typed_event)Phase 4: Batching & Storage
# EventLoader: Batch events for efficiency
# - Size-based: Flush when buffer reaches N events
# - Time-based: Flush every N seconds
# - Shutdown: Flush all buffers on graceful shutdown
await event_loader.flush(partition_id)Component Overview
- Architecture - Complete system architecture and design decisions
- RawEvent - Flexible container for any JSON payload
- ValidationPipeline - Composable validators
- SchemaAdapter - Transform and validate events
- TypedEvent - Type-safe event models (Identify, Track, Page)
- Sequencer - Hash-based partition routing
- EventLoader - Per-partition buffering and batching
- EventStore - Pluggable storage interface
- ErrorStore - Dead letter queue for failures
Storage Options
GCS + BigQuery (Production)
Write events to Google Cloud Storage as Parquet files, then batch load to BigQuery.
Benefits:
- Cost-efficient (~50% cheaper than BigQuery active storage)
- Raw events available for reprocessing
- Proven at petabyte scale
Firestore (Development)
Managed NoSQL storage with emulator support.
Benefits:
- Fast local development
- No GCP account needed (emulator)
- Good for moderate throughput
Custom Storage
Implement the EventStore protocol for any backend:
class MyCustomStore(EventStore):
async def store(self, event: TypedEvent) -> None: ...
async def store_batch(self, events: list[TypedEvent]) -> None: ...
def health_check(self) -> bool: ...Queue Modes
AsyncQueue (Default)
In-process async workers with asyncio.Queue.
Use when:
- Single instance deployment
- Simple architecture
- Good for most use cases
PubSubQueue
Distributed workers with Google Pub/Sub.
Use when:
- Multiple instances
- Horizontal scaling needed
- Cloud Run, GKE deployments
Both modes use a ring buffer (Write-Ahead Log) for durability.
Next Steps
- Architecture - Deep dive into system design
- Local Development - Set up your environment
- API Reference - Explore the collection API