Files
triviathang/backend/routes/auth.py
Ryan Chen 69992f1be9 Add question ownership and sharing
Questions now have a created_by field linking to the user who created them.
Users only see questions they own or that have been shared with them.
Includes share dialog, user search, bulk sharing, and export/import
respects ownership.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-03 09:43:04 -04:00

156 lines
4.9 KiB
Python

from flask import Blueprint, request, jsonify, redirect, make_response, current_app, g
from backend.auth import oauth
from backend.auth.middleware import require_auth
from backend.models import db, User
from datetime import datetime
bp = Blueprint('auth', __name__, url_prefix='/api/auth')
@bp.route('/login')
def login():
"""Redirect to Authelia login page"""
redirect_uri = current_app.config['OIDC_REDIRECT_URI']
return oauth.authelia.authorize_redirect(redirect_uri)
@bp.route('/callback')
def callback():
"""Handle OIDC callback from Authelia"""
try:
# Exchange authorization code for tokens
token = oauth.authelia.authorize_access_token()
# Parse ID token to get user info
user_info = token.get('userinfo')
if not user_info:
user_info = oauth.authelia.parse_id_token(token)
# Get or create user
user = User.query.filter_by(authelia_sub=user_info['sub']).first()
if not user:
user = User(
authelia_sub=user_info['sub'],
email=user_info.get('email'),
name=user_info.get('name'),
preferred_username=user_info.get('preferred_username'),
groups=user_info.get('groups', [])
)
db.session.add(user)
else:
user.email = user_info.get('email')
user.name = user_info.get('name')
user.preferred_username = user_info.get('preferred_username')
user.groups = user_info.get('groups', [])
user.last_login = datetime.utcnow()
db.session.commit()
# Redirect to frontend with tokens in URL fragment (SPA pattern)
frontend_url = current_app.config.get('FRONTEND_URL', 'http://localhost:3000')
# Create response with refresh token in HTTP-only cookie
response = make_response(redirect(
f"{frontend_url}/auth/callback#access_token={token['access_token']}"
f"&id_token={token['id_token']}"
f"&expires_in={token.get('expires_in', 900)}"
))
# Set refresh token as HTTP-only cookie
if token.get('refresh_token'):
response.set_cookie(
'refresh_token',
value=token['refresh_token'],
httponly=True,
secure=current_app.config.get('SESSION_COOKIE_SECURE', False),
samesite='Strict',
max_age=7*24*60*60 # 7 days
)
return response
except Exception as e:
current_app.logger.error(f"OIDC callback error: {e}")
frontend_url = current_app.config.get('FRONTEND_URL', 'http://localhost:3000')
return redirect(f"{frontend_url}/login?error=auth_failed")
@bp.route('/refresh', methods=['POST'])
def refresh():
"""Refresh access token using refresh token"""
refresh_token = request.cookies.get('refresh_token')
if not refresh_token:
return jsonify({'error': 'No refresh token'}), 401
try:
# Exchange refresh token for new access token
new_token = oauth.authelia.fetch_access_token(
grant_type='refresh_token',
refresh_token=refresh_token
)
return jsonify({
'access_token': new_token['access_token'],
'expires_in': new_token.get('expires_in', 900)
}), 200
except Exception as e:
current_app.logger.error(f"Token refresh failed: {e}")
return jsonify({'error': 'Token refresh failed'}), 401
@bp.route('/logout', methods=['POST'])
def logout():
"""Logout user and revoke tokens"""
# Clear refresh token cookie
response = make_response(jsonify({'message': 'Logged out'}), 200)
response.set_cookie('refresh_token', '', expires=0)
# Return Authelia logout URL for frontend to redirect
authelia_logout_url = f"{current_app.config['OIDC_ISSUER']}/logout"
return jsonify({
'message': 'Logged out',
'logout_url': authelia_logout_url
}), 200
@bp.route('/me')
@require_auth
def get_current_user():
"""Get current user info (requires auth)"""
return jsonify(g.current_user.to_dict()), 200
@bp.route('/users/search')
@require_auth
def search_users():
"""Search users by name, username, or email for sharing
Query parameters:
- q: Search query (required, min 1 character)
"""
query = request.args.get('q', '').strip()
if not query:
return jsonify([]), 200
search_pattern = f'%{query}%'
users = User.query.filter(
User.id != g.current_user.id,
User.is_active == True,
db.or_(
User.name.ilike(search_pattern),
User.preferred_username.ilike(search_pattern),
User.email.ilike(search_pattern)
)
).limit(10).all()
return jsonify([{
'id': u.id,
'name': u.name,
'username': u.preferred_username,
'email': u.email
} for u in users]), 200