SOULGATE/server/game_loop.py
2026-05-04 03:42:47 +02:00

558 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# game_loop.py : boucle 20Hz du serveur (mouvement, projectiles, ennemis, vagues, broadcast)
import asyncio
import logging
import time
from boss_ai import nearest_player, update_boss_ai
from buff_system import player_damage_mult, player_has_buff, player_is_immune, player_speed_mult, update_buffs
from combat import circle_overlap, normalize
from displacement import handle_displacement
from waves import WaveManager
from constants import (
KAEL_MELEE_RADIUS, KAEL_MELEE_DAMAGE,
KAEL_SLAM_RADIUS, KAEL_SLAM_TICKS, KAEL_SLAM_DMG_PER_TICK,
KAEL_STORM_RADIUS, KAEL_STORM_TICKS, KAEL_STORM_DMG_PER_TICK,
SERIS_FAN_COUNT, SERIS_FAN_SPREAD, SERIS_FAN_DAMAGE,
SERIS_VOID_RADIUS, SERIS_VOID_TICKS, SERIS_VOID_DMG_PER_TICK,
ALDRIC_HEAL_RADIUS, ALDRIC_HEAL_AMOUNT,
ALDRIC_PULSE_RADIUS, ALDRIC_PULSE_TICKS, ALDRIC_PULSE_DMG_PER_TICK,
ABILITY_COOLDOWNS, ARENA_HEIGHT, ARENA_WIDTH,
ATTACK_COOLDOWN,
ENEMY_ATTACK_COOLDOWN, ENEMY_MAIN_SPAWN_X, ENEMY_MAIN_SPAWN_Y, ENEMY_STATS,
PLAYER_HITBOX_RADIUS, PLAYER_MAX_HP, PLAYER_SPAWN_POSITIONS, PLAYER_SPEED,
PROJECTILE_DAMAGE, PROJECTILE_RADIUS, PROJECTILE_SPEED, PROJECTILE_TTL,
SOULGATE_HITBOX_RADIUS, SOULGATE_MAX_HP, SOULGATE_X, SOULGATE_Y, TICK_DURATION,
SOUL_REWARDS, UPGRADE_CATALOG,
VEXARIS_HP, MORVETH_HP,
)
from game_state import AoeZone, EnemyState, GameState, PlayerState, ProjectileState, SoulgateState, WaveState
logger = logging.getLogger(__name__)
class GameLoop:
def __init__(self, lobby, broadcast):
self.lobby = lobby
self._broadcast = broadcast
self.state = self._init_state()
self._running = False
self._inputs = {} # {conn_id: (dx, dy)} dernier input de mouvement
self._proj_counter = 0
self._enemy_counter = 0
self._aoe_counter = 0
self._wave_manager = WaveManager()
self._pending_spawns = [] # spawns au prochain tick (eviter de modif la liste pendant l'iteration)
self._debug_invincible = False
self._debug_one_shot = False
def _init_state(self):
players = [
PlayerState(
id=p.conn_id,
class_name=p.player_class,
username=p.username,
x=float(PLAYER_SPAWN_POSITIONS[i][0]),
y=float(PLAYER_SPAWN_POSITIONS[i][1]),
hp=PLAYER_MAX_HP[p.player_class],
max_hp=PLAYER_MAX_HP[p.player_class],
alive=True,
souls=50, # ames de depart pour pouvoir acheter qq upgrades dans la 1ere prep
)
for i, p in enumerate(self.lobby.players)
]
return GameState(
tick=0,
players=players,
enemies=[],
projectiles=[],
soulgate=SoulgateState(hp=SOULGATE_MAX_HP, max_hp=SOULGATE_MAX_HP),
wave=WaveState(number=1, phase=0, enemies_remaining=0, state="preparation"),
effects=[],
)
def _get_player(self, conn_id):
return next((p for p in self.state.players if p.id == conn_id), None)
# --- inputs (appeles depuis main.py au reception ws) ---
def handle_input(self, conn_id, dx, dy):
self._inputs[conn_id] = (dx, dy)
def handle_attack(self, conn_id, tx, ty):
# melee Kael / projectile Seris+Aldric
player = self._get_player(conn_id)
if not player or not player.alive:
return
if player_has_buff(player, "casting") or player.cooldowns.get("attack", 0) > 0:
return
dx, dy = normalize(tx - player.x, ty - player.y)
if dx == 0.0 and dy == 0.0:
return
cls = player.class_name
cd = ATTACK_COOLDOWN * (1 - 0.20 * player.upgrades.get("cooldown_down", 0))
if cls == "kael":
self._kael_melee(player, dx, dy)
else:
damage = int(PROJECTILE_DAMAGE[cls] * (1 + 0.25 * player.upgrades.get("damage_up", 0)))
damage = int(damage * player_damage_mult(player))
self._proj_counter += 1
self.state.projectiles.append(ProjectileState(
id=f"pr_{self._proj_counter}",
owner_id=conn_id,
x=player.x, y=player.y,
vx=dx * PROJECTILE_SPEED[cls],
vy=dy * PROJECTILE_SPEED[cls],
damage=damage,
radius=PROJECTILE_RADIUS,
ttl=PROJECTILE_TTL[cls],
))
player.cooldowns["attack"] = cd
def _kael_melee(self, player, dx, dy):
# frappe melee Kael, demi-cercle devant
import math
# FIXME le bonus de damage_up je sais pas si c'est bien 25% par stack
damage = int(KAEL_MELEE_DAMAGE * (1 + 0.25 * player.upgrades.get("damage_up", 0)))
damage = int(damage * player_damage_mult(player))
dead = set()
for e in self.state.enemies:
ex = e.x - player.x
ey = e.y - player.y
d = math.sqrt(ex*ex + ey*ey)
if d > KAEL_MELEE_RADIUS:
continue
# check qu'on tape devant et pas derriere (produit scalaire)
if d > 0:
if dx * (ex/d) + dy * (ey/d) < 0:
continue
e.hp = max(0, e.hp - damage)
if e.hp <= 0:
dead.add(e.id)
player.souls += SOUL_REWARDS.get(e.type, 10)
if dead:
self.state.enemies = [x for x in self.state.enemies if x.id not in dead]
def handle_ability(self, conn_id, ability_id, tx, ty):
# touches 1 et 2 (skill par classe)
player = self._get_player(conn_id)
if not player or not player.alive:
return
key = f"ability_{ability_id}"
if player.cooldowns.get(key, 0) > 0:
return
cooldown_list = ABILITY_COOLDOWNS.get(player.class_name, [8.0, 12.0])
idx = ability_id - 1
if not (0 <= idx < len(cooldown_list)):
return
cls = player.class_name
if cls == "kael":
if ability_id == 1: self._kael_slam(player)
elif ability_id == 2: self._kael_storm(player)
elif cls == "seris":
if ability_id == 1: self._seris_fan(player, tx, ty)
elif ability_id == 2: self._seris_void(player)
elif cls == "aldric":
if ability_id == 1: self._aldric_heal(player)
elif ability_id == 2: self._aldric_pulse(player)
player.cooldowns[key] = cooldown_list[idx]
def _kael_slam(self, player):
self._aoe_counter += 1
dmg = int(KAEL_SLAM_DMG_PER_TICK * (1 + 0.25 * player.upgrades.get("damage_up", 0)))
dmg = int(dmg * player_damage_mult(player))
self.state.aoe_zones.append(AoeZone(
id=f"aoe_{self._aoe_counter}",
owner_id=player.id,
zone_type="slam",
x=player.x, y=player.y,
radius=KAEL_SLAM_RADIUS,
damage_per_tick=dmg,
ticks_remaining=KAEL_SLAM_TICKS,
max_ticks=KAEL_SLAM_TICKS,
))
def _kael_storm(self, player):
# tempete : grosse zone aoe sur plusieurs ticks
self._aoe_counter += 1
dmg = int(KAEL_STORM_DMG_PER_TICK * (1 + 0.25 * player.upgrades.get("damage_up", 0)))
dmg = int(dmg * player_damage_mult(player))
self.state.aoe_zones.append(AoeZone(
id=f"aoe_{self._aoe_counter}",
owner_id=player.id,
zone_type="storm",
x=player.x, y=player.y,
radius=KAEL_STORM_RADIUS,
damage_per_tick=dmg,
ticks_remaining=KAEL_STORM_TICKS,
max_ticks=KAEL_STORM_TICKS,
))
def _seris_fan(self, player, tx, ty):
# 3 dagues en eventail vers la souris
import math
dx, dy = normalize(tx - player.x, ty - player.y)
if dx == 0.0 and dy == 0.0:
return
base_angle = math.atan2(dy, dx)
dmg = int(SERIS_FAN_DAMAGE * (1 + 0.25 * player.upgrades.get("damage_up", 0)))
dmg = int(dmg * player_damage_mult(player))
for i in range(SERIS_FAN_COUNT):
# repartition symetrique : -spread, 0, +spread (avec count impair)
offset = SERIS_FAN_SPREAD * (i - (SERIS_FAN_COUNT - 1) / 2)
angle = base_angle + offset
vx = math.cos(angle) * PROJECTILE_SPEED["seris"]
vy = math.sin(angle) * PROJECTILE_SPEED["seris"]
self._proj_counter += 1
self.state.projectiles.append(ProjectileState(
id=f"pr_{self._proj_counter}",
owner_id=player.id,
x=player.x, y=player.y,
vx=vx, vy=vy,
damage=dmg,
radius=PROJECTILE_RADIUS,
ttl=PROJECTILE_TTL["seris"],
))
def _seris_void(self, player):
self._aoe_counter += 1
dmg = int(SERIS_VOID_DMG_PER_TICK * (1 + 0.25 * player.upgrades.get("damage_up", 0)))
dmg = int(dmg * player_damage_mult(player))
self.state.aoe_zones.append(AoeZone(
id=f"aoe_{self._aoe_counter}",
owner_id=player.id,
zone_type="void",
x=player.x, y=player.y,
radius=SERIS_VOID_RADIUS,
damage_per_tick=dmg,
ticks_remaining=SERIS_VOID_TICKS,
max_ticks=SERIS_VOID_TICKS,
))
def _aldric_heal(self, player):
# halo de soin sur tous les joueurs dans le rayon
for t in self.state.players:
if not t.alive:
continue
if circle_overlap(player.x, player.y, ALDRIC_HEAL_RADIUS, t.x, t.y, PLAYER_HITBOX_RADIUS):
t.hp = min(t.hp + ALDRIC_HEAL_AMOUNT, t.max_hp)
def _aldric_pulse(self, player):
self._aoe_counter += 1
dmg = int(ALDRIC_PULSE_DMG_PER_TICK * (1 + 0.25 * player.upgrades.get("damage_up", 0)))
dmg = int(dmg * player_damage_mult(player))
self.state.aoe_zones.append(AoeZone(
id=f"aoe_{self._aoe_counter}",
owner_id=player.id,
zone_type="pulse",
x=player.x, y=player.y,
radius=ALDRIC_PULSE_RADIUS,
damage_per_tick=dmg,
ticks_remaining=ALDRIC_PULSE_TICKS,
max_ticks=ALDRIC_PULSE_TICKS,
))
def _process_aoe_zones(self):
import math
if not self.state.aoe_zones:
return
dead = set()
owner_map = {p.id: p for p in self.state.players}
for zone in self.state.aoe_zones:
owner = owner_map.get(zone.owner_id)
for e in self.state.enemies:
if e.id in dead:
continue
dx = e.x - zone.x
dy = e.y - zone.y
if math.sqrt(dx*dx + dy*dy) <= zone.radius:
e.hp = max(0, e.hp - zone.damage_per_tick)
if e.hp <= 0:
dead.add(e.id)
if owner:
owner.souls += SOUL_REWARDS.get(e.type, 10)
zone.ticks_remaining -= 1
if dead:
self.state.enemies = [e for e in self.state.enemies if e.id not in dead]
self.state.aoe_zones = [z for z in self.state.aoe_zones if z.ticks_remaining > 0]
def handle_displacement(self, conn_id, tx, ty):
# touche E, on delegue a displacement.py
p = self._get_player(conn_id)
if p:
handle_displacement(p, tx, ty, self.state)
def spawn_enemy(self, enemy_type):
self._enemy_counter += 1
eid = f"en_{self._enemy_counter}"
if enemy_type == "vexaris":
self.state.enemies.append(EnemyState(
id=eid, type="vexaris", x=0.0, y=12.0,
hp=VEXARIS_HP, max_hp=VEXARIS_HP, is_boss=True,
))
elif enemy_type == "morveth":
# TODO peut etre baisser ses HP si c trop dur
self.state.enemies.append(EnemyState(
id=eid, type="morveth", x=0.0, y=12.0,
hp=MORVETH_HP, max_hp=MORVETH_HP, is_boss=True,
))
else:
stats = ENEMY_STATS[enemy_type]
self.state.enemies.append(EnemyState(
id=eid, type=enemy_type,
x=float(ENEMY_MAIN_SPAWN_X), y=float(ENEMY_MAIN_SPAWN_Y),
hp=stats["hp"], max_hp=stats["hp"],
))
def _spawn_enemy_at(self, enemy_type, x, y):
# spawn a une position precise (pour les generaux de Morveth)
stats = ENEMY_STATS[enemy_type]
self._enemy_counter += 1
self.state.enemies.append(EnemyState(
id=f"en_{self._enemy_counter}", type=enemy_type,
x=x, y=y, hp=stats["hp"], max_hp=stats["hp"],
))
def _update_enemies(self):
sg_x, sg_y = float(SOULGATE_X), float(SOULGATE_Y)
alive_players = [p for p in self.state.players if p.alive]
surviving = []
for enemy in self.state.enemies:
if enemy.frozen_timer > 0:
# Ennemi gelé (sort divin Seris) → il ne bouge pas ce tick
surviving.append(enemy)
continue
if enemy.is_boss:
# Boss → IA spéciale dans boss_ai.py
update_boss_ai(enemy, self.state.players, self.state.soulgate, self._pending_spawns)
surviving.append(enemy) # les boss ne "meurent" pas via keep=False
continue
# Ennemi normal
keep = self._update_single_enemy(enemy, sg_x, sg_y, alive_players)
if keep:
surviving.append(enemy) # False = ennemi mort (a touché le Soulgate)
self.state.enemies = surviving # remplacer par la liste filtrée
# Spawner les ennemis en attente (ex: généraux de Morveth)
for etype, ex, ey in self._pending_spawns:
self._spawn_enemy_at(etype, ex, ey)
self._pending_spawns.clear() # vider la file d'attente
def _update_single_enemy(self, enemy, sg_x, sg_y, alive_players):
# retourne False si l'ennemi doit etre supprime (mort)
stats = ENEMY_STATS[enemy.type]
speed, e_radius = stats["speed"], stats["radius"]
if enemy.attack_cooldown > 0:
enemy.attack_cooldown = max(0.0, enemy.attack_cooldown - TICK_DURATION)
if stats["sg_damage"] > 0:
# vise le Soulgate (ex fracture)
return self._enemy_target_soulgate(enemy, sg_x, sg_y, e_radius, stats)
# vise les joueurs (rampant, eclat)
return self._enemy_target_player(enemy, alive_players, e_radius, speed, stats)
def _enemy_target_soulgate(self, enemy, sg_x, sg_y, e_radius, stats):
t_radius = float(SOULGATE_HITBOX_RADIUS)
if circle_overlap(enemy.x, enemy.y, e_radius, sg_x, sg_y, t_radius):
if enemy.attack_cooldown <= 0:
self.state.soulgate.hp = max(0, self.state.soulgate.hp - stats["sg_damage"])
return False
else:
dx, dy = normalize(sg_x - enemy.x, sg_y - enemy.y)
enemy.x += dx * stats["speed"] * TICK_DURATION
enemy.y += dy * stats["speed"] * TICK_DURATION
return True
def _enemy_target_player(self, enemy, alive_players, e_radius, speed, stats):
target = nearest_player(enemy.x, enemy.y, alive_players)
if target is None:
return True
t_radius = float(PLAYER_HITBOX_RADIUS)
in_contact = circle_overlap(enemy.x, enemy.y, e_radius, target.x, target.y, t_radius)
if in_contact and enemy.attack_cooldown <= 0:
if not player_is_immune(target):
target.hp = max(0, target.hp - stats["damage"])
if target.hp <= 0:
target.alive = False
enemy.attack_cooldown = ENEMY_ATTACK_COOLDOWN
if not in_contact:
dx, dy = normalize(target.x - enemy.x, target.y - enemy.y)
enemy.x += dx * speed * TICK_DURATION
enemy.y += dy * speed * TICK_DURATION
return True # l'ennemi reste vivant (il ne se consume pas)
def debug_kill_all(self):
self.state.enemies.clear()
self.state.projectiles.clear()
def debug_toggle_invincible(self):
self._debug_invincible = not self._debug_invincible
def debug_toggle_one_shot(self):
self._debug_one_shot = not self._debug_one_shot
def debug_revive_all(self):
for p in self.state.players:
p.hp = p.max_hp
p.alive = True
def handle_upgrade(self, conn_id, upgrade_id):
# achat d'upgrade pendant la phase de prep
if self._wave_manager.state != "preparation":
return
p = self._get_player(conn_id)
if not p or upgrade_id not in UPGRADE_CATALOG:
return
spec = UPGRADE_CATALOG[upgrade_id]
stacks = p.upgrades.get(upgrade_id, 0)
if stacks >= spec["max_stacks"] or p.souls < spec["cost"]:
return
p.souls -= spec["cost"]
p.upgrades[upgrade_id] = stacks + 1
if upgrade_id == "hp_up":
# cas special : augmente les hp max + soigne du delta
p.max_hp += 30
p.hp = min(p.hp + 30, p.max_hp)
def _apply_movement(self):
half_w, half_h = ARENA_WIDTH / 2, ARENA_HEIGHT / 2
for p in self.state.players:
if not p.alive or player_has_buff(p, "casting"):
continue
dx, dy = self._inputs.get(p.id, (0.0, 0.0))
if dx == 0.0 and dy == 0.0:
continue
speed = PLAYER_SPEED * (1 + 0.15 * p.upgrades.get("speed_up", 0))
speed *= player_speed_mult(p)
# clamp dans l'arene
p.x = max(-half_w, min(half_w, p.x + dx * speed * TICK_DURATION))
p.y = max(-half_h, min(half_h, p.y + dy * speed * TICK_DURATION))
def _update_projectiles(self):
half_w, half_h = ARENA_WIDTH / 2, ARENA_HEIGHT / 2
dead_enemy_ids = set()
surviving = []
for proj in self.state.projectiles:
# Déplacer le projectile : position += vitesse × durée tick
proj.x += proj.vx * TICK_DURATION
proj.y += proj.vy * TICK_DURATION
proj.ttl -= TICK_DURATION # réduire le temps de vie restant
# Vérifier si le projectile doit disparaître
if proj.ttl <= 0 or abs(proj.x) > half_w or abs(proj.y) > half_h:
continue # TTL écoulé ou hors arène → ne pas ajouter à surviving
# Vérifier les collisions avec les ennemis
hit = self._check_proj_hit(proj, dead_enemy_ids)
if not hit:
surviving.append(proj) # pas de collision → projectile continue
self.state.projectiles = surviving # remplacer par les projectiles encore en vol
# Supprimer les ennemis tués par des projectiles ce tick
if dead_enemy_ids:
self.state.enemies = [e for e in self.state.enemies if e.id not in dead_enemy_ids]
def _check_proj_hit(self, proj, dead_ids):
for e in self.state.enemies:
if e.id in dead_ids:
continue
r = ENEMY_STATS[e.type]["radius"]
if not circle_overlap(proj.x, proj.y, proj.radius, e.x, e.y, r):
continue
# collision
e.hp = 0 if self._debug_one_shot else e.hp - proj.damage
if e.hp <= 0:
dead_ids.add(e.id)
owner = self._get_player(proj.owner_id)
if owner:
owner.souls += SOUL_REWARDS.get(e.type, 0)
owner.enemies_killed += 1
return True
return False
def _update_cooldowns(self):
for p in self.state.players:
for k, v in p.cooldowns.items():
if v > 0:
p.cooldowns[k] = max(0.0, v - TICK_DURATION)
def _update_wave(self):
wm = self._wave_manager
wm.tick(self.spawn_enemy, len(self.state.enemies))
self.state.wave.number = wm.wave_number
self.state.wave.phase = wm.phase_number
self.state.wave.state = wm.state
self.state.wave.enemies_remaining = len(self.state.enemies) + wm.enemies_remaining()
self.state.wave.prep_timer = wm.prep_timer_remaining
boss = next((e for e in self.state.enemies if e.is_boss), None)
self.state.wave.boss_hp = boss.hp if boss else 0
self.state.wave.boss_max_hp = boss.max_hp if boss else 0
self.state.wave.boss_name = boss.type if boss else ""
def _tick(self):
# 1 tick = 1 frame de jeu (20Hz)
self.state.tick += 1
self._apply_movement()
self._update_projectiles()
self._process_aoe_zones()
self._update_enemies()
self._update_cooldowns()
update_buffs(self.state)
self._update_wave()
if self._debug_invincible:
for p in self.state.players:
p.hp = p.max_hp
p.alive = True
async def run(self):
self._running = True
logger.info("Game loop demarree — lobby %s", self.lobby.code)
while self._running:
start = time.monotonic()
self._tick()
await self._broadcast(self.state.to_dict())
elapsed = time.monotonic() - start
# si le tick a pris plus de 50ms on rattrape pas, on enchaine
await asyncio.sleep(max(0.0, TICK_DURATION - elapsed))
def stop(self):
self._running = False