Skip to content

Security Model

FireBreak implements defense-in-depth security across all layers of the platform.

Security Principles

  1. Zero Trust Architecture: Never trust, always verify
  2. Least Privilege: Minimal access required for operation
  3. Defense in Depth: Multiple layers of security controls
  4. Data Encryption: Encrypt data at rest and in transit
  5. Tenant Isolation: Complete separation between tenants

Authentication & Authorization

API Authentication

API Key Authentication

GET /api/v1/analyze
Authorization: Bearer <api-key>
X-Tenant-ID: acme-insurance

API keys: - Generated per tenant - Scoped to specific permissions - Rotated every 90 days - Revocable instantly

JWT Token Authentication (Future)

// Token structure
{
  "sub": "user@acme-insurance.com",
  "tenant_id": "acme-insurance",
  "role": "admin",
  "exp": 1735689600,
  "permissions": ["read:assessments", "write:assessments"]
}

Authorization Model

Role-Based Access Control (RBAC)

{
  "roles": {
    "admin": {
      "permissions": ["*"]
    },
    "user": {
      "permissions": [
        "read:assessments",
        "write:assessments",
        "read:reports"
      ]
    },
    "viewer": {
      "permissions": [
        "read:assessments",
        "read:reports"
      ]
    }
  }
}

Permission Checks

def require_permission(permission: str):
    """Decorator to check permission"""
    def decorator(func):
        async def wrapper(request, env):
            user = await get_user_from_request(request)
            if not user.has_permission(permission):
                return Response('Forbidden', status=403)
            return await func(request, env)
        return wrapper
    return decorator

@require_permission('write:assessments')
async def create_assessment(request, env):
    # Handler code
    pass

Data Security

Encryption at Rest

  • D1 Databases: Encrypted by default (AES-256)
  • R2 Storage: Server-side encryption (SSE-S3)
  • KV Storage: Encrypted at rest

Encryption in Transit

  • TLS 1.3: All HTTPS connections
  • Certificate Management: Automated via Cloudflare
  • HSTS: HTTP Strict Transport Security enabled
Strict-Transport-Security: max-age=31536000; includeSubDomains; preload

Sensitive Data Handling

from cryptography.fernet import Fernet

class SecretManager:
    """Handle sensitive data encryption"""

    def __init__(self, key):
        self.cipher = Fernet(key)

    def encrypt(self, data: str) -> str:
        """Encrypt sensitive data"""
        return self.cipher.encrypt(data.encode()).decode()

    def decrypt(self, encrypted: str) -> str:
        """Decrypt sensitive data"""
        return self.cipher.decrypt(encrypted.encode()).decode()

# Never log sensitive data
logger.info(f"User: {user.email}")  # OK
logger.info(f"API Key: {api_key}")  # ❌ NEVER DO THIS
logger.info(f"API Key: ***")         # ✅ OK

Tenant Isolation

Database Isolation

Each tenant has dedicated D1 database:

async def get_tenant_database(tenant_id, env):
    """Get tenant-specific database"""
    db_id = f"tenant-{tenant_id}-db"
    return env.get_database(db_id)

async def query_with_isolation(tenant_id, query, params, env):
    """Execute query with tenant isolation"""
    db = await get_tenant_database(tenant_id, env)

    # Always include tenant_id in WHERE clause
    safe_query = f"{query} AND tenant_id = ?"
    return await db.execute(safe_query, [*params, tenant_id])

Storage Isolation

Separate R2 buckets per tenant:

def get_tenant_bucket(tenant_id, env):
    """Get tenant-specific R2 bucket"""
    bucket_name = f"firestorm-photos-{tenant_id}"
    return env.get_bucket(bucket_name)

async def upload_photo(tenant_id, photo_data, env):
    """Upload photo to tenant bucket"""
    bucket = get_tenant_bucket(tenant_id, env)

    # Ensure filename is scoped to tenant
    filename = f"{tenant_id}/{photo_id}/{file}"
    await bucket.put(filename, photo_data)

Request Validation

async def validate_tenant_access(request, env):
    """Validate user can access requested tenant"""

    # Extract tenant from header
    tenant_id = request.headers.get('X-Tenant-ID')
    if not tenant_id:
        raise UnauthorizedError("Missing tenant ID")

    # Validate tenant exists
    tenant = await get_tenant(tenant_id, env)
    if not tenant:
        raise ForbiddenError("Invalid tenant")

    # Validate user belongs to tenant
    user = await get_user_from_request(request)
    if user.tenant_id != tenant_id:
        raise ForbiddenError("Tenant mismatch")

    return tenant

