398 lines
14 KiB
Python
398 lines
14 KiB
Python
"""Service for exporting and importing trivia questions with media files"""
|
|
import os
|
|
import json
|
|
import shutil
|
|
import tempfile
|
|
import zipfile
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Tuple, Optional
|
|
|
|
from flask import current_app
|
|
from backend.models import db, Question, Category, QuestionType, Score, Team, GameQuestion, Game
|
|
|
|
|
|
def export_questions_to_zip() -> Tuple[bytes, str]:
|
|
"""
|
|
Export all questions and categories to a ZIP file with images and audio.
|
|
|
|
Returns:
|
|
Tuple of (zip_bytes, filename)
|
|
"""
|
|
# Query all data
|
|
questions = Question.query.order_by(Question.created_at).all()
|
|
categories = Category.query.order_by(Category.name).all()
|
|
|
|
# Create temporary directory
|
|
temp_dir = tempfile.mkdtemp()
|
|
|
|
try:
|
|
# Create subdirectories
|
|
images_dir = os.path.join(temp_dir, 'images')
|
|
audio_dir = os.path.join(temp_dir, 'audio')
|
|
os.makedirs(images_dir, exist_ok=True)
|
|
os.makedirs(audio_dir, exist_ok=True)
|
|
|
|
# Prepare manifest data
|
|
manifest = {
|
|
'export_version': '1.0',
|
|
'export_timestamp': datetime.utcnow().isoformat() + 'Z',
|
|
'questions_count': len(questions),
|
|
'categories_count': len(categories),
|
|
'categories': [],
|
|
'questions': []
|
|
}
|
|
|
|
# Export categories
|
|
for category in categories:
|
|
manifest['categories'].append({
|
|
'name': category.name,
|
|
'created_at': category.created_at.isoformat() if category.created_at else None
|
|
})
|
|
|
|
# Get base folder for file operations
|
|
base_folder = Path(current_app.config.get('UPLOAD_FOLDER')).parent.parent
|
|
|
|
# Export questions with media files
|
|
missing_files = []
|
|
|
|
for question in questions:
|
|
question_data = {
|
|
'type': question.type.value,
|
|
'question_content': question.question_content,
|
|
'answer': question.answer,
|
|
'category': question.category,
|
|
'created_at': question.created_at.isoformat() if question.created_at else None,
|
|
'image_path': question.image_path,
|
|
'image_filename': None,
|
|
'audio_path': question.audio_path,
|
|
'audio_filename': None,
|
|
'youtube_url': question.youtube_url,
|
|
'start_time': question.start_time,
|
|
'end_time': question.end_time
|
|
}
|
|
|
|
# Handle image files
|
|
if question.image_path:
|
|
image_filename = os.path.basename(question.image_path)
|
|
question_data['image_filename'] = image_filename
|
|
|
|
# Copy image file to temp directory
|
|
source_path = os.path.join(base_folder, question.image_path.lstrip('/'))
|
|
dest_path = os.path.join(images_dir, image_filename)
|
|
|
|
if os.path.exists(source_path):
|
|
try:
|
|
shutil.copy2(source_path, dest_path)
|
|
except Exception as e:
|
|
print(f"Warning: Failed to copy image {source_path}: {e}")
|
|
missing_files.append(question.image_path)
|
|
else:
|
|
print(f"Warning: Image file not found: {source_path}")
|
|
missing_files.append(question.image_path)
|
|
|
|
# Handle audio files
|
|
if question.audio_path:
|
|
audio_filename = os.path.basename(question.audio_path)
|
|
question_data['audio_filename'] = audio_filename
|
|
|
|
# Copy audio file to temp directory
|
|
source_path = os.path.join(base_folder, question.audio_path.lstrip('/'))
|
|
dest_path = os.path.join(audio_dir, audio_filename)
|
|
|
|
if os.path.exists(source_path):
|
|
try:
|
|
shutil.copy2(source_path, dest_path)
|
|
except Exception as e:
|
|
print(f"Warning: Failed to copy audio {source_path}: {e}")
|
|
missing_files.append(question.audio_path)
|
|
else:
|
|
print(f"Warning: Audio file not found: {source_path}")
|
|
missing_files.append(question.audio_path)
|
|
|
|
manifest['questions'].append(question_data)
|
|
|
|
# Add missing files info to manifest if any
|
|
if missing_files:
|
|
manifest['missing_files'] = missing_files
|
|
|
|
# Write manifest.json
|
|
manifest_path = os.path.join(temp_dir, 'manifest.json')
|
|
with open(manifest_path, 'w') as f:
|
|
json.dump(manifest, f, indent=2)
|
|
|
|
# Create ZIP file
|
|
timestamp = datetime.now().strftime('%Y-%m-%d_%H%M%S')
|
|
zip_filename = f'trivia-export-{timestamp}.zip'
|
|
zip_path = os.path.join(temp_dir, zip_filename)
|
|
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
|
# Add manifest
|
|
zipf.write(manifest_path, 'manifest.json')
|
|
|
|
# Add all images
|
|
for filename in os.listdir(images_dir):
|
|
file_path = os.path.join(images_dir, filename)
|
|
zipf.write(file_path, f'images/{filename}')
|
|
|
|
# Add all audio files
|
|
for filename in os.listdir(audio_dir):
|
|
file_path = os.path.join(audio_dir, filename)
|
|
zipf.write(file_path, f'audio/{filename}')
|
|
|
|
# Read ZIP into memory
|
|
with open(zip_path, 'rb') as f:
|
|
zip_bytes = f.read()
|
|
|
|
return zip_bytes, zip_filename
|
|
|
|
finally:
|
|
# Clean up temp directory
|
|
try:
|
|
shutil.rmtree(temp_dir)
|
|
except Exception as e:
|
|
print(f"Warning: Failed to clean up temp directory {temp_dir}: {e}")
|
|
|
|
|
|
def validate_import_zip(zip_path: str) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Validate an import ZIP file.
|
|
|
|
Args:
|
|
zip_path: Path to ZIP file
|
|
|
|
Returns:
|
|
Tuple of (is_valid, error_message)
|
|
"""
|
|
try:
|
|
# Check if file exists
|
|
if not os.path.exists(zip_path):
|
|
return False, "ZIP file not found"
|
|
|
|
# Try to open as ZIP
|
|
try:
|
|
with zipfile.ZipFile(zip_path, 'r') as zipf:
|
|
# Check for manifest.json
|
|
if 'manifest.json' not in zipf.namelist():
|
|
return False, "Missing manifest.json in ZIP"
|
|
|
|
# Read and parse manifest
|
|
with zipf.open('manifest.json') as f:
|
|
manifest = json.load(f)
|
|
|
|
# Validate required fields
|
|
required_fields = ['export_version', 'questions', 'categories']
|
|
for field in required_fields:
|
|
if field not in manifest:
|
|
return False, f"Missing required field in manifest: {field}"
|
|
|
|
# Validate version compatibility
|
|
if manifest['export_version'] != '1.0':
|
|
return False, f"Unsupported export version: {manifest['export_version']}"
|
|
|
|
# Validate each question has required fields
|
|
for i, question in enumerate(manifest['questions']):
|
|
required_q_fields = ['type', 'question_content', 'answer']
|
|
for field in required_q_fields:
|
|
if field not in question:
|
|
return False, f"Question {i} missing required field: {field}"
|
|
|
|
# Validate type is valid
|
|
if question['type'] not in ['text', 'image', 'youtube_audio']:
|
|
return False, f"Question {i} has invalid type: {question['type']}"
|
|
|
|
# Verify referenced files exist in ZIP
|
|
if question.get('image_filename'):
|
|
expected_path = f"images/{question['image_filename']}"
|
|
if expected_path not in zipf.namelist():
|
|
return False, f"Missing referenced image: {expected_path}"
|
|
|
|
if question.get('audio_filename'):
|
|
expected_path = f"audio/{question['audio_filename']}"
|
|
if expected_path not in zipf.namelist():
|
|
return False, f"Missing referenced audio: {expected_path}"
|
|
|
|
return True, None
|
|
|
|
except zipfile.BadZipFile:
|
|
return False, "Invalid ZIP file format"
|
|
except json.JSONDecodeError:
|
|
return False, "Invalid JSON in manifest.json"
|
|
|
|
except Exception as e:
|
|
return False, f"Validation error: {str(e)}"
|
|
|
|
|
|
def clear_all_data():
|
|
"""Clear all questions, categories, games, teams, and scores from database."""
|
|
# Delete in dependency order to avoid FK errors
|
|
Score.query.delete()
|
|
Team.query.delete()
|
|
GameQuestion.query.delete()
|
|
Game.query.delete()
|
|
Question.query.delete()
|
|
Category.query.delete()
|
|
db.session.commit()
|
|
|
|
|
|
def clear_all_media_files():
|
|
"""Delete all images and audio files from static directories."""
|
|
base_folder = Path(current_app.config.get('UPLOAD_FOLDER')).parent.parent
|
|
|
|
# Clear images
|
|
images_dir = os.path.join(base_folder, 'backend', 'static', 'images')
|
|
if os.path.exists(images_dir):
|
|
for filename in os.listdir(images_dir):
|
|
if filename == '.gitkeep':
|
|
continue
|
|
file_path = os.path.join(images_dir, filename)
|
|
try:
|
|
if os.path.isfile(file_path):
|
|
os.remove(file_path)
|
|
except Exception as e:
|
|
print(f"Warning: Failed to delete {file_path}: {e}")
|
|
|
|
# Clear audio
|
|
audio_dir = os.path.join(base_folder, 'backend', 'static', 'audio')
|
|
if os.path.exists(audio_dir):
|
|
for filename in os.listdir(audio_dir):
|
|
if filename == '.gitkeep':
|
|
continue
|
|
file_path = os.path.join(audio_dir, filename)
|
|
try:
|
|
if os.path.isfile(file_path):
|
|
os.remove(file_path)
|
|
except Exception as e:
|
|
print(f"Warning: Failed to delete {file_path}: {e}")
|
|
|
|
|
|
def import_questions_from_zip(zip_path: str) -> dict:
|
|
"""
|
|
Import questions and media from a ZIP file.
|
|
|
|
Args:
|
|
zip_path: Path to ZIP file to import
|
|
|
|
Returns:
|
|
Dict with import summary
|
|
|
|
Raises:
|
|
ValueError: If validation fails
|
|
Exception: If import fails
|
|
"""
|
|
temp_dir = None
|
|
|
|
try:
|
|
# Validate ZIP
|
|
is_valid, error_msg = validate_import_zip(zip_path)
|
|
if not is_valid:
|
|
raise ValueError(error_msg)
|
|
|
|
# Extract ZIP to temp directory
|
|
temp_dir = tempfile.mkdtemp()
|
|
|
|
with zipfile.ZipFile(zip_path, 'r') as zipf:
|
|
zipf.extractall(temp_dir)
|
|
|
|
# Load manifest
|
|
manifest_path = os.path.join(temp_dir, 'manifest.json')
|
|
with open(manifest_path, 'r') as f:
|
|
manifest = json.load(f)
|
|
|
|
# Get base folder for file operations
|
|
base_folder = Path(current_app.config.get('UPLOAD_FOLDER')).parent.parent
|
|
images_dest_dir = os.path.join(base_folder, 'backend', 'static', 'images')
|
|
audio_dest_dir = os.path.join(base_folder, 'backend', 'static', 'audio')
|
|
|
|
# Ensure destination directories exist
|
|
os.makedirs(images_dest_dir, exist_ok=True)
|
|
os.makedirs(audio_dest_dir, exist_ok=True)
|
|
|
|
# Begin transaction
|
|
# Clear existing data
|
|
clear_all_data()
|
|
|
|
# Clear existing media files
|
|
clear_all_media_files()
|
|
|
|
# Copy media files from extracted ZIP
|
|
images_src_dir = os.path.join(temp_dir, 'images')
|
|
if os.path.exists(images_src_dir):
|
|
for filename in os.listdir(images_src_dir):
|
|
src = os.path.join(images_src_dir, filename)
|
|
dst = os.path.join(images_dest_dir, filename)
|
|
shutil.copy2(src, dst)
|
|
|
|
audio_src_dir = os.path.join(temp_dir, 'audio')
|
|
if os.path.exists(audio_src_dir):
|
|
for filename in os.listdir(audio_src_dir):
|
|
src = os.path.join(audio_src_dir, filename)
|
|
dst = os.path.join(audio_dest_dir, filename)
|
|
shutil.copy2(src, dst)
|
|
|
|
# Import categories
|
|
for cat_data in manifest['categories']:
|
|
category = Category(name=cat_data['name'])
|
|
# Preserve created_at if provided
|
|
if cat_data.get('created_at'):
|
|
try:
|
|
category.created_at = datetime.fromisoformat(cat_data['created_at'].replace('Z', '+00:00'))
|
|
except:
|
|
pass # Use default if parsing fails
|
|
db.session.add(category)
|
|
|
|
# Import questions
|
|
for q_data in manifest['questions']:
|
|
# Reconstruct paths from filenames
|
|
image_path = None
|
|
if q_data.get('image_filename'):
|
|
image_path = f"/static/images/{q_data['image_filename']}"
|
|
|
|
audio_path = None
|
|
if q_data.get('audio_filename'):
|
|
audio_path = f"/static/audio/{q_data['audio_filename']}"
|
|
|
|
question = Question(
|
|
type=QuestionType(q_data['type']),
|
|
question_content=q_data['question_content'],
|
|
answer=q_data['answer'],
|
|
category=q_data.get('category'),
|
|
image_path=image_path,
|
|
audio_path=audio_path,
|
|
youtube_url=q_data.get('youtube_url'),
|
|
start_time=q_data.get('start_time'),
|
|
end_time=q_data.get('end_time')
|
|
)
|
|
|
|
# Preserve created_at if provided
|
|
if q_data.get('created_at'):
|
|
try:
|
|
question.created_at = datetime.fromisoformat(q_data['created_at'].replace('Z', '+00:00'))
|
|
except:
|
|
pass # Use default if parsing fails
|
|
|
|
db.session.add(question)
|
|
|
|
# Commit transaction
|
|
db.session.commit()
|
|
|
|
return {
|
|
'success': True,
|
|
'questions_imported': len(manifest['questions']),
|
|
'categories_imported': len(manifest['categories'])
|
|
}
|
|
|
|
except Exception as e:
|
|
# Rollback database changes
|
|
db.session.rollback()
|
|
# Note: File operations cannot be rolled back
|
|
raise
|
|
|
|
finally:
|
|
# Clean up temp directory
|
|
if temp_dir:
|
|
try:
|
|
shutil.rmtree(temp_dir)
|
|
except Exception as e:
|
|
print(f"Warning: Failed to clean up temp directory {temp_dir}: {e}")
|