Skip to Content
eventkit is in active development. APIs may change.
Core ConceptsArchitecture

Architecture

eventkit is an event ingestion and processing library built around the principle of flexible ingestion, strict processing. This document describes the complete architecture, component responsibilities, and design decisions.

Overview

Event collection pipelines require a delicate balance:

  • Accept everything at the edge (never lose data due to schema changes)
  • Validate downstream (ensure type safety for processing)
  • Route consistently (same user → same partition → ordered processing)
  • Batch efficiently (minimize storage writes, maximize throughput)
  • Handle errors gracefully (dead letter queue, retries, observability)

eventkit provides these primitives as a composable, type-safe library.

High-Level Architecture

Key Points:

  • Phase 1: Accept any JSON (less than 100ms p95)
  • Phase 2: Validate downstream, errors go to dead letter queue
  • Phase 3: Hash-based routing ensures consistent partitioning
  • Phase 4: Adaptive batching and pluggable storage
  • Phase 5: Out of scope for core eventkit

Component Breakdown

Phase 1: Ingestion Edge

RawEvent

File: src/eventkit/schema/raw.py

Flexible container for any JSON payload. Accepts extra fields (ConfigDict(extra="allow")) to ensure no events are rejected at the edge.

class RawEvent(BaseModel): payload: dict[str, Any] # Original JSON received_at: datetime # Ingestion timestamp stream: str | None = None # Routing/isolation

Design Decision: Never reject events at ingestion. Schema mismatches are validation concerns, not ingestion failures.

Phase 2: Validation & Adaptation

ValidationPipeline

File: src/eventkit/adapters/validators/pipeline.py

Composable chain of validators. Each validator checks one aspect of the payload.

class ValidationPipeline: def __init__(self, validators: list[Validator]): self.validators = validators def validate(self, payload: dict[str, Any]) -> tuple[bool, str | None]: """Run all validators, stop on first failure.""" for validator in self.validators: is_valid, error = validator.validate(payload) if not is_valid: return False, error return True, None

Built-in Validators:

  • RequiredFieldsValidator - Check for presence of required fields
  • TypeCheckValidator - Verify field types
  • TimestampValidator - Parse timestamps (ISO 8601, Unix seconds/ms)

Design Decision: Use composition over inheritance for validators. This allows both fixed schemas (Segment) and dynamic schemas (future custom schema engine) to reuse the same validation logic.

SchemaAdapter

File: src/eventkit/adapters/segment.py

Transforms RawEventTypedEvent by applying validation and type routing.

Current Implementation: SegmentSchemaAdapter

  • Validates using ValidationPipeline.for_segment_spec()
  • Routes by event type (identify, track, page)
  • Extracts identity fields (userId, anonymousId)
  • Normalizes timestamps
  • Supports PostHog-style property updates ($set, $set_once)
class SegmentSchemaAdapter: def adapt(self, raw: RawEvent) -> AdapterResult: # 1. Validate is_valid, error = self.pipeline.validate(raw.payload) if not is_valid: return AdapterResult.failure(error) # 2. Route by type event_type = raw.get("type") builder = self.type_mapping.get(event_type) # 3. Build typed event return builder(raw)

Future: CustomSchemaAdapter will build ValidationPipeline dynamically from per-account schemas (see eventkit-schema design notes).

TypedEvent Models

File: src/eventkit/schema/events.py

Type-safe event models for downstream processing.

class TypedEvent(BaseModel): event_id: str timestamp: datetime user_id: str | None anonymous_id: str | None event_type: str properties: dict[str, Any] stream: str | None class IdentifyEvent(TypedEvent): event_type: Literal["identify"] = "identify" traits: dict[str, Any] set: dict | None = Field(None, alias="$set") set_once: dict | None = Field(None, alias="$set_once") class TrackEvent(TypedEvent): event_type: Literal["track"] = "track" event: str # Event name (e.g., "Button Clicked") class PageEvent(TypedEvent): event_type: Literal["page"] = "page" name: str # Page name

Design Decision: Use Pydantic v2 for validation, serialization, and type safety. Frozen models (frozen=True) prevent accidental mutation.

Phase 3: Routing & Sequencing

Sequencer

File: src/eventkit/processing/sequencer.py

Hash-based routing for consistent partition assignment.

Key Responsibilities:

  1. Consistent routing - Same userId → same partition
  2. Load distribution - Events spread across N partitions
  3. Ordering foundation - Same partition = sequential processing
  4. Deduplication foundation - Same partition sees all events for a user

Algorithm:

class Sequencer: def get_partition_id(self, event: TypedEvent) -> int: # Priority: userId → anonymousId → event hash routing_key = self._get_routing_key(event) hash_value = self._fnv1a_hash(routing_key) return hash_value % self.num_partitions