Input Validation

Request Validation

from pydantic import BaseModel, validator

class AnalyzeRequest(BaseModel):
    """Validate analyze request"""
    latitude: float
    longitude: float
    radius_km: float

    @validator('latitude')
    def validate_latitude(cls, v):
        if not -90 <= v <= 90:
            raise ValueError('Latitude must be between -90 and 90')
        return v

    @validator('longitude')
    def validate_longitude(cls, v):
        if not -180 <= v <= 180:
            raise ValueError('Longitude must be between -180 and 180')
        return v

    @validator('radius_km')
    def validate_radius(cls, v):
        if not 0 < v <= 100:
            raise ValueError('Radius must be between 0 and 100 km')
        return v

async def handle_analyze(request, env):
    """Handle analyze request with validation"""
    try:
        data = await request.json()
        validated = AnalyzeRequest(**data)
        return await analyze(validated, env)
    except ValueError as e:
        return Response(str(e), status=400)

SQL Injection Prevention

Always use parameterized queries:

# ❌ VULNERABLE to SQL injection
query = f"SELECT * FROM users WHERE email = '{email}'"

# ✅ SAFE - parameterized query
query = "SELECT * FROM users WHERE email = ?"
result = await db.execute(query, [email])

XSS Prevention

Sanitize all user input displayed in HTML:

import html

def escape_html(text: str) -> str:
    """Escape HTML to prevent XSS"""
    return html.escape(text)

# In HTML generation
user_input = escape_html(request.form['comment'])
html_output = f"<div>{user_input}</div>"

Rate Limiting

Per-Tenant Rate Limits

async def check_rate_limit(tenant_id, endpoint, env):
    """Check rate limit for tenant endpoint"""

    # Get tenant config
    tenant = await get_tenant(tenant_id, env)
    limit = tenant.limits.api_rate_limit_per_minute

    # Check current usage
    key = f"rate:{tenant_id}:{endpoint}:{current_minute()}"
    current = int(await env.CACHE.get(key) or 0)

    if current >= limit:
        raise RateLimitError(
            f"Rate limit exceeded: {limit} requests per minute"
        )

    # Increment counter
    await env.CACHE.put(key, str(current + 1), expiration=60)

DDoS Protection

Cloudflare provides automatic DDoS protection:

  • Layer ¾ protection
  • Layer 7 (HTTP) protection
  • Rate limiting at edge
  • Challenge pages for suspicious traffic

CORS Security

def get_cors_headers(origin, env):
    """Get CORS headers with validation"""

    # Whitelist of allowed origins
    allowed_origins = [
        'https://firebreakrisk.com',
        'https://acme.firebreakrisk.com',
        'https://wildguard.firebreakrisk.com',
    ]

    if env.ENVIRONMENT == 'local':
        allowed_origins.append('http://localhost:5173')

    # Validate origin
    if origin not in allowed_origins:
        return {}

    return {
        'Access-Control-Allow-Origin': origin,
        'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS',
        'Access-Control-Allow-Headers': 'Content-Type, Authorization, X-Tenant-ID',
        'Access-Control-Max-Age': '86400',
    }

Secrets Management

Wrangler Secrets

# Set secrets (never commit to git)
wrangler secret put OPENAI_API_KEY --env production
wrangler secret put ANTHROPIC_API_KEY --env production

# List secrets (values are hidden)
wrangler secret list --env production

# Delete secret
wrangler secret delete OLD_KEY --env production

Environment Variables

{
  "vars": {
    "ENVIRONMENT": "production",
    "API_VERSION": "v1"
  }
  // Never put secrets in vars!
}

Secret Rotation

async def rotate_api_key(tenant_id, env):
    """Rotate tenant API key"""

    # Generate new key
    new_key = generate_secure_key()

    # Store new key
    await env.SECRETS.put(
        f"api_key:{tenant_id}",
        new_key,
        metadata={'created_at': timestamp()}
    )

    # Notify tenant
    await send_email(
        tenant.admin_email,
        "API key rotated",
        f"New API key: {new_key}"
    )

    # Keep old key valid for 24h grace period
    await schedule_key_deletion(old_key, delay_hours=24)

Audit Logging

Security Events

