77 lines
2.4 KiB
Python
77 lines
2.4 KiB
Python
"""
|
|
OIDC User Management Service
|
|
"""
|
|
from typing import Dict, Any, Optional
|
|
from uuid import uuid4
|
|
from .models import User
|
|
|
|
|
|
class OIDCUserService:
|
|
"""Service for managing OIDC user authentication and provisioning"""
|
|
|
|
@staticmethod
|
|
async def get_or_create_user_from_oidc(claims: Dict[str, Any]) -> User:
|
|
"""
|
|
Get existing user by OIDC subject, or create new user from OIDC claims
|
|
|
|
Args:
|
|
claims: Decoded OIDC ID token claims
|
|
|
|
Returns:
|
|
User object (existing or newly created)
|
|
"""
|
|
oidc_subject = claims.get("sub")
|
|
if not oidc_subject:
|
|
raise ValueError("No 'sub' claim in ID token")
|
|
|
|
# Try to find existing user by OIDC subject
|
|
user = await User.filter(oidc_subject=oidc_subject).first()
|
|
|
|
if user:
|
|
# Update user info from latest claims (optional)
|
|
user.email = claims.get("email", user.email)
|
|
user.username = (
|
|
claims.get("preferred_username")
|
|
or claims.get("name")
|
|
or user.username
|
|
)
|
|
await user.save()
|
|
return user
|
|
|
|
# Check if user exists by email (migration case)
|
|
email = claims.get("email")
|
|
if email:
|
|
user = await User.filter(email=email, auth_provider="local").first()
|
|
if user:
|
|
# Migrate existing local user to OIDC
|
|
user.oidc_subject = oidc_subject
|
|
user.auth_provider = "oidc"
|
|
user.password = None # Clear password
|
|
await user.save()
|
|
return user
|
|
|
|
# Create new user from OIDC claims
|
|
username = (
|
|
claims.get("preferred_username")
|
|
or claims.get("name")
|
|
or claims.get("email", "").split("@")[0]
|
|
or f"user_{oidc_subject[:8]}"
|
|
)
|
|
|
|
user = await User.create(
|
|
id=uuid4(),
|
|
username=username,
|
|
email=email
|
|
or f"{oidc_subject}@oidc.local", # Fallback if no email claim
|
|
oidc_subject=oidc_subject,
|
|
auth_provider="oidc",
|
|
password=None,
|
|
)
|
|
|
|
return user
|
|
|
|
@staticmethod
|
|
async def find_user_by_oidc_subject(oidc_subject: str) -> Optional[User]:
|
|
"""Find user by OIDC subject ID"""
|
|
return await User.filter(oidc_subject=oidc_subject).first()
|