Adding mkdocs and privileged tools
This commit is contained in:
@@ -60,7 +60,7 @@ async def oidc_login():
|
||||
"client_id": oidc_config.client_id,
|
||||
"response_type": "code",
|
||||
"redirect_uri": oidc_config.redirect_uri,
|
||||
"scope": "openid email profile",
|
||||
"scope": "openid email profile groups",
|
||||
"state": state,
|
||||
"code_challenge": code_challenge,
|
||||
"code_challenge_method": "S256",
|
||||
@@ -115,7 +115,9 @@ async def oidc_callback():
|
||||
token_response = await client.post(token_endpoint, data=token_data)
|
||||
|
||||
if token_response.status_code != 200:
|
||||
return jsonify({"error": f"Failed to exchange code for token: {token_response.text}"}), 400
|
||||
return jsonify(
|
||||
{"error": f"Failed to exchange code for token: {token_response.text}"}
|
||||
), 400
|
||||
|
||||
tokens = token_response.json()
|
||||
|
||||
@@ -141,7 +143,13 @@ async def oidc_callback():
|
||||
return jsonify(
|
||||
access_token=access_token,
|
||||
refresh_token=refresh_token,
|
||||
user={"id": str(user.id), "username": user.username, "email": user.email},
|
||||
user={
|
||||
"id": str(user.id),
|
||||
"username": user.username,
|
||||
"email": user.email,
|
||||
"groups": user.ldap_groups,
|
||||
"is_admin": user.is_admin(),
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
|
||||
26
services/raggr/blueprints/users/decorators.py
Normal file
26
services/raggr/blueprints/users/decorators.py
Normal file
@@ -0,0 +1,26 @@
|
||||
"""
|
||||
Authentication decorators for role-based access control.
|
||||
"""
|
||||
|
||||
from functools import wraps
|
||||
from quart import jsonify
|
||||
from quart_jwt_extended import jwt_refresh_token_required, get_jwt_identity
|
||||
from .models import User
|
||||
|
||||
|
||||
def admin_required(fn):
|
||||
"""
|
||||
Decorator that requires the user to be an admin (member of lldap_admin group).
|
||||
Must be used on async route handlers.
|
||||
"""
|
||||
|
||||
@wraps(fn)
|
||||
@jwt_refresh_token_required
|
||||
async def wrapper(*args, **kwargs):
|
||||
user_id = get_jwt_identity()
|
||||
user = await User.get_or_none(id=user_id)
|
||||
if not user or not user.is_admin():
|
||||
return jsonify({"error": "Admin access required"}), 403
|
||||
return await fn(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
@@ -12,8 +12,13 @@ class User(Model):
|
||||
email = fields.CharField(max_length=100, unique=True)
|
||||
|
||||
# OIDC fields
|
||||
oidc_subject = fields.CharField(max_length=255, unique=True, null=True, index=True) # "sub" claim from OIDC
|
||||
auth_provider = fields.CharField(max_length=50, default="local") # "local" or "oidc"
|
||||
oidc_subject = fields.CharField(
|
||||
max_length=255, unique=True, null=True, index=True
|
||||
) # "sub" claim from OIDC
|
||||
auth_provider = fields.CharField(
|
||||
max_length=50, default="local"
|
||||
) # "local" or "oidc"
|
||||
ldap_groups = fields.JSONField(default=[]) # LDAP groups from OIDC claims
|
||||
|
||||
created_at = fields.DatetimeField(auto_now_add=True)
|
||||
updated_at = fields.DatetimeField(auto_now=True)
|
||||
@@ -21,6 +26,14 @@ class User(Model):
|
||||
class Meta:
|
||||
table = "users"
|
||||
|
||||
def has_group(self, group: str) -> bool:
|
||||
"""Check if user belongs to a specific LDAP group."""
|
||||
return group in (self.ldap_groups or [])
|
||||
|
||||
def is_admin(self) -> bool:
|
||||
"""Check if user is an admin (member of lldap_admin group)."""
|
||||
return self.has_group("lldap_admin")
|
||||
|
||||
def set_password(self, plain_password: str):
|
||||
self.password = bcrypt.hashpw(
|
||||
plain_password.encode("utf-8"),
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
"""
|
||||
OIDC User Management Service
|
||||
"""
|
||||
|
||||
from typing import Dict, Any, Optional
|
||||
from uuid import uuid4
|
||||
from .models import User
|
||||
@@ -31,10 +32,10 @@ class OIDCUserService:
|
||||
# Update user info from latest claims (optional)
|
||||
user.email = claims.get("email", user.email)
|
||||
user.username = (
|
||||
claims.get("preferred_username")
|
||||
or claims.get("name")
|
||||
or user.username
|
||||
claims.get("preferred_username") or claims.get("name") or user.username
|
||||
)
|
||||
# Update LDAP groups from claims
|
||||
user.ldap_groups = claims.get("groups", [])
|
||||
await user.save()
|
||||
return user
|
||||
|
||||
@@ -47,6 +48,7 @@ class OIDCUserService:
|
||||
user.oidc_subject = oidc_subject
|
||||
user.auth_provider = "oidc"
|
||||
user.password = None # Clear password
|
||||
user.ldap_groups = claims.get("groups", [])
|
||||
await user.save()
|
||||
return user
|
||||
|
||||
@@ -58,14 +60,17 @@ class OIDCUserService:
|
||||
or f"user_{oidc_subject[:8]}"
|
||||
)
|
||||
|
||||
# Extract LDAP groups from claims
|
||||
groups = claims.get("groups", [])
|
||||
|
||||
user = await User.create(
|
||||
id=uuid4(),
|
||||
username=username,
|
||||
email=email
|
||||
or f"{oidc_subject}@oidc.local", # Fallback if no email claim
|
||||
email=email or f"{oidc_subject}@oidc.local", # Fallback if no email claim
|
||||
oidc_subject=oidc_subject,
|
||||
auth_provider="oidc",
|
||||
password=None,
|
||||
ldap_groups=groups,
|
||||
)
|
||||
|
||||
return user
|
||||
|
||||
Reference in New Issue
Block a user