from functools import wraps from flask import request, jsonify, current_app, g from authlib.jose import jwt, JoseError, JsonWebKey import requests from datetime import datetime from backend.models import db, User # Simple in-memory cache for JWKS (in production, use Redis with TTL) _jwks_cache = {'data': None, 'timestamp': None} def get_jwks(): """Fetch JWKS from Authelia (with basic caching)""" from datetime import timedelta # Check cache (24-hour TTL) if _jwks_cache['data'] and _jwks_cache['timestamp']: if datetime.utcnow() - _jwks_cache['timestamp'] < timedelta(hours=24): return _jwks_cache['data'] # Fetch JWKS jwks_uri = current_app.config.get('OIDC_JWKS_URI') if not jwks_uri: # Fetch from discovery document issuer = current_app.config['OIDC_ISSUER'] discovery_url = f"{issuer}/.well-known/openid-configuration" discovery = requests.get(discovery_url, timeout=10).json() jwks_uri = discovery['jwks_uri'] jwks_data = requests.get(jwks_uri, timeout=10).json() # Update cache _jwks_cache['data'] = jwks_data _jwks_cache['timestamp'] = datetime.utcnow() return jwks_data def validate_jwt(token): """Validate JWT token and return claims""" if not token: return None try: jwks = get_jwks() # Decode and validate JWT claims = jwt.decode( token, jwks, claims_options={ 'iss': {'essential': True, 'value': current_app.config['OIDC_ISSUER']}, 'aud': {'essential': True, 'values': [current_app.config['OIDC_AUDIENCE']]}, } ) claims.validate() return claims except JoseError as e: current_app.logger.error(f"JWT validation failed: {e}") return None except Exception as e: current_app.logger.error(f"Unexpected error during JWT validation: {e}") return None def require_auth(f): """Decorator to require authentication on any route""" @wraps(f) def decorated_function(*args, **kwargs): auth_header = request.headers.get('Authorization') if not auth_header or not auth_header.startswith('Bearer '): return jsonify({'error': 'Missing or invalid Authorization header'}), 401 token = auth_header.split(' ')[1] claims = validate_jwt(token) if not claims: return jsonify({'error': 'Invalid or expired token'}), 401 # Get or create user from claims user = User.query.filter_by(authelia_sub=claims['sub']).first() if not user: # Auto-create user on first login groups_claim = current_app.config.get('OIDC_GROUPS_CLAIM', 'groups') user = User( authelia_sub=claims['sub'], email=claims.get('email'), name=claims.get('name'), preferred_username=claims.get('preferred_username'), groups=claims.get(groups_claim, []) ) db.session.add(user) db.session.commit() else: # Update user info from latest token groups_claim = current_app.config.get('OIDC_GROUPS_CLAIM', 'groups') user.email = claims.get('email') user.name = claims.get('name') user.preferred_username = claims.get('preferred_username') user.groups = claims.get(groups_claim, []) user.last_login = datetime.utcnow() db.session.commit() # Check if user is active if not user.is_active: return jsonify({'error': 'User account is disabled'}), 403 # Store user in Flask's g object for access in route handlers g.current_user = user return f(*args, **kwargs) return decorated_function def require_admin(f): """Decorator to require admin role (must be used WITH @require_auth)""" @wraps(f) def decorated_function(*args, **kwargs): if not hasattr(g, 'current_user'): return jsonify({'error': 'Authentication required'}), 401 if not g.current_user.is_admin: return jsonify({'error': 'Admin access required'}), 403 return f(*args, **kwargs) return decorated_function