Files
simbarag/blueprints/email/crypto_service.py
Ryan Chen bee63d1c60 feat(01-01): create email blueprint with encrypted Tortoise ORM models
- Add EncryptedTextField for transparent Fernet encryption
- Create EmailAccount model with encrypted IMAP credentials
- Create EmailSyncStatus model for sync state tracking
- Create Email model with 30-day retention logic
- Follow existing blueprint patterns from users/conversation
2026-02-08 09:08:32 -05:00

69 lines
2.2 KiB
Python

"""
Encryption service for email credentials.
Provides transparent Fernet encryption for sensitive fields in the database.
"""
import os
from cryptography.fernet import Fernet
from tortoise import fields
class EncryptedTextField(fields.TextField):
"""
Custom Tortoise ORM field that transparently encrypts/decrypts text values.
Uses Fernet symmetric encryption with a key from FERNET_KEY environment variable.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Load encryption key from environment
key = os.getenv("FERNET_KEY")
if not key:
raise ValueError(
"FERNET_KEY environment variable required for encrypted fields. "
'Generate with: python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"'
)
try:
self.fernet = Fernet(key.encode())
except Exception as e:
raise ValueError(f"Invalid FERNET_KEY format: {e}")
def to_db_value(self, value: str, instance) -> str:
"""Encrypt value before storing in database."""
if value is None:
return None
# Encrypt and return as URL-safe base64 string
return self.fernet.encrypt(value.encode()).decode()
def to_python_value(self, value: str) -> str:
"""Decrypt value when loading from database."""
if value is None:
return None
# Decrypt Fernet token
return self.fernet.decrypt(value.encode()).decode()
def validate_fernet_key():
"""
Validate that FERNET_KEY is set and functional.
Raises:
ValueError: If key is missing or invalid
"""
key = os.getenv("FERNET_KEY")
if not key:
raise ValueError("FERNET_KEY environment variable not set")
try:
f = Fernet(key.encode())
# Test encryption/decryption cycle
test_value = b"test_encryption"
encrypted = f.encrypt(test_value)
decrypted = f.decrypt(encrypted)
if decrypted != test_value:
raise ValueError("Encryption/decryption test failed")
except Exception as e:
raise ValueError(f"FERNET_KEY validation failed: {e}")