Security Model¶
FireBreak implements defense-in-depth security across all layers of the platform.
Security Principles¶
- Zero Trust Architecture: Never trust, always verify
- Least Privilege: Minimal access required for operation
- Defense in Depth: Multiple layers of security controls
- Data Encryption: Encrypt data at rest and in transit
- Tenant Isolation: Complete separation between tenants
Authentication & Authorization¶
API Authentication¶
API Key Authentication¶
API keys: - Generated per tenant - Scoped to specific permissions - Rotated every 90 days - Revocable instantly
JWT Token Authentication (Future)¶
// Token structure
{
"sub": "user@acme-insurance.com",
"tenant_id": "acme-insurance",
"role": "admin",
"exp": 1735689600,
"permissions": ["read:assessments", "write:assessments"]
}
Authorization Model¶
Role-Based Access Control (RBAC)¶
{
"roles": {
"admin": {
"permissions": ["*"]
},
"user": {
"permissions": [
"read:assessments",
"write:assessments",
"read:reports"
]
},
"viewer": {
"permissions": [
"read:assessments",
"read:reports"
]
}
}
}
Permission Checks¶
def require_permission(permission: str):
"""Decorator to check permission"""
def decorator(func):
async def wrapper(request, env):
user = await get_user_from_request(request)
if not user.has_permission(permission):
return Response('Forbidden', status=403)
return await func(request, env)
return wrapper
return decorator
@require_permission('write:assessments')
async def create_assessment(request, env):
# Handler code
pass
Data Security¶
Encryption at Rest¶
- D1 Databases: Encrypted by default (AES-256)
- R2 Storage: Server-side encryption (SSE-S3)
- KV Storage: Encrypted at rest
Encryption in Transit¶
- TLS 1.3: All HTTPS connections
- Certificate Management: Automated via Cloudflare
- HSTS: HTTP Strict Transport Security enabled
Sensitive Data Handling¶
from cryptography.fernet import Fernet
class SecretManager:
"""Handle sensitive data encryption"""
def __init__(self, key):
self.cipher = Fernet(key)
def encrypt(self, data: str) -> str:
"""Encrypt sensitive data"""
return self.cipher.encrypt(data.encode()).decode()
def decrypt(self, encrypted: str) -> str:
"""Decrypt sensitive data"""
return self.cipher.decrypt(encrypted.encode()).decode()
# Never log sensitive data
logger.info(f"User: {user.email}") # OK
logger.info(f"API Key: {api_key}") # ❌ NEVER DO THIS
logger.info(f"API Key: ***") # ✅ OK
Tenant Isolation¶
Database Isolation¶
Each tenant has dedicated D1 database:
async def get_tenant_database(tenant_id, env):
"""Get tenant-specific database"""
db_id = f"tenant-{tenant_id}-db"
return env.get_database(db_id)
async def query_with_isolation(tenant_id, query, params, env):
"""Execute query with tenant isolation"""
db = await get_tenant_database(tenant_id, env)
# Always include tenant_id in WHERE clause
safe_query = f"{query} AND tenant_id = ?"
return await db.execute(safe_query, [*params, tenant_id])
Storage Isolation¶
Separate R2 buckets per tenant:
def get_tenant_bucket(tenant_id, env):
"""Get tenant-specific R2 bucket"""
bucket_name = f"firestorm-photos-{tenant_id}"
return env.get_bucket(bucket_name)
async def upload_photo(tenant_id, photo_data, env):
"""Upload photo to tenant bucket"""
bucket = get_tenant_bucket(tenant_id, env)
# Ensure filename is scoped to tenant
filename = f"{tenant_id}/{photo_id}/{file}"
await bucket.put(filename, photo_data)
Request Validation¶
async def validate_tenant_access(request, env):
"""Validate user can access requested tenant"""
# Extract tenant from header
tenant_id = request.headers.get('X-Tenant-ID')
if not tenant_id:
raise UnauthorizedError("Missing tenant ID")
# Validate tenant exists
tenant = await get_tenant(tenant_id, env)
if not tenant:
raise ForbiddenError("Invalid tenant")
# Validate user belongs to tenant
user = await get_user_from_request(request)
if user.tenant_id != tenant_id:
raise ForbiddenError("Tenant mismatch")
return tenant
Input Validation¶
Request Validation¶
from pydantic import BaseModel, validator
class AnalyzeRequest(BaseModel):
"""Validate analyze request"""
latitude: float
longitude: float
radius_km: float
@validator('latitude')
def validate_latitude(cls, v):
if not -90 <= v <= 90:
raise ValueError('Latitude must be between -90 and 90')
return v
@validator('longitude')
def validate_longitude(cls, v):
if not -180 <= v <= 180:
raise ValueError('Longitude must be between -180 and 180')
return v
@validator('radius_km')
def validate_radius(cls, v):
if not 0 < v <= 100:
raise ValueError('Radius must be between 0 and 100 km')
return v
async def handle_analyze(request, env):
"""Handle analyze request with validation"""
try:
data = await request.json()
validated = AnalyzeRequest(**data)
return await analyze(validated, env)
except ValueError as e:
return Response(str(e), status=400)
SQL Injection Prevention¶
Always use parameterized queries:
# ❌ VULNERABLE to SQL injection
query = f"SELECT * FROM users WHERE email = '{email}'"
# ✅ SAFE - parameterized query
query = "SELECT * FROM users WHERE email = ?"
result = await db.execute(query, [email])
XSS Prevention¶
Sanitize all user input displayed in HTML:
import html
def escape_html(text: str) -> str:
"""Escape HTML to prevent XSS"""
return html.escape(text)
# In HTML generation
user_input = escape_html(request.form['comment'])
html_output = f"<div>{user_input}</div>"
Rate Limiting¶
Per-Tenant Rate Limits¶
async def check_rate_limit(tenant_id, endpoint, env):
"""Check rate limit for tenant endpoint"""
# Get tenant config
tenant = await get_tenant(tenant_id, env)
limit = tenant.limits.api_rate_limit_per_minute
# Check current usage
key = f"rate:{tenant_id}:{endpoint}:{current_minute()}"
current = int(await env.CACHE.get(key) or 0)
if current >= limit:
raise RateLimitError(
f"Rate limit exceeded: {limit} requests per minute"
)
# Increment counter
await env.CACHE.put(key, str(current + 1), expiration=60)
DDoS Protection¶
Cloudflare provides automatic DDoS protection:
- Layer ¾ protection
- Layer 7 (HTTP) protection
- Rate limiting at edge
- Challenge pages for suspicious traffic
CORS Security¶
def get_cors_headers(origin, env):
"""Get CORS headers with validation"""
# Whitelist of allowed origins
allowed_origins = [
'https://firebreakrisk.com',
'https://acme.firebreakrisk.com',
'https://wildguard.firebreakrisk.com',
]
if env.ENVIRONMENT == 'local':
allowed_origins.append('http://localhost:5173')
# Validate origin
if origin not in allowed_origins:
return {}
return {
'Access-Control-Allow-Origin': origin,
'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type, Authorization, X-Tenant-ID',
'Access-Control-Max-Age': '86400',
}
Secrets Management¶
Wrangler Secrets¶
# Set secrets (never commit to git)
wrangler secret put OPENAI_API_KEY --env production
wrangler secret put ANTHROPIC_API_KEY --env production
# List secrets (values are hidden)
wrangler secret list --env production
# Delete secret
wrangler secret delete OLD_KEY --env production
Environment Variables¶
Secret Rotation¶
async def rotate_api_key(tenant_id, env):
"""Rotate tenant API key"""
# Generate new key
new_key = generate_secure_key()
# Store new key
await env.SECRETS.put(
f"api_key:{tenant_id}",
new_key,
metadata={'created_at': timestamp()}
)
# Notify tenant
await send_email(
tenant.admin_email,
"API key rotated",
f"New API key: {new_key}"
)
# Keep old key valid for 24h grace period
await schedule_key_deletion(old_key, delay_hours=24)
Audit Logging¶
Security Events¶
async def log_security_event(event_type, tenant_id, user_id, details, env):
"""Log security-relevant events"""
await env.AUDIT_LOG.put({
'timestamp': timestamp(),
'event_type': event_type,
'tenant_id': tenant_id,
'user_id': user_id,
'details': details,
'ip_address': request.headers.get('CF-Connecting-IP'),
'user_agent': request.headers.get('User-Agent'),
})
# Log authentication
await log_security_event('auth_success', tenant_id, user_id, {}, env)
await log_security_event('auth_failure', tenant_id, None, {'reason': 'invalid_key'}, env)
# Log authorization
await log_security_event('access_denied', tenant_id, user_id, {'resource': '/admin'}, env)
Compliance Logging¶
Track all data access for compliance (GDPR, etc.):
async def log_data_access(resource_type, resource_id, action, user_id, env):
"""Log data access for compliance"""
await env.COMPLIANCE_LOG.put({
'timestamp': timestamp(),
'resource_type': resource_type,
'resource_id': resource_id,
'action': action,
'user_id': user_id,
})
# Usage
await log_data_access('assessment', assessment_id, 'read', user_id, env)
await log_data_access('assessment', assessment_id, 'update', user_id, env)
await log_data_access('assessment', assessment_id, 'delete', user_id, env)
Vulnerability Management¶
Dependency Scanning¶
# Python dependencies
pip-audit
# JavaScript dependencies
npm audit
# Container scanning (if applicable)
trivy scan .
Security Headers¶
SECURITY_HEADERS = {
'X-Content-Type-Options': 'nosniff',
'X-Frame-Options': 'DENY',
'X-XSS-Protection': '1; mode=block',
'Referrer-Policy': 'strict-origin-when-cross-origin',
'Permissions-Policy': 'geolocation=(), microphone=(), camera=()',
'Content-Security-Policy': "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline';",
}
def add_security_headers(response):
"""Add security headers to response"""
for header, value in SECURITY_HEADERS.items():
response.headers[header] = value
return response
Incident Response¶
Security Incident Workflow¶
- Detect: Monitoring alerts, user reports
- Contain: Disable affected API keys, block IPs
- Investigate: Review logs, assess impact
- Remediate: Patch vulnerability, rotate secrets
- Document: Write post-mortem, update runbooks
Emergency Procedures¶
async def emergency_lockdown(tenant_id, env):
"""Emergency lockdown for compromised tenant"""
# Disable all API keys
await disable_all_api_keys(tenant_id, env)
# Revoke all active sessions
await revoke_all_sessions(tenant_id, env)
# Enable read-only mode
await set_tenant_mode(tenant_id, 'read_only', env)
# Notify admins
await send_alert(
'SECURITY: Emergency lockdown',
f'Tenant {tenant_id} in lockdown mode'
)
Compliance¶
GDPR Compliance¶
- Right to Access: Users can export their data
- Right to Deletion: Data can be permanently deleted
- Data Portability: Export in JSON format
- Consent Management: Explicit opt-in for data processing
Data Retention¶
# Retention policies
RETENTION_POLICIES = {
'assessments': 365, # 1 year
'photos': 365, # 1 year
'audit_logs': 2555, # 7 years
'deleted_users': 90, # 90 days
}
async def cleanup_old_data(tenant_id, env):
"""Clean up data past retention period"""
for data_type, days in RETENTION_POLICIES.items():
cutoff = datetime.now() - timedelta(days=days)
await delete_data_before(data_type, cutoff, tenant_id, env)
Best Practices¶
1. Never Log Sensitive Data¶
# ❌ DON'T
logger.info(f"API Key: {api_key}")
logger.info(f"Password: {password}")
# ✅ DO
logger.info(f"API Key: {api_key[:8]}...")
logger.info("Password authentication successful")
2. Use Secure Defaults¶
# Default to most secure settings
DEFAULT_CONFIG = {
'debug': False,
'allow_http': False,
'require_https': True,
'strict_validation': True,
}
3. Validate All Input¶
# Validate everything from clients
data = await request.json()
validated = validate_schema(data, AnalyzeRequest)
sanitized = sanitize_input(validated)
4. Principle of Least Privilege¶
# Only grant minimal required permissions
user_permissions = ['read:own_assessments'] # Not 'read:all_assessments'