# game_loop.py : boucle 20Hz du serveur (mouvement, projectiles, ennemis, vagues, broadcast) import asyncio import logging import time from boss_ai import nearest_player, update_boss_ai from buff_system import player_damage_mult, player_has_buff, player_is_immune, player_speed_mult, update_buffs from combat import circle_overlap, normalize from displacement import handle_displacement from waves import WaveManager from constants import ( KAEL_MELEE_RADIUS, KAEL_MELEE_DAMAGE, KAEL_SLAM_RADIUS, KAEL_SLAM_TICKS, KAEL_SLAM_DMG_PER_TICK, KAEL_STORM_RADIUS, KAEL_STORM_TICKS, KAEL_STORM_DMG_PER_TICK, SERIS_FAN_COUNT, SERIS_FAN_SPREAD, SERIS_FAN_DAMAGE, SERIS_VOID_RADIUS, SERIS_VOID_TICKS, SERIS_VOID_DMG_PER_TICK, ALDRIC_HEAL_RADIUS, ALDRIC_HEAL_AMOUNT, ALDRIC_PULSE_RADIUS, ALDRIC_PULSE_TICKS, ALDRIC_PULSE_DMG_PER_TICK, ABILITY_COOLDOWNS, ARENA_HEIGHT, ARENA_WIDTH, ATTACK_COOLDOWN, ENEMY_ATTACK_COOLDOWN, ENEMY_MAIN_SPAWN_X, ENEMY_MAIN_SPAWN_Y, ENEMY_STATS, PLAYER_HITBOX_RADIUS, PLAYER_MAX_HP, PLAYER_SPAWN_POSITIONS, PLAYER_SPEED, PROJECTILE_DAMAGE, PROJECTILE_RADIUS, PROJECTILE_SPEED, PROJECTILE_TTL, SOULGATE_HITBOX_RADIUS, SOULGATE_MAX_HP, SOULGATE_X, SOULGATE_Y, TICK_DURATION, SOUL_REWARDS, UPGRADE_CATALOG, VEXARIS_HP, MORVETH_HP, ) from game_state import AoeZone, EnemyState, GameState, PlayerState, ProjectileState, SoulgateState, WaveState logger = logging.getLogger(__name__) class GameLoop: def __init__(self, lobby, broadcast): self.lobby = lobby self._broadcast = broadcast self.state = self._init_state() self._running = False self._inputs = {} # {conn_id: (dx, dy)} dernier input de mouvement self._proj_counter = 0 self._enemy_counter = 0 self._aoe_counter = 0 self._wave_manager = WaveManager() self._pending_spawns = [] # spawns au prochain tick (eviter de modif la liste pendant l'iteration) self._debug_invincible = False self._debug_one_shot = False def _init_state(self): players = [ PlayerState( id=p.conn_id, class_name=p.player_class, username=p.username, x=float(PLAYER_SPAWN_POSITIONS[i][0]), y=float(PLAYER_SPAWN_POSITIONS[i][1]), hp=PLAYER_MAX_HP[p.player_class], max_hp=PLAYER_MAX_HP[p.player_class], alive=True, souls=50, # ames de depart pour pouvoir acheter qq upgrades dans la 1ere prep ) for i, p in enumerate(self.lobby.players) ] return GameState( tick=0, players=players, enemies=[], projectiles=[], soulgate=SoulgateState(hp=SOULGATE_MAX_HP, max_hp=SOULGATE_MAX_HP), wave=WaveState(number=1, phase=0, enemies_remaining=0, state="preparation"), effects=[], ) def _get_player(self, conn_id): return next((p for p in self.state.players if p.id == conn_id), None) # --- inputs (appeles depuis main.py au reception ws) --- def handle_input(self, conn_id, dx, dy): self._inputs[conn_id] = (dx, dy) def handle_attack(self, conn_id, tx, ty): # melee Kael / projectile Seris+Aldric player = self._get_player(conn_id) if not player or not player.alive: return if player_has_buff(player, "casting") or player.cooldowns.get("attack", 0) > 0: return dx, dy = normalize(tx - player.x, ty - player.y) if dx == 0.0 and dy == 0.0: return cls = player.class_name cd = ATTACK_COOLDOWN * (1 - 0.20 * player.upgrades.get("cooldown_down", 0)) if cls == "kael": self._kael_melee(player, dx, dy) else: damage = int(PROJECTILE_DAMAGE[cls] * (1 + 0.25 * player.upgrades.get("damage_up", 0))) damage = int(damage * player_damage_mult(player)) self._proj_counter += 1 self.state.projectiles.append(ProjectileState( id=f"pr_{self._proj_counter}", owner_id=conn_id, x=player.x, y=player.y, vx=dx * PROJECTILE_SPEED[cls], vy=dy * PROJECTILE_SPEED[cls], damage=damage, radius=PROJECTILE_RADIUS, ttl=PROJECTILE_TTL[cls], )) player.cooldowns["attack"] = cd def _kael_melee(self, player, dx, dy): # frappe melee Kael, demi-cercle devant import math # FIXME le bonus de damage_up je sais pas si c'est bien 25% par stack damage = int(KAEL_MELEE_DAMAGE * (1 + 0.25 * player.upgrades.get("damage_up", 0))) damage = int(damage * player_damage_mult(player)) dead = set() for e in self.state.enemies: ex = e.x - player.x ey = e.y - player.y d = math.sqrt(ex*ex + ey*ey) if d > KAEL_MELEE_RADIUS: continue # check qu'on tape devant et pas derriere (produit scalaire) if d > 0: if dx * (ex/d) + dy * (ey/d) < 0: continue e.hp = max(0, e.hp - damage) if e.hp <= 0: dead.add(e.id) player.souls += SOUL_REWARDS.get(e.type, 10) if dead: self.state.enemies = [x for x in self.state.enemies if x.id not in dead] def handle_ability(self, conn_id, ability_id, tx, ty): # touches 1 et 2 (skill par classe) player = self._get_player(conn_id) if not player or not player.alive: return key = f"ability_{ability_id}" if player.cooldowns.get(key, 0) > 0: return cooldown_list = ABILITY_COOLDOWNS.get(player.class_name, [8.0, 12.0]) idx = ability_id - 1 if not (0 <= idx < len(cooldown_list)): return cls = player.class_name if cls == "kael": if ability_id == 1: self._kael_slam(player) elif ability_id == 2: self._kael_storm(player) elif cls == "seris": if ability_id == 1: self._seris_fan(player, tx, ty) elif ability_id == 2: self._seris_void(player) elif cls == "aldric": if ability_id == 1: self._aldric_heal(player) elif ability_id == 2: self._aldric_pulse(player) player.cooldowns[key] = cooldown_list[idx] def _kael_slam(self, player): self._aoe_counter += 1 dmg = int(KAEL_SLAM_DMG_PER_TICK * (1 + 0.25 * player.upgrades.get("damage_up", 0))) dmg = int(dmg * player_damage_mult(player)) self.state.aoe_zones.append(AoeZone( id=f"aoe_{self._aoe_counter}", owner_id=player.id, zone_type="slam", x=player.x, y=player.y, radius=KAEL_SLAM_RADIUS, damage_per_tick=dmg, ticks_remaining=KAEL_SLAM_TICKS, max_ticks=KAEL_SLAM_TICKS, )) def _kael_storm(self, player): # tempete : grosse zone aoe sur plusieurs ticks self._aoe_counter += 1 dmg = int(KAEL_STORM_DMG_PER_TICK * (1 + 0.25 * player.upgrades.get("damage_up", 0))) dmg = int(dmg * player_damage_mult(player)) self.state.aoe_zones.append(AoeZone( id=f"aoe_{self._aoe_counter}", owner_id=player.id, zone_type="storm", x=player.x, y=player.y, radius=KAEL_STORM_RADIUS, damage_per_tick=dmg, ticks_remaining=KAEL_STORM_TICKS, max_ticks=KAEL_STORM_TICKS, )) def _seris_fan(self, player, tx, ty): # 3 dagues en eventail vers la souris import math dx, dy = normalize(tx - player.x, ty - player.y) if dx == 0.0 and dy == 0.0: return base_angle = math.atan2(dy, dx) dmg = int(SERIS_FAN_DAMAGE * (1 + 0.25 * player.upgrades.get("damage_up", 0))) dmg = int(dmg * player_damage_mult(player)) for i in range(SERIS_FAN_COUNT): # repartition symetrique : -spread, 0, +spread (avec count impair) offset = SERIS_FAN_SPREAD * (i - (SERIS_FAN_COUNT - 1) / 2) angle = base_angle + offset vx = math.cos(angle) * PROJECTILE_SPEED["seris"] vy = math.sin(angle) * PROJECTILE_SPEED["seris"] self._proj_counter += 1 self.state.projectiles.append(ProjectileState( id=f"pr_{self._proj_counter}", owner_id=player.id, x=player.x, y=player.y, vx=vx, vy=vy, damage=dmg, radius=PROJECTILE_RADIUS, ttl=PROJECTILE_TTL["seris"], )) def _seris_void(self, player): self._aoe_counter += 1 dmg = int(SERIS_VOID_DMG_PER_TICK * (1 + 0.25 * player.upgrades.get("damage_up", 0))) dmg = int(dmg * player_damage_mult(player)) self.state.aoe_zones.append(AoeZone( id=f"aoe_{self._aoe_counter}", owner_id=player.id, zone_type="void", x=player.x, y=player.y, radius=SERIS_VOID_RADIUS, damage_per_tick=dmg, ticks_remaining=SERIS_VOID_TICKS, max_ticks=SERIS_VOID_TICKS, )) def _aldric_heal(self, player): # halo de soin sur tous les joueurs dans le rayon for t in self.state.players: if not t.alive: continue if circle_overlap(player.x, player.y, ALDRIC_HEAL_RADIUS, t.x, t.y, PLAYER_HITBOX_RADIUS): t.hp = min(t.hp + ALDRIC_HEAL_AMOUNT, t.max_hp) def _aldric_pulse(self, player): self._aoe_counter += 1 dmg = int(ALDRIC_PULSE_DMG_PER_TICK * (1 + 0.25 * player.upgrades.get("damage_up", 0))) dmg = int(dmg * player_damage_mult(player)) self.state.aoe_zones.append(AoeZone( id=f"aoe_{self._aoe_counter}", owner_id=player.id, zone_type="pulse", x=player.x, y=player.y, radius=ALDRIC_PULSE_RADIUS, damage_per_tick=dmg, ticks_remaining=ALDRIC_PULSE_TICKS, max_ticks=ALDRIC_PULSE_TICKS, )) def _process_aoe_zones(self): import math if not self.state.aoe_zones: return dead = set() owner_map = {p.id: p for p in self.state.players} for zone in self.state.aoe_zones: owner = owner_map.get(zone.owner_id) for e in self.state.enemies: if e.id in dead: continue dx = e.x - zone.x dy = e.y - zone.y if math.sqrt(dx*dx + dy*dy) <= zone.radius: e.hp = max(0, e.hp - zone.damage_per_tick) if e.hp <= 0: dead.add(e.id) if owner: owner.souls += SOUL_REWARDS.get(e.type, 10) zone.ticks_remaining -= 1 if dead: self.state.enemies = [e for e in self.state.enemies if e.id not in dead] self.state.aoe_zones = [z for z in self.state.aoe_zones if z.ticks_remaining > 0] def handle_displacement(self, conn_id, tx, ty): # touche E, on delegue a displacement.py p = self._get_player(conn_id) if p: handle_displacement(p, tx, ty, self.state) def spawn_enemy(self, enemy_type): self._enemy_counter += 1 eid = f"en_{self._enemy_counter}" if enemy_type == "vexaris": self.state.enemies.append(EnemyState( id=eid, type="vexaris", x=0.0, y=12.0, hp=VEXARIS_HP, max_hp=VEXARIS_HP, is_boss=True, )) elif enemy_type == "morveth": # TODO peut etre baisser ses HP si c trop dur self.state.enemies.append(EnemyState( id=eid, type="morveth", x=0.0, y=12.0, hp=MORVETH_HP, max_hp=MORVETH_HP, is_boss=True, )) else: stats = ENEMY_STATS[enemy_type] self.state.enemies.append(EnemyState( id=eid, type=enemy_type, x=float(ENEMY_MAIN_SPAWN_X), y=float(ENEMY_MAIN_SPAWN_Y), hp=stats["hp"], max_hp=stats["hp"], )) def _spawn_enemy_at(self, enemy_type, x, y): # spawn a une position precise (pour les generaux de Morveth) stats = ENEMY_STATS[enemy_type] self._enemy_counter += 1 self.state.enemies.append(EnemyState( id=f"en_{self._enemy_counter}", type=enemy_type, x=x, y=y, hp=stats["hp"], max_hp=stats["hp"], )) def _update_enemies(self): sg_x, sg_y = float(SOULGATE_X), float(SOULGATE_Y) alive_players = [p for p in self.state.players if p.alive] surviving = [] for enemy in self.state.enemies: if enemy.frozen_timer > 0: # Ennemi gelé (sort divin Seris) → il ne bouge pas ce tick surviving.append(enemy) continue if enemy.is_boss: # Boss → IA spéciale dans boss_ai.py update_boss_ai(enemy, self.state.players, self.state.soulgate, self._pending_spawns) surviving.append(enemy) # les boss ne "meurent" pas via keep=False continue # Ennemi normal keep = self._update_single_enemy(enemy, sg_x, sg_y, alive_players) if keep: surviving.append(enemy) # False = ennemi mort (a touché le Soulgate) self.state.enemies = surviving # remplacer par la liste filtrée # Spawner les ennemis en attente (ex: généraux de Morveth) for etype, ex, ey in self._pending_spawns: self._spawn_enemy_at(etype, ex, ey) self._pending_spawns.clear() # vider la file d'attente def _update_single_enemy(self, enemy, sg_x, sg_y, alive_players): # retourne False si l'ennemi doit etre supprime (mort) stats = ENEMY_STATS[enemy.type] speed, e_radius = stats["speed"], stats["radius"] if enemy.attack_cooldown > 0: enemy.attack_cooldown = max(0.0, enemy.attack_cooldown - TICK_DURATION) if stats["sg_damage"] > 0: # vise le Soulgate (ex fracture) return self._enemy_target_soulgate(enemy, sg_x, sg_y, e_radius, stats) # vise les joueurs (rampant, eclat) return self._enemy_target_player(enemy, alive_players, e_radius, speed, stats) def _enemy_target_soulgate(self, enemy, sg_x, sg_y, e_radius, stats): t_radius = float(SOULGATE_HITBOX_RADIUS) if circle_overlap(enemy.x, enemy.y, e_radius, sg_x, sg_y, t_radius): if enemy.attack_cooldown <= 0: self.state.soulgate.hp = max(0, self.state.soulgate.hp - stats["sg_damage"]) return False else: dx, dy = normalize(sg_x - enemy.x, sg_y - enemy.y) enemy.x += dx * stats["speed"] * TICK_DURATION enemy.y += dy * stats["speed"] * TICK_DURATION return True def _enemy_target_player(self, enemy, alive_players, e_radius, speed, stats): target = nearest_player(enemy.x, enemy.y, alive_players) if target is None: return True t_radius = float(PLAYER_HITBOX_RADIUS) in_contact = circle_overlap(enemy.x, enemy.y, e_radius, target.x, target.y, t_radius) if in_contact and enemy.attack_cooldown <= 0: if not player_is_immune(target): target.hp = max(0, target.hp - stats["damage"]) if target.hp <= 0: target.alive = False enemy.attack_cooldown = ENEMY_ATTACK_COOLDOWN if not in_contact: dx, dy = normalize(target.x - enemy.x, target.y - enemy.y) enemy.x += dx * speed * TICK_DURATION enemy.y += dy * speed * TICK_DURATION return True # l'ennemi reste vivant (il ne se consume pas) def debug_kill_all(self): self.state.enemies.clear() self.state.projectiles.clear() def debug_toggle_invincible(self): self._debug_invincible = not self._debug_invincible def debug_toggle_one_shot(self): self._debug_one_shot = not self._debug_one_shot def debug_revive_all(self): for p in self.state.players: p.hp = p.max_hp p.alive = True def handle_upgrade(self, conn_id, upgrade_id): # achat d'upgrade pendant la phase de prep if self._wave_manager.state != "preparation": return p = self._get_player(conn_id) if not p or upgrade_id not in UPGRADE_CATALOG: return spec = UPGRADE_CATALOG[upgrade_id] stacks = p.upgrades.get(upgrade_id, 0) if stacks >= spec["max_stacks"] or p.souls < spec["cost"]: return p.souls -= spec["cost"] p.upgrades[upgrade_id] = stacks + 1 if upgrade_id == "hp_up": # cas special : augmente les hp max + soigne du delta p.max_hp += 30 p.hp = min(p.hp + 30, p.max_hp) def _apply_movement(self): half_w, half_h = ARENA_WIDTH / 2, ARENA_HEIGHT / 2 for p in self.state.players: if not p.alive or player_has_buff(p, "casting"): continue dx, dy = self._inputs.get(p.id, (0.0, 0.0)) if dx == 0.0 and dy == 0.0: continue speed = PLAYER_SPEED * (1 + 0.15 * p.upgrades.get("speed_up", 0)) speed *= player_speed_mult(p) # clamp dans l'arene p.x = max(-half_w, min(half_w, p.x + dx * speed * TICK_DURATION)) p.y = max(-half_h, min(half_h, p.y + dy * speed * TICK_DURATION)) def _update_projectiles(self): half_w, half_h = ARENA_WIDTH / 2, ARENA_HEIGHT / 2 dead_enemy_ids = set() surviving = [] for proj in self.state.projectiles: # Déplacer le projectile : position += vitesse × durée tick proj.x += proj.vx * TICK_DURATION proj.y += proj.vy * TICK_DURATION proj.ttl -= TICK_DURATION # réduire le temps de vie restant # Vérifier si le projectile doit disparaître if proj.ttl <= 0 or abs(proj.x) > half_w or abs(proj.y) > half_h: continue # TTL écoulé ou hors arène → ne pas ajouter à surviving # Vérifier les collisions avec les ennemis hit = self._check_proj_hit(proj, dead_enemy_ids) if not hit: surviving.append(proj) # pas de collision → projectile continue self.state.projectiles = surviving # remplacer par les projectiles encore en vol # Supprimer les ennemis tués par des projectiles ce tick if dead_enemy_ids: self.state.enemies = [e for e in self.state.enemies if e.id not in dead_enemy_ids] def _check_proj_hit(self, proj, dead_ids): for e in self.state.enemies: if e.id in dead_ids: continue r = ENEMY_STATS[e.type]["radius"] if not circle_overlap(proj.x, proj.y, proj.radius, e.x, e.y, r): continue # collision e.hp = 0 if self._debug_one_shot else e.hp - proj.damage if e.hp <= 0: dead_ids.add(e.id) owner = self._get_player(proj.owner_id) if owner: owner.souls += SOUL_REWARDS.get(e.type, 0) owner.enemies_killed += 1 return True return False def _update_cooldowns(self): for p in self.state.players: for k, v in p.cooldowns.items(): if v > 0: p.cooldowns[k] = max(0.0, v - TICK_DURATION) def _update_wave(self): wm = self._wave_manager wm.tick(self.spawn_enemy, len(self.state.enemies)) self.state.wave.number = wm.wave_number self.state.wave.phase = wm.phase_number self.state.wave.state = wm.state self.state.wave.enemies_remaining = len(self.state.enemies) + wm.enemies_remaining() self.state.wave.prep_timer = wm.prep_timer_remaining boss = next((e for e in self.state.enemies if e.is_boss), None) self.state.wave.boss_hp = boss.hp if boss else 0 self.state.wave.boss_max_hp = boss.max_hp if boss else 0 self.state.wave.boss_name = boss.type if boss else "" def _tick(self): # 1 tick = 1 frame de jeu (20Hz) self.state.tick += 1 self._apply_movement() self._update_projectiles() self._process_aoe_zones() self._update_enemies() self._update_cooldowns() update_buffs(self.state) self._update_wave() if self._debug_invincible: for p in self.state.players: p.hp = p.max_hp p.alive = True async def run(self): self._running = True logger.info("Game loop demarree — lobby %s", self.lobby.code) while self._running: start = time.monotonic() self._tick() await self._broadcast(self.state.to_dict()) elapsed = time.monotonic() - start # si le tick a pris plus de 50ms on rattrape pas, on enchaine await asyncio.sleep(max(0.0, TICK_DURATION - elapsed)) def stop(self): self._running = False