Questions now have a created_by field linking to the user who created them. Users only see questions they own or that have been shared with them. Includes share dialog, user search, bulk sharing, and export/import respects ownership. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
331 lines
13 KiB
Python
331 lines
13 KiB
Python
from datetime import datetime
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from sqlalchemy import Enum
|
|
import enum
|
|
|
|
db = SQLAlchemy()
|
|
|
|
|
|
class User(db.Model):
|
|
"""User model for authenticated users via Authelia OIDC"""
|
|
__tablename__ = 'users'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
authelia_sub = db.Column(db.String(255), unique=True, nullable=False) # OIDC 'sub' claim
|
|
email = db.Column(db.String(255), nullable=True) # Email may not always be provided by OIDC
|
|
name = db.Column(db.String(255), nullable=True)
|
|
preferred_username = db.Column(db.String(100), nullable=True)
|
|
groups = db.Column(db.JSON, default=list, nullable=False) # Array of group names from OIDC
|
|
is_active = db.Column(db.Boolean, default=True)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
last_login = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
|
|
@property
|
|
def is_admin(self):
|
|
"""Check if user is in trivia-admins group"""
|
|
return 'trivia-admins' in (self.groups or [])
|
|
|
|
def to_dict(self):
|
|
"""Convert user to dictionary"""
|
|
return {
|
|
'id': self.id,
|
|
'email': self.email,
|
|
'name': self.name,
|
|
'username': self.preferred_username,
|
|
'groups': self.groups,
|
|
'is_admin': self.is_admin,
|
|
'last_login': self.last_login.isoformat() if self.last_login else None
|
|
}
|
|
|
|
|
|
class QuestionType(enum.Enum):
|
|
"""Enum for question types"""
|
|
TEXT = "text"
|
|
IMAGE = "image"
|
|
YOUTUBE_AUDIO = "youtube_audio"
|
|
|
|
|
|
class Category(db.Model):
|
|
"""Category model for organizing questions"""
|
|
__tablename__ = 'categories'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(100), nullable=False, unique=True)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
|
|
def to_dict(self):
|
|
"""Convert category to dictionary"""
|
|
return {
|
|
'id': self.id,
|
|
'name': self.name,
|
|
'created_at': self.created_at.isoformat() if self.created_at else None
|
|
}
|
|
|
|
|
|
class Question(db.Model):
|
|
"""Question model for trivia questions"""
|
|
__tablename__ = 'questions'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
type = db.Column(Enum(QuestionType), nullable=False, default=QuestionType.TEXT)
|
|
question_content = db.Column(db.Text, nullable=False)
|
|
answer = db.Column(db.Text, nullable=False)
|
|
image_path = db.Column(db.String(255), nullable=True) # For image questions
|
|
category = db.Column(db.String(100), nullable=True) # Question category (e.g., "History", "Science")
|
|
|
|
# YouTube audio fields
|
|
youtube_url = db.Column(db.String(500), nullable=True) # YouTube video URL
|
|
audio_path = db.Column(db.String(255), nullable=True) # Path to trimmed audio file
|
|
start_time = db.Column(db.Integer, nullable=True) # Start time in seconds
|
|
end_time = db.Column(db.Integer, nullable=True) # End time in seconds
|
|
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
created_by = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=True)
|
|
|
|
# Relationships
|
|
creator = db.relationship('User', backref='questions')
|
|
game_questions = db.relationship('GameQuestion', back_populates='question', cascade='all, delete-orphan')
|
|
shares = db.relationship('QuestionShare', backref='question', cascade='all, delete-orphan')
|
|
|
|
def is_visible_to(self, user):
|
|
"""Check if this question is visible to the given user"""
|
|
if self.created_by == user.id:
|
|
return True
|
|
return any(s.shared_with_user_id == user.id for s in self.shares)
|
|
|
|
def is_owned_by(self, user):
|
|
"""Check if this question is owned by the given user"""
|
|
return self.created_by == user.id
|
|
|
|
def to_dict(self, include_answer=False):
|
|
"""Convert question to dictionary"""
|
|
data = {
|
|
'id': self.id,
|
|
'type': self.type.value,
|
|
'question_content': self.question_content,
|
|
'image_path': self.image_path,
|
|
'youtube_url': self.youtube_url,
|
|
'audio_path': self.audio_path,
|
|
'start_time': self.start_time,
|
|
'end_time': self.end_time,
|
|
'category': self.category,
|
|
'created_at': self.created_at.isoformat() if self.created_at else None,
|
|
'created_by': self.created_by,
|
|
'creator_name': (self.creator.name or self.creator.preferred_username) if self.creator else None,
|
|
}
|
|
if include_answer:
|
|
data['answer'] = self.answer
|
|
return data
|
|
|
|
|
|
class QuestionShare(db.Model):
|
|
"""Junction table for sharing questions between users"""
|
|
__tablename__ = 'question_shares'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
question_id = db.Column(db.Integer, db.ForeignKey('questions.id'), nullable=False)
|
|
shared_with_user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
|
|
shared_by_user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
|
|
shared_with_user = db.relationship('User', foreign_keys=[shared_with_user_id], backref='shared_questions')
|
|
shared_by_user = db.relationship('User', foreign_keys=[shared_by_user_id])
|
|
|
|
__table_args__ = (
|
|
db.UniqueConstraint('question_id', 'shared_with_user_id', name='unique_question_share'),
|
|
)
|
|
|
|
def to_dict(self):
|
|
return {
|
|
'id': self.id,
|
|
'question_id': self.question_id,
|
|
'shared_with_user_id': self.shared_with_user_id,
|
|
'shared_with_user_name': (self.shared_with_user.name or self.shared_with_user.preferred_username) if self.shared_with_user else None,
|
|
'shared_by_user_id': self.shared_by_user_id,
|
|
'created_at': self.created_at.isoformat() if self.created_at else None,
|
|
}
|
|
|
|
|
|
class Game(db.Model):
|
|
"""Game model representing a trivia game session"""
|
|
__tablename__ = 'games'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(200), nullable=False)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
is_active = db.Column(db.Boolean, default=False) # Only one game active at a time
|
|
current_question_index = db.Column(db.Integer, default=0) # Track current question
|
|
is_template = db.Column(db.Boolean, default=False) # Mark as reusable template
|
|
current_turn_team_id = db.Column(db.Integer, db.ForeignKey('teams.id'), nullable=True) # Track whose turn it is
|
|
completed_at = db.Column(db.DateTime, nullable=True) # When game ended
|
|
winners = db.Column(db.JSON, nullable=True) # [{"team_name": str, "score": int}, ...]
|
|
|
|
# Relationships
|
|
teams = db.relationship('Team', back_populates='game', cascade='all, delete-orphan',
|
|
foreign_keys='Team.game_id')
|
|
game_questions = db.relationship('GameQuestion', back_populates='game',
|
|
cascade='all, delete-orphan',
|
|
order_by='GameQuestion.order')
|
|
scores = db.relationship('Score', back_populates='game', cascade='all, delete-orphan')
|
|
current_turn_team = db.relationship('Team', foreign_keys=[current_turn_team_id], post_update=True)
|
|
|
|
@classmethod
|
|
def get_active(cls):
|
|
"""Get the currently active game"""
|
|
return cls.query.filter_by(is_active=True).first()
|
|
|
|
def get_current_question(self):
|
|
"""Get the current question in the game"""
|
|
if 0 <= self.current_question_index < len(self.game_questions):
|
|
return self.game_questions[self.current_question_index].question
|
|
return None
|
|
|
|
def to_dict(self, include_questions=False, include_teams=False):
|
|
"""Convert game to dictionary"""
|
|
data = {
|
|
'id': self.id,
|
|
'name': self.name,
|
|
'created_at': self.created_at.isoformat() if self.created_at else None,
|
|
'is_active': self.is_active,
|
|
'current_question_index': self.current_question_index,
|
|
'total_questions': len(self.game_questions),
|
|
'is_template': self.is_template,
|
|
'current_turn_team_id': self.current_turn_team_id,
|
|
'current_turn_team_name': self.current_turn_team.name if self.current_turn_team else None,
|
|
'completed_at': self.completed_at.isoformat() if self.completed_at else None,
|
|
'winners': self.winners
|
|
}
|
|
|
|
if include_questions:
|
|
data['questions'] = [gq.to_dict() for gq in self.game_questions]
|
|
|
|
if include_teams:
|
|
data['teams'] = [team.to_dict() for team in self.teams]
|
|
|
|
return data
|
|
|
|
|
|
class GameQuestion(db.Model):
|
|
"""Junction table linking games to questions with ordering"""
|
|
__tablename__ = 'game_questions'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
game_id = db.Column(db.Integer, db.ForeignKey('games.id'), nullable=False)
|
|
question_id = db.Column(db.Integer, db.ForeignKey('questions.id'), nullable=False)
|
|
order = db.Column(db.Integer, nullable=False) # Question order in game
|
|
|
|
# Relationships
|
|
game = db.relationship('Game', back_populates='game_questions')
|
|
question = db.relationship('Question', back_populates='game_questions')
|
|
|
|
__table_args__ = (
|
|
db.UniqueConstraint('game_id', 'order', name='unique_game_question_order'),
|
|
)
|
|
|
|
def to_dict(self):
|
|
"""Convert to dictionary"""
|
|
return {
|
|
'order': self.order,
|
|
'question': self.question.to_dict()
|
|
}
|
|
|
|
|
|
class Team(db.Model):
|
|
"""Team model representing a trivia team"""
|
|
__tablename__ = 'teams'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(100), nullable=False)
|
|
game_id = db.Column(db.Integer, db.ForeignKey('games.id'), nullable=False)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
phone_a_friend_count = db.Column(db.Integer, default=5) # Number of phone-a-friend lifelines
|
|
|
|
# Relationships
|
|
game = db.relationship('Game', back_populates='teams', foreign_keys=[game_id])
|
|
scores = db.relationship('Score', back_populates='team', cascade='all, delete-orphan')
|
|
|
|
@property
|
|
def total_score(self):
|
|
"""Calculate total score for the team"""
|
|
return sum(score.points for score in self.scores)
|
|
|
|
def to_dict(self):
|
|
"""Convert team to dictionary"""
|
|
return {
|
|
'id': self.id,
|
|
'name': self.name,
|
|
'game_id': self.game_id,
|
|
'total_score': self.total_score,
|
|
'phone_a_friend_count': self.phone_a_friend_count,
|
|
'created_at': self.created_at.isoformat() if self.created_at else None
|
|
}
|
|
|
|
|
|
class Score(db.Model):
|
|
"""Score model tracking points per team per question"""
|
|
__tablename__ = 'scores'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
team_id = db.Column(db.Integer, db.ForeignKey('teams.id'), nullable=False)
|
|
game_id = db.Column(db.Integer, db.ForeignKey('games.id'), nullable=False)
|
|
question_index = db.Column(db.Integer, nullable=False) # Which question this score is for
|
|
points = db.Column(db.Integer, default=0)
|
|
awarded_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
|
|
# Relationships
|
|
team = db.relationship('Team', back_populates='scores')
|
|
game = db.relationship('Game', back_populates='scores')
|
|
|
|
__table_args__ = (
|
|
db.UniqueConstraint('team_id', 'question_index', name='unique_team_question_score'),
|
|
)
|
|
|
|
def to_dict(self):
|
|
"""Convert score to dictionary"""
|
|
return {
|
|
'id': self.id,
|
|
'team_id': self.team_id,
|
|
'game_id': self.game_id,
|
|
'question_index': self.question_index,
|
|
'points': self.points,
|
|
'awarded_at': self.awarded_at.isoformat() if self.awarded_at else None
|
|
}
|
|
|
|
|
|
class DownloadJobStatus(enum.Enum):
|
|
"""Enum for download job statuses"""
|
|
PENDING = "pending"
|
|
PROCESSING = "processing"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
|
|
|
|
class DownloadJob(db.Model):
|
|
"""Track YouTube audio download jobs"""
|
|
__tablename__ = 'download_jobs'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
question_id = db.Column(db.Integer, db.ForeignKey('questions.id'), nullable=False)
|
|
celery_task_id = db.Column(db.String(255), nullable=False, unique=True)
|
|
status = db.Column(Enum(DownloadJobStatus), default=DownloadJobStatus.PENDING, nullable=False)
|
|
progress = db.Column(db.Integer, default=0) # 0-100
|
|
error_message = db.Column(db.Text, nullable=True)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
completed_at = db.Column(db.DateTime, nullable=True)
|
|
|
|
# Relationships
|
|
question = db.relationship('Question', backref='download_job')
|
|
|
|
def to_dict(self):
|
|
"""Convert download job to dictionary"""
|
|
return {
|
|
'id': self.id,
|
|
'question_id': self.question_id,
|
|
'task_id': self.celery_task_id,
|
|
'status': self.status.value,
|
|
'progress': self.progress,
|
|
'error_message': self.error_message,
|
|
'created_at': self.created_at.isoformat() if self.created_at else None,
|
|
'completed_at': self.completed_at.isoformat() if self.completed_at else None
|
|
}
|