The Shift to Observability-Driven Testing
Traditional testing approaches focus on pre-production validation - unit tests, integration tests, and staging environment testing. However, modern distributed systems exhibit emergent behaviors that only manifest in production under real load, with real user patterns, and real infrastructure constraints. Observability-driven testing shifts left and right simultaneously - instrumenting systems comprehensively while testing actively in production using telemetry data, distributed traces, and SLO validation.
This article explores observability-driven testing strategies using OpenTelemetry, distributed tracing validation, testing in production with synthetic monitoring, SLO-based testing, and integration with observability platforms (Prometheus, Jaeger, Grafana). By treating observability as a testing strategy, teams can validate system behavior in production with confidence.
OpenTelemetry Instrumentation for Testing
OpenTelemetry SDK Setup
# app/telemetry.py
from opentelemetry import trace, metrics
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
def setup_telemetry(service_name: str, service_version: str):
"""Configure OpenTelemetry instrumentation"""
# Define resource attributes
resource = Resource.create({
"service.name": service_name,
"service.version": service_version,
"deployment.environment": os.getenv("ENVIRONMENT", "production")
})
# Setup tracing
trace_provider = TracerProvider(resource=resource)
otlp_trace_exporter = OTLPSpanExporter(
endpoint="http://otel-collector:4317",
insecure=True
)
trace_provider.add_span_processor(BatchSpanProcessor(otlp_trace_exporter))
trace.set_tracer_provider(trace_provider)
# Setup metrics
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint="http://otel-collector:4317", insecure=True),
export_interval_millis=60000
)
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
# Auto-instrument frameworks
FlaskInstrumentor().instrument()
RequestsInstrumentor().instrument()
SQLAlchemyInstrumentor().instrument()
print(f"✓ OpenTelemetry configured for {service_name}")
# Usage in application
setup_telemetry("payment-service", "1.2.0")
tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__)
# Custom metrics
payment_counter = meter.create_counter(
"payments.processed",
description="Number of payments processed"
)
payment_latency = meter.create_histogram(
"payments.latency",
description="Payment processing latency",
unit="ms"
)
@app.route('/api/payment', methods=['POST'])
def process_payment():
with tracer.start_as_current_span("process_payment") as span:
start_time = time.time()
try:
# Add custom attributes
span.set_attribute("payment.amount", request.json.get('amount'))
span.set_attribute("payment.method", request.json.get('method'))
# Process payment
result = payment_processor.process(request.json)
# Record metrics
payment_counter.add(1, {"status": "success", "method": request.json.get('method')})
latency = (time.time() - start_time) * 1000
payment_latency.record(latency, {"status": "success"})
span.set_attribute("payment.transaction_id", result['transaction_id'])
span.set_status(trace.Status(trace.StatusCode.OK))
return jsonify(result), 200
except Exception as e:
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
span.record_exception(e)
payment_counter.add(1, {"status": "error"})
raise
Distributed Tracing Validation
# tests/test_distributed_tracing.py
import pytest
import requests
from opentelemetry import trace
from opentelemetry.sdk.trace.export import InMemorySpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
class DistributedTracingTester:
"""Test distributed tracing across microservices"""
def __init__(self):
self.span_exporter = InMemorySpanExporter()
self.tracer_provider = TracerProvider()
self.tracer_provider.add_span_processor(SimpleSpanProcessor(self.span_exporter))
trace.set_tracer_provider(self.tracer_provider)
self.tracer = trace.get_tracer(__name__)
def test_end_to_end_trace_propagation(self):
"""Test trace context propagates across all services"""
with self.tracer.start_as_current_span("test_checkout") as span:
trace_id = span.get_span_context().trace_id
# Trigger checkout flow
response = requests.post('http://api-gateway/checkout', json={
'items': [{'id': '123', 'quantity': 1}],
'user_id': 'test-user-123'
})
assert response.status_code == 200
# Query Jaeger for the trace
traces = self.get_traces_from_jaeger(trace_id)
# Verify trace spans all expected services
expected_services = [
'api-gateway',
'checkout-service',
'inventory-service',
'payment-service',
'notification-service'
]
actual_services = set(span['process']['serviceName'] for span in traces['data'][0]['spans'])
for service in expected_services:
assert service in actual_services, f"Service {service} not found in trace"
def test_span_hierarchy(self):
"""Verify correct parent-child span relationships"""
with self.tracer.start_as_current_span("test_payment_flow"):
# Trigger payment
response = requests.post('http://payment-service/process', json={
'amount': 100.00,
'method': 'credit_card'
})
spans = self.span_exporter.get_finished_spans()
# Build span tree
span_tree = self._build_span_tree(spans)
# Verify hierarchy
root_span = span_tree['test_payment_flow']
assert 'validate_payment_method' in root_span['children']
assert 'charge_card' in root_span['children']
assert 'update_transaction_record' in root_span['children']
def test_span_attributes(self):
"""Verify spans contain expected attributes"""
response = requests.post('http://payment-service/process', json={
'amount': 50.00,
'method': 'paypal'
})
spans = self.span_exporter.get_finished_spans()
payment_span = next(s for s in spans if s.name == 'process_payment')
# Verify attributes
assert payment_span.attributes.get('payment.amount') == 50.00
assert payment_span.attributes.get('payment.method') == 'paypal'
assert payment_span.attributes.get('http.status_code') == 200
def test_error_trace_capture(self):
"""Test error traces are properly captured and attributed"""
with pytest.raises(requests.exceptions.HTTPError):
requests.post('http://payment-service/process', json={
'amount': -10.00 # Invalid amount
}).raise_for_status()
spans = self.span_exporter.get_finished_spans()
error_span = next(s for s in spans if s.name == 'process_payment')
# Verify error recorded
assert error_span.status.status_code == trace.StatusCode.ERROR
assert len(error_span.events) > 0
error_event = next(e for e in error_span.events if e.name == 'exception')
assert 'Invalid amount' in error_event.attributes.get('exception.message', '')
Testing in Production with Synthetic Monitoring
# tests/production/synthetic_tests.py
import pytest
import requests
from datadog import initialize, api
import time
class SyntheticMonitoringTests:
"""Synthetic tests that run continuously in production"""
def __init__(self):
initialize(
api_key=os.getenv('DD_API_KEY'),
app_key=os.getenv('DD_APP_KEY')
)
def test_critical_user_journey_checkout(self):
"""Test critical checkout user journey in production"""
start_time = time.time()
# Step 1: Browse products
response = requests.get('https://production.example.com/api/products')
assert response.status_code == 200
assert len(response.json()['products']) > 0
# Step 2: Add to cart
product_id = response.json()['products'][0]['id']
response = requests.post('https://production.example.com/api/cart/add', json={
'product_id': product_id,
'quantity': 1
})
assert response.status_code == 200
# Step 3: Checkout
response = requests.post('https://production.example.com/api/checkout', json={
'cart_id': response.json()['cart_id'],
'payment_method': 'test_card'
})
assert response.status_code == 200
assert 'order_id' in response.json()
# Record metrics
duration = time.time() - start_time
# Send custom metric to Datadog
api.Metric.send(
metric='synthetic.checkout.duration',
points=[(time.time(), duration)],
tags=['environment:production', 'journey:checkout']
)
# Assert SLA
assert duration < 3.0, f"Checkout took {duration}s, exceeds 3s SLA"
def test_api_latency_percentiles(self):
"""Test API latency meets SLO"""
latencies = []
# Make 100 requests
for _ in range(100):
start = time.time()
response = requests.get('https://production.example.com/api/products')
latency = (time.time() - start) * 1000
latencies.append(latency)
assert response.status_code == 200
# Calculate percentiles
latencies.sort()
p50 = latencies[49]
p95 = latencies[94]
p99 = latencies[98]
print(f"P50: {p50:.2f}ms, P95: {p95:.2f}ms, P99: {p99:.2f}ms")
# Assert SLOs
assert p50 < 100, f"P50 latency {p50}ms exceeds 100ms SLO"
assert p95 < 500, f"P95 latency {p95}ms exceeds 500ms SLO"
assert p99 < 1000, f"P99 latency {p99}ms exceeds 1000ms SLO"
def test_data_freshness(self):
"""Test data freshness in caches and databases"""
# Write test data
test_value = f"test-{int(time.time())}"
response = requests.post('https://production.example.com/api/config', json={
'key': 'test_key',
'value': test_value
})
assert response.status_code == 200
# Wait for cache propagation
time.sleep(2)
# Read test data
response = requests.get('https://production.example.com/api/config/test_key')
assert response.status_code == 200
assert response.json()['value'] == test_value
# Cleanup
requests.delete('https://production.example.com/api/config/test_key')
Service Level Objective (SLO) Testing
# tests/test_slo.py
import pytest
from prometheus_api_client import PrometheusConnect
from datetime import datetime, timedelta
class SLOValidator:
"""Validate Service Level Objectives using Prometheus metrics"""
def __init__(self, prometheus_url: str):
self.prom = PrometheusConnect(url=prometheus_url, disable_ssl=True)
def test_availability_slo(self):
"""Test 99.9% availability SLO"""
# Query success rate over last 30 days
query = '''
sum(rate(http_requests_total{status=~"2.."}[30d]))
/
sum(rate(http_requests_total[30d]))
'''
result = self.prom.custom_query(query)
availability = float(result[0]['value'][1])
print(f"30-day availability: {availability * 100:.3f}%")
# Assert 99.9% SLO
assert availability >= 0.999, f"Availability {availability*100:.3f}% below 99.9% SLO"
def test_latency_slo(self):
"""Test P95 latency SLO (<500ms)"""
# Query P95 latency
query = '''
histogram_quantile(0.95,
sum(rate(http_request_duration_seconds_bucket[5m])) by (le)
)
'''
result = self.prom.custom_query(query)
p95_latency_seconds = float(result[0]['value'][1])
p95_latency_ms = p95_latency_seconds * 1000
print(f"P95 latency: {p95_latency_ms:.2f}ms")
# Assert <500ms SLO
assert p95_latency_ms < 500, f"P95 latency {p95_latency_ms}ms exceeds 500ms SLO"
def test_error_budget_consumption(self):
"""Test error budget consumption rate"""
# Calculate error budget burn rate
# For 99.9% SLO, error budget is 0.1% over 30 days
# Fast burn: consuming monthly budget in <3 days is critical
query = '''
sum(rate(http_requests_total{status=~"5.."}[1h]))
/
sum(rate(http_requests_total[1h]))
'''
result = self.prom.custom_query(query)
error_rate_1h = float(result[0]['value'][1])
# Calculate burn rate (how fast we're consuming monthly budget)
monthly_budget = 0.001 # 0.1%
hours_in_month = 730
burn_rate = (error_rate_1h * hours_in_month) / monthly_budget
print(f"Error budget burn rate: {burn_rate:.2f}x")
# Alert if burning budget >10x (will exhaust in <3 days)
assert burn_rate < 10, f"Critical: Error budget burning at {burn_rate}x normal rate"
def test_dependency_slo(self):
"""Test external dependency availability"""
# Query dependency success rate
query = '''
sum(rate(external_api_requests_total{status="success"}[5m]))
/
sum(rate(external_api_requests_total[5m]))
'''
result = self.prom.custom_query(query)
if result:
dependency_availability = float(result[0]['value'][1])
print(f"Dependency availability: {dependency_availability * 100:.2f}%")
# Lower SLO for external dependencies (99%)
assert dependency_availability >= 0.99, \
f"Dependency availability {dependency_availability*100:.2f}% below 99% SLO"
Canary Deployment Testing with Metrics
# tests/test_canary.py
import pytest
from prometheus_api_client import PrometheusConnect
import time
class CanaryAnalyzer:
"""Analyze canary deployment using metrics"""
def __init__(self, prometheus_url: str):
self.prom = PrometheusConnect(url=prometheus_url)
def test_canary_error_rate(self):
"""Compare error rates between baseline and canary"""
# Get error rate for baseline version
baseline_query = '''
sum(rate(http_requests_total{version="v1.0", status=~"5.."}[5m]))
/
sum(rate(http_requests_total{version="v1.0"}[5m]))
'''
# Get error rate for canary version
canary_query = '''
sum(rate(http_requests_total{version="v1.1", status=~"5.."}[5m]))
/
sum(rate(http_requests_total{version="v1.1"}[5m]))
'''
baseline_result = self.prom.custom_query(baseline_query)
canary_result = self.prom.custom_query(canary_query)
baseline_error_rate = float(baseline_result[0]['value'][1]) if baseline_result else 0
canary_error_rate = float(canary_result[0]['value'][1]) if canary_result else 0
print(f"Baseline error rate: {baseline_error_rate * 100:.3f}%")
print(f"Canary error rate: {canary_error_rate * 100:.3f}%")
# Canary should not have significantly higher error rate
assert canary_error_rate <= baseline_error_rate * 1.5, \
"Canary error rate significantly higher than baseline"
def test_canary_latency_regression(self):
"""Detect latency regressions in canary"""
# Compare P95 latency
baseline_query = '''
histogram_quantile(0.95,
sum(rate(http_request_duration_seconds_bucket{version="v1.0"}[5m])) by (le)
)
'''
canary_query = '''
histogram_quantile(0.95,
sum(rate(http_request_duration_seconds_bucket{version="v1.1"}[5m])) by (le)
)
'''
baseline_result = self.prom.custom_query(baseline_query)
canary_result = self.prom.custom_query(canary_query)
baseline_p95 = float(baseline_result[0]['value'][1]) * 1000 # Convert to ms
canary_p95 = float(canary_result[0]['value'][1]) * 1000
print(f"Baseline P95: {baseline_p95:.2f}ms")
print(f"Canary P95: {canary_p95:.2f}ms")
# Allow 10% latency increase
assert canary_p95 <= baseline_p95 * 1.10, \
f"Canary P95 latency {canary_p95}ms exceeds baseline {baseline_p95}ms by >10%"
def test_canary_business_metrics(self):
"""Validate business metrics for canary"""
# Compare conversion rates
baseline_query = 'sum(rate(checkout_completed_total{version="v1.0"}[5m]))'
canary_query = 'sum(rate(checkout_completed_total{version="v1.1"}[5m]))'
baseline_result = self.prom.custom_query(baseline_query)
canary_result = self.prom.custom_query(canary_query)
baseline_conversions = float(baseline_result[0]['value'][1])
canary_conversions = float(canary_result[0]['value'][1])
print(f"Baseline conversions/sec: {baseline_conversions:.3f}")
print(f"Canary conversions/sec: {canary_conversions:.3f}")
# Canary should not decrease conversions
assert canary_conversions >= baseline_conversions * 0.95, \
"Canary conversion rate dropped >5% compared to baseline"
GitHub Actions Workflow
# .github/workflows/observability-testing.yml
name: Observability Testing
on:
schedule:
- cron: '*/15 * * * *' # Run every 15 minutes
workflow_dispatch:
jobs:
synthetic-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install pytest requests datadog prometheus-api-client
- name: Run synthetic production tests
env:
DD_API_KEY: ${{ secrets.DD_API_KEY }}
DD_APP_KEY: ${{ secrets.DD_APP_KEY }}
run: |
pytest tests/production/synthetic_tests.py -v
- name: Validate SLOs
env:
PROMETHEUS_URL: ${{ secrets.PROMETHEUS_URL }}
run: |
pytest tests/test_slo.py -v
- name: Analyze canary deployment
if: github.event_name == 'workflow_dispatch'
env:
PROMETHEUS_URL: ${{ secrets.PROMETHEUS_URL }}
run: |
pytest tests/test_canary.py -v
Conclusion
Observability-driven testing represents a paradigm shift from purely pre-production validation to continuous production testing using telemetry, traces, and metrics. By implementing OpenTelemetry instrumentation, distributed tracing validation, synthetic monitoring, SLO-based testing, and canary analysis, teams can validate system behavior in production with unprecedented visibility.
The key is treating observability as a first-class testing strategy - instrumenting comprehensively, testing continuously in production, validating SLOs programmatically, and using metrics to guide deployment decisions. With observability-driven testing, teams can confidently deploy to production while maintaining reliability through data-driven validation.