This commit is contained in:
2026-01-12 21:09:57 -05:00
parent 00e9eb8986
commit 557f440256
3 changed files with 501 additions and 1 deletions

View File

@@ -50,7 +50,7 @@ def create_app(config_name=None):
init_oauth(app)
# Register blueprints
from backend.routes import questions, games, teams, admin, categories, download_jobs, auth
from backend.routes import questions, games, teams, admin, categories, download_jobs, auth, export_import
app.register_blueprint(auth.bp)
app.register_blueprint(questions.bp)
app.register_blueprint(games.bp)
@@ -58,6 +58,7 @@ def create_app(config_name=None):
app.register_blueprint(admin.bp)
app.register_blueprint(categories.bp)
app.register_blueprint(download_jobs.bp)
app.register_blueprint(export_import.bp)
# Register socket events
from backend.sockets import events

View File

@@ -0,0 +1,102 @@
"""Routes for exporting and importing trivia questions"""
import os
import tempfile
from flask import Blueprint, jsonify, Response, request
from werkzeug.utils import secure_filename
from backend.auth.middleware import require_auth
from backend.services.export_import_service import (
export_questions_to_zip,
import_questions_from_zip
)
bp = Blueprint('export_import', __name__, url_prefix='/api/admin')
@bp.route('/export', methods=['POST'])
@require_auth
def export_data():
"""
Export all questions and categories to a ZIP file.
Returns:
ZIP file containing manifest.json and all media files
"""
try:
# Generate export ZIP
zip_bytes, zip_filename = export_questions_to_zip()
# Create response with ZIP file
response = Response(
zip_bytes,
mimetype='application/zip',
headers={
'Content-Disposition': f'attachment; filename={zip_filename}',
'Content-Length': str(len(zip_bytes))
}
)
return response
except Exception as e:
return jsonify({'error': f'Export failed: {str(e)}'}), 500
@bp.route('/import', methods=['POST'])
@require_auth
def import_data():
"""
Import questions and categories from a ZIP file.
Expects:
multipart/form-data with 'file' key containing ZIP file
Returns:
JSON with import summary
"""
# Check if file was uploaded
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
# Check if filename is present
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
# Check if file is a ZIP
if not file.filename.lower().endswith('.zip'):
return jsonify({'error': 'File must be a ZIP archive'}), 400
temp_path = None
try:
# Save uploaded file to temporary location
temp_dir = tempfile.mkdtemp()
temp_path = os.path.join(temp_dir, secure_filename(file.filename))
file.save(temp_path)
# Import from ZIP
result = import_questions_from_zip(temp_path)
return jsonify(result), 200
except ValueError as e:
# Validation errors
return jsonify({'error': str(e)}), 400
except Exception as e:
# Other errors
return jsonify({'error': f'Import failed: {str(e)}'}), 500
finally:
# Clean up uploaded file
if temp_path and os.path.exists(temp_path):
try:
os.remove(temp_path)
# Remove temp directory if empty
temp_dir = os.path.dirname(temp_path)
if os.path.exists(temp_dir) and not os.listdir(temp_dir):
os.rmdir(temp_dir)
except Exception as e:
print(f"Warning: Failed to clean up temp file {temp_path}: {e}")

View File

