Skip to Content
eventkit is in active development. APIs may change.
ContributingTesting Guidelines

Testing Guidelines

This document describes the testing strategy, conventions, and tools for eventkit.

Testing Philosophy

Test-Driven Development (TDD): Write tests before implementation.

Test Pyramid:

/\ / \ E2E Tests (few, slow, high confidence) /____\ / \ Integration Tests (some, medium speed) /________\ / \ Unit Tests (many, fast, low-level) /__________\

Coverage Target: >80% line coverage, 100% for critical paths (adapters, processor).

Test Structure

tests/ ├── unit/ # Fast, isolated tests │ ├── schema/ │ ├── adapters/ │ ├── processing/ │ ├── stores/ │ └── api/ ├── integration/ # Multi-component tests │ ├── test_full_pipeline.py │ ├── test_firestore_integration.py │ └── test_api_integration.py ├── performance/ # Throughput and latency tests │ ├── test_throughput.py │ └── test_latency.py └── conftest.py # Shared fixtures

Running Tests

Quick Start

# Run all tests pytest # Run with coverage pytest --cov=src/eventkit --cov-report=html # Run specific test file pytest tests/unit/schema/test_raw.py # Run tests matching pattern pytest -k "test_raw_event" # Run with verbose output pytest -v

By Test Type

# Unit tests only (fast) pytest tests/unit/ # Integration tests (requires Firestore emulator) pytest tests/integration/ # Performance tests pytest tests/performance/ --benchmark-only

Watch Mode (for TDD)

# Re-run tests on file changes pytest-watch # Or use pytest-xdist for parallel execution pytest -n auto --looponfail

Writing Tests

Unit Tests

Principle: Test one component in isolation. Mock all dependencies.

Example: Testing RawEvent

# tests/unit/schema/test_raw.py from datetime import datetime, timezone from eventkit.schema.raw import RawEvent def test_raw_event_accepts_arbitrary_fields(): """Story 1: Accept any JSON""" event = RawEvent( payload={ "type": "identify", "custom_field": "value", "nested": {"field": 123} }, stream="users" ) assert event.get("custom_field") == "value" assert event.get("nested")["field"] == 123 assert event.stream == "users" assert isinstance(event.received_at, datetime) def test_raw_event_get_with_default(): """Test get() helper with missing key""" event = RawEvent(payload={"type": "track"}, stream="events") assert event.get("missing_key") is None assert event.get("missing_key", "default") == "default"

Example: Testing Adapter with Mocks

# tests/unit/adapters/test_segment.py import pytest from eventkit.schema.raw import RawEvent from eventkit.adapters.segment import SegmentAdapter from eventkit.adapters.base import AdapterResult @pytest.fixture def adapter(): return SegmentAdapter() def test_adapt_identify_success(adapter): """Story 3: Validate and normalize identify events""" raw = RawEvent( payload={ "type": "identify", "userId": "user_123", "traits": {"email": "user@example.com"} }, stream="users" ) result = adapter.adapt(raw) assert result.success is True assert result.event.type == "identify" assert result.event.userId == "user_123" assert result.event.traits["email"] == "user@example.com" assert result.error is None def test_adapt_missing_user_id_returns_error(adapter): """Story 6: Invalid events return error (not exception)""" raw = RawEvent( payload={"type": "identify", "traits": {}}, stream="users" ) result = adapter.adapt(raw) assert result.success is False assert "Missing required field" in result.error assert result.event is None

Integration Tests

Principle: Test multiple components working together. Use real implementations where feasible, but run against emulators/test backends.

Setup: Firestore Emulator

# Start Firestore emulator gcloud emulators firestore start --host-port=localhost:8080 # In another terminal, set environment variable export FIRESTORE_EMULATOR_HOST=localhost:8080

Example: End-to-End Pipeline Test

# tests/integration/test_full_pipeline.py import pytest from eventkit.schema.raw import RawEvent from eventkit.adapters.segment import SegmentAdapter from eventkit.processing.sequencer import Sequencer from eventkit.processing.event_loader import EventLoader from eventkit.processing.processor import Processor from eventkit.stores.firestore import FirestoreEventStore, FirestoreErrorStore @pytest.fixture async def processor(): """Create processor with real Firestore stores (emulator)""" event_store = FirestoreEventStore( project_id="test-project", collection="events" ) error_store = FirestoreErrorStore( project_id="test-project", collection="errors" ) adapter = SegmentAdapter() sequencer = Sequencer(num_partitions=4) event_loader = EventLoader(event_store, batch_size=10, flush_interval=1.0) processor = Processor(adapter, sequencer, event_loader, error_store) yield processor # Cleanup: flush buffers for partition_id in event_loader.buffers.keys(): await event_loader._flush_partition(partition_id) @pytest.mark.asyncio async def test_valid_event_reaches_event_store(processor): """Story 1 + Story 2 + Story 3: Full pipeline""" # Arrange raw_event = RawEvent( payload={ "type": "identify", "userId": "user_123", "traits": {"email": "user@example.com"} }, stream="users" ) # Act await processor.enqueue(raw_event) await processor.event_loader._flush_partition(0) # Assert events = await processor.event_loader.event_store.read(stream="users", limit=1) assert len(events) == 1 assert events[0].userId == "user_123" assert events[0].traits["email"] == "user@example.com" @pytest.mark.asyncio async def test_invalid_event_goes_to_error_store(processor): """Story 6: Invalid events → error store (not dropped)""" # Arrange raw_event = RawEvent( payload={"type": "identify"}, # Missing userId and anonymousId stream="users" ) # Act await processor.enqueue(raw_event) # Assert errors = await processor.error_store.query_errors(limit=1) assert len(errors) == 1 assert "Missing required field" in errors[0]["error"] assert errors[0]["raw_payload"]["type"] == "identify"

