Architecture
eventkit is an event ingestion and processing library built around the principle of flexible ingestion, strict processing. This document describes the complete architecture, component responsibilities, and design decisions.
Overview
Event collection pipelines require a delicate balance:
- Accept everything at the edge (never lose data due to schema changes)
- Validate downstream (ensure type safety for processing)
- Route consistently (same user → same partition → ordered processing)
- Batch efficiently (minimize storage writes, maximize throughput)
- Handle errors gracefully (dead letter queue, retries, observability)
eventkit provides these primitives as a composable, type-safe library.
High-Level Architecture
Key Points:
- Phase 1: Accept any JSON (less than 100ms p95)
- Phase 2: Validate downstream, errors go to dead letter queue
- Phase 3: Hash-based routing ensures consistent partitioning
- Phase 4: Adaptive batching and pluggable storage
- Phase 5: Out of scope for core eventkit
Component Breakdown
Phase 1: Ingestion Edge
RawEvent
File: src/eventkit/schema/raw.py
Flexible container for any JSON payload. Accepts extra fields (ConfigDict(extra="allow")) to ensure no events are rejected at the edge.
class RawEvent(BaseModel):
payload: dict[str, Any] # Original JSON
received_at: datetime # Ingestion timestamp
stream: str | None = None # Routing/isolationDesign Decision: Never reject events at ingestion. Schema mismatches are validation concerns, not ingestion failures.
Phase 2: Validation & Adaptation
ValidationPipeline
File: src/eventkit/adapters/validators/pipeline.py
Composable chain of validators. Each validator checks one aspect of the payload.
class ValidationPipeline:
def __init__(self, validators: list[Validator]):
self.validators = validators
def validate(self, payload: dict[str, Any]) -> tuple[bool, str | None]:
"""Run all validators, stop on first failure."""
for validator in self.validators:
is_valid, error = validator.validate(payload)
if not is_valid:
return False, error
return True, NoneBuilt-in Validators:
RequiredFieldsValidator- Check for presence of required fieldsTypeCheckValidator- Verify field typesTimestampValidator- Parse timestamps (ISO 8601, Unix seconds/ms)
Design Decision: Use composition over inheritance for validators. This allows both fixed schemas (Segment) and dynamic schemas (future custom schema engine) to reuse the same validation logic.
SchemaAdapter
File: src/eventkit/adapters/segment.py
Transforms RawEvent → TypedEvent by applying validation and type routing.
Current Implementation: SegmentSchemaAdapter
- Validates using
ValidationPipeline.for_segment_spec() - Routes by event type (
identify,track,page) - Extracts identity fields (userId, anonymousId)
- Normalizes timestamps
- Supports PostHog-style property updates (
$set,$set_once)
class SegmentSchemaAdapter:
def adapt(self, raw: RawEvent) -> AdapterResult:
# 1. Validate
is_valid, error = self.pipeline.validate(raw.payload)
if not is_valid:
return AdapterResult.failure(error)
# 2. Route by type
event_type = raw.get("type")
builder = self.type_mapping.get(event_type)
# 3. Build typed event
return builder(raw)Future: CustomSchemaAdapter will build ValidationPipeline dynamically from per-account schemas (see eventkit-schema design notes).
TypedEvent Models
File: src/eventkit/schema/events.py
Type-safe event models for downstream processing.
class TypedEvent(BaseModel):
event_id: str
timestamp: datetime
user_id: str | None
anonymous_id: str | None
event_type: str
properties: dict[str, Any]
stream: str | None
class IdentifyEvent(TypedEvent):
event_type: Literal["identify"] = "identify"
traits: dict[str, Any]
set: dict | None = Field(None, alias="$set")
set_once: dict | None = Field(None, alias="$set_once")
class TrackEvent(TypedEvent):
event_type: Literal["track"] = "track"
event: str # Event name (e.g., "Button Clicked")
class PageEvent(TypedEvent):
event_type: Literal["page"] = "page"
name: str # Page nameDesign Decision: Use Pydantic v2 for validation, serialization, and type safety. Frozen models (frozen=True) prevent accidental mutation.
Phase 3: Routing & Sequencing
Sequencer
File: src/eventkit/processing/sequencer.py
Hash-based routing for consistent partition assignment.
Key Responsibilities:
- Consistent routing - Same userId → same partition
- Load distribution - Events spread across N partitions
- Ordering foundation - Same partition = sequential processing
- Deduplication foundation - Same partition sees all events for a user
Algorithm:
class Sequencer:
def get_partition_id(self, event: TypedEvent) -> int:
# Priority: userId → anonymousId → event hash
routing_key = self._get_routing_key(event)
hash_value = self._fnv1a_hash(routing_key)
return hash_value % self.num_partitionsDesign Decision: Use FNV-1a hash (fast, non-cryptographic, good distribution). Default to 16 partitions (balance between distribution and resource usage).
Why This Matters:
- Hot user handling - High-volume users don’t block others (distributed across workers)
- Race condition prevention - Sequential processing per user prevents conflicting updates
- Deduplication - Same partition can track recently seen event IDs
EventLoader
File: src/eventkit/processing/event_loader.py
Per-partition buffering with size and time-based flushing.
Key Responsibilities:
- Batch efficiency - Reduce storage write overhead
- Backpressure handling - Absorb traffic spikes
- Resilience - In-memory buffering with graceful shutdown
Flushing Strategy:
- Size-based - Flush when buffer reaches 100 events (configurable)
- Time-based - Flush every 5 seconds (configurable)
- Shutdown - Flush all buffers on graceful shutdown
class EventLoader:
def __init__(self, event_store: EventStore, batch_size: int = 100, flush_interval: float = 5.0):
self.buffers: dict[int, list[TypedEvent]] = defaultdict(list)
async def add(self, partition_id: int, event: TypedEvent) -> None:
self.buffers[partition_id].append(event)
if len(self.buffers[partition_id]) >= self.size:
await self._flush(partition_id)Design Decision: Per-partition buffers (not global). This aligns with sequencer’s consistent routing and allows parallel flushing.
Phase 4: Storage
eventkit supports pluggable storage backends via the EventStore protocol. The default is GCS + BigQuery for production deployments.
GCSEventStore (Default)
File: src/eventkit/stores/gcs.py
Writes events to Google Cloud Storage as Parquet files, then loads to BigQuery via a background loader.
Key Design Decisions:
-
Hive-style partitioning
gs://my-events/ date=2026-01-13/ {uuid1}.parquet {uuid2}.parquet date=2026-01-14/ {uuid3}.parquetWhy: Efficient BigQuery loading, cost-effective lifecycle management.
-
Wide schema (sparse columns)
- Single Parquet file with all event type fields
- Nullable columns for type-specific fields (e.g.,
event_nameonly for track events) - Parquet handles sparse data efficiently
- Simpler queries than separate tables per event type
-
Pandas → Parquet → GCS
- Convert events to DataFrame for columnar representation
- Serialize to Parquet with PyArrow
- Upload to GCS with retry logic
-
Retry logic with
tenacity- Exponential backoff for transient GCS failures
- Max 3 retries per operation
Code Pattern:
class GCSEventStore:
async def store_batch(self, events: list[TypedEvent]) -> None:
# Group by date for partitioning
by_date = defaultdict(list)
for event in events:
date = event.timestamp.date()
by_date[date].append(event)
# Write one file per date
for date, day_events in by_date.items():
df = self._events_to_dataframe(day_events)
path = self._generate_path(date) # date=YYYY-MM-DD/{uuid}.parquet
await self._write_parquet(df, path)Why GCS + BigQuery?
- Cost: GCS Standard (0.010/GB/month)
- Flexibility: Raw events for reprocessing, custom pipelines
- Scalability: Proven at Petabyte scale (PostHog, RudderStack, Snowplow)
- Queryability: BigQuery’s SQL engine for analytics
- Pluggable: Easy to add Snowflake, Redshift, etc. via
WarehouseLoaderprotocol
BigQueryLoader
File: src/eventkit/loaders/bigquery_loader.py
Background service that polls GCS for new Parquet files and loads them to BigQuery.
Key Responsibilities:
- Poll GCS - List new
.parquetfiles every 5 minutes (configurable) - Filter loaded - Query
_loaded_filesmetadata table to skip duplicates - Batch load - Create BigQuery load jobs from GCS URIs
- Track metadata - Record loaded files for idempotency
Lifecycle:
loader = BigQueryLoader(bucket, dataset, table, project_id, poll_interval=300.0)
await loader.start() # Runs in background
# ... application runs ...
await loader.stop() # Graceful shutdownWhy separate service?
- Independent scaling: API and loader scale independently
- Latency tolerance: Batch loading accepts 5-10 minute delay
- Resource isolation: Loading doesn’t impact API performance
- Deployment flexibility: Run as Cloud Run scheduled job, Kubernetes CronJob, or embedded
Idempotency:
- Metadata table tracks loaded files:
_loaded_files(file_path, loaded_at, row_count) - Prevents duplicate loads if loader restarts
WarehouseLoader Protocol
File: src/eventkit/loaders/warehouse_loader.py
Pluggable protocol for loading events to different data warehouses.
class WarehouseLoader(Protocol):
async def start(self) -> None:
"""Start background loading process."""
async def stop(self) -> None:
"""Stop background loading process."""
async def load_files(self, file_paths: list[str]) -> None:
"""Load specific files (for manual triggering)."""Implementations:
BigQueryLoader- Default GCS → BigQuery- Future:
SnowflakeLoader,RedshiftLoader,ClickHouseLoader
Why protocol-based?
- Same interface for all warehouses
- Users bring their own warehouse
- Easy to test (mock loaders)
FirestoreEventStore (Development/Testing)
File: src/eventkit/stores/firestore.py
Managed NoSQL storage for development and testing environments.
Why Firestore for dev?
- Emulator support (no GCP account needed)
- Fast local development
- Good for moderate throughput
- Free tier
Not recommended for production analytics due to:
- Higher costs at scale
- Limited query capabilities (no SQL)
- Not designed for analytical workloads
ErrorStore
File: src/eventkit/stores/firestore.py
Dead letter queue for validation failures and processing errors.
Schema:
errors/
{error_id}:
payload: {} # Original RawEvent payload
error: "..." # Error message
timestamp: "..." # When error occurred
metadata: {} # Additional context (stream, etc.)Design Decision: Separate top-level collection (not subcollections under events). This provides clear separation of concerns and simpler error monitoring.
Phase 5: Processing & Orchestration
Processor
File: src/eventkit/processing/processor.py
Orchestrates the full pipeline: ingest → adapt → sequence → buffer → store.
Responsibilities:
- Wire together all components
- Handle async coordination
- Graceful shutdown (flush buffers)
- Error handling & retries
HTTP API
File: src/eventkit/api/collect.py
FastAPI endpoints for event collection.
Endpoints:
POST /collect/{stream}- Flexible ingestionPOST /v1/identify- Segment-compatible identifyPOST /v1/track- Segment-compatible trackPOST /v1/page- Segment-compatible page
Design Philosophy
1. Flexible Ingestion, Strict Processing
Problem: Schema changes break ingestion in most systems.
Solution: Accept any JSON at the edge, validate downstream. Invalid events go to error store, not dropped.
Inspired by: Lytics, RudderStack, Snowplow, PostHog all follow this pattern.
2. Stream-Based Isolation
Problem: Webhook spikes block user event processing.
Solution: Route events to named streams with independent processing. Each stream scales separately.
Example:
/collect/users → users stream → Real-time processing
/collect/webhooks → webhooks stream → Batch processing (resilient to spikes)Inspired by: Lytics’ custom streams, RudderStack’s source isolation.
3. Hash-Based Routing (Sequencer)
Problem: Race conditions when processing out-of-order events for same user.
Solution: Hash userId → consistent partition. Same user always processed by same worker in order.
Benefits:
- Sequential processing per user
- Foundation for deduplication
- Hot user distribution
Inspired by: Lytics’ sequencer (May 2018), Kafka’s partition key strategy.
4. Composable Validators
Problem: Fixed validation doesn’t work for dynamic schemas.
Solution: ValidationPipeline composes reusable validators. SegmentSchemaAdapter uses fixed pipeline, future CustomSchemaAdapter builds pipeline dynamically.
Design:
# Fixed schema (Segment)
pipeline = ValidationPipeline.for_segment_spec()
# Dynamic schema (future)
pipeline = ValidationPipeline.from_schema(account_schema)Inspired by: Middleware/decorator patterns in web frameworks.
5. Two-Tier Adapter Architecture
Tier 1: Format Adapters (future)
- Parse raw formats (JSON, protobuf, form-encoded)
- Produce
RawEvent
Tier 2: Schema Adapters (current)
- Apply schema validation
- Produce
TypedEvent
Why: Separation of concerns. Format parsing is orthogonal to schema validation. A JSON payload can conform to Segment spec, custom schema, or multiple schemas.
Inspired by: Lytics’ adapter layering.
Future Architecture
Out of Scope for Core eventkit:
These can be built as separate packages using eventkit primitives:
eventkit-profiles- Profile building and field mergingeventkit-identity- Identity graph and alias resolutioneventkit-schema- Dynamic schema engine (custom per-account schemas)eventkit-enrichment- Data enrichment and traversaleventkit-destinations- Event forwarding to external systems
Why separate?
- Smaller core surface area
- Independent versioning
- Composable (use what you need)
- Easier to maintain and test