@@ -0,0 +1,397 @@
"""Service for exporting and importing trivia questions with media files"""
import os
import json
import shutil
import tempfile
import zipfile
from datetime import datetime
from pathlib import Path
from typing import Tuple, Optional
from flask import current_app
from backend.models import db, Question, Category, QuestionType, Score, Team, GameQuestion, Game
def export_questions_to_zip() -> Tuple[bytes, str]:
"""
Export all questions and categories to a ZIP file with images and audio.
Returns:
Tuple of (zip_bytes, filename)
"""
# Query all data
questions = Question.query.order_by(Question.created_at).all()
categories = Category.query.order_by(Category.name).all()
# Create temporary directory
temp_dir = tempfile.mkdtemp()
try:
# Create subdirectories
images_dir = os.path.join(temp_dir, 'images')
audio_dir = os.path.join(temp_dir, 'audio')
os.makedirs(images_dir, exist_ok=True)
os.makedirs(audio_dir, exist_ok=True)
# Prepare manifest data
manifest = {
'export_version': '1.0',
'export_timestamp': datetime.utcnow().isoformat() + 'Z',
'questions_count': len(questions),
'categories_count': len(categories),
'categories': [],
'questions': []
}
# Export categories
for category in categories:
manifest['categories'].append({
'name': category.name,
'created_at': category.created_at.isoformat() if category.created_at else None
})
# Get base folder for file operations
base_folder = Path(current_app.config.get('UPLOAD_FOLDER')).parent.parent
# Export questions with media files
missing_files = []
for question in questions:
question_data = {
'type': question.type.value,
'question_content': question.question_content,
'answer': question.answer,
'category': question.category,
'created_at': question.created_at.isoformat() if question.created_at else None,
'image_path': question.image_path,
'image_filename': None,
'audio_path': question.audio_path,
'audio_filename': None,
'youtube_url': question.youtube_url,
'start_time': question.start_time,
'end_time': question.end_time
}
# Handle image files
if question.image_path:
image_filename = os.path.basename(question.image_path)
question_data['image_filename'] = image_filename
# Copy image file to temp directory
source_path = os.path.join(base_folder, question.image_path.lstrip('/'))
dest_path = os.path.join(images_dir, image_filename)
if os.path.exists(source_path):
try:
shutil.copy2(source_path, dest_path)
except Exception as e:
print(f"Warning: Failed to copy image {source_path}: {e}")
missing_files.append(question.image_path)
else:
print(f"Warning: Image file not found: {source_path}")
missing_files.append(question.image_path)
# Handle audio files
if question.audio_path:
audio_filename = os.path.basename(question.audio_path)
question_data['audio_filename'] = audio_filename
# Copy audio file to temp directory
source_path = os.path.join(base_folder, question.audio_path.lstrip('/'))
dest_path = os.path.join(audio_dir, audio_filename)
if os.path.exists(source_path):
try:
shutil.copy2(source_path, dest_path)
except Exception as e:
print(f"Warning: Failed to copy audio {source_path}: {e}")
missing_files.append(question.audio_path)
else:
print(f"Warning: Audio file not found: {source_path}")
missing_files.append(question.audio_path)
manifest['questions'].append(question_data)
# Add missing files info to manifest if any
if missing_files:
manifest['missing_files'] = missing_files
# Write manifest.json
manifest_path = os.path.join(temp_dir, 'manifest.json')
with open(manifest_path, 'w') as f:
json.dump(manifest, f, indent=2)
# Create ZIP file
timestamp = datetime.now().strftime('%Y-%m-%d_%H%M%S')
zip_filename = f'trivia-export-{timestamp}.zip'
zip_path = os.path.join(temp_dir, zip_filename)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
# Add manifest
zipf.write(manifest_path, 'manifest.json')
# Add all images
for filename in os.listdir(images_dir):
file_path = os.path.join(images_dir, filename)
zipf.write(file_path, f'images/{filename}')
# Add all audio files
for filename in os.listdir(audio_dir):
file_path = os.path.join(audio_dir, filename)
zipf.write(file_path, f'audio/{filename}')
# Read ZIP into memory
with open(zip_path, 'rb') as f:
zip_bytes = f.read()
return zip_bytes, zip_filename
finally:
# Clean up temp directory
try:
shutil.rmtree(temp_dir)
except Exception as e:
print(f"Warning: Failed to clean up temp directory {temp_dir}: {e}")
def validate_import_zip(zip_path: str) -> Tuple[bool, Optional[str]]:
"""
Validate an import ZIP file.
Args:
zip_path: Path to ZIP file
Returns:
Tuple of (is_valid, error_message)
"""
try:
# Check if file exists
if not os.path.exists(zip_path):
return False, "ZIP file not found"
# Try to open as ZIP
try:
with zipfile.ZipFile(zip_path, 'r') as zipf:
# Check for manifest.json
if 'manifest.json' not in zipf.namelist():
return False, "Missing manifest.json in ZIP"
# Read and parse manifest
with zipf.open('manifest.json') as f:
manifest = json.load(f)
# Validate required fields
required_fields = ['export_version', 'questions', 'categories']
for field in required_fields:
if field not in manifest:
return False, f"Missing required field in manifest: {field}"
# Validate version compatibility
if manifest['export_version'] != '1.0':
return False, f"Unsupported export version: {manifest['export_version']}"
# Validate each question has required fields
for i, question in enumerate(manifest['questions']):
required_q_fields = ['type', 'question_content', 'answer']
for field in required_q_fields:
if field not in question:
return False, f"Question {i} missing required field: {field}"
# Validate type is valid
if question['type'] not in ['text', 'image', 'youtube_audio']:
return False, f"Question {i} has invalid type: {question['type']}"
# Verify referenced files exist in ZIP
if question.get('image_filename'):
expected_path = f"images/{question['image_filename']}"
if expected_path not in zipf.namelist():
return False, f"Missing referenced image: {expected_path}"
if question.get('audio_filename'):
expected_path = f"audio/{question['audio_filename']}"
if expected_path not in zipf.namelist():
return False, f"Missing referenced audio: {expected_path}"
return True, None
except zipfile.BadZipFile:
return False, "Invalid ZIP file format"
except json.JSONDecodeError:
return False, "Invalid JSON in manifest.json"
except Exception as e:
return False, f"Validation error: {str(e)}"
def clear_all_data():
"""Clear all questions, categories, games, teams, and scores from database."""
# Delete in dependency order to avoid FK errors
Score.query.delete()
Team.query.delete()
GameQuestion.query.delete()
Game.query.delete()
Question.query.delete()
Category.query.delete()
db.session.commit()
def clear_all_media_files():
"""Delete all images and audio files from static directories."""
base_folder = Path(current_app.config.get('UPLOAD_FOLDER')).parent.parent
# Clear images
images_dir = os.path.join(base_folder, 'backend', 'static', 'images')
if os.path.exists(images_dir):
for filename in os.listdir(images_dir):
if filename == '.gitkeep':
continue
file_path = os.path.join(images_dir, filename)
try:
if os.path.isfile(file_path):
os.remove(file_path)
except Exception as e:
print(f"Warning: Failed to delete {file_path}: {e}")
# Clear audio
audio_dir = os.path.join(base_folder, 'backend', 'static', 'audio')
if os.path.exists(audio_dir):
for filename in os.listdir(audio_dir):
if filename == '.gitkeep':
continue
file_path = os.path.join(audio_dir, filename)
try:
if os.path.isfile(file_path):
os.remove(file_path)
except Exception as e:
print(f"Warning: Failed to delete {file_path}: {e}")
def import_questions_from_zip(zip_path: str) -> dict:
"""
Import questions and media from a ZIP file.
Args:
zip_path: Path to ZIP file to import
Returns:
Dict with import summary
Raises:
ValueError: If validation fails
Exception: If import fails
"""
temp_dir = None
try:
# Validate ZIP
is_valid, error_msg = validate_import_zip(zip_path)
if not is_valid:
raise ValueError(error_msg)
# Extract ZIP to temp directory
temp_dir = tempfile.mkdtemp()
with zipfile.ZipFile(zip_path, 'r') as zipf:
zipf.extractall(temp_dir)
# Load manifest
manifest_path = os.path.join(temp_dir, 'manifest.json')
with open(manifest_path, 'r') as f:
manifest = json.load(f)
# Get base folder for file operations
base_folder = Path(current_app.config.get('UPLOAD_FOLDER')).parent.parent
images_dest_dir = os.path.join(base_folder, 'backend', 'static', 'images')
audio_dest_dir = os.path.join(base_folder, 'backend', 'static', 'audio')
# Ensure destination directories exist
os.makedirs(images_dest_dir, exist_ok=True)
os.makedirs(audio_dest_dir, exist_ok=True)
# Begin transaction
# Clear existing data
clear_all_data()
# Clear existing media files
clear_all_media_files()
# Copy media files from extracted ZIP
images_src_dir = os.path.join(temp_dir, 'images')
if os.path.exists(images_src_dir):
for filename in os.listdir(images_src_dir):
src = os.path.join(images_src_dir, filename)
dst = os.path.join(images_dest_dir, filename)
shutil.copy2(src, dst)
audio_src_dir = os.path.join(temp_dir, 'audio')
if os.path.exists(audio_src_dir):
for filename in os.listdir(audio_src_dir):
src = os.path.join(audio_src_dir, filename)
dst = os.path.join(audio_dest_dir, filename)
shutil.copy2(src, dst)
# Import categories
for cat_data in manifest['categories']:
category = Category(name=cat_data['name'])
# Preserve created_at if provided
if cat_data.get('created_at'):
try:
category.created_at = datetime.fromisoformat(cat_data['created_at'].replace('Z', '+00:00'))
except:
pass # Use default if parsing fails
db.session.add(category)
# Import questions
for q_data in manifest['questions']:
# Reconstruct paths from filenames
image_path = None
if q_data.get('image_filename'):
image_path = f"/static/images/{q_data['image_filename']}"
audio_path = None
if q_data.get('audio_filename'):
audio_path = f"/static/audio/{q_data['audio_filename']}"
question = Question(
type=QuestionType(q_data['type']),
question_content=q_data['question_content'],
answer=q_data['answer'],
category=q_data.get('category'),
image_path=image_path,
audio_path=audio_path,
youtube_url=q_data.get('youtube_url'),
start_time=q_data.get('start_time'),
end_time=q_data.get('end_time')
)
# Preserve created_at if provided
if q_data.get('created_at'):
try:
question.created_at = datetime.fromisoformat(q_data['created_at'].replace('Z', '+00:00'))
except:
pass # Use default if parsing fails
db.session.add(question)
# Commit transaction
db.session.commit()
return {
'success': True,
'questions_imported': len(manifest['questions']),
'categories_imported': len(manifest['categories'])
}
except Exception as e:
# Rollback database changes
db.session.rollback()
# Note: File operations cannot be rolled back
raise
finally:
# Clean up temp directory
if temp_dir:
try:
shutil.rmtree(temp_dir)
except Exception as e:
print(f"Warning: Failed to clean up temp directory {temp_dir}: {e}")