The Secrets Management Challenge in CI/CD

Modern CI/CD pipelines require access to sensitive credentials - database passwords, API keys, cloud provider tokens, encryption keys, and service account credentials. Traditional approaches of hardcoding secrets in code, storing them in plain text configuration files, or passing them as unencrypted environment variables create severe security vulnerabilities and compliance risks.

Effective secrets management for test automation requires balancing security with usability - secrets must be securely stored, rotated regularly, audited comprehensively, and injected into pipelines without exposing them in logs or artifacts. This article explores comprehensive secrets management strategies using HashiCorp Vault, SOPS (Secrets OPerationS), and secure testing patterns for CI/CD pipelines.

HashiCorp Vault Integration

Vault Configuration for CI/CD

# vault-config.hcl
storage "consul" {
  address = "127.0.0.1:8500"
  path    = "vault/"
}

listener "tcp" {
  address     = "0.0.0.0:8200"
  tls_disable = 0
  tls_cert_file = "/etc/vault/tls/vault.crt"
  tls_key_file  = "/etc/vault/tls/vault.key"
}

api_addr = "https://vault.example.com:8200"
cluster_addr = "https://vault.example.com:8201"
ui = true

# Enable audit logging
audit {
  type = "file"
  options = {
    file_path = "/var/log/vault/audit.log"
  }
}

Vault Secret Engine Setup

#!/bin/bash
# scripts/vault-setup.sh

# Enable KV v2 secrets engine
vault secrets enable -path=secret kv-v2

# Create secrets for different environments
vault kv put secret/test/database \
  username=test_user \
  password=secure_password \
  host=test-db.example.com \
  port=5432

vault kv put secret/staging/database \
  username=staging_user \
  password=secure_password \
  host=staging-db.example.com \
  port=5432

# Enable AppRole auth method for CI/CD
vault auth enable approle

# Create policy for CI/CD access
vault policy write cicd-policy - <<EOF
path "secret/data/test/*" {
  capabilities = ["read", "list"]
}

path "secret/data/staging/*" {
  capabilities = ["read", "list"]
}

path "auth/token/renew-self" {
  capabilities = ["update"]
}
EOF

# Create AppRole for CI/CD
vault write auth/approle/role/cicd-role \
  token_policies="cicd-policy" \
  token_ttl=1h \
  token_max_ttl=4h \
  secret_id_ttl=24h

# Get role ID and secret ID
vault read auth/approle/role/cicd-role/role-id
vault write -f auth/approle/role/cicd-role/secret-id

Vault Client for Test Automation

# tests/vault_client.py
import hvac
import os
from typing import Dict, Optional

class VaultClient:
    def __init__(self, vault_addr: str = None, approle_id: str = None, approle_secret: str = None):
        self.vault_addr = vault_addr or os.getenv('VAULT_ADDR', 'https://vault.example.com:8200')
        self.approle_id = approle_id or os.getenv('VAULT_ROLE_ID')
        self.approle_secret = approle_secret or os.getenv('VAULT_SECRET_ID')

        self.client = hvac.Client(url=self.vault_addr)
        self._authenticate()

    def _authenticate(self):
        """Authenticate using AppRole"""
        try:
            response = self.client.auth.approle.login(
                role_id=self.approle_id,
                secret_id=self.approle_secret
            )
            self.client.token = response['auth']['client_token']
            print(f"✓ Authenticated to Vault (token TTL: {response['auth']['lease_duration']}s)")
        except Exception as e:
            raise Exception(f"Vault authentication failed: {str(e)}")

    def get_secret(self, path: str, key: Optional[str] = None) -> Dict:
        """Retrieve secret from Vault"""
        try:
            secret = self.client.secrets.kv.v2.read_secret_version(
                path=path,
                mount_point='secret'
            )

            data = secret['data']['data']

            if key:
                return data.get(key)
            return data
        except Exception as e:
            raise Exception(f"Failed to retrieve secret {path}: {str(e)}")

    def get_database_credentials(self, environment: str) -> Dict:
        """Get database credentials for specified environment"""
        path = f"{environment}/database"
        return self.get_secret(path)

    def renew_token(self):
        """Renew Vault token"""
        try:
            self.client.auth.token.renew_self()
            print("✓ Vault token renewed")
        except Exception as e:
            print(f"⚠ Token renewal failed: {str(e)}")

