132 lines
4.2 KiB
Python
132 lines
4.2 KiB
Python
from functools import wraps
|
|
from flask import request, jsonify, current_app, g
|
|
from authlib.jose import jwt, JoseError, JsonWebKey
|
|
import requests
|
|
from datetime import datetime
|
|
from backend.models import db, User
|
|
|
|
|
|
# Simple in-memory cache for JWKS (in production, use Redis with TTL)
|
|
_jwks_cache = {'data': None, 'timestamp': None}
|
|
|
|
|
|
def get_jwks():
|
|
"""Fetch JWKS from Authelia (with basic caching)"""
|
|
from datetime import timedelta
|
|
|
|
# Check cache (24-hour TTL)
|
|
if _jwks_cache['data'] and _jwks_cache['timestamp']:
|
|
if datetime.utcnow() - _jwks_cache['timestamp'] < timedelta(hours=24):
|
|
return _jwks_cache['data']
|
|
|
|
# Fetch JWKS
|
|
jwks_uri = current_app.config.get('OIDC_JWKS_URI')
|
|
if not jwks_uri:
|
|
# Fetch from discovery document
|
|
issuer = current_app.config['OIDC_ISSUER']
|
|
discovery_url = f"{issuer}/.well-known/openid-configuration"
|
|
discovery = requests.get(discovery_url, timeout=10).json()
|
|
jwks_uri = discovery['jwks_uri']
|
|
|
|
jwks_data = requests.get(jwks_uri, timeout=10).json()
|
|
|
|
# Update cache
|
|
_jwks_cache['data'] = jwks_data
|
|
_jwks_cache['timestamp'] = datetime.utcnow()
|
|
|
|
return jwks_data
|
|
|
|
|
|
def validate_jwt(token):
|
|
"""Validate JWT token and return claims"""
|
|
if not token:
|
|
return None
|
|
|
|
try:
|
|
jwks = get_jwks()
|
|
|
|
# Decode and validate JWT
|
|
claims = jwt.decode(
|
|
token,
|
|
jwks,
|
|
claims_options={
|
|
'iss': {'essential': True, 'value': current_app.config['OIDC_ISSUER']},
|
|
'aud': {'essential': True, 'values': [current_app.config['OIDC_AUDIENCE']]},
|
|
}
|
|
)
|
|
claims.validate()
|
|
return claims
|
|
|
|
except JoseError as e:
|
|
current_app.logger.error(f"JWT validation failed: {e}")
|
|
return None
|
|
except Exception as e:
|
|
current_app.logger.error(f"Unexpected error during JWT validation: {e}")
|
|
return None
|
|
|
|
|
|
def require_auth(f):
|
|
"""Decorator to require authentication on any route"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
auth_header = request.headers.get('Authorization')
|
|
|
|
if not auth_header or not auth_header.startswith('Bearer '):
|
|
return jsonify({'error': 'Missing or invalid Authorization header'}), 401
|
|
|
|
token = auth_header.split(' ')[1]
|
|
claims = validate_jwt(token)
|
|
|
|
if not claims:
|
|
return jsonify({'error': 'Invalid or expired token'}), 401
|
|
|
|
# Get or create user from claims
|
|
user = User.query.filter_by(authelia_sub=claims['sub']).first()
|
|
if not user:
|
|
# Auto-create user on first login
|
|
groups_claim = current_app.config.get('OIDC_GROUPS_CLAIM', 'groups')
|
|
user = User(
|
|
authelia_sub=claims['sub'],
|
|
email=claims.get('email'),
|
|
name=claims.get('name'),
|
|
preferred_username=claims.get('preferred_username'),
|
|
groups=claims.get(groups_claim, [])
|
|
)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
else:
|
|
# Update user info from latest token
|
|
groups_claim = current_app.config.get('OIDC_GROUPS_CLAIM', 'groups')
|
|
user.email = claims.get('email')
|
|
user.name = claims.get('name')
|
|
user.preferred_username = claims.get('preferred_username')
|
|
user.groups = claims.get(groups_claim, [])
|
|
user.last_login = datetime.utcnow()
|
|
db.session.commit()
|
|
|
|
# Check if user is active
|
|
if not user.is_active:
|
|
return jsonify({'error': 'User account is disabled'}), 403
|
|
|
|
# Store user in Flask's g object for access in route handlers
|
|
g.current_user = user
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
|
|
def require_admin(f):
|
|
"""Decorator to require admin role (must be used WITH @require_auth)"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if not hasattr(g, 'current_user'):
|
|
return jsonify({'error': 'Authentication required'}), 401
|
|
|
|
if not g.current_user.is_admin:
|
|
return jsonify({'error': 'Admin access required'}), 403
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|