Users can now receive a unique email address (ask+<token>@domain) and interact with Simba by sending emails. Inbound emails hit a Mailgun webhook, are authenticated via HMAC token lookup, processed through the LangChain agent, and replied to via the Mailgun API. - Extract shared SIMBA_SYSTEM_PROMPT to blueprints/conversation/prompts.py - Add email_enabled and email_hmac_token fields to User model - Create blueprints/email with webhook, signature validation, rate limiting - Add admin endpoints to enable/disable email per user - Update AdminPanel with Email column, toggle, and copy-address button - Add Mailgun env vars to .env.example - Include database migration for new fields Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
52 lines
1.7 KiB
Python
52 lines
1.7 KiB
Python
from tortoise.models import Model
|
|
from tortoise import fields
|
|
|
|
|
|
import bcrypt
|
|
|
|
|
|
class User(Model):
|
|
id = fields.UUIDField(primary_key=True)
|
|
username = fields.CharField(max_length=255)
|
|
password = fields.BinaryField(null=True) # Hashed - nullable for OIDC users
|
|
email = fields.CharField(max_length=100, unique=True)
|
|
whatsapp_number = fields.CharField(max_length=30, unique=True, null=True, index=True)
|
|
|
|
# Email channel fields
|
|
email_enabled = fields.BooleanField(default=False)
|
|
email_hmac_token = fields.CharField(max_length=16, unique=True, null=True, index=True)
|
|
|
|
# OIDC fields
|
|
oidc_subject = fields.CharField(
|
|
max_length=255, unique=True, null=True, index=True
|
|
) # "sub" claim from OIDC
|
|
auth_provider = fields.CharField(
|
|
max_length=50, default="local"
|
|
) # "local" or "oidc"
|
|
ldap_groups = fields.JSONField(default=[]) # LDAP groups from OIDC claims
|
|
|
|
created_at = fields.DatetimeField(auto_now_add=True)
|
|
updated_at = fields.DatetimeField(auto_now=True)
|
|
|
|
class Meta:
|
|
table = "users"
|
|
|
|
def has_group(self, group: str) -> bool:
|
|
"""Check if user belongs to a specific LDAP group."""
|
|
return group in (self.ldap_groups or [])
|
|
|
|
def is_admin(self) -> bool:
|
|
"""Check if user is an admin (member of lldap_admin group)."""
|
|
return self.has_group("lldap_admin")
|
|
|
|
def set_password(self, plain_password: str):
|
|
self.password = bcrypt.hashpw(
|
|
plain_password.encode("utf-8"),
|
|
bcrypt.gensalt(),
|
|
)
|
|
|
|
def verify_password(self, plain_password: str):
|
|
if not self.password:
|
|
return False
|
|
return bcrypt.checkpw(plain_password.encode("utf-8"), self.password)
|