Files
triviathang/backend/services/game_service.py
Ryan Chen 46d45eebb6 Add right/wrong marking with steal mechanism and question stats
Adds Correct/Wrong buttons to admin panel that track question
answer stats across games and implement a steal system where
the next team gets one chance to answer if the first team is wrong.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-03 14:40:58 -04:00

457 lines
14 KiB
Python

from backend.models import db, Game, Score, Team
from flask_socketio import emit
def get_ordered_teams(game):
"""Get teams sorted by ID for consistent turn ordering"""
return sorted(game.teams, key=lambda t: t.id)
def get_game_state(game):
"""Get current game state with all necessary information"""
current_question = game.get_current_question()
state = {
'game_id': game.id,
'game_name': game.name,
'current_question_index': game.current_question_index,
'total_questions': len(game.game_questions),
'is_active': game.is_active,
'teams': [team.to_dict() for team in game.teams],
'current_turn_team_id': game.current_turn_team_id,
'current_turn_team_name': game.current_turn_team.name if game.current_turn_team else None,
'steal_active': game.steal_active
}
if current_question:
state['current_question'] = current_question.to_dict(include_answer=False)
return state
def get_admin_game_state(game):
"""Get game state with answer (for admin only)"""
state = get_game_state(game)
current_question = game.get_current_question()
if current_question:
state['current_question'] = current_question.to_dict(include_answer=True)
return state
def start_game(game, socketio_instance):
"""Start/activate a game"""
# Deactivate any other active games
active_games = Game.query.filter_by(is_active=True).all()
for g in active_games:
if g.id != game.id:
g.is_active = False
game.is_active = True
game.current_question_index = 0
game.steal_active = False
# Set initial turn to the first team (by ID order)
ordered_teams = get_ordered_teams(game)
if ordered_teams:
game.current_turn_team_id = ordered_teams[0].id
else:
game.current_turn_team_id = None
db.session.commit()
# Emit game_started event
socketio_instance.emit('game_started', {
'game_id': game.id,
'game_name': game.name,
'total_questions': len(game.game_questions)
}, room=f'game_{game.id}_contestant')
socketio_instance.emit('game_started', {
'game_id': game.id,
'game_name': game.name,
'total_questions': len(game.game_questions)
}, room=f'game_{game.id}_admin')
# Emit first question
broadcast_question_change(game, socketio_instance)
# Emit initial turn
broadcast_turn_change(game, socketio_instance)
def next_question(game, socketio_instance):
"""Move to next question"""
if game.current_question_index < len(game.game_questions) - 1:
game.current_question_index += 1
game.steal_active = False
db.session.commit()
broadcast_question_change(game, socketio_instance)
broadcast_steal_state(game, socketio_instance)
return True
return False
def previous_question(game, socketio_instance):
"""Move to previous question"""
if game.current_question_index > 0:
game.current_question_index -= 1
game.steal_active = False
db.session.commit()
broadcast_question_change(game, socketio_instance)
broadcast_steal_state(game, socketio_instance)
return True
return False
def broadcast_question_change(game, socketio_instance):
"""Broadcast question change to all connected clients"""
current_question = game.get_current_question()
if not current_question:
return
# Determine previous category for category slide transitions
prev_category = None
if game.current_question_index > 0:
prev_gq = game.game_questions[game.current_question_index - 1]
prev_category = prev_gq.question.category
current_category = current_question.category
category_changed = current_category != prev_category
# Emit to contestant room (without answer)
socketio_instance.emit('question_changed', {
'question_index': game.current_question_index,
'question': current_question.to_dict(include_answer=False),
'total_questions': len(game.game_questions),
'previous_category': prev_category,
'category_changed': category_changed
}, room=f'game_{game.id}_contestant')
# Emit to admin room (with answer)
socketio_instance.emit('question_with_answer', {
'question_index': game.current_question_index,
'question': current_question.to_dict(include_answer=True),
'total_questions': len(game.game_questions),
'previous_category': prev_category,
'category_changed': category_changed
}, room=f'game_{game.id}_admin')
def award_points(game, team, points, socketio_instance):
"""Award points to a team for the current question"""
# Check if score already exists for this team and question
existing_score = Score.query.filter_by(
team_id=team.id,
question_index=game.current_question_index
).first()
if existing_score:
# Add to existing score
existing_score.points += points
else:
# Create new score
score = Score(
team_id=team.id,
game_id=game.id,
question_index=game.current_question_index,
points=points
)
db.session.add(score)
db.session.commit()
# Get all team scores with full data including lifelines
all_scores = [t.to_dict() for t in game.teams]
# Broadcast score update
score_data = {
'team_id': team.id,
'team_name': team.name,
'new_score': team.total_score,
'points_awarded': points,
'all_scores': all_scores
}
socketio_instance.emit('score_updated', score_data, room=f'game_{game.id}_contestant')
socketio_instance.emit('score_updated', score_data, room=f'game_{game.id}_admin')
def toggle_answer_visibility(game, show_answer, socketio_instance):
"""Toggle answer visibility on contestant screen"""
current_question = game.get_current_question()
if not current_question:
return
answer_data = {
'show_answer': show_answer
}
if show_answer:
answer_data['answer'] = current_question.answer
# Broadcast to contestant room only
socketio_instance.emit('answer_visibility_changed', answer_data, room=f'game_{game.id}_contestant')
def toggle_timer_pause(game, paused, socketio_instance):
"""Pause or resume the timer"""
# Broadcast timer pause state to both rooms
socketio_instance.emit('timer_paused', {
'paused': paused
}, room=f'game_{game.id}_contestant')
socketio_instance.emit('timer_paused', {
'paused': paused
}, room=f'game_{game.id}_admin')
def reset_timer(game, socketio_instance):
"""Reset the timer to 30 seconds"""
# Broadcast timer reset to both rooms
socketio_instance.emit('timer_reset', {
'seconds': 30
}, room=f'game_{game.id}_contestant')
socketio_instance.emit('timer_reset', {
'seconds': 30
}, room=f'game_{game.id}_admin')
def end_game(game, socketio_instance):
"""End/deactivate a game and record winners"""
from datetime import datetime
# Calculate winners (team(s) with highest score)
if game.teams:
team_scores = [(team.name, team.total_score) for team in game.teams]
if team_scores:
max_score = max(score for _, score in team_scores)
winners = [
{"team_name": name, "score": score}
for name, score in team_scores
if score == max_score
]
game.winners = winners
game.is_active = False
game.completed_at = datetime.utcnow()
db.session.commit()
# Emit game_ended event with winners
end_data = {
'game_id': game.id,
'game_name': game.name,
'winners': game.winners
}
socketio_instance.emit('game_ended', end_data, room=f'game_{game.id}_contestant')
socketio_instance.emit('game_ended', end_data, room=f'game_{game.id}_admin')
def restart_game(game, socketio_instance):
"""Restart a game (clear scores and reset to waiting state)"""
# Clear all scores for this game
Score.query.filter_by(game_id=game.id).delete()
# Reset game state
game.is_active = False
game.current_question_index = 0
game.current_turn_team_id = None # Reset turn
game.steal_active = False
# Reset phone-a-friend lifelines for all teams
for team in game.teams:
team.phone_a_friend_count = 5
db.session.commit()
# Emit game_ended event to reset contestant view
socketio_instance.emit('game_ended', {
'game_id': game.id,
'game_name': game.name
}, room=f'game_{game.id}_contestant')
# Emit score update to show cleared scores and reset lifelines
socketio_instance.emit('score_updated', {
'team_id': None,
'team_name': None,
'new_score': 0,
'points_awarded': 0,
'all_scores': [t.to_dict() for t in game.teams]
}, room=f'game_{game.id}_contestant')
socketio_instance.emit('score_updated', {
'team_id': None,
'team_name': None,
'new_score': 0,
'points_awarded': 0,
'all_scores': [t.to_dict() for t in game.teams]
}, room=f'game_{game.id}_admin')
def broadcast_lifeline_update(game, team, socketio_instance):
"""Broadcast phone-a-friend lifeline update"""
# Get all team scores with updated lifeline counts
all_scores = [t.to_dict() for t in game.teams]
lifeline_data = {
'team_id': team.id,
'team_name': team.name,
'phone_a_friend_count': team.phone_a_friend_count,
'all_scores': all_scores
}
socketio_instance.emit('lifeline_updated', lifeline_data, room=f'game_{game.id}_contestant')
socketio_instance.emit('lifeline_updated', lifeline_data, room=f'game_{game.id}_admin')
def broadcast_audio_play(game, socketio_instance):
"""Broadcast audio play command to contestants"""
socketio_instance.emit('audio_play', {
'game_id': game.id
}, room=f'game_{game.id}_contestant')
def broadcast_audio_pause(game, socketio_instance):
"""Broadcast audio pause command to contestants"""
socketio_instance.emit('audio_pause', {
'game_id': game.id
}, room=f'game_{game.id}_contestant')
def broadcast_audio_stop(game, socketio_instance):
"""Broadcast audio stop command to contestants"""
socketio_instance.emit('audio_stop', {
'game_id': game.id
}, room=f'game_{game.id}_contestant')
def broadcast_audio_seek(game, position, socketio_instance):
"""Broadcast audio seek command to contestants"""
socketio_instance.emit('audio_seek', {
'game_id': game.id,
'position': position
}, room=f'game_{game.id}_contestant')
def advance_turn(game, socketio_instance):
"""Advance to the next team's turn (cycles through teams by ID order)"""
ordered_teams = get_ordered_teams(game)
if not ordered_teams:
return None
if game.current_turn_team_id is None:
# No current turn, set to first team
next_team = ordered_teams[0]
else:
# Find current team index and advance to next
current_index = None
for i, team in enumerate(ordered_teams):
if team.id == game.current_turn_team_id:
current_index = i
break
if current_index is None:
# Current team not found, set to first
next_team = ordered_teams[0]
else:
# Cycle to next team (modulo for wrap-around)
next_index = (current_index + 1) % len(ordered_teams)
next_team = ordered_teams[next_index]
game.current_turn_team_id = next_team.id
db.session.commit()
# Broadcast turn change to both rooms
broadcast_turn_change(game, socketio_instance)
return next_team
def mark_correct(game, socketio_instance):
"""Mark the current question as answered correctly by the current turn team.
Awards 1 point, increments question stats, resets steal state."""
current_question = game.get_current_question()
if not current_question:
return None
team = game.current_turn_team
if not team:
return None
# Increment question stats
current_question.times_correct += 1
# Reset steal state
game.steal_active = False
db.session.commit()
# Award 1 point
award_points(game, team, 1, socketio_instance)
# Broadcast steal state reset
broadcast_steal_state(game, socketio_instance)
return team
def mark_wrong(game, socketio_instance):
"""Mark the current question as answered incorrectly.
If steal not active: enters steal mode, advances turn.
If steal active: both teams failed, resets steal."""
current_question = game.get_current_question()
if not current_question:
return {'steal_active': False, 'both_failed': True}
# Increment question stats
current_question.times_incorrect += 1
if not game.steal_active:
# First wrong answer - enter steal mode, advance turn
game.steal_active = True
db.session.commit()
advance_turn(game, socketio_instance)
broadcast_steal_state(game, socketio_instance)
return {'steal_active': True, 'both_failed': False}
else:
# Second wrong answer - both teams failed
game.steal_active = False
db.session.commit()
broadcast_steal_state(game, socketio_instance)
return {'steal_active': False, 'both_failed': True}
def broadcast_steal_state(game, socketio_instance):
"""Broadcast steal state to all connected clients"""
steal_data = {
'steal_active': game.steal_active,
'current_turn_team_id': game.current_turn_team_id,
'current_turn_team_name': game.current_turn_team.name if game.current_turn_team else None,
}
socketio_instance.emit('steal_state_changed', steal_data, room=f'game_{game.id}_contestant')
socketio_instance.emit('steal_state_changed', steal_data, room=f'game_{game.id}_admin')
def broadcast_turn_change(game, socketio_instance):
"""Broadcast turn change to all connected clients"""
ordered_teams = get_ordered_teams(game)
turn_data = {
'current_turn_team_id': game.current_turn_team_id,
'current_turn_team_name': game.current_turn_team.name if game.current_turn_team else None,
'all_teams': [{'id': t.id, 'name': t.name} for t in ordered_teams]
}
socketio_instance.emit('turn_changed', turn_data, room=f'game_{game.id}_contestant')
socketio_instance.emit('turn_changed', turn_data, room=f'game_{game.id}_admin')