Add question ownership and sharing

Questions now have a created_by field linking to the user who created them.
Users only see questions they own or that have been shared with them.
Includes share dialog, user search, bulk sharing, and export/import
respects ownership.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-03 09:43:04 -04:00
parent 02fcbad9ba
commit 69992f1be9
15 changed files with 836 additions and 70 deletions

View File

@@ -122,3 +122,34 @@ def logout():
def get_current_user():
"""Get current user info (requires auth)"""
return jsonify(g.current_user.to_dict()), 200
@bp.route('/users/search')
@require_auth
def search_users():
"""Search users by name, username, or email for sharing
Query parameters:
- q: Search query (required, min 1 character)
"""
query = request.args.get('q', '').strip()
if not query:
return jsonify([]), 200
search_pattern = f'%{query}%'
users = User.query.filter(
User.id != g.current_user.id,
User.is_active == True,
db.or_(
User.name.ilike(search_pattern),
User.preferred_username.ilike(search_pattern),
User.email.ilike(search_pattern)
)
).limit(10).all()
return jsonify([{
'id': u.id,
'name': u.name,
'username': u.preferred_username,
'email': u.email
} for u in users]), 200

View File

@@ -1,7 +1,7 @@
"""Routes for exporting and importing trivia questions"""
import os
import tempfile
from flask import Blueprint, jsonify, Response, request
from flask import Blueprint, jsonify, Response, request, g
from werkzeug.utils import secure_filename
from backend.auth.middleware import require_auth
@@ -23,8 +23,8 @@ def export_data():
ZIP file containing manifest.json and all media files
"""
try:
# Generate export ZIP
zip_bytes, zip_filename = export_questions_to_zip()
# Generate export ZIP (only questions visible to current user)
zip_bytes, zip_filename = export_questions_to_zip(user_id=g.current_user.id)
# Create response with ZIP file
response = Response(
@@ -82,8 +82,8 @@ def import_data():
file_size = os.path.getsize(temp_path)
print(f"Saved file size: {file_size} bytes")
# Import from ZIP
result = import_questions_from_zip(temp_path)
# Import from ZIP (set ownership to current user)
result = import_questions_from_zip(temp_path, user_id=g.current_user.id)
return jsonify(result), 200

View File

@@ -1,13 +1,28 @@
from flask import Blueprint, request, jsonify, current_app
from backend.models import db, Question, QuestionType
from flask import Blueprint, request, jsonify, current_app, g
from backend.models import db, Question, QuestionType, QuestionShare, User
from backend.services.image_service import save_image, delete_image
from backend.auth.middleware import require_auth
bp = Blueprint('questions', __name__, url_prefix='/api/questions')
def _visible_questions_query():
"""Return a base query filtered to questions visible to the current user."""
return Question.query.filter(
db.or_(
Question.created_by == g.current_user.id,
Question.id.in_(
db.session.query(QuestionShare.question_id)
.filter(QuestionShare.shared_with_user_id == g.current_user.id)
)
)
)
@bp.route('', methods=['GET'])
@require_auth
def list_questions():
"""Get all questions with optional filtering and sorting
"""Get all questions visible to the current user with optional filtering and sorting
Query parameters:
- search: Search in question content and answer (case-insensitive)
@@ -15,8 +30,16 @@ def list_questions():
- type: Filter by question type (text, image, youtube_audio)
- sort_by: Field to sort by (created_at, category, type, question_content, answer)
- sort_order: Sort direction (asc, desc) - default: desc
- owner: Filter by ownership ('mine', 'shared', or omit for all visible)
"""
query = Question.query
query = _visible_questions_query()
# Owner filter
owner = request.args.get('owner', '').strip()
if owner == 'mine':
query = query.filter(Question.created_by == g.current_user.id)
elif owner == 'shared':
query = query.filter(Question.created_by != g.current_user.id)
# Search filter
search = request.args.get('search', '').strip()
@@ -70,13 +93,17 @@ def list_questions():
@bp.route('/<int:question_id>', methods=['GET'])
@require_auth
def get_question(question_id):
"""Get a single question by ID"""
"""Get a single question by ID (must be visible to current user)"""
question = Question.query.get_or_404(question_id)
if not question.is_visible_to(g.current_user):
return jsonify({'error': 'Question not found'}), 404
return jsonify(question.to_dict(include_answer=True)), 200
@bp.route('', methods=['POST'])
@require_auth
def create_question():
"""Create a new question"""
try:
@@ -171,7 +198,8 @@ def create_question():
audio_path=audio_path, # Will be None initially for YouTube
start_time=start_time,
end_time=end_time,
category=category if category else None
category=category if category else None,
created_by=g.current_user.id
)
db.session.add(question)
@@ -207,10 +235,14 @@ def create_question():
@bp.route('/<int:question_id>', methods=['PUT'])
@require_auth
def update_question(question_id):
"""Update an existing question"""
"""Update an existing question (owner only)"""
question = Question.query.get_or_404(question_id)
if not question.is_owned_by(g.current_user):
return jsonify({'error': 'You can only edit questions you created'}), 403
try:
# Handle multipart form data or JSON
if request.content_type and 'multipart/form-data' in request.content_type:
@@ -268,10 +300,14 @@ def update_question(question_id):
@bp.route('/<int:question_id>', methods=['DELETE'])
@require_auth
def delete_question(question_id):
"""Delete a question"""
"""Delete a question (owner only)"""
question = Question.query.get_or_404(question_id)
if not question.is_owned_by(g.current_user):
return jsonify({'error': 'You can only delete questions you created'}), 403
try:
# Delete associated image if exists
if question.image_path:
@@ -293,8 +329,9 @@ def delete_question(question_id):
@bp.route('/random', methods=['GET'])
@require_auth
def get_random_questions():
"""Get random questions by category
"""Get random questions by category (only from visible questions)
Query parameters:
- category: Category name (required)
@@ -312,8 +349,8 @@ def get_random_questions():
return jsonify({'error': 'count must be a valid integer'}), 400
try:
# Get all questions for the category
questions = Question.query.filter_by(category=category).all()
# Get visible questions for the category
questions = _visible_questions_query().filter_by(category=category).all()
if not questions:
return jsonify({'error': f'No questions found for category: {category}'}), 404
@@ -334,6 +371,7 @@ def get_random_questions():
@bp.route('/bulk', methods=['POST'])
@require_auth
def bulk_create_questions():
"""Bulk create questions
@@ -377,7 +415,8 @@ def bulk_create_questions():
question_content=q_data['question_content'],
answer=q_data['answer'],
category=q_data.get('category') if q_data.get('category') else None,
image_path=None # Bulk import doesn't support images
image_path=None, # Bulk import doesn't support images
created_by=g.current_user.id
)
db.session.add(question)
@@ -398,3 +437,139 @@ def bulk_create_questions():
except Exception as e:
db.session.rollback()
return jsonify({'error': str(e)}), 500
# --- Sharing endpoints ---
@bp.route('/<int:question_id>/shares', methods=['GET'])
@require_auth
def get_shares(question_id):
"""Get list of users a question is shared with (owner only)"""
question = Question.query.get_or_404(question_id)
if not question.is_owned_by(g.current_user):
return jsonify({'error': 'Only the question owner can view shares'}), 403
return jsonify([s.to_dict() for s in question.shares]), 200
@bp.route('/<int:question_id>/share', methods=['POST'])
@require_auth
def share_question(question_id):
"""Share a question with another user (owner only)
Expected JSON: { "user_id": int }
"""
question = Question.query.get_or_404(question_id)
if not question.is_owned_by(g.current_user):
return jsonify({'error': 'Only the question owner can share it'}), 403
data = request.get_json()
if not data or 'user_id' not in data:
return jsonify({'error': 'user_id is required'}), 400
target_user_id = data['user_id']
if target_user_id == g.current_user.id:
return jsonify({'error': 'Cannot share a question with yourself'}), 400
target_user = User.query.get(target_user_id)
if not target_user:
return jsonify({'error': 'User not found'}), 404
# Check if already shared
existing = QuestionShare.query.filter_by(
question_id=question_id,
shared_with_user_id=target_user_id
).first()
if existing:
return jsonify({'error': 'Question already shared with this user'}), 409
share = QuestionShare(
question_id=question_id,
shared_with_user_id=target_user_id,
shared_by_user_id=g.current_user.id
)
db.session.add(share)
db.session.commit()
return jsonify(share.to_dict()), 201
@bp.route('/<int:question_id>/share/<int:user_id>', methods=['DELETE'])
@require_auth
def unshare_question(question_id, user_id):
"""Remove sharing of a question with a user (owner only)"""
question = Question.query.get_or_404(question_id)
if not question.is_owned_by(g.current_user):
return jsonify({'error': 'Only the question owner can manage shares'}), 403
share = QuestionShare.query.filter_by(
question_id=question_id,
shared_with_user_id=user_id
).first()
if not share:
return jsonify({'error': 'Share not found'}), 404
db.session.delete(share)
db.session.commit()
return jsonify({'message': 'Share removed'}), 200
@bp.route('/bulk-share', methods=['POST'])
@require_auth
def bulk_share_questions():
"""Share multiple questions with a user (owner only for each)
Expected JSON: { "question_ids": [int, ...], "user_id": int }
"""
data = request.get_json()
if not data or 'question_ids' not in data or 'user_id' not in data:
return jsonify({'error': 'question_ids and user_id are required'}), 400
target_user_id = data['user_id']
question_ids = data['question_ids']
if target_user_id == g.current_user.id:
return jsonify({'error': 'Cannot share questions with yourself'}), 400
target_user = User.query.get(target_user_id)
if not target_user:
return jsonify({'error': 'User not found'}), 404
shared = []
skipped = []
for qid in question_ids:
question = Question.query.get(qid)
if not question or not question.is_owned_by(g.current_user):
skipped.append({'id': qid, 'reason': 'not found or not owned'})
continue
existing = QuestionShare.query.filter_by(
question_id=qid,
shared_with_user_id=target_user_id
).first()
if existing:
skipped.append({'id': qid, 'reason': 'already shared'})
continue
share = QuestionShare(
question_id=qid,
shared_with_user_id=target_user_id,
shared_by_user_id=g.current_user.id
)
db.session.add(share)
shared.append(qid)
db.session.commit()
return jsonify({
'shared': shared,
'skipped': skipped
}), 200