El Desafío del Testing de Bases de Datos en DevOps

Los cambios en bases de datos son frecuentemente la parte más riesgosa de los despliegues de software. Los enfoques tradicionales de gestión de bases de datos - cambios manuales de esquema, entornos inconsistentes y falta de control de versiones - crean cuellos de botella que ralentizan los pipelines CI/CD y aumentan el riesgo de despliegue. Las prácticas modernas de DevOps demandan que los cambios de bases de datos sean probados, versionados y automatizados con el mismo rigor que el código de aplicación.

Este artículo explora estrategias integrales de Database DevOps para automatización de pruebas, cubriendo herramientas de migración (Flyway, Liquibase), validación de esquemas, gestión de datos de prueba, testing de rollback e integración CI/CD.

Testing de Migraciones de Base de Datos con Flyway

Configuración y Testing de Flyway

-- V1__create_users_table.sql
CREATE TABLE users (
    id BIGSERIAL PRIMARY KEY,
    email VARCHAR(255) UNIQUE NOT NULL,
    username VARCHAR(100) UNIQUE NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_username ON users(username);
# tests/test_flyway_migrations.py
class FlywayMigrationTester:
    def test_migration_execution(self):
        """Probar que las migraciones se ejecutan exitosamente"""
        self.flyway.clean()
        result = self.flyway.migrate()

        assert result.success
        assert result.migrations_executed > 0

    def test_migration_idempotency(self):
        """Probar que las migraciones son idempotentes"""
        result1 = self.flyway.migrate()
        result2 = self.flyway.migrate()

        # La segunda ejecución no debe aplicar migraciones
        assert result2.migrations_executed == 0

    def test_schema_validation(self):
        """Validar esquema después de migraciones"""
        self.flyway.migrate()

        conn = psycopg2.connect(**self.db_config)
        cursor = conn.cursor()

        # Verificar tablas existen
        cursor.execute("""
            SELECT table_name
            FROM information_schema.tables
            WHERE table_schema = 'public'
        """)
        tables = [row[0] for row in cursor.fetchall()]
        assert 'users' in tables

        # Verificar columnas
        cursor.execute("""
            SELECT column_name, data_type, is_nullable
            FROM information_schema.columns
            WHERE table_name = 'users'
        """)
        columns = {row[0]: {'type': row[1], 'nullable': row[2]} for row in cursor.fetchall()}

        assert 'id' in columns
        assert columns['email']['nullable'] == 'NO'

        conn.close()

Integración CI/CD con Flyway

# .gitlab-ci.yml
database_test:
  stage: test
  image: postgres:15
  services:
    - postgres:15
  variables:
    POSTGRES_DB: test_db
    POSTGRES_USER: test_user
    POSTGRES_PASSWORD: test_password
  script:
    - pytest tests/test_flyway_migrations.py -v

migrate_staging:
  stage: migrate
  image: flyway/flyway:latest
  script:
    - flyway migrate
      -url=jdbc:postgresql://$DB_HOST:5432/$DB_NAME
      -user=$DB_USER
      -password=$DB_PASSWORD
      -locations=filesystem:migrations
  environment:
    name: staging
  only:
    - main

Integración y Testing de Liquibase

Changesets de Liquibase

<!-- changesets/001-create-products-table.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<databaseChangeLog
    xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">

    <changeSet id="001" author="qa-team">
        <createTable tableName="products">
            <column name="id" type="BIGINT" autoIncrement="true">
                <constraints primaryKey="true" nullable="false"/>
            </column>
            <column name="name" type="VARCHAR(255)">
                <constraints nullable="false"/>
            </column>
            <column name="price" type="DECIMAL(10,2)">
                <constraints nullable="false"/>
            </column>
            <column name="stock" type="INT" defaultValue="0">
                <constraints nullable="false"/>
            </column>
        </createTable>

        <createIndex indexName="idx_products_name" tableName="products">
            <column name="name"/>
        </createIndex>

        <rollback>
            <dropTable tableName="products"/>
        </rollback>
    </changeSet>
</databaseChangeLog>

Framework de Testing de Liquibase

# tests/test_liquibase_migrations.py
class LiquibaseTester:
    def run_liquibase_command(self, command: str):
        """Ejecutar comando Liquibase"""
        cmd = [
            "liquibase",
            f"--url={self.jdbc_url}",
            f"--username={self.db_config['user']}",
            f"--password={self.db_config['password']}",
            f"--changeLogFile={self.changelog_path}",
            command
        ]
        return subprocess.run(cmd, capture_output=True, text=True)

    def test_changeset_validation(self):
        """Probar que los changesets son válidos"""
        result = self.run_liquibase_command("validate")
        assert result.returncode == 0

    def test_update_execution(self):
        """Probar que update se ejecuta exitosamente"""
        self.run_liquibase_command("dropAll")
        result = self.run_liquibase_command("update")
        assert result.returncode == 0

    def test_rollback_functionality(self):
        """Probar funcionalidad de rollback"""
        self.run_liquibase_command("update")
        self.run_liquibase_command("tag version-1.0")
        result = self.run_liquibase_command("rollback version-1.0")
        assert result.returncode == 0

Detección de Schema Drift

# tests/test_schema_drift.py
class SchemaDriftDetector:
    def get_actual_schema(self):
        """Obtener esquema actual de la base de datos"""
        inspector = inspect(self.engine)
        schema = {'tables': {}, 'indexes': {}}

        for table_name in inspector.get_table_names():
            columns = {}
            for column in inspector.get_columns(table_name):
                columns[column['name']] = {
                    'type': str(column['type']),
                    'nullable': column['nullable']
                }
            schema['tables'][table_name] = columns

            indexes = []
            for index in inspector.get_indexes(table_name):
                indexes.append({
                    'name': index['name'],
                    'columns': index['column_names']
                })
            schema['indexes'][table_name] = indexes

        return schema

    def test_no_schema_drift(self):
        """Verificar que el esquema actual coincide con el esperado"""
        actual = self.get_actual_schema()
        expected = self.expected_schema

        actual_tables = set(actual['tables'].keys())
        expected_tables = set(expected['tables'].keys())

        missing_tables = expected_tables - actual_tables
        extra_tables = actual_tables - expected_tables

        assert not missing_tables, f"Tablas faltantes: {missing_tables}"
        assert not extra_tables, f"Tablas inesperadas: {extra_tables}"

Gestión de Datos de Prueba

# tests/fixtures/test_data_seeder.py
class TestDataSeeder:
    def seed_users(self, count: int = 100):
        """Sembrar datos de usuarios de prueba"""
        session = self.Session()

        for _ in range(count):
            user = {
                'email': self.faker.email(),
                'username': self.faker.user_name(),
                'first_name': self.faker.first_name(),
                'last_name': self.faker.last_name()
            }
            session.execute(
                "INSERT INTO users (email, username, first_name, last_name) "
                "VALUES (:email, :username, :first_name, :last_name)",
                user
            )

        session.commit()
        session.close()

    def clean_all_tables(self):
        """Limpiar todos los datos de prueba"""
        session = self.Session()
        session.execute("SET session_replication_role = 'replica';")

        result = session.execute("""
            SELECT tablename FROM pg_tables
            WHERE schemaname = 'public'
            AND tablename NOT IN ('databasechangelog', 'flyway_schema_history')
        """)

        for row in result:
            session.execute(f"TRUNCATE TABLE {row[0]} CASCADE")

        session.execute("SET session_replication_role = 'origin';")
        session.commit()
        session.close()

@pytest.fixture(scope="function")
def seeded_database(database_url):
    """Fixture de pytest para sembrar y limpiar base de datos"""
    seeder = TestDataSeeder(database_url)
    seeder.seed_users(100)
    seeder.seed_products(50)
    yield seeder
    seeder.clean_all_tables()

Workflow de GitHub Actions

# .github/workflows/database-ci.yml
name: Database CI/CD

on:
  pull_request:
    paths:
      - 'migrations/**'
  push:
    branches:
      - main

jobs:
  test-migrations:
    runs-on: ubuntu-latest

    services:
      postgres:
        image: postgres:15
        env:
          POSTGRES_PASSWORD: postgres
          POSTGRES_DB: test_db
        ports:
          - 5432:5432

    steps:
      - uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install pytest psycopg2-binary sqlalchemy faker

      - name: Install Flyway
        run: |
          wget -qO- https://download.red-gate.com/maven/release/com/redgate/flyway/flyway-commandline/10.0.0/flyway-commandline-10.0.0-linux-x64.tar.gz | tar -xz
          sudo ln -s $(pwd)/flyway-10.0.0/flyway /usr/local/bin/flyway

      - name: Run Flyway tests
        env:
          DB_HOST: localhost
          DB_NAME: test_db
          DB_USER: postgres
          DB_PASSWORD: postgres
        run: |
          pytest tests/test_flyway_migrations.py -v

      - name: Run schema drift tests
        run: |
          pytest tests/test_schema_drift.py -v

Conclusión

Database DevOps para automatización de pruebas requiere tratar los cambios de base de datos como código con testing integral, control de versiones y estrategias de despliegue automatizadas. Implementando testing de migraciones con Flyway y Liquibase, detección de schema drift, sembrado automatizado de datos de prueba e integración CI/CD, los equipos pueden lograr entrega continua de bases de datos con confianza.

La clave está en establecer validación automatizada para cada cambio de base de datos - desde scripts de migración hasta modificaciones de esquema - asegurando consistencia entre entornos y previniendo problemas en producción.