Questions now have a created_by field linking to the user who created them. Users only see questions they own or that have been shared with them. Includes share dialog, user search, bulk sharing, and export/import respects ownership. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
424 lines
15 KiB
Python
424 lines
15 KiB
Python
"""Service for exporting and importing trivia questions with media files"""
|
|
import os
|
|
import json
|
|
import shutil
|
|
import tempfile
|
|
import zipfile
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Tuple, Optional
|
|
|
|
from flask import current_app
|
|
from backend.models import db, Question, Category, QuestionType, Score, Team, GameQuestion, Game, QuestionShare
|
|
|
|
|
|
def export_questions_to_zip(user_id=None) -> Tuple[bytes, str]:
|
|
"""
|
|
Export questions and categories to a ZIP file with images and audio.
|
|
If user_id is provided, only exports questions owned by or shared with that user.
|
|
|
|
Returns:
|
|
Tuple of (zip_bytes, filename)
|
|
"""
|
|
# Query data filtered by visibility
|
|
if user_id is not None:
|
|
questions = Question.query.filter(
|
|
db.or_(
|
|
Question.created_by == user_id,
|
|
Question.id.in_(
|
|
db.session.query(QuestionShare.question_id)
|
|
.filter(QuestionShare.shared_with_user_id == user_id)
|
|
)
|
|
)
|
|
).order_by(Question.created_at).all()
|
|
else:
|
|
questions = Question.query.order_by(Question.created_at).all()
|
|
categories = Category.query.order_by(Category.name).all()
|
|
|
|
# Create temporary directory
|
|
temp_dir = tempfile.mkdtemp()
|
|
|
|
try:
|
|
# Create subdirectories
|
|
images_dir = os.path.join(temp_dir, 'images')
|
|
audio_dir = os.path.join(temp_dir, 'audio')
|
|
os.makedirs(images_dir, exist_ok=True)
|
|
os.makedirs(audio_dir, exist_ok=True)
|
|
|
|
# Prepare manifest data
|
|
manifest = {
|
|
'export_version': '1.0',
|
|
'export_timestamp': datetime.utcnow().isoformat() + 'Z',
|
|
'questions_count': len(questions),
|
|
'categories_count': len(categories),
|
|
'categories': [],
|
|
'questions': []
|
|
}
|
|
|
|
# Export categories
|
|
for category in categories:
|
|
manifest['categories'].append({
|
|
'name': category.name,
|
|
'created_at': category.created_at.isoformat() if category.created_at else None
|
|
})
|
|
|
|
# Get folders for file operations
|
|
images_folder = Path(current_app.config.get('UPLOAD_FOLDER'))
|
|
audio_folder = Path(current_app.config.get('AUDIO_FOLDER'))
|
|
|
|
# Export questions with media files
|
|
missing_files = []
|
|
|
|
for question in questions:
|
|
question_data = {
|
|
'type': question.type.value,
|
|
'question_content': question.question_content,
|
|
'answer': question.answer,
|
|
'category': question.category,
|
|
'created_at': question.created_at.isoformat() if question.created_at else None,
|
|
'image_path': question.image_path,
|
|
'image_filename': None,
|
|
'audio_path': question.audio_path,
|
|
'audio_filename': None,
|
|
'youtube_url': question.youtube_url,
|
|
'start_time': question.start_time,
|
|
'end_time': question.end_time
|
|
}
|
|
|
|
# Handle image files
|
|
if question.image_path:
|
|
image_filename = os.path.basename(question.image_path)
|
|
question_data['image_filename'] = image_filename
|
|
|
|
# Copy image file to temp directory
|
|
source_path = os.path.join(images_folder, image_filename)
|
|
dest_path = os.path.join(images_dir, image_filename)
|
|
|
|
if os.path.exists(source_path):
|
|
try:
|
|
shutil.copy2(source_path, dest_path)
|
|
except Exception as e:
|
|
print(f"Warning: Failed to copy image {source_path}: {e}")
|
|
missing_files.append(question.image_path)
|
|
else:
|
|
print(f"Warning: Image file not found: {source_path}")
|
|
missing_files.append(question.image_path)
|
|
|
|
# Handle audio files
|
|
if question.audio_path:
|
|
audio_filename = os.path.basename(question.audio_path)
|
|
question_data['audio_filename'] = audio_filename
|
|
|
|
# Copy audio file to temp directory
|
|
source_path = os.path.join(audio_folder, audio_filename)
|
|
dest_path = os.path.join(audio_dir, audio_filename)
|
|
|
|
if os.path.exists(source_path):
|
|
try:
|
|
shutil.copy2(source_path, dest_path)
|
|
except Exception as e:
|
|
print(f"Warning: Failed to copy audio {source_path}: {e}")
|
|
missing_files.append(question.audio_path)
|
|
else:
|
|
print(f"Warning: Audio file not found: {source_path}")
|
|
missing_files.append(question.audio_path)
|
|
|
|
manifest['questions'].append(question_data)
|
|
|
|
# Add missing files info to manifest if any
|
|
if missing_files:
|
|
manifest['missing_files'] = missing_files
|
|
|
|
# Write manifest.json
|
|
manifest_path = os.path.join(temp_dir, 'manifest.json')
|
|
with open(manifest_path, 'w') as f:
|
|
json.dump(manifest, f, indent=2)
|
|
|
|
# Create ZIP file
|
|
timestamp = datetime.now().strftime('%Y-%m-%d_%H%M%S')
|
|
zip_filename = f'trivia-export-{timestamp}.zip'
|
|
zip_path = os.path.join(temp_dir, zip_filename)
|
|
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
|
# Add manifest
|
|
zipf.write(manifest_path, 'manifest.json')
|
|
|
|
# Add all images
|
|
for filename in os.listdir(images_dir):
|
|
file_path = os.path.join(images_dir, filename)
|
|
zipf.write(file_path, f'images/{filename}')
|
|
|
|
# Add all audio files
|
|
for filename in os.listdir(audio_dir):
|
|
file_path = os.path.join(audio_dir, filename)
|
|
zipf.write(file_path, f'audio/{filename}')
|
|
|
|
# Read ZIP into memory
|
|
with open(zip_path, 'rb') as f:
|
|
zip_bytes = f.read()
|
|
|
|
return zip_bytes, zip_filename
|
|
|
|
finally:
|
|
# Clean up temp directory
|
|
try:
|
|
shutil.rmtree(temp_dir)
|
|
except Exception as e:
|
|
print(f"Warning: Failed to clean up temp directory {temp_dir}: {e}")
|
|
|
|
|
|
def validate_import_zip(zip_path: str) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Validate an import ZIP file.
|
|
|
|
Args:
|
|
zip_path: Path to ZIP file
|
|
|
|
Returns:
|
|
Tuple of (is_valid, error_message)
|
|
"""
|
|
try:
|
|
# Check if file exists
|
|
if not os.path.exists(zip_path):
|
|
return False, "ZIP file not found"
|
|
|
|
# Check file size
|
|
file_size = os.path.getsize(zip_path)
|
|
print(f"Validating ZIP file: {zip_path}, size: {file_size} bytes")
|
|
|
|
if file_size == 0:
|
|
return False, "ZIP file is empty"
|
|
|
|
# Try to open as ZIP
|
|
try:
|
|
with zipfile.ZipFile(zip_path, 'r') as zipf:
|
|
# Check for manifest.json
|
|
if 'manifest.json' not in zipf.namelist():
|
|
return False, "Missing manifest.json in ZIP"
|
|
|
|
# Read and parse manifest
|
|
with zipf.open('manifest.json') as f:
|
|
manifest = json.load(f)
|
|
|
|
# Validate required fields
|
|
required_fields = ['export_version', 'questions', 'categories']
|
|
for field in required_fields:
|
|
if field not in manifest:
|
|
return False, f"Missing required field in manifest: {field}"
|
|
|
|
# Validate version compatibility
|
|
if manifest['export_version'] != '1.0':
|
|
return False, f"Unsupported export version: {manifest['export_version']}"
|
|
|
|
# Validate each question has required fields
|
|
for i, question in enumerate(manifest['questions']):
|
|
required_q_fields = ['type', 'question_content', 'answer']
|
|
for field in required_q_fields:
|
|
if field not in question:
|
|
return False, f"Question {i} missing required field: {field}"
|
|
|
|
# Validate type is valid
|
|
if question['type'] not in ['text', 'image', 'youtube_audio']:
|
|
return False, f"Question {i} has invalid type: {question['type']}"
|
|
|
|
# Verify referenced files exist in ZIP
|
|
if question.get('image_filename'):
|
|
expected_path = f"images/{question['image_filename']}"
|
|
if expected_path not in zipf.namelist():
|
|
return False, f"Missing referenced image: {expected_path}"
|
|
|
|
if question.get('audio_filename'):
|
|
expected_path = f"audio/{question['audio_filename']}"
|
|
if expected_path not in zipf.namelist():
|
|
return False, f"Missing referenced audio: {expected_path}"
|
|
|
|
return True, None
|
|
|
|
except zipfile.BadZipFile as e:
|
|
print(f"BadZipFile error: {e}")
|
|
return False, f"Invalid ZIP file format: {str(e)}"
|
|
except json.JSONDecodeError as e:
|
|
print(f"JSON decode error: {e}")
|
|
return False, f"Invalid JSON in manifest.json: {str(e)}"
|
|
|
|
except Exception as e:
|
|
print(f"Unexpected validation error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False, f"Validation error: {str(e)}"
|
|
|
|
|
|
def clear_all_data():
|
|
"""Clear all questions, categories, games, teams, and scores from database."""
|
|
# Delete in dependency order to avoid FK errors
|
|
Score.query.delete()
|
|
Team.query.delete()
|
|
GameQuestion.query.delete()
|
|
Game.query.delete()
|
|
Question.query.delete()
|
|
Category.query.delete()
|
|
db.session.commit()
|
|
|
|
|
|
def clear_all_media_files():
|
|
"""Delete all images and audio files from static directories."""
|
|
# Get folders from config
|
|
images_dir = str(Path(current_app.config.get('UPLOAD_FOLDER')))
|
|
audio_dir = str(Path(current_app.config.get('AUDIO_FOLDER')))
|
|
|
|
# Clear images
|
|
if os.path.exists(images_dir):
|
|
for filename in os.listdir(images_dir):
|
|
if filename == '.gitkeep':
|
|
continue
|
|
file_path = os.path.join(images_dir, filename)
|
|
try:
|
|
if os.path.isfile(file_path):
|
|
os.remove(file_path)
|
|
except Exception as e:
|
|
print(f"Warning: Failed to delete {file_path}: {e}")
|
|
|
|
# Clear audio
|
|
if os.path.exists(audio_dir):
|
|
for filename in os.listdir(audio_dir):
|
|
if filename == '.gitkeep':
|
|
continue
|
|
file_path = os.path.join(audio_dir, filename)
|
|
try:
|
|
if os.path.isfile(file_path):
|
|
os.remove(file_path)
|
|
except Exception as e:
|
|
print(f"Warning: Failed to delete {file_path}: {e}")
|
|
|
|
|
|
def import_questions_from_zip(zip_path: str, user_id=None) -> dict:
|
|
"""
|
|
Import questions and media from a ZIP file.
|
|
|
|
Args:
|
|
zip_path: Path to ZIP file to import
|
|
user_id: If provided, set created_by on all imported questions
|
|
|
|
Returns:
|
|
Dict with import summary
|
|
|
|
Raises:
|
|
ValueError: If validation fails
|
|
Exception: If import fails
|
|
"""
|
|
temp_dir = None
|
|
|
|
try:
|
|
# Validate ZIP
|
|
is_valid, error_msg = validate_import_zip(zip_path)
|
|
if not is_valid:
|
|
raise ValueError(error_msg)
|
|
|
|
# Extract ZIP to temp directory
|
|
temp_dir = tempfile.mkdtemp()
|
|
|
|
with zipfile.ZipFile(zip_path, 'r') as zipf:
|
|
zipf.extractall(temp_dir)
|
|
|
|
# Load manifest
|
|
manifest_path = os.path.join(temp_dir, 'manifest.json')
|
|
with open(manifest_path, 'r') as f:
|
|
manifest = json.load(f)
|
|
|
|
# Get destination folders from config
|
|
images_dest_dir = str(Path(current_app.config.get('UPLOAD_FOLDER')))
|
|
audio_dest_dir = str(Path(current_app.config.get('AUDIO_FOLDER')))
|
|
|
|
# Ensure destination directories exist
|
|
os.makedirs(images_dest_dir, exist_ok=True)
|
|
os.makedirs(audio_dest_dir, exist_ok=True)
|
|
|
|
# Begin transaction
|
|
# Clear existing data
|
|
clear_all_data()
|
|
|
|
# Clear existing media files
|
|
clear_all_media_files()
|
|
|
|
# Copy media files from extracted ZIP
|
|
images_src_dir = os.path.join(temp_dir, 'images')
|
|
if os.path.exists(images_src_dir):
|
|
for filename in os.listdir(images_src_dir):
|
|
src = os.path.join(images_src_dir, filename)
|
|
dst = os.path.join(images_dest_dir, filename)
|
|
shutil.copy2(src, dst)
|
|
|
|
audio_src_dir = os.path.join(temp_dir, 'audio')
|
|
if os.path.exists(audio_src_dir):
|
|
for filename in os.listdir(audio_src_dir):
|
|
src = os.path.join(audio_src_dir, filename)
|
|
dst = os.path.join(audio_dest_dir, filename)
|
|
shutil.copy2(src, dst)
|
|
|
|
# Import categories
|
|
for cat_data in manifest['categories']:
|
|
category = Category(name=cat_data['name'])
|
|
# Preserve created_at if provided
|
|
if cat_data.get('created_at'):
|
|
try:
|
|
category.created_at = datetime.fromisoformat(cat_data['created_at'].replace('Z', '+00:00'))
|
|
except:
|
|
pass # Use default if parsing fails
|
|
db.session.add(category)
|
|
|
|
# Import questions
|
|
for q_data in manifest['questions']:
|
|
# Reconstruct paths from filenames
|
|
image_path = None
|
|
if q_data.get('image_filename'):
|
|
image_path = f"/static/images/{q_data['image_filename']}"
|
|
|
|
audio_path = None
|
|
if q_data.get('audio_filename'):
|
|
audio_path = f"/static/audio/{q_data['audio_filename']}"
|
|
|
|
question = Question(
|
|
type=QuestionType(q_data['type']),
|
|
question_content=q_data['question_content'],
|
|
answer=q_data['answer'],
|
|
category=q_data.get('category'),
|
|
image_path=image_path,
|
|
audio_path=audio_path,
|
|
youtube_url=q_data.get('youtube_url'),
|
|
start_time=q_data.get('start_time'),
|
|
end_time=q_data.get('end_time'),
|
|
created_by=user_id
|
|
)
|
|
|
|
# Preserve created_at if provided
|
|
if q_data.get('created_at'):
|
|
try:
|
|
question.created_at = datetime.fromisoformat(q_data['created_at'].replace('Z', '+00:00'))
|
|
except:
|
|
pass # Use default if parsing fails
|
|
|
|
db.session.add(question)
|
|
|
|
# Commit transaction
|
|
db.session.commit()
|
|
|
|
return {
|
|
'success': True,
|
|
'questions_imported': len(manifest['questions']),
|
|
'categories_imported': len(manifest['categories'])
|
|
}
|
|
|
|
except Exception as e:
|
|
# Rollback database changes
|
|
db.session.rollback()
|
|
# Note: File operations cannot be rolled back
|
|
raise
|
|
|
|
finally:
|
|
# Clean up temp directory
|
|
if temp_dir:
|
|
try:
|
|
shutil.rmtree(temp_dir)
|
|
except Exception as e:
|
|
print(f"Warning: Failed to clean up temp directory {temp_dir}: {e}")
|