Design Decision: Use FNV-1a hash (fast, non-cryptographic, good distribution). Default to 16 partitions (balance between distribution and resource usage).

Why This Matters:

  • Hot user handling - High-volume users don’t block others (distributed across workers)
  • Race condition prevention - Sequential processing per user prevents conflicting updates
  • Deduplication - Same partition can track recently seen event IDs

EventLoader

File: src/eventkit/processing/event_loader.py

Per-partition buffering with size and time-based flushing.

Key Responsibilities:

  1. Batch efficiency - Reduce storage write overhead
  2. Backpressure handling - Absorb traffic spikes
  3. Resilience - In-memory buffering with graceful shutdown

Flushing Strategy:

  • Size-based - Flush when buffer reaches 100 events (configurable)
  • Time-based - Flush every 5 seconds (configurable)
  • Shutdown - Flush all buffers on graceful shutdown
class EventLoader: def __init__(self, event_store: EventStore, batch_size: int = 100, flush_interval: float = 5.0): self.buffers: dict[int, list[TypedEvent]] = defaultdict(list) async def add(self, partition_id: int, event: TypedEvent) -> None: self.buffers[partition_id].append(event) if len(self.buffers[partition_id]) >= self.size: await self._flush(partition_id)

Design Decision: Per-partition buffers (not global). This aligns with sequencer’s consistent routing and allows parallel flushing.

Phase 4: Storage

eventkit supports pluggable storage backends via the EventStore protocol. The default is GCS + BigQuery for production deployments.

GCSEventStore (Default)

File: src/eventkit/stores/gcs.py

Writes events to Google Cloud Storage as Parquet files, then loads to BigQuery via a background loader.

Key Design Decisions:

  1. Hive-style partitioning

    gs://my-events/ date=2026-01-13/ {uuid1}.parquet {uuid2}.parquet date=2026-01-14/ {uuid3}.parquet

    Why: Efficient BigQuery loading, cost-effective lifecycle management.

  2. Wide schema (sparse columns)

    • Single Parquet file with all event type fields
    • Nullable columns for type-specific fields (e.g., event_name only for track events)
    • Parquet handles sparse data efficiently
    • Simpler queries than separate tables per event type
  3. Pandas → Parquet → GCS

    • Convert events to DataFrame for columnar representation
    • Serialize to Parquet with PyArrow
    • Upload to GCS with retry logic
  4. Retry logic with tenacity

    • Exponential backoff for transient GCS failures
    • Max 3 retries per operation

Code Pattern:

class GCSEventStore: async def store_batch(self, events: list[TypedEvent]) -> None: # Group by date for partitioning by_date = defaultdict(list) for event in events: date = event.timestamp.date() by_date[date].append(event) # Write one file per date for date, day_events in by_date.items(): df = self._events_to_dataframe(day_events) path = self._generate_path(date) # date=YYYY-MM-DD/{uuid}.parquet await self._write_parquet(df, path)

Why GCS + BigQuery?

  • Cost: GCS Standard (0.020/GB/month)BigQuerylongterm(0.020/GB/month) → BigQuery long-term (0.010/GB/month)
  • Flexibility: Raw events for reprocessing, custom pipelines
  • Scalability: Proven at Petabyte scale (PostHog, RudderStack, Snowplow)
  • Queryability: BigQuery’s SQL engine for analytics
  • Pluggable: Easy to add Snowflake, Redshift, etc. via WarehouseLoader protocol

BigQueryLoader

File: src/eventkit/loaders/bigquery_loader.py

Background service that polls GCS for new Parquet files and loads them to BigQuery.

Key Responsibilities:

  1. Poll GCS - List new .parquet files every 5 minutes (configurable)
  2. Filter loaded - Query _loaded_files metadata table to skip duplicates
  3. Batch load - Create BigQuery load jobs from GCS URIs
  4. Track metadata - Record loaded files for idempotency

Lifecycle:

loader = BigQueryLoader(bucket, dataset, table, project_id, poll_interval=300.0) await loader.start() # Runs in background # ... application runs ... await loader.stop() # Graceful shutdown

Why separate service?

  • Independent scaling: API and loader scale independently
  • Latency tolerance: Batch loading accepts 5-10 minute delay
  • Resource isolation: Loading doesn’t impact API performance
  • Deployment flexibility: Run as Cloud Run scheduled job, Kubernetes CronJob, or embedded

Idempotency:

  • Metadata table tracks loaded files: _loaded_files(file_path, loaded_at, row_count)
  • Prevents duplicate loads if loader restarts

