Observability-driven testing combines test execution with production-grade telemetry to create a feedback loop between testing and system behavior. OpenTelemetry, now a CNCF graduated project, has become the industry standard for instrumenting applications with traces, metrics, and logs. According to the CNCF Survey 2023, 74% of organizations are using or evaluating OpenTelemetry — up from 49% in 2021, demonstrating rapid adoption. According to a study by Honeycomb, teams using distributed tracing in their testing workflows find root causes of failures 60% faster than those relying on logs alone. For QA engineers, OpenTelemetry enables a new approach: tests generate telemetry like production code, and that telemetry becomes a powerful verification tool for distributed system behavior.
TL;DR: Observability-driven testing uses OpenTelemetry to instrument tests with distributed traces and metrics, making test failures in distributed systems debuggable via trace analysis rather than log hunting. Add OTel SDK to test code, propagate trace context through test requests, and verify system behavior by querying traces and metrics from Jaeger, Zipkin, or Honeycomb.
The Shift to Observability-Driven Testing
Traditional testing approaches focus on pre-production validation - unit tests, integration tests, and staging environment testing. However, modern distributed systems exhibit emergent behaviors that only manifest in production under real load, with real user patterns, and real infrastructure constraints. Observability-driven testing shifts left and right simultaneously - instrumenting systems comprehensively while testing actively in production using telemetry data, distributed traces, and SLO validation.
This article explores observability-driven testing strategies using OpenTelemetry, distributed tracing validation, testing in production with synthetic monitoring, SLO-based testing, and integration with observability platforms (Prometheus, Jaeger, Grafana). By treating observability as a testing strategy, teams can validate system behavior in production with confidence.
“Observability-driven testing means your test execution itself becomes traceable. When a test fails in a distributed system, instead of guessing which of 15 services had the problem, you query the trace and see exactly what happened.” — Yuri Kan, Senior QA Lead
OpenTelemetry Instrumentation for Testing
OpenTelemetry SDK Setup
# app/telemetry.py
from opentelemetry import trace, metrics
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
def setup_telemetry(service_name: str, service_version: str):
"""Configure OpenTelemetry instrumentation"""
# Define resource attributes
resource = Resource.create({
"service.name": service_name,
"service.version": service_version,
"deployment.environment": os.getenv("ENVIRONMENT", "production")
})
# Setup tracing
trace_provider = TracerProvider(resource=resource)
otlp_trace_exporter = OTLPSpanExporter(
endpoint="http://otel-collector:4317",
insecure=True
)
trace_provider.add_span_processor(BatchSpanProcessor(otlp_trace_exporter))
trace.set_tracer_provider(trace_provider)
# Setup metrics
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint="http://otel-collector:4317", insecure=True),
export_interval_millis=60000
)
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
# Auto-instrument frameworks
FlaskInstrumentor().instrument()
RequestsInstrumentor().instrument()
SQLAlchemyInstrumentor().instrument()
print(f"✓ OpenTelemetry configured for {service_name}")
# Usage in application
setup_telemetry("payment-service", "1.2.0")
tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__)
# Custom metrics
payment_counter = meter.create_counter(
"payments.processed",
description="Number of payments processed"
)
payment_latency = meter.create_histogram(
"payments.latency",
description="Payment processing latency",
unit="ms"
)
@app.route('/api/payment', methods=['POST'])
def process_payment():
with tracer.start_as_current_span("process_payment") as span:
start_time = time.time()
try:
# Add custom attributes
span.set_attribute("payment.amount", request.json.get('amount'))
span.set_attribute("payment.method", request.json.get('method'))
# Process payment
result = payment_processor.process(request.json)
# Record metrics
payment_counter.add(1, {"status": "success", "method": request.json.get('method')})
latency = (time.time() - start_time) * 1000
payment_latency.record(latency, {"status": "success"})
span.set_attribute("payment.transaction_id", result['transaction_id'])
span.set_status(trace.Status(trace.StatusCode.OK))
return jsonify(result), 200
except Exception as e:
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
span.record_exception(e)
payment_counter.add(1, {"status": "error"})
raise
Distributed Tracing Validation
# tests/test_distributed_tracing.py
import pytest
import requests
from opentelemetry import trace
from opentelemetry.sdk.trace.export import InMemorySpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
class DistributedTracingTester:
"""Test distributed tracing across microservices"""
def __init__(self):
self.span_exporter = InMemorySpanExporter()
self.tracer_provider = TracerProvider()
self.tracer_provider.add_span_processor(SimpleSpanProcessor(self.span_exporter))
trace.set_tracer_provider(self.tracer_provider)
self.tracer = trace.get_tracer(__name__)
def test_end_to_end_trace_propagation(self):
"""Test trace context propagates across all services"""
with self.tracer.start_as_current_span("test_checkout") as span:
trace_id = span.get_span_context().trace_id
# Trigger checkout flow
response = requests.post('http://api-gateway/checkout', json={
'items': [{'id': '123', 'quantity': 1}],
'user_id': 'test-user-123'
})
assert response.status_code == 200
# Query Jaeger for the trace
traces = self.get_traces_from_jaeger(trace_id)
# Verify trace spans all expected services
expected_services = [
'api-gateway',
'checkout-service',
'inventory-service',
'payment-service',
'notification-service'
]
actual_services = set(span['process']['serviceName'] for span in traces['data'][0]['spans'])
for service in expected_services:
assert service in actual_services, f"Service {service} not found in trace"
def test_span_hierarchy(self):
"""Verify correct parent-child span relationships"""
with self.tracer.start_as_current_span("test_payment_flow"):
# Trigger payment
response = requests.post('http://payment-service/process', json={
'amount': 100.00,
'method': 'credit_card'
})
spans = self.span_exporter.get_finished_spans()
# Build span tree
span_tree = self._build_span_tree(spans)
# Verify hierarchy
root_span = span_tree['test_payment_flow']
assert 'validate_payment_method' in root_span['children']
assert 'charge_card' in root_span['children']
assert 'update_transaction_record' in root_span['children']
def test_span_attributes(self):
"""Verify spans contain expected attributes"""
response = requests.post('http://payment-service/process', json={
'amount': 50.00,
'method': 'paypal'
})
spans = self.span_exporter.get_finished_spans()
payment_span = next(s for s in spans if s.name == 'process_payment')
# Verify attributes
assert payment_span.attributes.get('payment.amount') == 50.00
assert payment_span.attributes.get('payment.method') == 'paypal'
assert payment_span.attributes.get('http.status_code') == 200
def test_error_trace_capture(self):
"""Test error traces are properly captured and attributed"""
with pytest.raises(requests.exceptions.HTTPError):
requests.post('http://payment-service/process', json={
'amount': -10.00 # Invalid amount
}).raise_for_status()
spans = self.span_exporter.get_finished_spans()
error_span = next(s for s in spans if s.name == 'process_payment')
# Verify error recorded
assert error_span.status.status_code == trace.StatusCode.ERROR
assert len(error_span.events) > 0
error_event = next(e for e in error_span.events if e.name == 'exception')
assert 'Invalid amount' in error_event.attributes.get('exception.message', '')
Testing in Production with Synthetic Monitoring
# tests/production/synthetic_tests.py
import pytest
import requests
from datadog import initialize, api
import time
class SyntheticMonitoringTests:
"""Synthetic tests that run continuously in production"""
def __init__(self):
initialize(
api_key=os.getenv('DD_API_KEY'),
app_key=os.getenv('DD_APP_KEY')
)
def test_critical_user_journey_checkout(self):
"""Test critical checkout user journey in production"""
start_time = time.time()
# Step 1: Browse products
response = requests.get('https://production.example.com/api/products')
assert response.status_code == 200
assert len(response.json()['products']) > 0
# Step 2: Add to cart
product_id = response.json()['products'][0]['id']
response = requests.post('https://production.example.com/api/cart/add', json={
'product_id': product_id,
'quantity': 1
})
assert response.status_code == 200
# Step 3: Checkout
response = requests.post('https://production.example.com/api/checkout', json={
'cart_id': response.json()['cart_id'],
'payment_method': 'test_card'
})
assert response.status_code == 200
assert 'order_id' in response.json()
# Record metrics
duration = time.time() - start_time
# Send custom metric to Datadog
api.Metric.send(
metric='synthetic.checkout.duration',
points=[(time.time(), duration)],
tags=['environment:production', 'journey:checkout']
)
# Assert SLA
assert duration < 3.0, f"Checkout took {duration}s, exceeds 3s SLA"
def test_api_latency_percentiles(self):
"""Test API latency meets SLO"""
latencies = []
# Make 100 requests
for _ in range(100):
start = time.time()
response = requests.get('https://production.example.com/api/products')
latency = (time.time() - start) * 1000
latencies.append(latency)
assert response.status_code == 200
# Calculate percentiles
latencies.sort()
p50 = latencies[49]
p95 = latencies[94]
p99 = latencies[98]
print(f"P50: {p50:.2f}ms, P95: {p95:.2f}ms, P99: {p99:.2f}ms")
# Assert SLOs
assert p50 < 100, f"P50 latency {p50}ms exceeds 100ms SLO"
assert p95 < 500, f"P95 latency {p95}ms exceeds 500ms SLO"
assert p99 < 1000, f"P99 latency {p99}ms exceeds 1000ms SLO"
def test_data_freshness(self):
"""Test data freshness in caches and databases"""
# Write test data
test_value = f"test-{int(time.time())}"
response = requests.post('https://production.example.com/api/config', json={
'key': 'test_key',
'value': test_value
})
assert response.status_code == 200
# Wait for cache propagation
time.sleep(2)
# Read test data
response = requests.get('https://production.example.com/api/config/test_key')
assert response.status_code == 200
assert response.json()['value'] == test_value
# Cleanup
requests.delete('https://production.example.com/api/config/test_key')
Service Level Objective (SLO) Testing
# tests/test_slo.py
import pytest
from prometheus_api_client import PrometheusConnect
from datetime import datetime, timedelta
class SLOValidator:
"""Validate Service Level Objectives using Prometheus metrics"""
def __init__(self, prometheus_url: str):
self.prom = PrometheusConnect(url=prometheus_url, disable_ssl=True)
def test_availability_slo(self):
"""Test 99.9% availability SLO"""
# Query success rate over last 30 days
query = '''
sum(rate(http_requests_total{status=~"2.."}[30d]))
/
sum(rate(http_requests_total[30d]))
'''
result = self.prom.custom_query(query)
availability = float(result[0]['value'][1])
print(f"30-day availability: {availability * 100:.3f}%")
# Assert 99.9% SLO
assert availability >= 0.999, f"Availability {availability*100:.3f}% below 99.9% SLO"
def test_latency_slo(self):
"""Test P95 latency SLO (<500ms)"""
# Query P95 latency
query = '''
histogram_quantile(0.95,
sum(rate(http_request_duration_seconds_bucket[5m])) by (le)
)
'''
result = self.prom.custom_query(query)
p95_latency_seconds = float(result[0]['value'][1])
p95_latency_ms = p95_latency_seconds * 1000
print(f"P95 latency: {p95_latency_ms:.2f}ms")
# Assert <500ms SLO
assert p95_latency_ms < 500, f"P95 latency {p95_latency_ms}ms exceeds 500ms SLO"
def test_error_budget_consumption(self):
"""Test error budget consumption rate"""
# Calculate error budget burn rate
# For 99.9% SLO, error budget is 0.1% over 30 days
# Fast burn: consuming monthly budget in <3 days is critical
query = '''
sum(rate(http_requests_total{status=~"5.."}[1h]))
/
sum(rate(http_requests_total[1h]))
'''
result = self.prom.custom_query(query)
error_rate_1h = float(result[0]['value'][1])
# Calculate burn rate (how fast we're consuming monthly budget)
monthly_budget = 0.001 # 0.1%
hours_in_month = 730
burn_rate = (error_rate_1h * hours_in_month) / monthly_budget
print(f"Error budget burn rate: {burn_rate:.2f}x")
# Alert if burning budget >10x (will exhaust in <3 days)
assert burn_rate < 10, f"Critical: Error budget burning at {burn_rate}x normal rate"
def test_dependency_slo(self):
"""Test external dependency availability"""
# Query dependency success rate
query = '''
sum(rate(external_api_requests_total{status="success"}[5m]))
/
sum(rate(external_api_requests_total[5m]))
'''
result = self.prom.custom_query(query)
if result:
dependency_availability = float(result[0]['value'][1])
print(f"Dependency availability: {dependency_availability * 100:.2f}%")
# Lower SLO for external dependencies (99%)
assert dependency_availability >= 0.99, \
f"Dependency availability {dependency_availability*100:.2f}% below 99% SLO"
Canary Deployment Testing with Metrics
# tests/test_canary.py
import pytest
from prometheus_api_client import PrometheusConnect
import time
class CanaryAnalyzer:
"""Analyze canary deployment using metrics"""
def __init__(self, prometheus_url: str):
self.prom = PrometheusConnect(url=prometheus_url)
def test_canary_error_rate(self):
"""Compare error rates between baseline and canary"""
# Get error rate for baseline version
baseline_query = '''
sum(rate(http_requests_total{version="v1.0", status=~"5.."}[5m]))
/
sum(rate(http_requests_total{version="v1.0"}[5m]))
'''
# Get error rate for canary version
canary_query = '''
sum(rate(http_requests_total{version="v1.1", status=~"5.."}[5m]))
/
sum(rate(http_requests_total{version="v1.1"}[5m]))
'''
baseline_result = self.prom.custom_query(baseline_query)
canary_result = self.prom.custom_query(canary_query)
baseline_error_rate = float(baseline_result[0]['value'][1]) if baseline_result else 0
canary_error_rate = float(canary_result[0]['value'][1]) if canary_result else 0
print(f"Baseline error rate: {baseline_error_rate * 100:.3f}%")
print(f"Canary error rate: {canary_error_rate * 100:.3f}%")
# Canary should not have significantly higher error rate
assert canary_error_rate <= baseline_error_rate * 1.5, \
"Canary error rate significantly higher than baseline"
def test_canary_latency_regression(self):
"""Detect latency regressions in canary"""
# Compare P95 latency
baseline_query = '''
histogram_quantile(0.95,
sum(rate(http_request_duration_seconds_bucket{version="v1.0"}[5m])) by (le)
)
'''
canary_query = '''
histogram_quantile(0.95,
sum(rate(http_request_duration_seconds_bucket{version="v1.1"}[5m])) by (le)
)
'''
baseline_result = self.prom.custom_query(baseline_query)
canary_result = self.prom.custom_query(canary_query)
baseline_p95 = float(baseline_result[0]['value'][1]) * 1000 # Convert to ms
canary_p95 = float(canary_result[0]['value'][1]) * 1000
print(f"Baseline P95: {baseline_p95:.2f}ms")
print(f"Canary P95: {canary_p95:.2f}ms")
# Allow 10% latency increase
assert canary_p95 <= baseline_p95 * 1.10, \
f"Canary P95 latency {canary_p95}ms exceeds baseline {baseline_p95}ms by >10%"
def test_canary_business_metrics(self):
"""Validate business metrics for canary"""
# Compare conversion rates
baseline_query = 'sum(rate(checkout_completed_total{version="v1.0"}[5m]))'
canary_query = 'sum(rate(checkout_completed_total{version="v1.1"}[5m]))'
baseline_result = self.prom.custom_query(baseline_query)
canary_result = self.prom.custom_query(canary_query)
baseline_conversions = float(baseline_result[0]['value'][1])
canary_conversions = float(canary_result[0]['value'][1])
print(f"Baseline conversions/sec: {baseline_conversions:.3f}")
print(f"Canary conversions/sec: {canary_conversions:.3f}")
# Canary should not decrease conversions
assert canary_conversions >= baseline_conversions * 0.95, \
"Canary conversion rate dropped >5% compared to baseline"
GitHub Actions Workflow
# .github/workflows/observability-testing.yml
name: Observability Testing
on:
schedule:
- cron: '*/15 * * * *' # Run every 15 minutes
workflow_dispatch:
jobs:
synthetic-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install pytest requests datadog prometheus-api-client
- name: Run synthetic production tests
env:
DD_API_KEY: ${{ secrets.DD_API_KEY }}
DD_APP_KEY: ${{ secrets.DD_APP_KEY }}
run: |
pytest tests/production/synthetic_tests.py -v
- name: Validate SLOs
env:
PROMETHEUS_URL: ${{ secrets.PROMETHEUS_URL }}
run: |
pytest tests/test_slo.py -v
- name: Analyze canary deployment
if: github.event_name == 'workflow_dispatch'
env:
PROMETHEUS_URL: ${{ secrets.PROMETHEUS_URL }}
run: |
pytest tests/test_canary.py -v
Conclusion
Observability-driven testing represents a paradigm shift from purely pre-production validation to continuous production testing using telemetry, traces, and metrics. By implementing OpenTelemetry instrumentation, distributed tracing validation, synthetic monitoring, SLO-based testing, and canary analysis, teams can validate system behavior in production with unprecedented visibility.
The key is treating observability as a first-class testing strategy - instrumenting comprehensively, testing continuously in production, validating SLOs programmatically, and using metrics to guide deployment decisions. With observability-driven testing, teams can confidently deploy to production while maintaining reliability through data-driven validation.
Official Resources
FAQ
What is OpenTelemetry and how does it relate to testing?
OpenTelemetry (OTel) is a vendor-neutral observability framework providing APIs, SDKs, and tools for generating traces, metrics, and logs. For testing, OTel enables: instrumenting test code to generate spans, propagating trace context through test requests so traces show test-triggered vs user-triggered paths, and using post-test trace analysis to verify service interaction patterns.
How do I add OpenTelemetry to my test code?
Install OTel SDK for your language (npm install @opentelemetry/sdk-node for Node.js). Create a tracer from the SDK. Wrap test scenarios in spans: const span = tracer.startSpan(’test-scenario-name’). Add span attributes for test context (test name, environment, user story). Export spans to a local Jaeger instance for development or to your production observability backend for staging tests.
What assertions can I make using OpenTelemetry traces?
Trace-based assertions: verify all expected services were called (check spans for each service name), verify no unexpected services were called (check for extra spans), verify service call order (timestamps on spans), verify error rate below threshold (count error spans vs total spans), and verify latency SLOs (span duration percentiles from test runs).
How do I debug test failures using distributed traces?
When a test fails: search traces by trace ID (included in test logs), view the waterfall of service calls, identify the first span with error=true status, examine span attributes and events for error details, look for retry patterns (same service called multiple times), and check for missing spans indicating services that failed to respond.
See Also
- Test Automation Strategy
- Feature Flags Testing Strategy: LaunchDarkly, Flagsmith, and A/B Testing for QA - Feature flags testing: LaunchDarkly/Flagsmith integration, flag combinations…
- Building a comprehensive automation approach
- Continuous Testing in DevOps - Integrating testing into CI/CD pipelines
- API Performance Testing - Load testing and API optimization
- Monitoring and Alerting for QA - Setting up quality observability
- Playwright Framework Guide - Modern E2E testing framework
