Testing Guidelines
This document describes the testing strategy, conventions, and tools for eventkit.
Testing Philosophy
Test-Driven Development (TDD): Write tests before implementation.
Test Pyramid:
/\
/ \ E2E Tests (few, slow, high confidence)
/____\
/ \ Integration Tests (some, medium speed)
/________\
/ \ Unit Tests (many, fast, low-level)
/__________\Coverage Target: >80% line coverage, 100% for critical paths (adapters, processor).
Test Structure
tests/
├── unit/ # Fast, isolated tests
│ ├── schema/
│ ├── adapters/
│ ├── processing/
│ ├── stores/
│ └── api/
├── integration/ # Multi-component tests
│ ├── test_full_pipeline.py
│ ├── test_firestore_integration.py
│ └── test_api_integration.py
├── performance/ # Throughput and latency tests
│ ├── test_throughput.py
│ └── test_latency.py
└── conftest.py # Shared fixturesRunning Tests
Quick Start
# Run all tests
pytest
# Run with coverage
pytest --cov=src/eventkit --cov-report=html
# Run specific test file
pytest tests/unit/schema/test_raw.py
# Run tests matching pattern
pytest -k "test_raw_event"
# Run with verbose output
pytest -vBy Test Type
# Unit tests only (fast)
pytest tests/unit/
# Integration tests (requires Firestore emulator)
pytest tests/integration/
# Performance tests
pytest tests/performance/ --benchmark-onlyWatch Mode (for TDD)
# Re-run tests on file changes
pytest-watch
# Or use pytest-xdist for parallel execution
pytest -n auto --looponfailWriting Tests
Unit Tests
Principle: Test one component in isolation. Mock all dependencies.
Example: Testing RawEvent
# tests/unit/schema/test_raw.py
from datetime import datetime, timezone
from eventkit.schema.raw import RawEvent
def test_raw_event_accepts_arbitrary_fields():
"""Story 1: Accept any JSON"""
event = RawEvent(
payload={
"type": "identify",
"custom_field": "value",
"nested": {"field": 123}
},
stream="users"
)
assert event.get("custom_field") == "value"
assert event.get("nested")["field"] == 123
assert event.stream == "users"
assert isinstance(event.received_at, datetime)
def test_raw_event_get_with_default():
"""Test get() helper with missing key"""
event = RawEvent(payload={"type": "track"}, stream="events")
assert event.get("missing_key") is None
assert event.get("missing_key", "default") == "default"Example: Testing Adapter with Mocks
# tests/unit/adapters/test_segment.py
import pytest
from eventkit.schema.raw import RawEvent
from eventkit.adapters.segment import SegmentAdapter
from eventkit.adapters.base import AdapterResult
@pytest.fixture
def adapter():
return SegmentAdapter()
def test_adapt_identify_success(adapter):
"""Story 3: Validate and normalize identify events"""
raw = RawEvent(
payload={
"type": "identify",
"userId": "user_123",
"traits": {"email": "user@example.com"}
},
stream="users"
)
result = adapter.adapt(raw)
assert result.success is True
assert result.event.type == "identify"
assert result.event.userId == "user_123"
assert result.event.traits["email"] == "user@example.com"
assert result.error is None
def test_adapt_missing_user_id_returns_error(adapter):
"""Story 6: Invalid events return error (not exception)"""
raw = RawEvent(
payload={"type": "identify", "traits": {}},
stream="users"
)
result = adapter.adapt(raw)
assert result.success is False
assert "Missing required field" in result.error
assert result.event is NoneIntegration Tests
Principle: Test multiple components working together. Use real implementations where feasible, but run against emulators/test backends.
Setup: Firestore Emulator
# Start Firestore emulator
gcloud emulators firestore start --host-port=localhost:8080
# In another terminal, set environment variable
export FIRESTORE_EMULATOR_HOST=localhost:8080Example: End-to-End Pipeline Test
# tests/integration/test_full_pipeline.py
import pytest
from eventkit.schema.raw import RawEvent
from eventkit.adapters.segment import SegmentAdapter
from eventkit.processing.sequencer import Sequencer
from eventkit.processing.event_loader import EventLoader
from eventkit.processing.processor import Processor
from eventkit.stores.firestore import FirestoreEventStore, FirestoreErrorStore
@pytest.fixture
async def processor():
"""Create processor with real Firestore stores (emulator)"""
event_store = FirestoreEventStore(
project_id="test-project",
collection="events"
)
error_store = FirestoreErrorStore(
project_id="test-project",
collection="errors"
)
adapter = SegmentAdapter()
sequencer = Sequencer(num_partitions=4)
event_loader = EventLoader(event_store, batch_size=10, flush_interval=1.0)
processor = Processor(adapter, sequencer, event_loader, error_store)
yield processor
# Cleanup: flush buffers
for partition_id in event_loader.buffers.keys():
await event_loader._flush_partition(partition_id)
@pytest.mark.asyncio
async def test_valid_event_reaches_event_store(processor):
"""Story 1 + Story 2 + Story 3: Full pipeline"""
# Arrange
raw_event = RawEvent(
payload={
"type": "identify",
"userId": "user_123",
"traits": {"email": "user@example.com"}
},
stream="users"
)
# Act
await processor.enqueue(raw_event)
await processor.event_loader._flush_partition(0)
# Assert
events = await processor.event_loader.event_store.read(stream="users", limit=1)
assert len(events) == 1
assert events[0].userId == "user_123"
assert events[0].traits["email"] == "user@example.com"
@pytest.mark.asyncio
async def test_invalid_event_goes_to_error_store(processor):
"""Story 6: Invalid events → error store (not dropped)"""
# Arrange
raw_event = RawEvent(
payload={"type": "identify"}, # Missing userId and anonymousId
stream="users"
)
# Act
await processor.enqueue(raw_event)
# Assert
errors = await processor.error_store.query_errors(limit=1)
assert len(errors) == 1
assert "Missing required field" in errors[0]["error"]
assert errors[0]["raw_payload"]["type"] == "identify"Performance Tests
Principle: Validate throughput and latency targets.
Example: Throughput Test
# tests/performance/test_throughput.py
import pytest
import asyncio
import time
from eventkit.schema.raw import RawEvent
from eventkit.processing.processor import Processor
@pytest.mark.asyncio
async def test_throughput_10k_events_per_second(processor):
"""Story 1: Throughput target (10k events/sec)"""
num_events = 10_000
events = [
RawEvent(
payload={
"type": "track",
"event": "test_event",
"userId": f"user_{i % 1000}"
},
stream="events"
)
for i in range(num_events)
]
start = time.time()
tasks = [processor.enqueue(event) for event in events]
await asyncio.gather(*tasks)
elapsed = time.time() - start
throughput = num_events / elapsed
print(f"Throughput: {throughput:.0f} events/sec")
assert throughput >= 10_000, f"Only {throughput:.0f} events/sec"Test Fixtures
Shared Fixtures (conftest.py)
# tests/conftest.py
import pytest
import asyncio
from eventkit.schema.raw import RawEvent
from eventkit.adapters.segment import SegmentAdapter
from eventkit.processing.sequencer import Sequencer
@pytest.fixture(scope="session")
def event_loop():
"""Create event loop for async tests"""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture
def sample_raw_event():
"""Reusable raw event for tests"""
return RawEvent(
payload={
"type": "identify",
"userId": "test_user",
"traits": {"email": "test@example.com"}
},
stream="users"
)
@pytest.fixture
def adapter():
"""Reusable adapter instance"""
return SegmentAdapter()
@pytest.fixture
def sequencer():
"""Reusable sequencer instance"""
return Sequencer(num_partitions=16)Mocking Best Practices
Mock External Services
# tests/unit/stores/test_firestore.py
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from eventkit.stores.firestore import FirestoreEventStore
from eventkit.schema.events import IdentifyEvent
@pytest.fixture
def mock_firestore_client():
"""Mock Firestore client"""
with patch("google.cloud.firestore.AsyncClient") as mock:
client = MagicMock()
mock.return_value = client
yield client
@pytest.mark.asyncio
async def test_write_batch(mock_firestore_client):
"""Test batch write to Firestore"""
store = FirestoreEventStore(project_id="test", collection="events")
events = [
IdentifyEvent(
userId=f"user_{i}",
traits={"index": i},
timestamp=datetime.now(timezone.utc)
)
for i in range(100)
]
await store.write(events)
assert mock_firestore_client.batch.calledTest Naming Conventions
Unit Tests
# Pattern: test_{method}_{scenario}_{expected_result}
def test_adapt_identify_with_valid_payload_returns_success()
def test_adapt_identify_with_missing_user_id_returns_error()
def test_get_partition_id_with_same_user_id_returns_same_partition()Integration Tests
# Pattern: test_{feature}_{scenario}
async def test_full_pipeline_valid_event_reaches_event_store()
async def test_full_pipeline_invalid_event_goes_to_error_store()
def test_api_collect_endpoint_accepts_batch_events()Performance Tests
# Pattern: test_{metric}_{target}
async def test_throughput_10k_events_per_second()
def test_latency_p95_under_100ms()Debugging Tests
Run Single Test with Output
pytest tests/unit/schema/test_raw.py::test_raw_event_accepts_arbitrary_fields -v -sDrop into Debugger on Failure
pytest --pdbShow Print Statements
pytest -sShow Local Variables on Failure
pytest -lCoverage Reports
Generate HTML Report
pytest --cov=src/eventkit --cov-report=html
open htmlcov/index.htmlIdentify Uncovered Lines
pytest --cov=src/eventkit --cov-report=term-missingEnforce Minimum Coverage
pytest --cov=src/eventkit --cov-fail-under=80Checklist: Before Merging PR
- All tests pass (
pytest) - Coverage >80% (
pytest --cov) - Type checking passes (
mypy src/eventkit) - Linting passes (
ruff check src/) - Integration tests with Firestore emulator pass
- Performance tests meet targets (if applicable)
- New tests added for new functionality
- Docstrings updated
Resources
Last updated on