# Usage in tests
@pytest.fixture(scope="session")
def vault_client():
    """Pytest fixture for Vault client"""
    client = VaultClient()
    yield client
    # Token will auto-expire

def test_database_connection_with_vault(vault_client):
    """Test database connection using Vault credentials"""
    db_creds = vault_client.get_database_credentials('test')

    conn = psycopg2.connect(
        host=db_creds['host'],
        port=db_creds['port'],
        database=db_creds['database'],
        user=db_creds['username'],
        password=db_creds['password']
    )

    cursor = conn.cursor()
    cursor.execute("SELECT 1")
    result = cursor.fetchone()

    assert result[0] == 1
    conn.close()

GitLab CI Integration with Vault

# .gitlab-ci.yml
variables:
  VAULT_ADDR: "https://vault.example.com:8200"

.vault_auth: &vault_auth
  before_script:
    - apt-get update && apt-get install -y curl jq
    - |
      # Authenticate to Vault using JWT
      export VAULT_TOKEN=$(curl -s \
        --request POST \
        --data "{\"jwt\": \"${CI_JOB_JWT}\", \"role\": \"cicd-role\"}" \
        ${VAULT_ADDR}/v1/auth/jwt/login | jq -r '.auth.client_token')
    - |
      # Retrieve secrets
      export DB_PASSWORD=$(curl -s \
        --header "X-Vault-Token: ${VAULT_TOKEN}" \
        ${VAULT_ADDR}/v1/secret/data/test/database | jq -r '.data.data.password')

test_with_vault_secrets:
  stage: test
  <<: *vault_auth
  script:
    - echo "Running tests with secrets from Vault..."
    - pytest tests/ -v --db-password=$DB_PASSWORD
  after_script:
    - unset VAULT_TOKEN
    - unset DB_PASSWORD

SOPS (Secrets OPerationS) for Encrypted Secrets

SOPS Configuration

# .sops.yaml
creation_rules:
  - path_regex: secrets/test/.*\.yaml$
    kms: 'arn:aws:kms:us-east-1:123456789012:key/test-key-id'
    pgp: 'FBC7B9E2A4F9289AC0C1D4843D16CEE4A27381B4'

  - path_regex: secrets/staging/.*\.yaml$
    kms: 'arn:aws:kms:us-east-1:123456789012:key/staging-key-id'
    pgp: 'FBC7B9E2A4F9289AC0C1D4843D16CEE4A27381B4'

  - path_regex: secrets/production/.*\.yaml$
    kms: 'arn:aws:kms:us-east-1:123456789012:key/prod-key-id'
    pgp: 'ABC123456789DEF123456789ABC123456789DEF1'

Encrypted Secrets File

# secrets/test/database.enc.yaml (encrypted with SOPS)
database:
    host: ENC[AES256_GCM,data:fHR4c3QtZGIuZXhhbXBsZS5jb20=,iv:xxx,tag:yyy,type:str]
    port: ENC[AES256_GCM,data:NTQzMg==,iv:xxx,tag:yyy,type:int]
    username: ENC[AES256_GCM,data:dGVzdF91c2Vy,iv:xxx,tag:yyy,type:str]
    password: ENC[AES256_GCM,data:c2VjdXJlX3Bhc3N3b3Jk,iv:xxx,tag:yyy,type:str]
sops:
    kms:
    -   arn: arn:aws:kms:us-east-1:123456789012:key/test-key-id
        created_at: "2025-01-15T10:00:00Z"
    pgp:
    -   fp: FBC7B9E2A4F9289AC0C1D4843D16CEE4A27381B4
        created_at: "2025-01-15T10:00:00Z"
    version: 3.7.3

SOPS Integration in Tests

# tests/sops_secrets.py
import subprocess
import yaml
import json
from typing import Dict

