Databases (as discussed in SQL Injection and XSS: Finding Vulnerabilities) are the foundation of most applications, storing critical business data and powering core functionality. Yet database (as discussed in Database Performance Testing: Query Optimization) testing often receives less attention than application testing, leading to production issues ranging from data corruption to catastrophic performance degradation. This comprehensive guide covers the essential aspects of database testing—from ensuring data integrity to validating complex migrations—equipping QA engineers with practical strategies for thorough database validation.
Understanding Database Testing
Database testing validates the data layer of applications, focusing on:
- Data integrity and consistency
- Schema correctness
- Transaction behavior (ACID properties)
- Performance under load
- Security and access control
- Backup and recovery mechanisms
- Migration and upgrade processes
Unlike application testing which validates business logic and user interfaces, database testing examines the foundational data layer that applications depend on.
Why Database Testing Matters
Data corruption is permanent: Unlike application bugs that can be fixed with a code deployment, corrupted data requires extensive recovery efforts and may be irretrievable.
Performance issues compound: Poorly optimized queries become exponentially worse as data volumes grow, turning acceptable performance into system-crippling slowness.
Schema changes are risky: Database migrations in production can cause downtime, data loss, or subtle data corruption that manifests weeks later.
Compliance requirements: Many industries have regulatory requirements for data accuracy, integrity, and auditability (GDPR, HIPAA, SOX).
Data Integrity and Consistency Testing
Data integrity ensures that data remains accurate, complete, and consistent throughout its lifecycle. Consistency ensures that all database constraints and relationships are maintained.
Types of Data Integrity
1. Entity Integrity
Entity integrity requires that each table has a primary key and that the primary key is unique and not null.
What to test:
-- Test 1: Verify primary key exists
SELECT
table_name,
COUNT(*) as pk_count
FROM information_schema.table_constraints
WHERE constraint_type = 'PRIMARY KEY'
AND table_schema = 'your_database'
GROUP BY table_name
HAVING pk_count = 0;
-- Expected: No results (every table should have a PK)
-- Test 2: Verify no NULL primary keys (should be impossible, but check after migrations)
SELECT * FROM users WHERE id IS NULL;
-- Expected: 0 rows
-- Test 3: Verify no duplicate primary keys
SELECT id, COUNT(*) as count
FROM users
GROUP BY id
HAVING count > 1;
-- Expected: 0 rows
2. Referential Integrity
Referential integrity ensures that relationships between tables remain consistent. Foreign keys must reference existing records in parent tables.
Common referential integrity issues:
- Orphaned records (child records with no parent)
- Foreign key violations during inserts/updates
- Cascade delete/update behaviors not working correctly
Testing strategies:
-- Test: Find orphaned order records
SELECT o.*
FROM orders o
LEFT JOIN users u ON o.user_id = u.id
WHERE u.id IS NULL;
-- Expected: 0 rows (all orders should have valid users)
-- Test: Verify foreign key constraints exist
SELECT
tc.table_name,
tc.constraint_name,
kcu.column_name,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_schema = 'your_database';
-- Verify all expected foreign keys are present
Testing cascade behaviors:
-- Setup test data
BEGIN;
INSERT INTO users (id, email) VALUES (999, 'test@example.com');
INSERT INTO orders (id, user_id, total) VALUES (9999, 999, 100.00);
-- Test CASCADE DELETE
DELETE FROM users WHERE id = 999;
-- Verify order was also deleted
SELECT * FROM orders WHERE id = 9999;
-- Expected: 0 rows
ROLLBACK;
3. Domain Integrity
Domain integrity ensures that column values adhere to defined constraints (data types, check constraints, not null, unique).
What to test:
-- Test: Verify NOT NULL constraints
-- Attempt to insert NULL where not allowed
BEGIN;
INSERT INTO users (id, email, created_at)
VALUES (1, NULL, NOW());
-- Expected: Error (if email is NOT NULL)
ROLLBACK;
-- Test: Verify CHECK constraints
BEGIN;
INSERT INTO products (id, name, price)
VALUES (1, 'Test Product', -10.00);
-- Expected: Error (if CHECK constraint exists: price >= 0)
ROLLBACK;
-- Test: Verify UNIQUE constraints
BEGIN;
INSERT INTO users (id, email) VALUES (1, 'user@example.com');
INSERT INTO users (id, email) VALUES (2, 'user@example.com');
-- Expected: Error on second insert (duplicate email)
ROLLBACK;
-- Test: Verify data type constraints
-- Find records with invalid data patterns
SELECT * FROM users WHERE email NOT LIKE '%@%';
SELECT * FROM orders WHERE total < 0;
SELECT * FROM timestamps WHERE created_at > NOW();
4. User-Defined Integrity
User-defined integrity enforces business rules specific to the application that aren’t covered by standard integrity constraints.
Examples:
- Order total must equal sum of order items
- Account balance must equal sum of all transactions
- Employee salary must be within grade range
Testing approach:
-- Test: Order totals match sum of line items
SELECT
o.id AS order_id,
o.total AS order_total,
SUM(oi.quantity * oi.unit_price) AS calculated_total,
ABS(o.total - SUM(oi.quantity * oi.unit_price)) AS difference
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
GROUP BY o.id, o.total
HAVING ABS(o.total - SUM(oi.quantity * oi.unit_price)) > 0.01;
-- Expected: 0 rows (within rounding tolerance)
-- Test: Account balances are correct
SELECT
a.id,
a.balance AS recorded_balance,
COALESCE(SUM(t.amount), 0) AS calculated_balance,
a.balance - COALESCE(SUM(t.amount), 0) AS difference
FROM accounts a
LEFT JOIN transactions t ON a.id = t.account_id
GROUP BY a.id, a.balance
HAVING ABS(a.balance - COALESCE(SUM(t.amount), 0)) > 0.01;
-- Expected: 0 rows
Consistency Testing in Distributed Systems
Modern applications often use distributed databases or microservices with separate databases, introducing additional consistency challenges.
Eventual Consistency Testing
In eventually consistent systems, writes may not be immediately visible across all nodes, but consistency is guaranteed “eventually.”
Testing strategies:
# Test eventual consistency
import time
import pytest
def test_eventual_consistency(db_writer, db_reader):
# Write to master
user_id = db_writer.create_user(email="test@example.com")
# Immediately read from replica - may not be present yet
user = db_reader.get_user(user_id)
# Retry with exponential backoff
for attempt in range(5):
if user is None:
time.sleep(2 ** attempt) # 1s, 2s, 4s, 8s, 16s
user = db_reader.get_user(user_id)
else:
break
# After retries, should be consistent
assert user is not None
assert user.email == "test@example.com"
def test_consistency_window(db_writer, db_reader):
"""Verify consistency achieved within SLA window"""
start_time = time.time()
user_id = db_writer.create_user(email="test@example.com")
# Poll until visible on replica
while True:
user = db_reader.get_user(user_id)
if user is not None:
break
if time.time() - start_time > 30: # 30 second SLA
pytest.fail("Consistency not achieved within SLA")
time.sleep(0.5)
consistency_time = time.time() - start_time
print(f"Consistency achieved in {consistency_time:.2f} seconds")
assert consistency_time < 30
Testing Distributed Transactions
Distributed transactions across multiple databases require special testing.
Two-Phase Commit (2PC) testing:
def test_distributed_transaction_success(db1, db2):
"""Test successful distributed transaction"""
with distributed_transaction(db1, db2) as txn:
txn.execute_on_db1("INSERT INTO users (id, name) VALUES (1, 'Alice')")
txn.execute_on_db2("INSERT INTO accounts (user_id, balance) VALUES (1, 100)")
txn.commit()
# Verify both committed
assert db1.get_user(1) is not None
assert db2.get_account(1) is not None
def test_distributed_transaction_rollback(db1, db2):
"""Test distributed transaction rollback on failure"""
try:
with distributed_transaction(db1, db2) as txn:
txn.execute_on_db1("INSERT INTO users (id, name) VALUES (2, 'Bob')")
txn.execute_on_db2("INSERT INTO invalid_table VALUES (2, 100)") # Fails
txn.commit()
except Exception:
pass
# Verify both rolled back
assert db1.get_user(2) is None
assert db2.get_account(2) is None
Performance Tuning and Testing
Database performance is critical to application responsiveness and scalability. Performance testing should identify slow queries, missing indexes, and inefficient database access patterns.
Identifying Slow Queries
PostgreSQL:
-- Enable query logging (postgresql.conf)
-- log_min_duration_statement = 1000 # Log queries taking > 1 second
-- View slow queries from pg_stat_statements
SELECT
query,
calls,
total_exec_time,
mean_exec_time,
max_exec_time,
stddev_exec_time,
rows
FROM pg_stat_statements
ORDER BY mean_exec_time DESC
LIMIT 20;
-- Identify queries with high variance (unpredictable performance)
SELECT
query,
calls,
mean_exec_time,
stddev_exec_time,
(stddev_exec_time / mean_exec_time) AS coefficient_of_variation
FROM pg_stat_statements
WHERE calls > 100
AND mean_exec_time > 0
ORDER BY (stddev_exec_time / mean_exec_time) DESC
LIMIT 20;
MySQL:
-- Enable slow query log
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 1; -- Queries taking > 1 second
-- View slow query summary (requires sys schema)
SELECT
query,
exec_count,
avg_latency,
max_latency,
rows_examined_avg
FROM sys.statement_analysis
ORDER BY avg_latency DESC
LIMIT 20;
-- Find full table scans
SELECT
object_schema,
object_name,
count_read
FROM performance_schema.table_io_waits_summary_by_table
WHERE object_schema NOT IN ('mysql', 'performance_schema', 'sys')
ORDER BY count_read DESC;
Index Optimization
Indexes dramatically improve read performance but slow down writes. Finding the right balance is key.
Identifying missing indexes:
-- PostgreSQL: Find sequential scans on large tables
SELECT
schemaname,
tablename,
seq_scan,
seq_tup_read,
idx_scan,
seq_tup_read / seq_scan AS avg_seq_tup,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_stat_user_tables
WHERE seq_scan > 0
AND seq_tup_read / seq_scan > 10000 -- Large sequential scans
ORDER BY seq_tup_read DESC
LIMIT 20;
-- MySQL (as discussed in [OWASP ZAP Automation: Security Scanning in CI/CD](/blog/owasp-zap-automation)): Find queries not using indexes
SELECT
query_id,
query,
exec_count,
rows_examined,
rows_sent,
rows_examined / rows_sent AS rows_examined_ratio
FROM sys.statement_analysis
WHERE rows_examined > 10000
AND rows_sent < 100
AND query NOT LIKE '%information_schema%'
ORDER BY rows_examined DESC;
Testing index effectiveness:
-- Test EXPLAIN plan before adding index
EXPLAIN ANALYZE
SELECT * FROM users WHERE email = 'user@example.com';
-- Seq Scan on users (cost=0.00..2345.00 rows=1 width=100) (actual time=50.234..50.235 rows=1 loops=1)
-- Add index
CREATE INDEX idx_users_email ON users(email);
-- Test EXPLAIN plan after adding index
EXPLAIN ANALYZE
SELECT * FROM users WHERE email = 'user@example.com';
-- Index Scan using idx_users_email (cost=0.42..8.44 rows=1 width=100) (actual time=0.045..0.046 rows=1 loops=1)
Index usage monitoring:
-- PostgreSQL: Find unused indexes
SELECT
schemaname,
tablename,
indexname,
idx_scan,
pg_size_pretty(pg_relation_size(indexrelid)) AS index_size
FROM pg_stat_user_indexes
WHERE idx_scan = 0
AND indexrelname NOT LIKE 'pg_%'
ORDER BY pg_relation_size(indexrelid) DESC;
-- Consider dropping unused indexes
Query Optimization Techniques
Use EXPLAIN ANALYZE to understand query execution:
EXPLAIN (ANALYZE, BUFFERS, VERBOSE)
SELECT
u.name,
COUNT(o.id) as order_count,
SUM(o.total) as total_spent
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at > '2024-01-01'
GROUP BY u.id, u.name
HAVING SUM(o.total) > 1000
ORDER BY total_spent DESC
LIMIT 100;
Common optimization techniques:
**1. Avoid SELECT ***
-- Bad: Fetches all columns
SELECT * FROM users WHERE id = 123;
-- Good: Fetch only needed columns
SELECT id, email, name FROM users WHERE id = 123;
2. Use appropriate JOIN types
-- Use INNER JOIN when you only need matching rows
SELECT u.name, o.total
FROM users u
INNER JOIN orders o ON u.id = o.user_id;
-- Use LEFT JOIN when you need all left table rows
SELECT u.name, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
GROUP BY u.id, u.name;
3. Avoid N+1 queries
# Bad: N+1 query problem
users = db.query("SELECT * FROM users")
for user in users:
orders = db.query(f"SELECT * FROM orders WHERE user_id = {user.id}")
# Process orders
# Good: Use JOIN or batch loading
result = db.query("""
SELECT
u.id, u.name,
o.id as order_id, o.total
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
""")
4. Use indexes for WHERE, JOIN, ORDER BY columns
-- Query pattern
SELECT * FROM orders
WHERE user_id = 123 AND status = 'pending'
ORDER BY created_at DESC;
-- Create composite index
CREATE INDEX idx_orders_user_status_created
ON orders(user_id, status, created_at DESC);
Load Testing Database Performance
Database performance should be tested under realistic load conditions.
Example load test script (Python + psycopg2):
import time
import random
import psycopg2
from concurrent.futures import ThreadPoolExecutor, as_completed
def simulate_user_session(conn_string, user_id):
"""Simulate a user session with multiple queries"""
conn = psycopg2.connect(conn_string)
cursor = conn.cursor()
start_time = time.time()
try:
# Read user profile
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user = cursor.fetchone()
# Read user's orders
cursor.execute("""
SELECT * FROM orders
WHERE user_id = %s
ORDER BY created_at DESC
LIMIT 10
""", (user_id,))
orders = cursor.fetchall()
# Update last_seen timestamp
cursor.execute("""
UPDATE users
SET last_seen = NOW()
WHERE id = %s
""", (user_id,))
conn.commit()
duration = time.time() - start_time
return {'success': True, 'duration': duration}
except Exception as e:
conn.rollback()
duration = time.time() - start_time
return {'success': False, 'duration': duration, 'error': str(e)}
finally:
cursor.close()
conn.close()
def run_load_test(conn_string, num_users=100, num_sessions=1000):
"""Run database load test"""
user_ids = range(1, num_users + 1)
results = []
start_time = time.time()
with ThreadPoolExecutor(max_workers=20) as executor:
futures = [
executor.submit(simulate_user_session, conn_string, random.choice(user_ids))
for _ in range(num_sessions)
]
for future in as_completed(futures):
results.append(future.result())
total_time = time.time() - start_time
# Analyze results
successful = [r for r in results if r['success']]
failed = [r for r in results if not r['success']]
durations = [r['duration'] for r in successful]
print(f"Total time: {total_time:.2f}s")
print(f"Throughput: {len(results) / total_time:.2f} requests/sec")
print(f"Success rate: {len(successful) / len(results) * 100:.2f}%")
print(f"Average response time: {sum(durations) / len(durations):.3f}s")
print(f"P95 response time: {sorted(durations)[int(len(durations) * 0.95)]:.3f}s")
print(f"P99 response time: {sorted(durations)[int(len(durations) * 0.99)]:.3f}s")
if failed:
print(f"\nFailures: {len(failed)}")
for f in failed[:5]:
print(f" - {f['error']}")
if __name__ == "__main__":
conn_string = "postgresql://user:password@localhost:5432/mydb"
run_load_test(conn_string, num_users=100, num_sessions=1000)
Migration Testing
Database migrations are among the riskiest operations in software development. A single faulty migration can corrupt data, cause downtime, or introduce subtle bugs that manifest weeks later.
Types of Database Migrations
1. Schema migrations - Adding/removing tables, columns, indexes, constraints 2. Data migrations - Transforming existing data to new formats 3. Hybrid migrations - Schema + data changes combined
Migration Testing Strategy
1. Test in non-production first (always!)
2. Use migration tools (Flyway, Liquibase, Alembic, Django migrations)
3. Make migrations reversible (provide rollback scripts)
4. Test with production-like data volumes
5. Measure migration duration (estimate production downtime)
Testing Schema Migrations
Example migration: Adding a column
-- Migration: add_email_verified_column.sql
-- UP
ALTER TABLE users ADD COLUMN email_verified BOOLEAN DEFAULT FALSE NOT NULL;
CREATE INDEX idx_users_email_verified ON users(email_verified);
-- DOWN (rollback)
DROP INDEX idx_users_email_verified;
ALTER TABLE users DROP COLUMN email_verified;
Test checklist:
- ✓ Migration runs without errors
- ✓ Default value is correctly applied to existing rows
- ✓ NOT NULL constraint doesn’t break existing data
- ✓ Index is created successfully
- ✓ Application queries still work
- ✓ Rollback script works correctly
- ✓ Migration duration is acceptable
Automated migration test:
import pytest
def test_add_email_verified_column(db_connection):
# Setup: Create test data
cursor = db_connection.cursor()
cursor.execute("INSERT INTO users (id, email) VALUES (1, 'test@example.com')")
db_connection.commit()
# Run migration UP
cursor.execute(open('migrations/add_email_verified_column_up.sql').read())
db_connection.commit()
# Verify column exists with correct default
cursor.execute("SELECT email_verified FROM users WHERE id = 1")
email_verified = cursor.fetchone()[0]
assert email_verified == False
# Verify index exists
cursor.execute("""
SELECT indexname FROM pg_indexes
WHERE tablename = 'users' AND indexname = 'idx_users_email_verified'
""")
assert cursor.fetchone() is not None
# Test application query still works
cursor.execute("SELECT * FROM users WHERE id = 1")
user = cursor.fetchone()
assert user is not None
# Run migration DOWN (rollback)
cursor.execute(open('migrations/add_email_verified_column_down.sql').read())
db_connection.commit()
# Verify column removed
try:
cursor.execute("SELECT email_verified FROM users WHERE id = 1")
pytest.fail("Column should not exist after rollback")
except Exception as e:
assert 'column "email_verified" does not exist' in str(e)
Testing Data Migrations
Example: Splitting name column into first_name and last_name
-- Migration: split_name_column.sql
-- Step 1: Add new columns
ALTER TABLE users ADD COLUMN first_name VARCHAR(100);
ALTER TABLE users ADD COLUMN last_name VARCHAR(100);
-- Step 2: Migrate data
UPDATE users
SET
first_name = SPLIT_PART(name, ' ', 1),
last_name = SPLIT_PART(name, ' ', 2)
WHERE name IS NOT NULL;
-- Step 3: Verify migration
SELECT COUNT(*) as unmigrated_count
FROM users
WHERE name IS NOT NULL
AND (first_name IS NULL OR last_name IS NULL);
-- Should be 0
-- Step 4: Drop old column (in separate migration after verification)
-- ALTER TABLE users DROP COLUMN name;
Data migration testing:
def test_split_name_column(db_connection):
cursor = db_connection.cursor()
# Setup test data with various name formats
test_cases = [
(1, 'John Doe', 'John', 'Doe'),
(2, 'Jane Smith', 'Jane', 'Smith'),
(3, 'Madonna', 'Madonna', ''), # Single name
(4, 'Jean-Claude Van Damme', 'Jean-Claude', 'Van Damme'), # Hyphenated
]
for user_id, name, expected_first, expected_last in test_cases:
cursor.execute("INSERT INTO users (id, name) VALUES (%s, %s)", (user_id, name))
db_connection.commit()
# Run migration
cursor.execute(open('migrations/split_name_column.sql').read())
db_connection.commit()
# Verify each test case
for user_id, name, expected_first, expected_last in test_cases:
cursor.execute("""
SELECT first_name, last_name
FROM users
WHERE id = %s
""", (user_id,))
first_name, last_name = cursor.fetchone()
assert first_name == expected_first, \
f"User {user_id}: expected first_name={expected_first}, got {first_name}"
assert last_name == expected_last or last_name == '', \
f"User {user_id}: expected last_name={expected_last}, got {last_name}"
Testing Large-Scale Migrations
For migrations on large tables (millions of rows), special considerations apply:
1. Batch processing
-- Instead of single UPDATE on millions of rows:
-- UPDATE users SET status = 'active' WHERE status IS NULL; -- Locks table!
-- Use batched approach:
DO $$
DECLARE
batch_size INT := 10000;
rows_updated INT;
BEGIN
LOOP
UPDATE users
SET status = 'active'
WHERE id IN (
SELECT id FROM users
WHERE status IS NULL
LIMIT batch_size
);
GET DIAGNOSTICS rows_updated = ROW_COUNT;
EXIT WHEN rows_updated = 0;
COMMIT; -- Commit each batch
RAISE NOTICE 'Updated % rows', rows_updated;
-- Small delay to avoid overwhelming database
PERFORM pg_sleep(0.1);
END LOOP;
END $$;
2. Online migrations (zero-downtime)
-- Phase 1: Add new column (non-blocking)
ALTER TABLE users ADD COLUMN email_lower VARCHAR(255);
-- Phase 2: Backfill data in batches
-- (Run in background, doesn't block application)
-- Phase 3: Add trigger to keep columns in sync
CREATE OR REPLACE FUNCTION sync_email_lower()
RETURNS TRIGGER AS $$
BEGIN
NEW.email_lower = LOWER(NEW.email);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER users_email_lower_trigger
BEFORE INSERT OR UPDATE OF email ON users
FOR EACH ROW
EXECUTE FUNCTION sync_email_lower();
-- Phase 4: After backfill complete, application uses new column
-- Phase 5: Eventually drop old column and trigger
NoSQL Database Testing
NoSQL databases (MongoDB, Cassandra, DynamoDB, Redis) have different characteristics and require adapted testing strategies.
MongoDB Testing
Testing document validation:
// Define schema validation rules
db.createCollection("users", {
validator: {
$jsonSchema: {
bsonType: "object",
required: ["email", "createdAt"],
properties: {
email: {
bsonType: "string",
pattern: "^.+@.+$"
},
age: {
bsonType: "int",
minimum: 0,
maximum: 150
},
createdAt: {
bsonType: "date"
}
}
}
}
});
// Test validation
// Should succeed
db.users.insertOne({
email: "user@example.com",
age: 25,
createdAt: new Date()
});
// Should fail (invalid email)
db.users.insertOne({
email: "invalid-email",
createdAt: new Date()
});
// Should fail (age out of range)
db.users.insertOne({
email: "user@example.com",
age: 200,
createdAt: new Date()
});
Testing indexes:
// Create index
db.users.createIndex({ email: 1 }, { unique: true });
// Test index is used
db.users.find({ email: "user@example.com" }).explain("executionStats");
// Verify: Should use index scan, not collection scan
Testing data consistency:
// Test: Verify referential integrity manually (MongoDB doesn't enforce it)
// Find orders with non-existent users
const ordersWithInvalidUsers = db.orders.aggregate([
{
$lookup: {
from: "users",
localField: "userId",
foreignField: "_id",
as: "user"
}
},
{
$match: {
user: { $size: 0 } // No matching user found
}
}
]).toArray();
// Should be empty
assert(ordersWithInvalidUsers.length === 0,
`Found ${ordersWithInvalidUsers.length} orders with invalid users`);
Cassandra Testing
Testing replication and consistency:
from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
cluster = Cluster(['cassandra-node1', 'cassandra-node2'])
session = cluster.connect('my_keyspace')
def test_consistency_levels():
# Write with QUORUM (majority of replicas must acknowledge)
insert_stmt = session.prepare("""
INSERT INTO users (id, email, name)
VALUES (?, ?, ?)
""")
insert_stmt.consistency_level = ConsistencyLevel.QUORUM
session.execute(insert_stmt, (uuid.uuid4(), 'test@example.com', 'Test User'))
# Read with ONE (from any replica) - may return stale data
select_stmt = session.prepare("SELECT * FROM users WHERE email = ?")
select_stmt.consistency_level = ConsistencyLevel.ONE
result = session.execute(select_stmt, ('test@example.com',))
# Read with QUORUM (from majority) - guarantees latest data
select_stmt.consistency_level = ConsistencyLevel.QUORUM
result = session.execute(select_stmt, ('test@example.com',))
Testing partition key design:
# Bad partition key: All data in single partition (hot partition)
# CREATE TABLE events_bad (
# event_type text,
# event_id uuid,
# data text,
# PRIMARY KEY (event_type, event_id)
# );
# Good partition key: Data distributed across partitions
# CREATE TABLE events_good (
# event_date date,
# event_type text,
# event_id uuid,
# data text,
# PRIMARY KEY ((event_date, event_type), event_id)
# );
def test_partition_distribution():
"""Verify data is evenly distributed across partitions"""
query = "SELECT token(event_date, event_type), count(*) FROM events_good GROUP BY token(event_date, event_type)"
results = session.execute(query)
partition_counts = [row.count for row in results]
# Verify no hot partitions (no partition should have >2x average)
avg_count = sum(partition_counts) / len(partition_counts)
max_count = max(partition_counts)
assert max_count < 2 * avg_count, \
f"Hot partition detected: max={max_count}, avg={avg_count}"
Redis Testing
Testing cache expiration:
import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0)
def test_cache_expiration():
# Set value with TTL
r.setex('user:123:profile', 10, '{"name": "John"}')
# Immediately available
assert r.get('user:123:profile') is not None
# Still available after 5 seconds
time.sleep(5)
assert r.get('user:123:profile') is not None
# Expired after 11 seconds
time.sleep(6)
assert r.get('user:123:profile') is None
def test_cache_invalidation():
"""Test cache invalidation on data update"""
# Cache user data
r.set('user:456:profile', '{"name": "Jane", "age": 25}')
# Simulate user update in database
db.execute("UPDATE users SET age = 26 WHERE id = 456")
# Invalidate cache
r.delete('user:456:profile')
# Next read should fetch from database
assert r.get('user:456:profile') is None
Conclusion
Database testing is a critical discipline that requires both technical depth and systematic rigor. From validating data integrity to optimizing query performance, from testing complex migrations to adapting strategies for NoSQL systems, comprehensive database testing protects the foundation of your applications.
Key takeaways:
- Data integrity is paramount - Use constraints, validate relationships, test business rules
- Performance scales exponentially - Optimize early, monitor continuously, test under load
- Migrations are high-risk - Test thoroughly, make reversible, batch large operations
- NoSQL requires different strategies - Understand consistency models, test partition design
- Automate database tests - Integration into CI/CD catches regressions early
- Monitor production databases - Use slow query logs, track metrics, alert on anomalies
Invest in robust database testing practices, and you’ll prevent data corruption, avoid performance crises, and deliver reliable systems that users can trust.