async def log_security_event(event_type, tenant_id, user_id, details, env):
    """Log security-relevant events"""

    await env.AUDIT_LOG.put({
        'timestamp': timestamp(),
        'event_type': event_type,
        'tenant_id': tenant_id,
        'user_id': user_id,
        'details': details,
        'ip_address': request.headers.get('CF-Connecting-IP'),
        'user_agent': request.headers.get('User-Agent'),
    })

# Log authentication
await log_security_event('auth_success', tenant_id, user_id, {}, env)
await log_security_event('auth_failure', tenant_id, None, {'reason': 'invalid_key'}, env)

# Log authorization
await log_security_event('access_denied', tenant_id, user_id, {'resource': '/admin'}, env)

Compliance Logging

Track all data access for compliance (GDPR, etc.):

async def log_data_access(resource_type, resource_id, action, user_id, env):
    """Log data access for compliance"""

    await env.COMPLIANCE_LOG.put({
        'timestamp': timestamp(),
        'resource_type': resource_type,
        'resource_id': resource_id,
        'action': action,
        'user_id': user_id,
    })

# Usage
await log_data_access('assessment', assessment_id, 'read', user_id, env)
await log_data_access('assessment', assessment_id, 'update', user_id, env)
await log_data_access('assessment', assessment_id, 'delete', user_id, env)

Vulnerability Management

Dependency Scanning

# Python dependencies
pip-audit

# JavaScript dependencies
npm audit

# Container scanning (if applicable)
trivy scan .

Security Headers

SECURITY_HEADERS = {
    'X-Content-Type-Options': 'nosniff',
    'X-Frame-Options': 'DENY',
    'X-XSS-Protection': '1; mode=block',
    'Referrer-Policy': 'strict-origin-when-cross-origin',
    'Permissions-Policy': 'geolocation=(), microphone=(), camera=()',
    'Content-Security-Policy': "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline';",
}

def add_security_headers(response):
    """Add security headers to response"""
    for header, value in SECURITY_HEADERS.items():
        response.headers[header] = value
    return response

Incident Response

Security Incident Workflow

  1. Detect: Monitoring alerts, user reports
  2. Contain: Disable affected API keys, block IPs
  3. Investigate: Review logs, assess impact
  4. Remediate: Patch vulnerability, rotate secrets
  5. Document: Write post-mortem, update runbooks

Emergency Procedures

async def emergency_lockdown(tenant_id, env):
    """Emergency lockdown for compromised tenant"""

    # Disable all API keys
    await disable_all_api_keys(tenant_id, env)

    # Revoke all active sessions
    await revoke_all_sessions(tenant_id, env)

    # Enable read-only mode
    await set_tenant_mode(tenant_id, 'read_only', env)

    # Notify admins
    await send_alert(
        'SECURITY: Emergency lockdown',
        f'Tenant {tenant_id} in lockdown mode'
    )

Compliance

GDPR Compliance

  • Right to Access: Users can export their data
  • Right to Deletion: Data can be permanently deleted
  • Data Portability: Export in JSON format
  • Consent Management: Explicit opt-in for data processing

Data Retention

# Retention policies
RETENTION_POLICIES = {
    'assessments': 365,        # 1 year
    'photos': 365,             # 1 year
    'audit_logs': 2555,        # 7 years
    'deleted_users': 90,       # 90 days
}

async def cleanup_old_data(tenant_id, env):
    """Clean up data past retention period"""
    for data_type, days in RETENTION_POLICIES.items():
        cutoff = datetime.now() - timedelta(days=days)
        await delete_data_before(data_type, cutoff, tenant_id, env)

Best Practices

1. Never Log Sensitive Data

# ❌ DON'T
logger.info(f"API Key: {api_key}")
logger.info(f"Password: {password}")

# ✅ DO
logger.info(f"API Key: {api_key[:8]}...")
logger.info("Password authentication successful")

2. Use Secure Defaults

# Default to most secure settings
DEFAULT_CONFIG = {
    'debug': False,
    'allow_http': False,
    'require_https': True,
    'strict_validation': True,
}

3. Validate All Input

# Validate everything from clients
data = await request.json()
validated = validate_schema(data, AnalyzeRequest)
sanitized = sanitize_input(validated)

4. Principle of Least Privilege

# Only grant minimal required permissions
user_permissions = ['read:own_assessments']  # Not 'read:all_assessments'

Security Resources

See Also