WarehouseLoader Protocol

File: src/eventkit/loaders/warehouse_loader.py

Pluggable protocol for loading events to different data warehouses.

class WarehouseLoader(Protocol): async def start(self) -> None: """Start background loading process.""" async def stop(self) -> None: """Stop background loading process.""" async def load_files(self, file_paths: list[str]) -> None: """Load specific files (for manual triggering)."""

Implementations:

  • BigQueryLoader - Default GCS → BigQuery
  • Future: SnowflakeLoader, RedshiftLoader, ClickHouseLoader

Why protocol-based?

  • Same interface for all warehouses
  • Users bring their own warehouse
  • Easy to test (mock loaders)

FirestoreEventStore (Development/Testing)

File: src/eventkit/stores/firestore.py

Managed NoSQL storage for development and testing environments.

Why Firestore for dev?

  • Emulator support (no GCP account needed)
  • Fast local development
  • Good for moderate throughput
  • Free tier

Not recommended for production analytics due to:

  • Higher costs at scale
  • Limited query capabilities (no SQL)
  • Not designed for analytical workloads

ErrorStore

File: src/eventkit/stores/firestore.py

Dead letter queue for validation failures and processing errors.

Schema:

errors/ {error_id}: payload: {} # Original RawEvent payload error: "..." # Error message timestamp: "..." # When error occurred metadata: {} # Additional context (stream, etc.)

Design Decision: Separate top-level collection (not subcollections under events). This provides clear separation of concerns and simpler error monitoring.

Phase 5: Processing & Orchestration

Processor

File: src/eventkit/processing/processor.py

Orchestrates the full pipeline: ingest → adapt → sequence → buffer → store.

Responsibilities:

  1. Wire together all components
  2. Handle async coordination
  3. Graceful shutdown (flush buffers)
  4. Error handling & retries

HTTP API

File: src/eventkit/api/collect.py

FastAPI endpoints for event collection.

Endpoints:

  • POST /collect/{stream} - Flexible ingestion
  • POST /v1/identify - Segment-compatible identify
  • POST /v1/track - Segment-compatible track
  • POST /v1/page - Segment-compatible page

Design Philosophy

1. Flexible Ingestion, Strict Processing

Problem: Schema changes break ingestion in most systems.

Solution: Accept any JSON at the edge, validate downstream. Invalid events go to error store, not dropped.

Inspired by: Lytics, RudderStack, Snowplow, PostHog all follow this pattern.

2. Stream-Based Isolation

Problem: Webhook spikes block user event processing.

Solution: Route events to named streams with independent processing. Each stream scales separately.

Example:

/collect/users → users stream → Real-time processing /collect/webhooks → webhooks stream → Batch processing (resilient to spikes)

Inspired by: Lytics’ custom streams, RudderStack’s source isolation.

3. Hash-Based Routing (Sequencer)

Problem: Race conditions when processing out-of-order events for same user.

Solution: Hash userId → consistent partition. Same user always processed by same worker in order.

Benefits:

  • Sequential processing per user
  • Foundation for deduplication
  • Hot user distribution

Inspired by: Lytics’ sequencer (May 2018), Kafka’s partition key strategy.

4. Composable Validators

Problem: Fixed validation doesn’t work for dynamic schemas.

Solution: ValidationPipeline composes reusable validators. SegmentSchemaAdapter uses fixed pipeline, future CustomSchemaAdapter builds pipeline dynamically.

Design:

# Fixed schema (Segment) pipeline = ValidationPipeline.for_segment_spec() # Dynamic schema (future) pipeline = ValidationPipeline.from_schema(account_schema)

Inspired by: Middleware/decorator patterns in web frameworks.

5. Two-Tier Adapter Architecture

Tier 1: Format Adapters (future)

  • Parse raw formats (JSON, protobuf, form-encoded)
  • Produce RawEvent

Tier 2: Schema Adapters (current)

  • Apply schema validation
  • Produce TypedEvent

Why: Separation of concerns. Format parsing is orthogonal to schema validation. A JSON payload can conform to Segment spec, custom schema, or multiple schemas.

Inspired by: Lytics’ adapter layering.

Future Architecture

Out of Scope for Core eventkit:

These can be built as separate packages using eventkit primitives:

  • eventkit-profiles - Profile building and field merging
  • eventkit-identity - Identity graph and alias resolution
  • eventkit-schema - Dynamic schema engine (custom per-account schemas)
  • eventkit-enrichment - Data enrichment and traversal
  • eventkit-destinations - Event forwarding to external systems

Why separate?

  • Smaller core surface area
  • Independent versioning
  • Composable (use what you need)
  • Easier to maintain and test
Last updated on