class SOPSSecretsManager:
    def __init__(self, secrets_path: str):
        self.secrets_path = secrets_path

    def decrypt_file(self, file_path: str) -> Dict:
        """Decrypt SOPS encrypted file"""
        try:
            result = subprocess.run(
                ['sops', '--decrypt', file_path],
                capture_output=True,
                text=True,
                check=True
            )
            return yaml.safe_load(result.stdout)
        except subprocess.CalledProcessError as e:
            raise Exception(f"SOPS decryption failed: {e.stderr}")

    def get_secret(self, environment: str, secret_name: str) -> Dict:
        """Get specific secret for environment"""
        file_path = f"{self.secrets_path}/{environment}/{secret_name}.enc.yaml"
        return self.decrypt_file(file_path)

    def validate_encrypted_file(self, file_path: str) -> bool:
        """Validate that file is properly encrypted"""
        with open(file_path, 'r') as f:
            content = yaml.safe_load(f)

        # Check for SOPS metadata
        if 'sops' not in content:
            return False

        # Check that values are encrypted (contain ENC[])
        def check_encrypted(obj):
            if isinstance(obj, dict):
                return all(check_encrypted(v) for v in obj.values())
            elif isinstance(obj, list):
                return all(check_encrypted(item) for item in obj)
            elif isinstance(obj, str):
                return obj.startswith('ENC[') or obj in ['sops']
            return True

        # Exclude sops metadata from check
        data_to_check = {k: v for k, v in content.items() if k != 'sops'}
        return check_encrypted(data_to_check)

# Usage in tests
@pytest.fixture(scope="session")
def sops_secrets():
    """Pytest fixture for SOPS secrets"""
    return SOPSSecretsManager('secrets')

def test_database_connection_with_sops(sops_secrets):
    """Test database connection using SOPS encrypted secrets"""
    db_config = sops_secrets.get_secret('test', 'database')

    conn = psycopg2.connect(
        host=db_config['database']['host'],
        port=db_config['database']['port'],
        user=db_config['database']['username'],
        password=db_config['database']['password']
    )

    assert conn is not None
    conn.close()

def test_all_secrets_encrypted(sops_secrets):
    """Verify all secret files are properly encrypted"""
    import glob

    secret_files = glob.glob('secrets/**/*.enc.yaml', recursive=True)

    for file_path in secret_files:
        assert sops_secrets.validate_encrypted_file(file_path), \
            f"File {file_path} is not properly encrypted"

GitHub Actions with SOPS

# .github/workflows/test-with-sops.yml
name: Test with SOPS Secrets

on:
  pull_request:
  push:
    branches:
      - main

jobs:
  test:
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v3

      - name: Install SOPS
        run: |
          wget https://github.com/mozilla/sops/releases/download/v3.8.1/sops-v3.8.1.linux.amd64
          sudo mv sops-v3.8.1.linux.amd64 /usr/local/bin/sops
          sudo chmod +x /usr/local/bin/sops

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v2
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          aws-region: us-east-1

      - name: Decrypt secrets
        run: |
          sops --decrypt secrets/test/database.enc.yaml > /tmp/database.yaml

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install pytest pyyaml

      - name: Run tests
        env:
          SECRETS_PATH: /tmp
        run: |
          pytest tests/ -v

      - name: Cleanup secrets
        if: always()
        run: |
          rm -f /tmp/database.yaml

Secret Rotation Testing

# tests/test_secret_rotation.py
import pytest
import time
from datetime import datetime, timedelta

class SecretRotationTester:
    def __init__(self, vault_client):
        self.vault = vault_client

    def test_database_credential_rotation(self):
        """Test database credential rotation process"""
        environment = 'test'

        # Get initial credentials
        initial_creds = self.vault.get_database_credentials(environment)
        initial_password = initial_creds['password']

        # Verify connection with initial credentials
        conn1 = psycopg2.connect(
            host=initial_creds['host'],
            user=initial_creds['username'],
            password=initial_password
        )
        assert conn1 is not None
        conn1.close()

        # Trigger credential rotation
        self._rotate_database_credentials(environment)

        # Wait for rotation to complete
        time.sleep(5)

        # Get rotated credentials
        rotated_creds = self.vault.get_database_credentials(environment)
        rotated_password = rotated_creds['password']

        # Verify password changed
        assert rotated_password != initial_password

        # Verify connection with new credentials
        conn2 = psycopg2.connect(
            host=rotated_creds['host'],
            user=rotated_creds['username'],
            password=rotated_password
        )
        assert conn2 is not None
        conn2.close()

        # Verify old credentials are revoked
        with pytest.raises(psycopg2.OperationalError):
            psycopg2.connect(
                host=initial_creds['host'],
                user=initial_creds['username'],
                password=initial_password
            )

    def _rotate_database_credentials(self, environment: str):
        """Rotate database credentials in Vault"""
        # This would typically call Vault's database secrets engine
        # to rotate credentials automatically
        self.vault.client.write(
            f'database/rotate-role/{environment}-readonly',
            {}
        )

    def test_api_key_expiration(self):
        """Test API key expiration and renewal"""
        api_key = self.vault.get_secret('test/api-keys', 'third_party_api')

        # Verify API key metadata
        metadata = self.vault.client.secrets.kv.v2.read_secret_metadata(
            path='test/api-keys',
            mount_point='secret'
        )

        created_time = datetime.fromisoformat(
            metadata['data']['created_time'].replace('Z', '+00:00')
        )

        # Check if key is older than rotation policy (30 days)
        age = datetime.now(created_time.tzinfo) - created_time
        assert age < timedelta(days=30), "API key requires rotation"