Performance Tests

Principle: Validate throughput and latency targets.

Example: Throughput Test

# tests/performance/test_throughput.py import pytest import asyncio import time from eventkit.schema.raw import RawEvent from eventkit.processing.processor import Processor @pytest.mark.asyncio async def test_throughput_10k_events_per_second(processor): """Story 1: Throughput target (10k events/sec)""" num_events = 10_000 events = [ RawEvent( payload={ "type": "track", "event": "test_event", "userId": f"user_{i % 1000}" }, stream="events" ) for i in range(num_events) ] start = time.time() tasks = [processor.enqueue(event) for event in events] await asyncio.gather(*tasks) elapsed = time.time() - start throughput = num_events / elapsed print(f"Throughput: {throughput:.0f} events/sec") assert throughput >= 10_000, f"Only {throughput:.0f} events/sec"

Test Fixtures

Shared Fixtures (conftest.py)

# tests/conftest.py import pytest import asyncio from eventkit.schema.raw import RawEvent from eventkit.adapters.segment import SegmentAdapter from eventkit.processing.sequencer import Sequencer @pytest.fixture(scope="session") def event_loop(): """Create event loop for async tests""" loop = asyncio.get_event_loop_policy().new_event_loop() yield loop loop.close() @pytest.fixture def sample_raw_event(): """Reusable raw event for tests""" return RawEvent( payload={ "type": "identify", "userId": "test_user", "traits": {"email": "test@example.com"} }, stream="users" ) @pytest.fixture def adapter(): """Reusable adapter instance""" return SegmentAdapter() @pytest.fixture def sequencer(): """Reusable sequencer instance""" return Sequencer(num_partitions=16)

Mocking Best Practices

Mock External Services

# tests/unit/stores/test_firestore.py import pytest from unittest.mock import AsyncMock, MagicMock, patch from eventkit.stores.firestore import FirestoreEventStore from eventkit.schema.events import IdentifyEvent @pytest.fixture def mock_firestore_client(): """Mock Firestore client""" with patch("google.cloud.firestore.AsyncClient") as mock: client = MagicMock() mock.return_value = client yield client @pytest.mark.asyncio async def test_write_batch(mock_firestore_client): """Test batch write to Firestore""" store = FirestoreEventStore(project_id="test", collection="events") events = [ IdentifyEvent( userId=f"user_{i}", traits={"index": i}, timestamp=datetime.now(timezone.utc) ) for i in range(100) ] await store.write(events) assert mock_firestore_client.batch.called

Test Naming Conventions

Unit Tests

# Pattern: test_{method}_{scenario}_{expected_result} def test_adapt_identify_with_valid_payload_returns_success() def test_adapt_identify_with_missing_user_id_returns_error() def test_get_partition_id_with_same_user_id_returns_same_partition()

Integration Tests

# Pattern: test_{feature}_{scenario} async def test_full_pipeline_valid_event_reaches_event_store() async def test_full_pipeline_invalid_event_goes_to_error_store() def test_api_collect_endpoint_accepts_batch_events()

Performance Tests

# Pattern: test_{metric}_{target} async def test_throughput_10k_events_per_second() def test_latency_p95_under_100ms()

Debugging Tests

Run Single Test with Output

pytest tests/unit/schema/test_raw.py::test_raw_event_accepts_arbitrary_fields -v -s

Drop into Debugger on Failure

pytest --pdb

Show Print Statements

pytest -s

Show Local Variables on Failure

pytest -l

Coverage Reports

Generate HTML Report

pytest --cov=src/eventkit --cov-report=html open htmlcov/index.html

Identify Uncovered Lines

pytest --cov=src/eventkit --cov-report=term-missing

Enforce Minimum Coverage

pytest --cov=src/eventkit --cov-fail-under=80

Checklist: Before Merging PR

  • All tests pass (pytest)
  • Coverage >80% (pytest --cov)
  • Type checking passes (mypy src/eventkit)
  • Linting passes (ruff check src/)
  • Integration tests with Firestore emulator pass
  • Performance tests meet targets (if applicable)
  • New tests added for new functionality
  • Docstrings updated

Resources

Last updated on