Questions now have a created_by field linking to the user who created them. Users only see questions they own or that have been shared with them. Includes share dialog, user search, bulk sharing, and export/import respects ownership. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
576 lines
19 KiB
Python
576 lines
19 KiB
Python
from flask import Blueprint, request, jsonify, current_app, g
|
|
from backend.models import db, Question, QuestionType, QuestionShare, User
|
|
from backend.services.image_service import save_image, delete_image
|
|
from backend.auth.middleware import require_auth
|
|
|
|
bp = Blueprint('questions', __name__, url_prefix='/api/questions')
|
|
|
|
|
|
def _visible_questions_query():
|
|
"""Return a base query filtered to questions visible to the current user."""
|
|
return Question.query.filter(
|
|
db.or_(
|
|
Question.created_by == g.current_user.id,
|
|
Question.id.in_(
|
|
db.session.query(QuestionShare.question_id)
|
|
.filter(QuestionShare.shared_with_user_id == g.current_user.id)
|
|
)
|
|
)
|
|
)
|
|
|
|
|
|
@bp.route('', methods=['GET'])
|
|
@require_auth
|
|
def list_questions():
|
|
"""Get all questions visible to the current user with optional filtering and sorting
|
|
|
|
Query parameters:
|
|
- search: Search in question content and answer (case-insensitive)
|
|
- category: Filter by category name (exact match, or 'none' for uncategorized)
|
|
- type: Filter by question type (text, image, youtube_audio)
|
|
- sort_by: Field to sort by (created_at, category, type, question_content, answer)
|
|
- sort_order: Sort direction (asc, desc) - default: desc
|
|
- owner: Filter by ownership ('mine', 'shared', or omit for all visible)
|
|
"""
|
|
query = _visible_questions_query()
|
|
|
|
# Owner filter
|
|
owner = request.args.get('owner', '').strip()
|
|
if owner == 'mine':
|
|
query = query.filter(Question.created_by == g.current_user.id)
|
|
elif owner == 'shared':
|
|
query = query.filter(Question.created_by != g.current_user.id)
|
|
|
|
# Search filter
|
|
search = request.args.get('search', '').strip()
|
|
if search:
|
|
search_pattern = f'%{search}%'
|
|
query = query.filter(
|
|
db.or_(
|
|
Question.question_content.ilike(search_pattern),
|
|
Question.answer.ilike(search_pattern)
|
|
)
|
|
)
|
|
|
|
# Category filter
|
|
category = request.args.get('category', '').strip()
|
|
if category:
|
|
if category.lower() == 'none':
|
|
query = query.filter(Question.category.is_(None))
|
|
else:
|
|
query = query.filter(Question.category == category)
|
|
|
|
# Type filter
|
|
question_type = request.args.get('type', '').strip()
|
|
if question_type:
|
|
try:
|
|
query = query.filter(Question.type == QuestionType(question_type))
|
|
except ValueError:
|
|
pass # Invalid type, ignore filter
|
|
|
|
# Sorting
|
|
sort_by = request.args.get('sort_by', 'created_at').strip()
|
|
sort_order = request.args.get('sort_order', 'desc').strip().lower()
|
|
|
|
# Map sort_by to column
|
|
sort_columns = {
|
|
'created_at': Question.created_at,
|
|
'category': Question.category,
|
|
'type': Question.type,
|
|
'question_content': Question.question_content,
|
|
'answer': Question.answer,
|
|
}
|
|
|
|
sort_column = sort_columns.get(sort_by, Question.created_at)
|
|
|
|
if sort_order == 'asc':
|
|
query = query.order_by(sort_column.asc())
|
|
else:
|
|
query = query.order_by(sort_column.desc())
|
|
|
|
questions = query.all()
|
|
return jsonify([q.to_dict(include_answer=True) for q in questions]), 200
|
|
|
|
|
|
@bp.route('/<int:question_id>', methods=['GET'])
|
|
@require_auth
|
|
def get_question(question_id):
|
|
"""Get a single question by ID (must be visible to current user)"""
|
|
question = Question.query.get_or_404(question_id)
|
|
if not question.is_visible_to(g.current_user):
|
|
return jsonify({'error': 'Question not found'}), 404
|
|
return jsonify(question.to_dict(include_answer=True)), 200
|
|
|
|
|
|
@bp.route('', methods=['POST'])
|
|
@require_auth
|
|
def create_question():
|
|
"""Create a new question"""
|
|
try:
|
|
# Check if it's a multipart form (for image uploads) or JSON
|
|
if request.content_type and 'multipart/form-data' in request.content_type:
|
|
# Image question
|
|
data = request.form
|
|
question_type = data.get('type', 'text')
|
|
question_content = data.get('question_content', '')
|
|
answer = data.get('answer', '')
|
|
category = data.get('category', '')
|
|
|
|
image_path = None
|
|
if question_type == 'image':
|
|
if 'image' not in request.files:
|
|
return jsonify({'error': 'Image file required for image questions'}), 400
|
|
|
|
file = request.files['image']
|
|
try:
|
|
image_path = save_image(
|
|
file,
|
|
current_app.config['UPLOAD_FOLDER'],
|
|
current_app.config['ALLOWED_EXTENSIONS']
|
|
)
|
|
except ValueError as e:
|
|
return jsonify({'error': str(e)}), 400
|
|
|
|
youtube_url = None
|
|
audio_path = None
|
|
start_time = None
|
|
end_time = None
|
|
|
|
else:
|
|
# JSON request for text or YouTube audio questions
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({'error': 'No data provided'}), 400
|
|
|
|
question_type = data.get('type', 'text')
|
|
question_content = data.get('question_content', '')
|
|
answer = data.get('answer', '')
|
|
category = data.get('category', '')
|
|
image_path = None
|
|
|
|
# Handle YouTube audio questions
|
|
youtube_url = None
|
|
audio_path = None
|
|
start_time = None
|
|
end_time = None
|
|
|
|
if question_type == 'youtube_audio':
|
|
youtube_url = data.get('youtube_url', '')
|
|
|
|
if not youtube_url:
|
|
return jsonify({'error': 'youtube_url required for YouTube audio questions'}), 400
|
|
|
|
# Validate YouTube URL
|
|
from backend.services.youtube_service import validate_youtube_url, validate_timestamps, get_video_duration
|
|
|
|
is_valid, result = validate_youtube_url(youtube_url)
|
|
if not is_valid:
|
|
return jsonify({'error': result}), 400
|
|
|
|
# Get and validate timestamps
|
|
try:
|
|
start_time = int(data.get('start_time', 0))
|
|
end_time = int(data.get('end_time', 0))
|
|
except ValueError:
|
|
return jsonify({'error': 'start_time and end_time must be integers'}), 400
|
|
|
|
# Validate timestamp range
|
|
video_duration = get_video_duration(youtube_url)
|
|
is_valid, error = validate_timestamps(start_time, end_time, video_duration)
|
|
if not is_valid:
|
|
return jsonify({'error': error}), 400
|
|
|
|
# Note: audio_path will be null until download completes
|
|
|
|
# Validation
|
|
if not question_content:
|
|
return jsonify({'error': 'question_content is required'}), 400
|
|
if not answer:
|
|
return jsonify({'error': 'answer is required'}), 400
|
|
|
|
# Create question
|
|
question = Question(
|
|
type=QuestionType(question_type),
|
|
question_content=question_content,
|
|
answer=answer,
|
|
image_path=image_path,
|
|
youtube_url=youtube_url,
|
|
audio_path=audio_path, # Will be None initially for YouTube
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
category=category if category else None,
|
|
created_by=g.current_user.id
|
|
)
|
|
|
|
db.session.add(question)
|
|
db.session.commit()
|
|
|
|
# For YouTube audio, start async download
|
|
if question_type == 'youtube_audio':
|
|
from backend.tasks.youtube_tasks import download_youtube_audio
|
|
from backend.models import DownloadJob, DownloadJobStatus
|
|
|
|
# Start Celery task
|
|
task = download_youtube_audio.delay(question.id, youtube_url, start_time, end_time)
|
|
|
|
# Create job tracking record
|
|
job = DownloadJob(
|
|
question_id=question.id,
|
|
celery_task_id=task.id,
|
|
status=DownloadJobStatus.PENDING
|
|
)
|
|
db.session.add(job)
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'question': question.to_dict(include_answer=True),
|
|
'job': job.to_dict()
|
|
}), 202 # 202 Accepted (async processing)
|
|
|
|
return jsonify(question.to_dict(include_answer=True)), 201
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@bp.route('/<int:question_id>', methods=['PUT'])
|
|
@require_auth
|
|
def update_question(question_id):
|
|
"""Update an existing question (owner only)"""
|
|
question = Question.query.get_or_404(question_id)
|
|
|
|
if not question.is_owned_by(g.current_user):
|
|
return jsonify({'error': 'You can only edit questions you created'}), 403
|
|
|
|
try:
|
|
# Handle multipart form data or JSON
|
|
if request.content_type and 'multipart/form-data' in request.content_type:
|
|
data = request.form
|
|
|
|
# Update fields if provided
|
|
if 'question_content' in data:
|
|
question.question_content = data['question_content']
|
|
if 'answer' in data:
|
|
question.answer = data['answer']
|
|
if 'type' in data:
|
|
question.type = QuestionType(data['type'])
|
|
if 'category' in data:
|
|
question.category = data['category'] if data['category'] else None
|
|
|
|
# Handle new image upload
|
|
if 'image' in request.files:
|
|
file = request.files['image']
|
|
if file and file.filename:
|
|
# Delete old image if exists
|
|
if question.image_path:
|
|
delete_image(question.image_path, current_app.root_path)
|
|
|
|
# Save new image
|
|
try:
|
|
question.image_path = save_image(
|
|
file,
|
|
current_app.config['UPLOAD_FOLDER'],
|
|
current_app.config['ALLOWED_EXTENSIONS']
|
|
)
|
|
except ValueError as e:
|
|
return jsonify({'error': str(e)}), 400
|
|
|
|
else:
|
|
# JSON request
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({'error': 'No data provided'}), 400
|
|
|
|
if 'question_content' in data:
|
|
question.question_content = data['question_content']
|
|
if 'answer' in data:
|
|
question.answer = data['answer']
|
|
if 'type' in data:
|
|
question.type = QuestionType(data['type'])
|
|
if 'category' in data:
|
|
question.category = data['category'] if data['category'] else None
|
|
|
|
db.session.commit()
|
|
return jsonify(question.to_dict(include_answer=True)), 200
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@bp.route('/<int:question_id>', methods=['DELETE'])
|
|
@require_auth
|
|
def delete_question(question_id):
|
|
"""Delete a question (owner only)"""
|
|
question = Question.query.get_or_404(question_id)
|
|
|
|
if not question.is_owned_by(g.current_user):
|
|
return jsonify({'error': 'You can only delete questions you created'}), 403
|
|
|
|
try:
|
|
# Delete associated image if exists
|
|
if question.image_path:
|
|
delete_image(question.image_path, current_app.root_path)
|
|
|
|
# Delete associated audio if exists
|
|
if question.audio_path:
|
|
from backend.services.audio_service import delete_audio
|
|
delete_audio(question.audio_path, current_app.root_path)
|
|
|
|
db.session.delete(question)
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'Question deleted successfully'}), 200
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@bp.route('/random', methods=['GET'])
|
|
@require_auth
|
|
def get_random_questions():
|
|
"""Get random questions by category (only from visible questions)
|
|
|
|
Query parameters:
|
|
- category: Category name (required)
|
|
- count: Number of random questions to return (default: 5)
|
|
"""
|
|
category = request.args.get('category')
|
|
if not category:
|
|
return jsonify({'error': 'category parameter is required'}), 400
|
|
|
|
try:
|
|
count = int(request.args.get('count', 5))
|
|
if count < 1:
|
|
return jsonify({'error': 'count must be at least 1'}), 400
|
|
except ValueError:
|
|
return jsonify({'error': 'count must be a valid integer'}), 400
|
|
|
|
try:
|
|
# Get visible questions for the category
|
|
questions = _visible_questions_query().filter_by(category=category).all()
|
|
|
|
if not questions:
|
|
return jsonify({'error': f'No questions found for category: {category}'}), 404
|
|
|
|
# Randomly select questions
|
|
import random
|
|
selected = random.sample(questions, min(count, len(questions)))
|
|
|
|
return jsonify({
|
|
'category': category,
|
|
'requested': count,
|
|
'returned': len(selected),
|
|
'questions': [q.to_dict(include_answer=True) for q in selected]
|
|
}), 200
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@bp.route('/bulk', methods=['POST'])
|
|
@require_auth
|
|
def bulk_create_questions():
|
|
"""Bulk create questions
|
|
|
|
Expected JSON: {
|
|
"questions": [
|
|
{
|
|
"question_content": "What is 2+2?",
|
|
"answer": "4",
|
|
"category": "Math",
|
|
"type": "text"
|
|
},
|
|
...
|
|
]
|
|
}
|
|
"""
|
|
data = request.get_json()
|
|
if not data or 'questions' not in data:
|
|
return jsonify({'error': 'questions array is required'}), 400
|
|
|
|
questions_data = data['questions']
|
|
if not isinstance(questions_data, list):
|
|
return jsonify({'error': 'questions must be an array'}), 400
|
|
|
|
created_questions = []
|
|
errors = []
|
|
|
|
try:
|
|
for idx, q_data in enumerate(questions_data):
|
|
try:
|
|
# Validate required fields
|
|
if not q_data.get('question_content'):
|
|
errors.append({'index': idx, 'error': 'question_content is required'})
|
|
continue
|
|
if not q_data.get('answer'):
|
|
errors.append({'index': idx, 'error': 'answer is required'})
|
|
continue
|
|
|
|
# Create question
|
|
question = Question(
|
|
type=QuestionType(q_data.get('type', 'text')),
|
|
question_content=q_data['question_content'],
|
|
answer=q_data['answer'],
|
|
category=q_data.get('category') if q_data.get('category') else None,
|
|
image_path=None, # Bulk import doesn't support images
|
|
created_by=g.current_user.id
|
|
)
|
|
|
|
db.session.add(question)
|
|
created_questions.append(question)
|
|
|
|
except Exception as e:
|
|
errors.append({'index': idx, 'error': str(e)})
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'message': f'Successfully created {len(created_questions)} questions',
|
|
'created': len(created_questions),
|
|
'errors': errors,
|
|
'questions': [q.to_dict(include_answer=True) for q in created_questions]
|
|
}), 201
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
# --- Sharing endpoints ---
|
|
|
|
@bp.route('/<int:question_id>/shares', methods=['GET'])
|
|
@require_auth
|
|
def get_shares(question_id):
|
|
"""Get list of users a question is shared with (owner only)"""
|
|
question = Question.query.get_or_404(question_id)
|
|
|
|
if not question.is_owned_by(g.current_user):
|
|
return jsonify({'error': 'Only the question owner can view shares'}), 403
|
|
|
|
return jsonify([s.to_dict() for s in question.shares]), 200
|
|
|
|
|
|
@bp.route('/<int:question_id>/share', methods=['POST'])
|
|
@require_auth
|
|
def share_question(question_id):
|
|
"""Share a question with another user (owner only)
|
|
|
|
Expected JSON: { "user_id": int }
|
|
"""
|
|
question = Question.query.get_or_404(question_id)
|
|
|
|
if not question.is_owned_by(g.current_user):
|
|
return jsonify({'error': 'Only the question owner can share it'}), 403
|
|
|
|
data = request.get_json()
|
|
if not data or 'user_id' not in data:
|
|
return jsonify({'error': 'user_id is required'}), 400
|
|
|
|
target_user_id = data['user_id']
|
|
|
|
if target_user_id == g.current_user.id:
|
|
return jsonify({'error': 'Cannot share a question with yourself'}), 400
|
|
|
|
target_user = User.query.get(target_user_id)
|
|
if not target_user:
|
|
return jsonify({'error': 'User not found'}), 404
|
|
|
|
# Check if already shared
|
|
existing = QuestionShare.query.filter_by(
|
|
question_id=question_id,
|
|
shared_with_user_id=target_user_id
|
|
).first()
|
|
|
|
if existing:
|
|
return jsonify({'error': 'Question already shared with this user'}), 409
|
|
|
|
share = QuestionShare(
|
|
question_id=question_id,
|
|
shared_with_user_id=target_user_id,
|
|
shared_by_user_id=g.current_user.id
|
|
)
|
|
db.session.add(share)
|
|
db.session.commit()
|
|
|
|
return jsonify(share.to_dict()), 201
|
|
|
|
|
|
@bp.route('/<int:question_id>/share/<int:user_id>', methods=['DELETE'])
|
|
@require_auth
|
|
def unshare_question(question_id, user_id):
|
|
"""Remove sharing of a question with a user (owner only)"""
|
|
question = Question.query.get_or_404(question_id)
|
|
|
|
if not question.is_owned_by(g.current_user):
|
|
return jsonify({'error': 'Only the question owner can manage shares'}), 403
|
|
|
|
share = QuestionShare.query.filter_by(
|
|
question_id=question_id,
|
|
shared_with_user_id=user_id
|
|
).first()
|
|
|
|
if not share:
|
|
return jsonify({'error': 'Share not found'}), 404
|
|
|
|
db.session.delete(share)
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'Share removed'}), 200
|
|
|
|
|
|
@bp.route('/bulk-share', methods=['POST'])
|
|
@require_auth
|
|
def bulk_share_questions():
|
|
"""Share multiple questions with a user (owner only for each)
|
|
|
|
Expected JSON: { "question_ids": [int, ...], "user_id": int }
|
|
"""
|
|
data = request.get_json()
|
|
if not data or 'question_ids' not in data or 'user_id' not in data:
|
|
return jsonify({'error': 'question_ids and user_id are required'}), 400
|
|
|
|
target_user_id = data['user_id']
|
|
question_ids = data['question_ids']
|
|
|
|
if target_user_id == g.current_user.id:
|
|
return jsonify({'error': 'Cannot share questions with yourself'}), 400
|
|
|
|
target_user = User.query.get(target_user_id)
|
|
if not target_user:
|
|
return jsonify({'error': 'User not found'}), 404
|
|
|
|
shared = []
|
|
skipped = []
|
|
|
|
for qid in question_ids:
|
|
question = Question.query.get(qid)
|
|
if not question or not question.is_owned_by(g.current_user):
|
|
skipped.append({'id': qid, 'reason': 'not found or not owned'})
|
|
continue
|
|
|
|
existing = QuestionShare.query.filter_by(
|
|
question_id=qid,
|
|
shared_with_user_id=target_user_id
|
|
).first()
|
|
if existing:
|
|
skipped.append({'id': qid, 'reason': 'already shared'})
|
|
continue
|
|
|
|
share = QuestionShare(
|
|
question_id=qid,
|
|
shared_with_user_id=target_user_id,
|
|
shared_by_user_id=g.current_user.id
|
|
)
|
|
db.session.add(share)
|
|
shared.append(qid)
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'shared': shared,
|
|
'skipped': skipped
|
|
}), 200
|