Secret Leak Detection

# tests/test_secret_detection.py
import pytest
import re
import subprocess
from pathlib import Path

class SecretLeakDetector:
    # Patterns for common secret types
    PATTERNS = {
        'aws_access_key': r'AKIA[0-9A-Z]{16}',
        'aws_secret_key': r'[0-9a-zA-Z/+]{40}',
        'github_token': r'ghp_[0-9a-zA-Z]{36}',
        'generic_api_key': r'api[_-]?key[_-]?[=:]\s*["\']?([0-9a-zA-Z]{32,})["\']?',
        'private_key': r'-----BEGIN (RSA |EC |DSA )?PRIVATE KEY-----',
        'jwt': r'eyJ[A-Za-z0-9_-]*\\.eyJ[A-Za-z0-9_-]*\\.[A-Za-z0-9_-]*',
        'database_url': r'(postgres|mysql|mongodb)://[^:]+:[^@]+@[^/]+/.*'
    }

    def scan_file(self, file_path: Path) -> list:
        """Scan file for potential secrets"""
        findings = []

        try:
            with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                content = f.read()

                for secret_type, pattern in self.PATTERNS.items():
                    matches = re.finditer(pattern, content, re.IGNORECASE)
                    for match in matches:
                        findings.append({
                            'file': str(file_path),
                            'type': secret_type,
                            'line': content[:match.start()].count('\n') + 1,
                            'match': match.group(0)[:20] + '...'  # Truncate for safety
                        })
        except Exception as e:
            print(f"Warning: Could not scan {file_path}: {e}")

        return findings

    def scan_repository(self, exclude_patterns=None) -> list:
        """Scan entire repository for secrets"""
        exclude_patterns = exclude_patterns or [
            '*.enc.yaml',
            '.git/*',
            'node_modules/*',
            'venv/*',
            '*.pyc'
        ]

        all_findings = []

        for file_path in Path('.').rglob('*'):
            if file_path.is_file():
                # Check exclusions
                if any(file_path.match(pattern) for pattern in exclude_patterns):
                    continue

                findings = self.scan_file(file_path)
                all_findings.extend(findings)

        return all_findings

def test_no_secrets_in_code():
    """Verify no secrets are committed to repository"""
    detector = SecretLeakDetector()

    findings = detector.scan_repository(exclude_patterns=[
        '*.enc.yaml',  # Encrypted secrets are OK
        '.git/*',
        'tests/fixtures/*',  # Test fixtures may contain fake secrets
        '*.md'  # Documentation may contain example patterns
    ])

    assert len(findings) == 0, f"Found potential secrets in code: {findings}"

def test_gitleaks_scan():
    """Run Gitleaks to detect secrets in git history"""
    try:
        result = subprocess.run(
            ['gitleaks', 'detect', '--verbose', '--no-git'],
            capture_output=True,
            text=True,
            check=True
        )
        assert result.returncode == 0
    except subprocess.CalledProcessError as e:
        pytest.fail(f"Gitleaks detected secrets: {e.stdout}")
    except FileNotFoundError:
        pytest.skip("Gitleaks not installed")

Conclusion

Secrets management in CI/CD requires a comprehensive approach combining secure storage (HashiCorp Vault, SOPS), automated rotation, strict access controls, and continuous monitoring for leaks. By implementing proper secrets management with Vault integration, SOPS encryption, rotation testing, and leak detection, teams can maintain security while enabling efficient test automation.

The key is treating secrets as dynamic, short-lived credentials that are never hardcoded, always encrypted at rest, audited comprehensively, and automatically rotated. With these secrets management practices, teams can achieve secure CI/CD pipelines that meet compliance requirements while maintaining developer productivity.