From 0c27846c724835ec277c335bace333ec6556958d Mon Sep 17 00:00:00 2001 From: Felix Draxler Date: Fri, 12 Feb 2021 22:56:23 +0100 Subject: [PATCH] Update to single-process code, and different timing and callbacks --- agent_code/peaceful_agent/callbacks.py | 10 + agent_code/random_agent/callbacks.py | 12 +- .../callbacks.py | 119 +-- agent_code/tpl_agent/callbacks.py | 79 ++ agent_code/tpl_agent/train.py | 98 ++ agent_code/user_agent/callbacks.py | 18 +- agents.py | 463 +++++---- environment.py | 887 ++++++++---------- events.py | 20 + fallbacks.py | 21 + items.py | 90 +- main.py | 188 ++-- replay.py | 96 ++ settings.py | 121 +-- 14 files changed, 1266 insertions(+), 956 deletions(-) create mode 100644 agent_code/peaceful_agent/callbacks.py rename agent_code/{simple_agent => rule_based_agent}/callbacks.py (65%) create mode 100644 agent_code/tpl_agent/callbacks.py create mode 100644 agent_code/tpl_agent/train.py create mode 100644 events.py create mode 100644 fallbacks.py create mode 100644 replay.py diff --git a/agent_code/peaceful_agent/callbacks.py b/agent_code/peaceful_agent/callbacks.py new file mode 100644 index 000000000..02147f0d7 --- /dev/null +++ b/agent_code/peaceful_agent/callbacks.py @@ -0,0 +1,10 @@ +import numpy as np + + +def setup(self): + np.random.seed() + + +def act(agent, game_state: dict): + agent.logger.info('Pick action at random, but no bombs.') + agent.next_action = np.random.choice(['RIGHT', 'LEFT', 'UP', 'DOWN']) diff --git a/agent_code/random_agent/callbacks.py b/agent_code/random_agent/callbacks.py index c5d74083e..af555d044 100644 --- a/agent_code/random_agent/callbacks.py +++ b/agent_code/random_agent/callbacks.py @@ -1,16 +1,10 @@ - import numpy as np -def setup(agent): +def setup(self): np.random.seed() -def act(agent): + +def act(agent, game_state: dict): agent.logger.info('Pick action at random') agent.next_action = np.random.choice(['RIGHT', 'LEFT', 'UP', 'DOWN', 'BOMB'], p=[.23, .23, .23, .23, .08]) - -def reward_update(agent): - pass - -def end_of_episode(agent): - pass diff --git a/agent_code/simple_agent/callbacks.py b/agent_code/rule_based_agent/callbacks.py similarity index 65% rename from agent_code/simple_agent/callbacks.py rename to agent_code/rule_based_agent/callbacks.py index 3aee47847..5b507e203 100644 --- a/agent_code/simple_agent/callbacks.py +++ b/agent_code/rule_based_agent/callbacks.py @@ -1,10 +1,7 @@ - -import numpy as np -from random import shuffle -from time import time, sleep from collections import deque +from random import shuffle -from settings import s +import numpy as np def look_for_targets(free_space, start, targets, logger=None): @@ -42,7 +39,7 @@ def look_for_targets(free_space, start, targets, logger=None): break # Add unexplored free neighboring tiles to the queue in a random order x, y = current - neighbors = [(x,y) for (x,y) in [(x+1,y), (x-1,y), (x,y+1), (x,y-1)] if free_space[x,y]] + neighbors = [(x, y) for (x, y) in [(x + 1, y), (x - 1, y), (x, y + 1), (x, y - 1)] if free_space[x, y]] shuffle(neighbors) for neighbor in neighbors: if neighbor not in parent_dict: @@ -75,10 +72,11 @@ def setup(self): self.ignore_others_timer = 0 -def act(self): - """Called each game step to determine the agent's next action. +def act(self, game_state): + """ + Called each game step to determine the agent's next action. - You can find out about the state of the game environment via self.game_state, + You can find out about the state of the game environment via game_state, which is a dictionary. Consult 'get_state_for_agent' in environment.py to see what it contains. @@ -91,42 +89,42 @@ def act(self): self.logger.info('Picking action according to rule set') # Gather information about the game state - arena = self.game_state['arena'] - x, y, _, bombs_left, score = self.game_state['self'] - bombs = self.game_state['bombs'] - bomb_xys = [(x,y) for (x,y,t) in bombs] - others = [(x,y) for (x,y,n,b,s) in self.game_state['others']] - coins = self.game_state['coins'] + arena = game_state['field'] + _, score, bombs_left, (x, y) = game_state['self'] + bombs = game_state['bombs'] + bomb_xys = [xy for (xy, t) in bombs] + others = [xy for (n, s, b, xy) in game_state['others']] + coins = game_state['coins'] bomb_map = np.ones(arena.shape) * 5 - for xb,yb,t in bombs: - for (i,j) in [(xb+h, yb) for h in range(-3,4)] + [(xb, yb+h) for h in range(-3,4)]: + for (xb, yb), t in bombs: + for (i, j) in [(xb + h, yb) for h in range(-3, 4)] + [(xb, yb + h) for h in range(-3, 4)]: if (0 < i < bomb_map.shape[0]) and (0 < j < bomb_map.shape[1]): - bomb_map[i,j] = min(bomb_map[i,j], t) + bomb_map[i, j] = min(bomb_map[i, j], t) # If agent has been in the same location three times recently, it's a loop - if self.coordinate_history.count((x,y)) > 2: + if self.coordinate_history.count((x, y)) > 2: self.ignore_others_timer = 5 else: self.ignore_others_timer -= 1 - self.coordinate_history.append((x,y)) + self.coordinate_history.append((x, y)) # Check which moves make sense at all - directions = [(x,y), (x+1,y), (x-1,y), (x,y+1), (x,y-1)] + directions = [(x, y), (x + 1, y), (x - 1, y), (x, y + 1), (x, y - 1)] valid_tiles, valid_actions = [], [] for d in directions: if ((arena[d] == 0) and - (self.game_state['explosions'][d] <= 1) and - (bomb_map[d] > 0) and - (not d in others) and - (not d in bomb_xys)): + (game_state['explosion_map'][d] <= 1) and + (bomb_map[d] > 0) and + (not d in others) and + (not d in bomb_xys)): valid_tiles.append(d) - if (x-1,y) in valid_tiles: valid_actions.append('LEFT') - if (x+1,y) in valid_tiles: valid_actions.append('RIGHT') - if (x,y-1) in valid_tiles: valid_actions.append('UP') - if (x,y+1) in valid_tiles: valid_actions.append('DOWN') - if (x,y) in valid_tiles: valid_actions.append('WAIT') + if (x - 1, y) in valid_tiles: valid_actions.append('LEFT') + if (x + 1, y) in valid_tiles: valid_actions.append('RIGHT') + if (x, y - 1) in valid_tiles: valid_actions.append('UP') + if (x, y + 1) in valid_tiles: valid_actions.append('DOWN') + if (x, y) in valid_tiles: valid_actions.append('WAIT') # Disallow the BOMB action if agent dropped a bomb in the same spot recently - if (bombs_left > 0) and (x,y) not in self.bomb_history: valid_actions.append('BOMB') + if (bombs_left > 0) and (x, y) not in self.bomb_history: valid_actions.append('BOMB') self.logger.debug(f'Valid actions: {valid_actions}') # Collect basic action proposals in a queue @@ -135,9 +133,9 @@ def act(self): shuffle(action_ideas) # Compile a list of 'targets' the agent should head towards - dead_ends = [(x,y) for x in range(1,16) for y in range(1,16) if (arena[x,y] == 0) - and ([arena[x+1,y], arena[x-1,y], arena[x,y+1], arena[x,y-1]].count(0) == 1)] - crates = [(x,y) for x in range(1,16) for y in range(1,16) if (arena[x,y] == 1)] + dead_ends = [(x, y) for x in range(1, 16) for y in range(1, 16) if (arena[x, y] == 0) + and ([arena[x + 1, y], arena[x - 1, y], arena[x, y + 1], arena[x, y - 1]].count(0) == 1)] + crates = [(x, y) for x in range(1, 16) for y in range(1, 16) if (arena[x, y] == 1)] targets = coins + dead_ends + crates # Add other agents as targets if in hunting mode or no crates/coins left if self.ignore_others_timer <= 0 or (len(crates) + len(coins) == 0): @@ -151,36 +149,36 @@ def act(self): if self.ignore_others_timer > 0: for o in others: free_space[o] = False - d = look_for_targets(free_space, (x,y), targets, self.logger) - if d == (x,y-1): action_ideas.append('UP') - if d == (x,y+1): action_ideas.append('DOWN') - if d == (x-1,y): action_ideas.append('LEFT') - if d == (x+1,y): action_ideas.append('RIGHT') + d = look_for_targets(free_space, (x, y), targets, self.logger) + if d == (x, y - 1): action_ideas.append('UP') + if d == (x, y + 1): action_ideas.append('DOWN') + if d == (x - 1, y): action_ideas.append('LEFT') + if d == (x + 1, y): action_ideas.append('RIGHT') if d is None: self.logger.debug('All targets gone, nothing to do anymore') action_ideas.append('WAIT') # Add proposal to drop a bomb if at dead end - if (x,y) in dead_ends: + if (x, y) in dead_ends: action_ideas.append('BOMB') # Add proposal to drop a bomb if touching an opponent if len(others) > 0: if (min(abs(xy[0] - x) + abs(xy[1] - y) for xy in others)) <= 1: action_ideas.append('BOMB') # Add proposal to drop a bomb if arrived at target and touching crate - if d == (x,y) and ([arena[x+1,y], arena[x-1,y], arena[x,y+1], arena[x,y-1]].count(1) > 0): + if d == (x, y) and ([arena[x + 1, y], arena[x - 1, y], arena[x, y + 1], arena[x, y - 1]].count(1) > 0): action_ideas.append('BOMB') # Add proposal to run away from any nearby bomb about to blow - for xb,yb,t in bombs: - if (xb == x) and (abs(yb-y) < 4): + for (xb, yb), t in bombs: + if (xb == x) and (abs(yb - y) < 4): # Run away if (yb > y): action_ideas.append('UP') if (yb < y): action_ideas.append('DOWN') # If possible, turn a corner action_ideas.append('LEFT') action_ideas.append('RIGHT') - if (yb == y) and (abs(xb-x) < 4): + if (yb == y) and (abs(xb - x) < 4): # Run away if (xb > x): action_ideas.append('LEFT') if (xb < x): action_ideas.append('RIGHT') @@ -188,7 +186,7 @@ def act(self): action_ideas.append('UP') action_ideas.append('DOWN') # Try random direction if directly on top of a bomb - for xb,yb,t in bombs: + for (xb, yb), t in bombs: if xb == x and yb == y: action_ideas.extend(action_ideas[:4]) @@ -196,31 +194,8 @@ def act(self): while len(action_ideas) > 0: a = action_ideas.pop() if a in valid_actions: - self.next_action = a - break - - # Keep track of chosen action for cycle detection - if self.next_action == 'BOMB': - self.bomb_history.append((x,y)) + # Keep track of chosen action for cycle detection + if a == 'BOMB': + self.bomb_history.append((x, y)) - -def reward_update(self): - """Called once per step to allow intermediate rewards based on game events. - - When this method is called, self.events will contain a list of all game - events relevant to your agent that occured during the previous step. Consult - settings.py to see what events are tracked. You can hand out rewards to your - agent based on these events and your knowledge of the (new) game state. In - contrast to act, this method has no time limit. - """ - self.logger.debug(f'Encountered {len(self.events)} game event(s)') - - -def end_of_episode(self): - """Called at the end of each game to hand out final rewards and do training. - - This is similar to reward_update, except it is only called at the end of a - game. self.events will contain all events that occured during your agent's - final step. You should place your actual learning code in this method. - """ - self.logger.debug(f'Encountered {len(self.events)} game event(s) in final step') + return a diff --git a/agent_code/tpl_agent/callbacks.py b/agent_code/tpl_agent/callbacks.py new file mode 100644 index 000000000..6cd687728 --- /dev/null +++ b/agent_code/tpl_agent/callbacks.py @@ -0,0 +1,79 @@ +import os +import pickle +import random + +import numpy as np + + +ACTIONS = ['UP', 'RIGHT', 'DOWN', 'LEFT', 'WAIT', 'BOMB'] + + +def setup(self): + """ + Setup your code. This is called once when loading each agent. + Make sure that you prepare everything such that act(...) can be called. + + When in training mode, the separate `setup_training` in train.py is called + after this method. This separation allows you to share your trained agent + with other students, without revealing your training code. + + In this example, our model is a set of probabilities over actions + that are is independent of the game state. + + :param self: This object is passed to all callbacks and you can set arbitrary values. + """ + if self.train or not os.path.isfile("my-saved-model.pt"): + self.logger.info("Setting up model from scratch.") + weights = np.random.rand(len(ACTIONS)) + self.model = weights / weights.sum() + else: + self.logger.info("Loading model from saved state.") + with open("my-saved-model.pt", "rb") as file: + self.model = pickle.load(file) + + +def act(self, game_state: dict) -> str: + """ + Your agent should parse the input, think, and take a decision. + When not in training mode, the maximum execution time for this method is 0.5s. + + :param self: The same object that is passed to all of your callbacks. + :param game_state: The dictionary that describes everything on the board. + :return: The action to take as a string. + """ + # todo Exploration vs exploitation + random_prob = .1 + if self.train and random.random() < random_prob: + self.logger.debug("Choosing action purely at random.") + # 80%: walk in any direction. 10% wait. 10% bomb. + return np.random.choice(ACTIONS, p=[.2, .2, .2, .2, .1, .1]) + + self.logger.debug("Querying model for action.") + return np.random.choice(ACTIONS, p=self.model) + + +def state_to_features(game_state: dict) -> np.array: + """ + *This is not a required function, but an idea to structure your code.* + + Converts the game state to the input of your model, i.e. + a feature vector. + + You can find out about the state of the game environment via game_state, + which is a dictionary. Consult 'get_state_for_agent' in environment.py to see + what it contains. + + :param game_state: A dictionary describing the current game board. + :return: np.array + """ + # This is the dict before the game begins and after it ends + if game_state is None: + return None + + # For example, you could construct several channels of equal shape, ... + channels = [] + channels.append(...) + # concatenate them as a feature tensor (they must have the same shape), ... + stacked_channels = np.stack(channels) + # and return them as a vector + return stacked_channels.reshape(-1) diff --git a/agent_code/tpl_agent/train.py b/agent_code/tpl_agent/train.py new file mode 100644 index 000000000..e5e871b94 --- /dev/null +++ b/agent_code/tpl_agent/train.py @@ -0,0 +1,98 @@ +import pickle +import random +from collections import namedtuple, deque +from typing import List + +import events as e +from .callbacks import state_to_features + +# This is only an example! +Transition = namedtuple('Transition', + ('state', 'action', 'next_state', 'reward')) + +# Hyper parameters -- DO modify +TRANSITION_HISTORY_SIZE = 3 # keep only ... last transitions +RECORD_ENEMY_TRANSITIONS = 1.0 # record enemy transitions with probability ... + +# Events +PLACEHOLDER_EVENT = "PLACEHOLDER" + + +def setup_training(self): + """ + Initialise self for training purpose. + + This is called after `setup` in callbacks.py. + + :param self: This object is passed to all callbacks and you can set arbitrary values. + """ + # Example: Setup an array that will note transition tuples + # (s, a, r, s') + self.transitions = deque(maxlen=TRANSITION_HISTORY_SIZE) + + +def game_events_occurred(self, old_game_state: dict, self_action: str, new_game_state: dict, events: List[str]): + """ + Called once per step to allow intermediate rewards based on game events. + + When this method is called, self.events will contain a list of all game + events relevant to your agent that occurred during the previous step. Consult + settings.py to see what events are tracked. You can hand out rewards to your + agent based on these events and your knowledge of the (new) game state. + + This is *one* of the places where you could update your agent. + + :param self: This object is passed to all callbacks and you can set arbitrary values. + :param old_game_state: The state that was passed to the last call of `act`. + :param self_action: The action that you took. + :param new_game_state: The state the agent is in now. + :param events: The events that occurred when going from `old_game_state` to `new_game_state` + """ + self.logger.debug(f'Encountered game event(s) {", ".join(map(repr, events))} in step {new_game_state["step"]}') + + # Idea: Add your own events to hand out rewards + if ...: + events.append(PLACEHOLDER_EVENT) + + # state_to_features is defined in callbacks.py + self.transitions.append(Transition(state_to_features(old_game_state), self_action, state_to_features(new_game_state), reward_from_events(self, events))) + + +def end_of_round(self, last_game_state: dict, last_action: str, events: List[str]): + """ + Called at the end of each game or when the agent died to hand out final rewards. + + This is similar to reward_update. self.events will contain all events that + occurred during your agent's final step. + + This is *one* of the places where you could update your agent. + This is also a good place to store an agent that you updated. + + :param self: The same object that is passed to all of your callbacks. + """ + self.logger.debug(f'Encountered event(s) {", ".join(map(repr, events))} in final step') + self.transitions.append(Transition(state_to_features(last_game_state), last_action, None, reward_from_events(self, events))) + + # Store the model + with open("my-saved-model.pt", "wb") as file: + pickle.dump(self.model, file) + + +def reward_from_events(self, events: List[str]) -> int: + """ + *This is not a required function, but an idea to structure your code.* + + Here you can modify the rewards your agent get so as to en/discourage + certain behavior. + """ + game_rewards = { + e.COIN_COLLECTED: 1, + e.KILLED_OPPONENT: 5, + PLACEHOLDER_EVENT: -.1 # idea: the custom event is bad + } + reward_sum = 0 + for event in events: + if event in game_rewards: + reward_sum += game_rewards[event] + self.logger.info(f"Awarded {reward_sum} for events {', '.join(events)}") + return reward_sum diff --git a/agent_code/user_agent/callbacks.py b/agent_code/user_agent/callbacks.py index 2559851b2..dd6e8972e 100644 --- a/agent_code/user_agent/callbacks.py +++ b/agent_code/user_agent/callbacks.py @@ -1,17 +1,7 @@ - -import numpy as np -from time import sleep - - -def setup(agent): +def setup(self): pass -def act(agent): - agent.logger.info('Pick action according to pressed key') - agent.next_action = agent.game_state['user_input'] -def reward_update(agent): - pass - -def learn(agent): - pass +def act(self, game_state: dict): + self.logger.info('Pick action according to pressed key') + return game_state['user_input'] diff --git a/agents.py b/agents.py index 22440eba8..39d4ea981 100644 --- a/agents.py +++ b/agents.py @@ -1,214 +1,152 @@ - -from time import time, sleep -import os, signal -from types import SimpleNamespace -import multiprocessing as mp import importlib import logging -import pygame -from pygame.locals import * -from pygame.transform import smoothscale - -from items import * -from settings import s, e - - -class IgnoreKeyboardInterrupt(object): - """Context manager that protects enclosed code from Interrupt signals.""" - def __enter__(self): - self.old_handler = signal.signal(signal.SIGINT, self.handler) - def handler(self, sig, frame): - pass - def __exit__(self, type, value, traceback): - signal.signal(signal.SIGINT, self.old_handler) - - -class AgentProcess(mp.Process): - """Wrapper class that runs custom agent code in a separate process.""" - - def __init__(self, pipe_to_world, ready_flag, name, agent_dir, train_flag): - super(AgentProcess, self).__init__(name=name) - self.pipe_to_world = pipe_to_world - self.ready_flag = ready_flag - self.agent_dir = agent_dir - self.train_flag = train_flag - - def run(self): - # Persistent 'self' object to pass to callback methods - self.fake_self = SimpleNamespace(name=self.name) - - # Set up individual loggers for the wrapper and the custom code - self.wlogger = logging.getLogger(self.name + '_wrapper') - self.wlogger.setLevel(s.log_agent_wrapper) - self.fake_self.logger = logging.getLogger(self.name + '_code') - self.fake_self.logger.setLevel(s.log_agent_code) - log_dir = f'agent_code/{self.agent_dir}/logs/' - if not os.path.exists(log_dir): os.makedirs(log_dir) - handler = logging.FileHandler(f'{log_dir}{self.name}.log', mode='w') - handler.setLevel(logging.DEBUG) - formatter = logging.Formatter('%(asctime)s [%(name)s] %(levelname)s: %(message)s') - handler.setFormatter(formatter) - self.wlogger.addHandler(handler) - self.fake_self.logger.addHandler(handler) +import multiprocessing as mp +import os +import queue +from inspect import signature +from logging.handlers import RotatingFileHandler +from time import time +from types import SimpleNamespace +from typing import Tuple, Any - # Import custom code for the agent from provided script - self.wlogger.info(f'Import agent code from "agent_code/{self.agent_dir}/callbacks.py"') - self.code = importlib.import_module('agent_code.' + self.agent_dir + '.callbacks') +import numpy as np - # Make agent directory the working directory for this process - os.chdir(f'agent_code/{self.agent_dir}/') +import settings as s +from fallbacks import pygame - # Initialize custom code - self.wlogger.info('Initialize agent code') - try: - self.code.setup(self.fake_self) - except Exception as e: - self.wlogger.exception(f'Error in callback function: {e}') - self.wlogger.debug('Set flag to indicate readiness') - self.ready_flag.set() - - # Play one game after the other until global exit message is received - while True: - # Receive round number and check for exit message - self.wlogger.debug('Wait for new round') - self.round = self.pipe_to_world.recv() - if self.round is None: - self.wlogger.info('Received global exit message') - break - self.wlogger.info(f'STARTING ROUND #{self.round}') - - # Take steps until exit message for current round is received - while True: - # Receive new game state and check for exit message - self.wlogger.debug('Receive game state') - self.fake_self.game_state = self.pipe_to_world.recv() - if self.fake_self.game_state['exit']: - self.ready_flag.set() - self.wlogger.info('Received exit message for round') - break - self.wlogger.info(f'STARTING STEP {self.fake_self.game_state["step"]}') - - # Process game events for rewards if in training mode - if self.train_flag.is_set(): - self.wlogger.debug('Receive event queue') - self.fake_self.events = self.pipe_to_world.recv() - self.wlogger.debug(f'Received event queue {self.fake_self.events}') - try: - if self.fake_self.game_state['step'] > 1: - self.wlogger.info('Process intermediate rewards') - self.code.reward_update(self.fake_self) - except Exception as e: - self.wlogger.exception(f'Error in callback function: {e}') - self.wlogger.debug('Set flag to indicate readiness') - self.ready_flag.set() - - # Come up with an action to perform - self.wlogger.debug('Begin choosing an action') - self.fake_self.next_action = 'WAIT' - t = time() - try: - self.code.act(self.fake_self) - except KeyboardInterrupt: - self.wlogger.warn(f'Got interrupted by timeout') - except Exception as e: - self.wlogger.exception(f'Error in callback function: {e}') - - # Send action and time taken back to main process - with IgnoreKeyboardInterrupt(): - t = time() - t - self.wlogger.info(f'Chose action {self.fake_self.next_action} after {t:.3f}s of thinking') - self.wlogger.debug('Send action and time to main process') - self.pipe_to_world.send((self.fake_self.next_action, t)) - while self.ready_flag.is_set(): - sleep(0.01) - self.wlogger.debug('Set flag to indicate readiness') - self.ready_flag.set() - - # Process final events and learn from episode if in training mode - if self.train_flag.is_set(): - self.wlogger.info('Finalize agent\'s training') - self.wlogger.debug('Receive final event queue') - self.fake_self.events = self.pipe_to_world.recv() - self.wlogger.debug(f'Received final event queue {self.fake_self.events}') - try: - self.code.end_of_episode(self.fake_self) - except Exception as e: - self.wlogger.exception(f'Error in callback function: {e}') - self.ready_flag.set() - - self.wlogger.info(f'Round #{self.round} finished') - - self.wlogger.info('SHUT DOWN') - - -class Agent(object): - """Class representing agents as game objects.""" - - coin_trophy = smoothscale(pygame.image.load('assets/coin.png'), (15,15)) - suicide_trophy = smoothscale(pygame.image.load('assets/explosion_2.png'), (15,15)) - time_trophy = pygame.image.load('assets/hourglass.png') - - def __init__(self, process, pipe_to_agent, ready_flag, color, train_flag): - """Set up agent, process for custom code and inter-process communication.""" - self.name = process.name - self.process = process - self.pipe = pipe_to_agent - self.ready_flag = ready_flag - self.color = color - self.train_flag = train_flag +AGENT_API = { + "callbacks": { + "setup": ["self"], + "act": ["self", "game_state: dict"], + }, + "train": { + "setup_training": ["self"], + "game_events_occurred": ["self", "old_game_state: dict", "self_action: str", "new_game_state: dict", "events: List[str]"], + # "enemy_game_events_occurred": ["self", "enemy_name: str", "old_enemy_game_state: dict", "enemy_action: str", "enemy_game_state: dict", "enemy_events: List[str]"], + "end_of_round": ["self", "last_game_state: dict", "last_action: str", "events: List[str]"] + } +} + + +class Agent: + """ + The Agent game object. + + Architecture: + In the game process, there is an Agent object that holds the state of the player. + Via an object of subclassing AgentBackend, it is connected to an AgentRunner instance. + + The Agent calls the callbacks in callbacks.py in the specified code folder by + calling events on its AgentBackend. + """ + + def __init__(self, color, agent_name, code_name, train: bool, backend: "AgentBackend"): + self.backend = backend # Load custom avatar or standard robot avatar of assigned color + self.color = color try: - self.avatar = pygame.image.load(f'agent_code/{self.process.agent_dir}/avatar.png') - assert self.avatar.get_size() == (30,30) + self.avatar = pygame.image.load(f'agent_code/{code_name}/avatar.png') + assert self.avatar.get_size() == (30, 30) except Exception as e: self.avatar = pygame.image.load(f'assets/robot_{self.color}.png') # Load custom bomb sprite try: - self.bomb_sprite = pygame.image.load(f'agent_code/{self.process.agent_dir}/bomb.png') - assert self.bomb_sprite.get_size() == (30,30) + self.bomb_sprite = pygame.image.load(f'agent_code/{code_name}/bomb.png') + assert self.bomb_sprite.get_size() == (30, 30) except Exception as e: self.bomb_sprite = None - # Prepare overlay that will indicate dead agent on the scoreboard - self.shade = pygame.Surface((30,30), SRCALPHA) - self.shade.fill((0,0,0,208)) + self.shade = pygame.Surface((30, 30), pygame.SRCALPHA) + self.shade.fill((0, 0, 0, 208)) + + self.name = agent_name + self.code_name = code_name + self.train = train - self.x, self.y = 1, 1 self.total_score = 0 - self.bomb_timer = s.bomb_timer + 1 - self.explosion_timer = s.explosion_timer + 1 - self.bomb_power = s.bomb_power - self.bomb_type = Bomb - - self.reset() - - def reset(self, current_round=None): - """Make agent ready for a new game round.""" - if current_round: - self.pipe.send(current_round) - self.times = [] - self.mean_time = 0 + + self.dead = None + self.score = None + self.trophies = None + + self.events = None + self.available_think_time = None + + self.x = None + self.y = None + self.bombs_left = None + + self.last_game_state = None + self.last_action = None + + self.setup() + + def setup(self): + # Call setup on backend + self.backend.send_event("setup") + self.backend.get("setup") + if self.train: + self.backend.send_event("setup_training") + self.backend.get("setup_training") + + def __str__(self): + return f"Agent {self.name} under control of {self.code_name}" + + def start_round(self): self.dead = False self.score = 0 - self.events = [] - self.bombs_left = 1 self.trophies = [] + self.events = [] + self.available_think_time = s.TIMEOUT + + self.bombs_left = True + + self.last_game_state = None + self.last_action = None + + def add_event(self, event): + self.events.append(event) + def get_state(self): """Provide information about this agent for the global game state.""" - return (self.x, self.y, self.name, self.bombs_left, self.score) + return self.name, self.score, self.bombs_left, (self.x, self.y) def update_score(self, delta): """Add delta to both the current round's score and the total score.""" self.score += delta self.total_score += delta - def make_bomb(self): - """Create a new Bomb object at current agent position.""" - return self.bomb_type((self.x, self.y), self, - self.bomb_timer, self.bomb_power, self.color, - custom_sprite=self.bomb_sprite) + def process_game_events(self, game_state): + self.backend.send_event("game_events_occurred", self.last_game_state, self.last_action, game_state, self.events) + + def wait_for_game_event_processing(self): + self.backend.get("game_events_occurred") + +# def process_enemy_game_events(self, enemy_game_state, enemy: "Agent"): +# self.backend.send_event("enemy_game_events_occurred", enemy.name, enemy.last_game_state, enemy.last_action, enemy_game_state, enemy.events) +# +# def wait_for_enemy_game_event_processing(self): +# self.backend.get("enemy_game_events_occurred") + + def store_game_state(self, game_state): + self.last_game_state = game_state + + def reset_game_events(self): + self.events = [] + + def act(self, game_state): + self.backend.send_event("act", game_state) + + def wait_for_act(self): + action, think_time = self.backend.get_with_time("act") + self.last_action = action + return action, think_time + + def round_ended(self): + self.backend.send_event("end_of_round", self.last_game_state, self.last_action, self.events) + self.backend.get("end_of_round") def render(self, screen, x, y): """Draw the agent's avatar to the screen at the given coordinates.""" @@ -217,27 +155,150 @@ def render(self, screen, x, y): screen.blit(self.shade, (x, y)) +class AgentRunner: + """ + Agent callback runner (called by backend). + """ + + def __init__(self, train, agent_name, code_name, result_queue): + self.agent_name = agent_name + self.code_name = code_name + self.result_queue = result_queue + + self.callbacks = importlib.import_module('agent_code.' + self.code_name + '.callbacks') + if train: + self.train = importlib.import_module('agent_code.' + self.code_name + '.train') + for module_name in ["callbacks"] + (["train"] if train else []): + module = getattr(self, module_name) + for event_name, event_args in AGENT_API[module_name].items(): + proper_signature = f"def {event_name}({', '.join(event_args)}):\n\tpass" + + if not hasattr(module, event_name): + raise NotImplementedError(f"Agent code {self.code_name} does not provide callback for {event_name}.\nAdd this function to your code in {module_name}.py:\n\n{proper_signature}") + actual_arg_count = len(signature(getattr(module, event_name)).parameters) + event_arg_count = len(event_args) + if actual_arg_count != event_arg_count: + raise TypeError(f"Agent code {self.code_name}'s {event_name!r} has {actual_arg_count} arguments, but {event_arg_count} are required.\nChange your function's signature to the following:\n\n{proper_signature}") + + self.fake_self = SimpleNamespace() + self.fake_self.train = train + + self.wlogger = logging.getLogger(self.agent_name + '_wrapper') + self.wlogger.setLevel(s.LOG_AGENT_WRAPPER) + self.fake_self.logger = logging.getLogger(self.agent_name + '_code') + self.fake_self.logger.setLevel(s.LOG_AGENT_CODE) + log_dir = f'agent_code/{self.code_name}/logs/' + if not os.path.exists(log_dir): os.makedirs(log_dir) + handler = logging.FileHandler(f'{log_dir}{self.agent_name}.log', mode="w") + handler.setLevel(logging.DEBUG) + formatter = logging.Formatter('%(asctime)s [%(name)s] %(levelname)s: %(message)s') + handler.setFormatter(formatter) + self.wlogger.addHandler(handler) + self.fake_self.logger.addHandler(handler) -class ReplayAgent(Agent): - """Agents class specifically for playing back pre-recorded games.""" + def process_event(self, event_name, *event_args): + module_name = None + for module_candidate in AGENT_API: + if event_name in AGENT_API[module_candidate]: + module_name = module_candidate + break + if module_name is None: + raise ValueError(f"No information on event {event_name!r} is available") + module = getattr(self, module_name) - def __init__(self, name, color, x, y): - """Recreate the agent as it was at the beginning of the original game.""" - self.name = name - self.x, self.y = x, y - self.color = color + try: + self.wlogger.debug(f"Calling {event_name} on callback.") + start_time = time() + event_result = getattr(module, event_name)(self.fake_self, *event_args) + duration = time() - start_time + self.wlogger.debug(f"Got result from callback#{event_name} in {duration:.3f}s.") - # Load standard robot avatar of assigned color - self.avatar = pygame.image.load(f'assets/robot_{self.color}.png') - self.bomb_sprite = None - # Prepare overlay that will indicate dead agent on the scoreboard - self.shade = pygame.Surface((30,30), SRCALPHA) - self.shade.fill((0,0,0,208)) + self.result_queue.put((event_name, duration, event_result)) + except Exception as e: + self.wlogger.error(f"An exception occurred while calling {event_name}: {e}") + self.result_queue.put((event_name, 0, e)) - self.total_score = 0 - self.bomb_timer = s.bomb_timer + 1 - self.explosion_timer = s.explosion_timer + 1 - self.bomb_power = s.bomb_power - self.bomb_type = Bomb - self.reset() +class AgentBackend: + """ + Base class connecting the agent to a callback implementation. + """ + + def __init__(self, train, agent_name, code_name, result_queue): + self.train = train + self.code_name = code_name + self.agent_name = agent_name + + self.result_queue = result_queue + + def start(self): + raise NotImplementedError() + + def send_event(self, event_name, *event_args): + raise NotImplementedError() + + def get(self, expect_name: str, block=True, timeout=None): + return self.get_with_time(expect_name, block, timeout)[0] + + def get_with_time(self, expect_name: str, block=True, timeout=None) -> Tuple[Any, float]: + try: + event_name, compute_time, result = self.result_queue.get(block, timeout) + if event_name != expect_name: + raise ValueError(f"Logic error: Expected result from event {expect_name}, but found {event_name}") + if isinstance(result, Exception): + raise result + return result, compute_time + except queue.Empty: + raise + + +class SequentialAgentBackend(AgentBackend): + """ + AgentConnector realised by a separate thread (easy debugging). + """ + + def __init__(self, train, agent_name, code_name): + super().__init__(train, agent_name, code_name, queue.Queue()) + self.runner = None + + def start(self): + self.runner = AgentRunner(self.train, self.agent_name, self.code_name, self.result_queue) + + def send_event(self, event_name, *event_args): + prev_cwd = os.getcwd() + os.chdir(os.path.dirname(__file__) + f'/agent_code/{self.code_name}/') + try: + self.runner.process_event(event_name, *event_args) + finally: + os.chdir(prev_cwd) + + +QUIT = "quit" + + +def run_in_agent_runner(train: bool, agent_name: str, code_name: str, wta_queue: mp.Queue, atw_queue: mp.Queue): + runner = AgentRunner(train, agent_name, code_name, atw_queue) + while True: + event_name, event_args = wta_queue.get() + if event_name == QUIT: + break + runner.process_event(event_name, *event_args) + + +class ProcessAgentBackend(AgentBackend): + """ + AgentConnector realised by a separate process (fast and safe mode). + """ + + def __init__(self, train, agent_name, code_name): + super().__init__(train, agent_name, code_name, mp.Queue()) + + self.wta_queue = mp.Queue() + + self.process = mp.Process(target=run_in_agent_runner, args=(self.train, self.agent_name, self.code_name, self.wta_queue, self.result_queue)) + + def start(self): + self.process.start() + + def send_event(self, event_name, *event_args): + self.wta_queue.put((event_name, event_args)) diff --git a/environment.py b/environment.py index bd66a4ecd..916670ba9 100644 --- a/environment.py +++ b/environment.py @@ -1,287 +1,120 @@ - -from time import time +import logging +import pickle +import random +from collections import namedtuple from datetime import datetime -import multiprocessing as mp +from logging.handlers import RotatingFileHandler +from os.path import dirname +from threading import Event +from time import time +from typing import List, Union + import numpy as np -import random -import pygame -import pickle -from pygame.locals import * -from pygame.transform import smoothscale -import logging +import events as e +import settings as s +from agents import Agent, SequentialAgentBackend +from fallbacks import pygame +from items import Coin, Explosion, Bomb -from agents import * -from items import * -from settings import s, e +WorldArgs = namedtuple("WorldArgs", + ["no_gui", "fps", "turn_based", "update_interval", "save_replay", "replay", "make_video", "continue_without_training"]) -class BombeRLeWorld(object): +class Trophy: + coin_trophy = pygame.transform.smoothscale(pygame.image.load('assets/coin.png'), (15, 15)) + suicide_trophy = pygame.transform.smoothscale(pygame.image.load('assets/explosion_2.png'), (15, 15)) + time_trophy = pygame.image.load('assets/hourglass.png') - def __init__(self, agents): + +class GenericWorld: + logger: logging.Logger + + running: bool = False + step: int + + agents: List[Agent] + active_agents: List[Agent] + arena: np.ndarray + coins: List[Coin] + bombs: List[Bomb] + explosions: List[Explosion] + + gui: Union[None, "GUI"] + round_id: str + + def __init__(self, args: WorldArgs): self.setup_logging() - if s.gui: - self.setup_gui() + self.args = args + if self.args.no_gui: + self.gui = None + else: + self.gui = GUI(args, self) - # Available robot colors - self.colors = ['blue', 'green', 'yellow', 'pink'] - self.setup_agents(agents) + self.colors = s.AGENT_COLORS - # Get the game going self.round = 0 self.running = False - self.ready_for_restart_flag = mp.Event() - self.new_round() - + self.ready_for_restart_flag = Event() def setup_logging(self): self.logger = logging.getLogger('BombeRLeWorld') - self.logger.setLevel(s.log_game) - handler = logging.FileHandler('logs/game.log', mode='w') + self.logger.setLevel(s.LOG_GAME) + handler = logging.FileHandler('logs/game.log', mode="w") handler.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s [%(name)s] %(levelname)s: %(message)s') handler.setFormatter(formatter) self.logger.addHandler(handler) self.logger.info('Initializing game world') - - def setup_gui(self): - # Initialize screen - self.screen = pygame.display.set_mode((s.width, s.height)) - pygame.display.set_caption('BombeRLe') - icon = pygame.image.load(f'assets/bomb_yellow.png') - pygame.display.set_icon(icon) - - # Background and tiles - self.background = pygame.Surface((s.width, s.height)) - self.background = self.background.convert() - self.background.fill((0,0,0)) - self.t_wall = pygame.image.load('assets/brick.png') - self.t_crate = pygame.image.load('assets/crate.png') - - # Font for scores and such - font_name = 'assets/emulogic.ttf' - self.fonts = { - 'huge': pygame.font.Font(font_name, 20), - 'big': pygame.font.Font(font_name, 16), - 'medium': pygame.font.Font(font_name, 10), - 'small': pygame.font.Font(font_name, 8), - } - - - def setup_agents(self, agents): - # Add specified agents and start their subprocesses - self.agents = [] - for agent_dir, train in agents: - if list([d for d,t in agents]).count(agent_dir) > 1: - name = agent_dir + '_' + str(list([a.process.agent_dir for a in self.agents]).count(agent_dir)) - else: - name = agent_dir - self.add_agent(agent_dir, name, train=train) - - def new_round(self): - if self.running: - self.logger.warn('New round requested while still running') - self.end_round() - - self.round += 1 - self.logger.info(f'STARTING ROUND #{self.round}') - pygame.display.set_caption(f'BombeRLe | Round #{self.round}') - - # Bookkeeping - self.step = 0 - self.active_agents = [] - self.bombs = [] - self.explosions = [] - self.round_id = f'Replay {datetime.now().strftime("%Y-%m-%d %H-%M-%S")}' - - # Arena with wall and crate layout - self.arena = (np.random.rand(s.cols, s.rows) < s.crate_density).astype(int) - self.arena[:1, :] = -1 - self.arena[-1:,:] = -1 - self.arena[:, :1] = -1 - self.arena[:,-1:] = -1 - for x in range(s.cols): - for y in range(s.rows): - if (x+1)*(y+1) % 2 == 1: - self.arena[x,y] = -1 - - # Starting positions - self.start_positions = [(1,1), (1,s.rows-2), (s.cols-2,1), (s.cols-2,s.rows-2)] - random.shuffle(self.start_positions) - for (x,y) in self.start_positions: - for (xx,yy) in [(x,y), (x-1,y), (x+1,y), (x,y-1), (x,y+1)]: - if self.arena[xx,yy] == 1: - self.arena[xx,yy] = 0 - - # Distribute coins evenly - self.coins = [] - for i in range(3): - for j in range(3): - n_crates = (self.arena[1+5*i:6+5*i, 1+5*j:6+5*j] == 1).sum() - while True: - x, y = np.random.randint(1+5*i,6+5*i), np.random.randint(1+5*j,6+5*j) - if n_crates == 0 and self.arena[x,y] == 0: - self.coins.append(Coin((x,y))) - self.coins[-1].collectable = True - break - elif self.arena[x,y] == 1: - self.coins.append(Coin((x,y))) - break - - # Reset agents and distribute starting positions - for agent in self.agents: - agent.reset(self.round) - self.active_agents.append(agent) - agent.x, agent.y = self.start_positions.pop() - - self.replay = { - 'arena': np.array(self.arena), - 'coins': [c.get_state() for c in self.coins], - 'agents': [a.get_state() for a in self.agents], - 'actions': dict([(a.name, []) for a in self.agents]), - 'permutations': [] - } - - self.running = True - + raise NotImplementedError() def add_agent(self, agent_dir, name, train=False): - if len(self.agents) < s.max_agents: - # Set up a new process to run the agent's code - pipe_to_world, pipe_to_agent = mp.Pipe() - ready_flag = mp.Event() - train_flag = mp.Event() - if train: - train_flag.set() - p = AgentProcess(pipe_to_world, ready_flag, name, agent_dir, train_flag) - self.logger.info(f'Starting process for agent <{name}>') - p.start() - - # Create the agent container object - agent = Agent(p, pipe_to_agent, ready_flag, self.colors.pop(), train_flag) - self.agents.append(agent) - - # Make sure process setup is finished - self.logger.debug(f'Waiting for setup of agent <{agent.name}>') - agent.ready_flag.wait() - agent.ready_flag.clear() - self.logger.debug(f'Setup finished for agent <{agent.name}>') - - - def get_state_for_agent(self, agent, exit=False): - state = {} - state['step'] = self.step - state['arena'] = np.array(self.arena) - state['self'] = agent.get_state() - state['train'] = agent.train_flag.is_set() - state['others'] = [other.get_state() for other in self.active_agents if other is not agent] - state['bombs'] = [bomb.get_state() for bomb in self.bombs] - state['coins'] = [coin.get_state() for coin in self.coins if coin.collectable] - explosion_map = np.zeros(self.arena.shape) - for e in self.explosions: - for (x,y) in e.blast_coords: - explosion_map[x,y] = max(explosion_map[x,y], e.timer) - state['explosions'] = explosion_map - state['user_input'] = self.user_input - state['exit'] = exit - return state + assert len(self.agents) < s.MAX_AGENTS + + # if self.args.single_process: + backend = SequentialAgentBackend(train, name, agent_dir) + # else: + # backend = ProcessAgentBackend(train, name, agent_dir) + backend.start() + agent = Agent(self.colors.pop(), name, agent_dir, train, backend) + self.agents.append(agent) def tile_is_free(self, x, y): - is_free = (self.arena[x,y] == 0) + is_free = (self.arena[x, y] == 0) if is_free: for obstacle in self.bombs + self.active_agents: is_free = is_free and (obstacle.x != x or obstacle.y != y) return is_free - - def perform_agent_action(self, agent, action): + def perform_agent_action(self, agent: Agent, action: str): # Perform the specified action if possible, wait otherwise if action == 'UP' and self.tile_is_free(agent.x, agent.y - 1): agent.y -= 1 - agent.events.append(e.MOVED_UP) + agent.add_event(e.MOVED_UP) elif action == 'DOWN' and self.tile_is_free(agent.x, agent.y + 1): agent.y += 1 - agent.events.append(e.MOVED_DOWN) + agent.add_event(e.MOVED_DOWN) elif action == 'LEFT' and self.tile_is_free(agent.x - 1, agent.y): agent.x -= 1 - agent.events.append(e.MOVED_LEFT) + agent.add_event(e.MOVED_LEFT) elif action == 'RIGHT' and self.tile_is_free(agent.x + 1, agent.y): agent.x += 1 - agent.events.append(e.MOVED_RIGHT) - elif action == 'BOMB' and agent.bombs_left > 0: + agent.add_event(e.MOVED_RIGHT) + elif action == 'BOMB' and agent.bombs_left: self.logger.info(f'Agent <{agent.name}> drops bomb at {(agent.x, agent.y)}') - self.bombs.append(agent.make_bomb()) - agent.bombs_left -= 1 - agent.events.append(e.BOMB_DROPPED) + self.bombs.append(Bomb((agent.x, agent.y), agent, s.BOMB_TIMER, s.BOMB_POWER, agent.color, custom_sprite=agent.bomb_sprite)) + agent.bombs_left = False + agent.add_event(e.BOMB_DROPPED) elif action == 'WAIT': - agent.events.append(e.WAITED) + agent.add_event(e.WAITED) else: - agent.events.append(e.INVALID_ACTION) - + agent.add_event(e.INVALID_ACTION) def poll_and_run_agents(self): - # Send world state to all agents - for a in self.active_agents: - self.logger.debug(f'Sending game state to agent <{a.name}>') - a.pipe.send(self.get_state_for_agent(a)) - - # Send events to all agents that expect them, then reset and wait for them - for a in self.active_agents: - if a.train_flag.is_set(): - self.logger.debug(f'Sending event queue {a.events} to agent <{a.name}>') - a.pipe.send(a.events) - a.events = [] - for a in self.active_agents: - if a.train_flag.is_set(): - self.logger.debug(f'Waiting for agent <{a.name}> to process events') - a.ready_flag.wait() - self.logger.debug(f'Clearing flag for agent <{a.name}>') - a.ready_flag.clear() - - # Give agents time to decide and set their ready flags; interrupt after time limit - deadline = time() + s.timeout - for a in self.active_agents: - if not a.ready_flag.wait(deadline - time()): - self.logger.warn(f'Interrupting agent <{a.name}>') - if os.name == 'posix': - if not a.ready_flag.is_set(): - os.kill(a.process.pid, signal.SIGINT) - else: - # Special case for Windows - if not a.ready_flag.is_set(): - os.kill(a.process.pid, signal.CTRL_C_EVENT) - a.events.append(e.INTERRUPTED) - - # Perform decided agent actions - perm = np.random.permutation(len(self.active_agents)) - self.replay['permutations'].append(perm) - for i in perm: - a = self.active_agents[i] - self.logger.debug(f'Collecting action from agent <{a.name}>') - (action, t) = a.pipe.recv() - self.logger.info(f'Agent <{a.name}> chose action {action} in {t:.2f}s.') - a.times.append(t) - a.mean_time = np.mean(a.times) - self.replay['actions'][a.name].append(action) - - self.perform_agent_action(a, action) - - # Reset agent flags - for a in self.active_agents: - self.logger.debug(f'Clearing flag for agent <{a.name}>') - a.ready_flag.clear() - - - def put_down_agent(self, agent): - # Send exit message to end round for this agent - self.logger.debug(f'Send exit message to end round for {agent.name}') - agent.pipe.send(self.get_state_for_agent(agent, exit=True)) - agent.ready_flag.wait() - agent.ready_flag.clear() - + raise NotImplementedError() def do_step(self, user_input='WAIT'): self.step += 1 @@ -292,45 +125,62 @@ def do_step(self, user_input='WAIT'): self.poll_and_run_agents() - # Coins + self.collect_coins() + self.update_bombs() + self.evaluate_explosions() + + if self.time_to_stop(): + self.end_round() + + def collect_coins(self): for coin in self.coins: if coin.collectable: for a in self.active_agents: if a.x == coin.x and a.y == coin.y: coin.collectable = False self.logger.info(f'Agent <{a.name}> picked up coin at {(a.x, a.y)} and receives 1 point') - a.update_score(s.reward_coin) - a.events.append(e.COIN_COLLECTED) - a.trophies.append(Agent.coin_trophy) + a.update_score(s.REWARD_COIN) + a.add_event(e.COIN_COLLECTED) + a.trophies.append(Trophy.coin_trophy) + + def update_bombs(self): + """ + Count down bombs placed + Explode bombs at zero timer. - # Bombs + :return: + """ for bomb in self.bombs: - # Explode when timer is finished if bomb.timer <= 0: + # Explode when timer is finished self.logger.info(f'Agent <{bomb.owner.name}>\'s bomb at {(bomb.x, bomb.y)} explodes') - bomb.owner.events.append(e.BOMB_EXPLODED) + bomb.owner.add_event(e.BOMB_EXPLODED) blast_coords = bomb.get_blast_coords(self.arena) + # Clear crates - for (x,y) in blast_coords: - if self.arena[x,y] == 1: - self.arena[x,y] = 0 - bomb.owner.events.append(e.CRATE_DESTROYED) + for (x, y) in blast_coords: + if self.arena[x, y] == 1: + self.arena[x, y] = 0 + bomb.owner.add_event(e.CRATE_DESTROYED) # Maybe reveal a coin for c in self.coins: - if (c.x,c.y) == (x,y): + if (c.x, c.y) == (x, y): c.collectable = True - self.logger.info(f'Coin found at {(x,y)}') - bomb.owner.events.append(e.COIN_FOUND) + self.logger.info(f'Coin found at {(x, y)}') + bomb.owner.add_event(e.COIN_FOUND) + # Create explosion - screen_coords = [(s.grid_offset[0] + s.grid_size*x, s.grid_offset[1] + s.grid_size*y) for (x,y) in blast_coords] - self.explosions.append(Explosion(blast_coords, screen_coords, bomb.owner)) + screen_coords = [(s.GRID_OFFSET[0] + s.GRID_SIZE * x, s.GRID_OFFSET[1] + s.GRID_SIZE * y) for (x, y) in + blast_coords] + self.explosions.append(Explosion(blast_coords, screen_coords, bomb.owner, s.EXPLOSION_TIMER)) bomb.active = False - bomb.owner.bombs_left += 1 - # Progress countdown + bomb.owner.bombs_left = True else: + # Progress countdown bomb.timer -= 1 self.bombs = [b for b in self.bombs if b.active] + def evaluate_explosions(self): # Explosions agents_hit = set() for explosion in self.explosions: @@ -342,94 +192,277 @@ def do_step(self, user_input='WAIT'): # Note who killed whom, adjust scores if a is explosion.owner: self.logger.info(f'Agent <{a.name}> blown up by own bomb') - a.events.append(e.KILLED_SELF) - explosion.owner.trophies.append(Agent.suicide_trophy) + a.add_event(e.KILLED_SELF) + explosion.owner.trophies.append(Trophy.suicide_trophy) else: self.logger.info(f'Agent <{a.name}> blown up by agent <{explosion.owner.name}>\'s bomb') self.logger.info(f'Agent <{explosion.owner.name}> receives 1 point') - explosion.owner.update_score(s.reward_kill) - explosion.owner.events.append(e.KILLED_OPPONENT) - explosion.owner.trophies.append(smoothscale(a.avatar, (15,15))) + explosion.owner.update_score(s.REWARD_KILL) + explosion.owner.add_event(e.KILLED_OPPONENT) + explosion.owner.trophies.append(pygame.transform.smoothscale(a.avatar, (15, 15))) # Show smoke for a little longer if explosion.timer <= 0: explosion.active = False + # Progress countdown explosion.timer -= 1 for a in agents_hit: a.dead = True self.active_agents.remove(a) - a.events.append(e.GOT_KILLED) + a.add_event(e.GOT_KILLED) for aa in self.active_agents: if aa is not a: - aa.events.append(e.OPPONENT_ELIMINATED) - self.put_down_agent(a) - self.explosions = [e for e in self.explosions if e.active] - - if self.time_to_stop(): - self.end_round() + aa.add_event(e.OPPONENT_ELIMINATED) + self.explosions = [exp for exp in self.explosions if exp.active] + def end_round(self): + raise NotImplementedError() def time_to_stop(self): # Check round stopping criteria if len(self.active_agents) == 0: self.logger.info(f'No agent left alive, wrap up round') return True + if (len(self.active_agents) == 1 - and (self.arena == 1).sum() == 0 - and all([not c.collectable for c in self.coins]) - and len(self.bombs) + len(self.explosions) == 0): + and (self.arena == 1).sum() == 0 + and all([not c.collectable for c in self.coins]) + and len(self.bombs) + len(self.explosions) == 0): self.logger.info(f'One agent left alive with nothing to do, wrap up round') return True - if s.stop_if_not_training: - if not any([a.train_flag.is_set() for a in self.active_agents]): + + if any(a.train for a in self.agents) and not self.args.continue_without_training: + if not any([a.train for a in self.active_agents]): self.logger.info('No training agent left alive, wrap up round') return True - if self.step >= s.max_steps: + + if self.step >= s.MAX_STEPS: self.logger.info('Maximum number of steps reached, wrap up round') return True return False + def render(self): + self.gui.render() - def end_round(self): + # Save screenshot + if self.args.make_video: + self.logger.debug(f'Saving screenshot for frame {self.gui.frame}') + pygame.image.save(self.gui.screen, f'screenshots/{self.round_id}_{self.gui.frame:05d}.png') + + def end(self): + # Turn screenshots into videos + if self.args.make_video: + self.logger.debug(f'Turning screenshots into video files') + import subprocess, os, glob + subprocess.call(['ffmpeg', '-y', '-framerate', f'{self.args.fps}', + '-f', 'image2', '-pattern_type', 'glob', '-i', f'screenshots/{self.round_id}_*.png', + '-preset', 'veryslow', '-tune', 'animation', '-crf', '5', '-c:v', 'libx264', '-pix_fmt', + 'yuv420p', + f'screenshots/{self.round_id}_video.mp4']) + subprocess.call(['ffmpeg', '-y', '-framerate', f'{self.args.fps}', + '-f', 'image2', '-pattern_type', 'glob', '-i', f'screenshots/{self.round_id}_*.png', + '-threads', '2', '-tile-columns', '2', '-frame-parallel', '0', '-g', '100', '-speed', '1', + '-pix_fmt', 'yuv420p', '-qmin', '0', '-qmax', '10', '-crf', '5', '-b:v', '2M', '-c:v', + 'libvpx-vp9', + f'screenshots/{self.round_id}_video.webm']) + for f in glob.glob(f'screenshots/{self.round_id}_*.png'): + os.remove(f) + + +class BombeRLeWorld(GenericWorld): + def __init__(self, args: WorldArgs, agents): + super().__init__(args) + + self.setup_agents(agents) + self.new_round() + + def setup_agents(self, agents): + # Add specified agents and start their subprocesses + self.agents = [] + for agent_dir, train in agents: + if list([d for d, t in agents]).count(agent_dir) > 1: + name = agent_dir + '_' + str(list([a.code_name for a in self.agents]).count(agent_dir)) + else: + name = agent_dir + self.add_agent(agent_dir, name, train=train) + + def new_round(self): if self.running: - # Wait in case there is still a game step running - sleep(s.update_interval) - - self.logger.info(f'WRAPPING UP ROUND #{self.round}') - # Clean up survivors - for a in self.active_agents: - a.events.append(e.SURVIVED_ROUND) - self.put_down_agent(a) - # Send final event queue to agents that expect them - for a in self.agents: - if a.train_flag.is_set(): - self.logger.debug(f'Sending final event queue {a.events} to agent <{a.name}>') - a.pipe.send(a.events) - a.events = [] - a.ready_flag.wait() - a.ready_flag.clear() - # Penalty for agent who spent most time thinking - if len(self.agents) > 1: - self.replay['times'] = [a.mean_time for a in self.agents] - slowest = max(self.agents, key=lambda a: a.mean_time) - self.logger.info(f'Agent <{slowest.name}> loses 1 point for being slowest (avg. {slowest.mean_time:.3f}s)') - slowest.update_score(s.reward_slow) - slowest.trophies.append(Agent.time_trophy) - # Save course of the game for future replay - if s.save_replay: - self.replay['n_steps'] = self.step - with open(f'replays/{self.round_id}.pt', 'wb') as f: - pickle.dump(self.replay, f) - # Mark round as ended - self.running = False - else: - self.logger.warn('End-of-round requested while no round was running') + self.logger.warning('New round requested while still running') + self.end_round() + + self.round += 1 + self.logger.info(f'STARTING ROUND #{self.round}') + pygame.display.set_caption(f'BombeRLe | Round #{self.round}') + + # Bookkeeping + self.step = 0 + self.active_agents = [] + self.bombs = [] + self.explosions = [] + self.round_id = f'Replay {datetime.now().strftime("%Y-%m-%d %H-%M-%S")}' + + # Arena with wall and crate layout + self.arena = (np.random.rand(s.COLS, s.ROWS) < s.CRATE_DENSITY).astype(int) + self.arena[:1, :] = -1 + self.arena[-1:, :] = -1 + self.arena[:, :1] = -1 + self.arena[:, -1:] = -1 + for x in range(s.COLS): + for y in range(s.ROWS): + if (x + 1) * (y + 1) % 2 == 1: + self.arena[x, y] = -1 + + # Starting positions + start_positions = [(1, 1), (1, s.ROWS - 2), (s.COLS - 2, 1), (s.COLS - 2, s.ROWS - 2)] + random.shuffle(start_positions) + for (x, y) in start_positions: + for (xx, yy) in [(x, y), (x - 1, y), (x + 1, y), (x, y - 1), (x, y + 1)]: + if self.arena[xx, yy] == 1: + self.arena[xx, yy] = 0 + + # Distribute coins evenly + self.coins = [] + """coin_pattern = np.array([ + [1, 1, 1], + [0, 0, 1], + ]) + coins = np.zeros_like(self.arena) + for x in range(1, s.COLS - 2, coin_pattern.shape[0]): + for i in range(coin_pattern.shape[0]): + for j in range(coin_pattern.shape[1]): + if coin_pattern[i, j] == 1: + self.coins.append(Coin((x + i, x + j), self.arena[x+i,x+j] == 0)) + coins[x + i, x + j] += 1""" + for i in range(3): + for j in range(3): + n_crates = (self.arena[1 + 5 * i:6 + 5 * i, 1 + 5 * j:6 + 5 * j] == 1).sum() + while True: + x, y = np.random.randint(1 + 5 * i, 6 + 5 * i), np.random.randint(1 + 5 * j, 6 + 5 * j) + if n_crates == 0 and self.arena[x, y] == 0: + self.coins.append(Coin((x, y))) + self.coins[-1].collectable = True + break + elif self.arena[x, y] == 1: + self.coins.append(Coin((x, y))) + break + + # Reset agents and distribute starting positions + for agent in self.agents: + agent.start_round() + self.active_agents.append(agent) + agent.x, agent.y = start_positions.pop() + + self.replay = { + 'round': self.round, + 'arena': np.array(self.arena), + 'coins': [c.get_state() for c in self.coins], + 'agents': [a.get_state() for a in self.agents], + 'actions': dict([(a.name, []) for a in self.agents]), + 'permutations': [] + } + + self.running = True + + def get_state_for_agent(self, agent: Agent): + state = { + 'round': self.round, + 'step': self.step, + 'field': np.array(self.arena), + 'self': agent.get_state(), + 'others': [other.get_state() for other in self.active_agents if other is not agent], + 'bombs': [bomb.get_state() for bomb in self.bombs], + 'coins': [coin.get_state() for coin in self.coins if coin.collectable], + 'user_input': self.user_input, + } + + explosion_map = np.zeros(self.arena.shape) + for exp in self.explosions: + for (x, y) in exp.blast_coords: + explosion_map[x, y] = max(explosion_map[x, y], exp.timer) + state['explosion_map'] = explosion_map + + return state + + def send_training_events(self): + # Send events to all agents that expect them, then reset and wait for them + for a in self.agents: + if a.train: + if not a.dead: + a.process_game_events(self.get_state_for_agent(a)) + for enemy in self.active_agents: + if enemy is not a: + pass + # a.process_enemy_game_events(self.get_state_for_agent(enemy), enemy) + for a in self.agents: + if a.train: + if not a.dead: + a.wait_for_game_event_processing() + for enemy in self.active_agents: + if enemy is not a: + pass + # a.wait_for_enemy_game_event_processing() + for a in self.active_agents: + a.store_game_state(self.get_state_for_agent(a)) + a.reset_game_events() + + def poll_and_run_agents(self): + self.send_training_events() + + # Tell agents to act + for a in self.active_agents: + if a.available_think_time > 0: + a.act(self.get_state_for_agent(a)) + + # Give agents time to decide + perm = np.random.permutation(len(self.active_agents)) + self.replay['permutations'].append(perm) + for i in perm: + a = self.active_agents[i] + if a.available_think_time > 0: + action, think_time = a.wait_for_act() + self.logger.info(f'Agent <{a.name}> chose action {action} in {think_time:.2f}s.') + if think_time > a.available_think_time: + self.logger.warning(f'Agent <{a.name}> exceeded think time by {s.TIMEOUT - think_time}s. Setting action to "WAIT" and decreasing available time for next round.') + action = "WAIT" + a.available_think_time = s.TIMEOUT - (think_time - a.available_think_time) + else: + self.logger.warning(f'Agent <{a.name}> stayed within acceptable think time.') + a.available_think_time = s.TIMEOUT + else: + self.logger.info(f'Skipping agent <{a.name}> because of last slow think time.') + a.available_think_time += s.TIMEOUT + action = "WAIT" + + self.replay['actions'][a.name].append(action) + self.perform_agent_action(a, action) + + def end_round(self): + assert self.running, "End of round requested while not running" + + self.logger.info(f'WRAPPING UP ROUND #{self.round}') + # Clean up survivors + for a in self.active_agents: + a.add_event(e.SURVIVED_ROUND) + + # Send final event to agents that expect them + for a in self.agents: + if a.train: + a.round_ended() + + # Save course of the game for future replay + if self.args.save_replay: + self.replay['n_steps'] = self.step + with open(f'replays/{self.round_id}.pt', 'wb') as f: + pickle.dump(self.replay, f) + + # Mark round as ended + self.running = False self.logger.debug('Setting ready_for_restart_flag') self.ready_for_restart_flag.set() - def end(self): if self.running: self.end_round() @@ -437,205 +470,115 @@ def end(self): for a in self.agents: # Send exit message to shut down agent self.logger.debug(f'Sending exit message to agent <{a.name}>') - a.pipe.send(None) - def render_text(self, text, x, y, color, halign='left', valign='top', - size='medium', aa=False): - if not s.gui: return + + +class GUI: + def __init__(self, args: WorldArgs, world: GenericWorld): + self.args = args + self.world = world + + # Initialize screen + self.screen = pygame.display.set_mode((s.WIDTH, s.HEIGHT)) + pygame.display.set_caption('BombeRLe') + icon = pygame.image.load(f'assets/bomb_yellow.png') + pygame.display.set_icon(icon) + + # Background and tiles + self.background = pygame.Surface((s.WIDTH, s.HEIGHT)) + self.background = self.background.convert() + self.background.fill((0, 0, 0)) + self.t_wall = pygame.image.load('assets/brick.png') + self.t_crate = pygame.image.load('assets/crate.png') + + # Font for scores and such + font_name = dirname(__file__) + '/assets/emulogic.ttf' + self.fonts = { + 'huge': pygame.font.Font(font_name, 20), + 'big': pygame.font.Font(font_name, 16), + 'medium': pygame.font.Font(font_name, 10), + 'small': pygame.font.Font(font_name, 8), + } + + self.frame = 0 + + def render_text(self, text, x, y, color, halign='left', valign='top', size='medium', aa=False): text_surface = self.fonts[size].render(text, aa, color) text_rect = text_surface.get_rect() - if halign == 'left': text_rect.left = x + if halign == 'left': text_rect.left = x if halign == 'center': text_rect.centerx = x - if halign == 'right': text_rect.right = x - if valign == 'top': text_rect.top = y + if halign == 'right': text_rect.right = x + if valign == 'top': text_rect.top = y if valign == 'center': text_rect.centery = y - if valign == 'bottom': text_rect.bottom = y + if valign == 'bottom': text_rect.bottom = y self.screen.blit(text_surface, text_rect) - def render(self): - if not s.gui: return - self.screen.blit(self.background, (0,0)) + self.frame += 1 + self.screen.blit(self.background, (0, 0)) # World - for x in range(self.arena.shape[1]): - for y in range(self.arena.shape[0]): - if self.arena[x,y] == -1: - self.screen.blit(self.t_wall, (s.grid_offset[0] + s.grid_size*x, s.grid_offset[1] + s.grid_size*y)) - if self.arena[x,y] == 1: - self.screen.blit(self.t_crate, (s.grid_offset[0] + s.grid_size*x, s.grid_offset[1] + s.grid_size*y)) - self.render_text(f'Step {self.step:d}', s.grid_offset[0], s.height - s.grid_offset[1]/2, (64,64,64), + for x in range(self.world.arena.shape[1]): + for y in range(self.world.arena.shape[0]): + if self.world.arena[x, y] == -1: + self.screen.blit(self.t_wall, + (s.GRID_OFFSET[0] + s.GRID_SIZE * x, s.GRID_OFFSET[1] + s.GRID_SIZE * y)) + if self.world.arena[x, y] == 1: + self.screen.blit(self.t_crate, + (s.GRID_OFFSET[0] + s.GRID_SIZE * x, s.GRID_OFFSET[1] + s.GRID_SIZE * y)) + self.render_text(f'Step {self.world.step:d}', s.GRID_OFFSET[0], s.HEIGHT - s.GRID_OFFSET[1] / 2, (64, 64, 64), valign='center', halign='left', size='medium') # Items - for bomb in self.bombs: - bomb.render(self.screen, s.grid_offset[0] + s.grid_size*bomb.x, s.grid_offset[1] + s.grid_size*bomb.y) - for coin in self.coins: + for bomb in self.world.bombs: + bomb.render(self.screen, s.GRID_OFFSET[0] + s.GRID_SIZE * bomb.x, s.GRID_OFFSET[1] + s.GRID_SIZE * bomb.y) + for coin in self.world.coins: if coin.collectable: - coin.render(self.screen, s.grid_offset[0] + s.grid_size*coin.x, s.grid_offset[1] + s.grid_size*coin.y) + coin.render(self.screen, s.GRID_OFFSET[0] + s.GRID_SIZE * coin.x, + s.GRID_OFFSET[1] + s.GRID_SIZE * coin.y) # Agents - for agent in self.active_agents: - agent.render(self.screen, s.grid_offset[0] + s.grid_size*agent.x, s.grid_offset[1] + s.grid_size*agent.y) + for agent in self.world.active_agents: + agent.render(self.screen, s.GRID_OFFSET[0] + s.GRID_SIZE * agent.x, + s.GRID_OFFSET[1] + s.GRID_SIZE * agent.y) # Explosions - for explosion in self.explosions: + for explosion in self.world.explosions: explosion.render(self.screen) # Scores # agents = sorted(self.agents, key=lambda a: (a.score, -a.mean_time), reverse=True) - agents = self.agents - leading = max(self.agents, key=lambda a: (a.score, -a.mean_time)) - y_base = s.grid_offset[1] + 15 + agents = self.world.agents + leading = max(agents, key=lambda a: (a.score, a.name)) + y_base = s.GRID_OFFSET[1] + 15 for i, a in enumerate(agents): - bounce = 0 if (a is not leading or self.running) else np.abs(10*np.sin(5*time())) - a.render(self.screen, 600, y_base + 50*i - 15 - bounce) - self.render_text(a.name, 650, y_base + 50*i, - (64,64,64) if a.dead else (255,255,255), + bounce = 0 if (a is not leading or self.world.running) else np.abs(10 * np.sin(5 * time())) + a.render(self.screen, 600, y_base + 50 * i - 15 - bounce) + self.render_text(a.name, 650, y_base + 50 * i, + (64, 64, 64) if a.dead else (255, 255, 255), valign='center', size='small') for j, trophy in enumerate(a.trophies): - self.screen.blit(trophy, (660 + 10*j, y_base + 50*i + 12)) - self.render_text(f'{a.score:d}', 830, y_base + 50*i, (255,255,255), + self.screen.blit(trophy, (660 + 10 * j, y_base + 50 * i + 12)) + self.render_text(f'{a.score:d}', 830, y_base + 50 * i, (255, 255, 255), valign='center', halign='right', size='big') - self.render_text(f'{a.total_score:d}', 890, y_base + 50*i, (64,64,64), + self.render_text(f'{a.total_score:d}', 890, y_base + 50 * i, (64, 64, 64), valign='center', halign='right', size='big') - self.render_text(f'({a.mean_time:.3f})', 930, y_base + 50*i, (128,128,128), - valign='center', size='small') # End of round info - if not self.running: - x_center = (s.width - s.grid_offset[0] - s.cols * s.grid_size) / 2 + s.grid_offset[0] + s.cols * s.grid_size - color = np.int_((255*(np.sin(3*time())/3 + .66), - 255*(np.sin(4*time()+np.pi/3)/3 + .66), - 255*(np.sin(5*time()-np.pi/3)/3 + .66))) + if not self.world.running: + x_center = (s.WIDTH - s.GRID_OFFSET[0] - s.COLS * s.GRID_SIZE) / 2 + s.GRID_OFFSET[0] + s.COLS * s.GRID_SIZE + color = np.int_((255 * (np.sin(3 * time()) / 3 + .66), + 255 * (np.sin(4 * time() + np.pi / 3) / 3 + .66), + 255 * (np.sin(5 * time() - np.pi / 3) / 3 + .66))) self.render_text(leading.name, x_center, 320, color, valign='top', halign='center', size='huge') self.render_text('has won the round!', x_center, 350, color, valign='top', halign='center', size='big') - leading_total = max(self.agents, key=lambda a: (a.total_score, -a.mean_time)) + leading_total = max(self.world.agents, key=lambda a: (a.total_score, a.name)) if leading_total is leading: - self.render_text(f'{leading_total.name} is also in the lead.', x_center, 390, (128,128,128), + self.render_text(f'{leading_total.name} is also in the lead.', x_center, 390, (128, 128, 128), valign='top', halign='center', size='medium') else: - self.render_text(f'But {leading_total.name} is in the lead.', x_center, 390, (128,128,128), + self.render_text(f'But {leading_total.name} is in the lead.', x_center, 390, (128, 128, 128), valign='top', halign='center', size='medium') - - - -class ReplayWorld(BombeRLeWorld): - - def __init__(self, replay_file): - assert s.gui, 'Replay only makes sense with active GUI.' - self.setup_logging() - self.setup_gui() - - self.logger.info(f'Loading replay file "{replay_file}"') - self.replay_file = replay_file - with open(f'replays/{replay_file}.pt', 'rb') as f: - self.replay = pickle.load(f) - if not 'n_steps' in self.replay: - self.replay['n_steps'] = s.max_steps - - # Recreate the agents - self.colors = ['blue', 'green', 'yellow', 'pink'] - self.agents = [ReplayAgent(name, self.colors.pop(), x, y) - for (x,y,name,b,s) in self.replay['agents']] - for i,t in enumerate(self.replay['times']): - self.agents[i].mean_time = t - - # Get the game going - self.round = 1 - self.ready_for_restart_flag = mp.Event() - self.new_round() - - - def new_round(self): - self.logger.info('STARTING REPLAY') - pygame.display.set_caption(f'{self.replay_file}') - - # Bookkeeping - self.step = 0 - self.bombs = [] - self.explosions = [] - self.running = True - self.frame = 0 - - # Game world and objects - self.arena = np.array(self.replay['arena']) - self.coins = [Coin(xy) for xy in self.replay['coins']] - self.active_agents = [a for a in self.agents] - for i, agent in enumerate(self.agents): - agent.reset() - agent.x, agent.y = self.replay['agents'][i][:2] - agent.total_score = 0 - - - def poll_and_run_agents(self): - # Perform recorded agent actions - perm = self.replay['permutations'][self.step-1] - for i in perm: - a = self.active_agents[i] - self.logger.debug(f'Repeating action from agent <{a.name}>') - action = self.replay['actions'][a.name][self.step-1] - self.logger.info(f'Agent <{a.name}> chose action {action}.') - self.perform_agent_action(a, action) - - - def time_to_stop(self): - time_to_stop = super().time_to_stop() - if self.step == self.replay['n_steps']: - self.logger.info('Replay ends here, wrap up round') - time_to_stop = True - return time_to_stop - - - def end_round(self): - if self.running: - self.running = False - # Wait in case there is still a game step running - sleep(s.update_interval) - - self.logger.info(f'WRAPPING UP REPLAY') - # Penalty for agent who spent most time thinking - if len(self.agents) > 1: - slowest = max(self.agents, key=lambda a: a.mean_time) - self.logger.info(f'Agent <{slowest.name}> loses 1 point for being slowest (avg. {slowest.mean_time:.3f}s)') - slowest.update_score(s.reward_slow) - slowest.trophies.append(Agent.time_trophy) - else: - self.logger.warn('End-of-round requested while no round was running') - - self.logger.debug('Setting ready_for_restart_flag') - self.ready_for_restart_flag.set() - - - def render(self): - super().render() - - # Save screenshot - if s.make_video_from_replay: - self.logger.debug(f'Saving screenshot for frame {self.frame}') - pygame.image.save(self.screen, f'screenshots/{self.replay_file}_{self.frame:05d}.png') - self.frame += 1 - - - def end(self): - # Turn screenshots into videos - if s.make_video_from_replay: - self.logger.debug(f'Turning screenshots into video files') - import subprocess, os, glob - subprocess.call(['ffmpeg', '-y', '-framerate', f'{s.fps}', - '-f', 'image2', '-pattern_type', 'glob', '-i', f'screenshots/{self.replay_file}_*.png', - '-preset', 'veryslow', '-tune', 'animation', '-crf', '5', '-c:v', 'libx264', '-pix_fmt', 'yuv420p', - f'screenshots/{self.replay_file}_video.mp4']) - subprocess.call(['ffmpeg', '-y', '-framerate', f'{s.fps}', - '-f', 'image2', '-pattern_type', 'glob', '-i', f'screenshots/{self.replay_file}_*.png', - '-threads', '2', '-tile-columns', '2', '-frame-parallel', '0', '-g', '100', '-speed', '1', - '-pix_fmt', 'yuv420p', '-qmin', '0', '-qmax', '10', '-crf', '5', '-b:v', '2M', '-c:v', 'libvpx-vp9', - f'screenshots/{self.replay_file}_video.webm']) - for f in glob.glob(f'screenshots/{self.replay_file}_*.png'): - os.remove(f) - - - def put_down_agent(self, agent): pass diff --git a/events.py b/events.py new file mode 100644 index 000000000..7e4f9fa34 --- /dev/null +++ b/events.py @@ -0,0 +1,20 @@ +MOVED_LEFT = 'MOVED_LEFT' +MOVED_RIGHT = 'MOVED_RIGHT' +MOVED_UP = 'MOVED_UP' +MOVED_DOWN = 'MOVED_DOWN' +WAITED = 'WAITED' +INVALID_ACTION = 'INVALID_ACTION' + +BOMB_DROPPED = 'BOMB_DROPPED' +BOMB_EXPLODED = 'BOMB_EXPLODED' + +CRATE_DESTROYED = 'CRATE_DESTROYED' +COIN_FOUND = 'COIN_FOUND' +COIN_COLLECTED = 'COIN_COLLECTED' + +KILLED_OPPONENT = 'KILLED_OPPONENT' +KILLED_SELF = 'KILLED_SELF' + +GOT_KILLED = 'GOT_KILLED' +OPPONENT_ELIMINATED = 'OPPONENT_ELIMINATED' +SURVIVED_ROUND = 'SURVIVED_ROUND' diff --git a/fallbacks.py b/fallbacks.py new file mode 100644 index 000000000..31c85bdf7 --- /dev/null +++ b/fallbacks.py @@ -0,0 +1,21 @@ +import contextlib + + +class QuietFallback: + def __getattr__(self, item): + return self + + def __call__(self, *args, **kwargs): + return self + + +try: + with contextlib.redirect_stdout(None): + import pygame +except ModuleNotFoundError: + pygame = QuietFallback() + +try: + from tqdm import tqdm +except ModuleNotFoundError: + tqdm = lambda iterable, *args, **kwargs: iterable diff --git a/items.py b/items.py index 8870847a0..64c45af08 100644 --- a/items.py +++ b/items.py @@ -1,33 +1,39 @@ - -import pygame -from pygame.locals import * -from pygame.transform import rotate +from functools import cached_property from time import time +import settings as s +from fallbacks import pygame -class Item(object): +class Item(object): def __init__(self): pass + def avatar(self): + raise NotImplementedError() + def render(self, screen, x, y): screen.blit(self.avatar, (x, y)) + def get_state(self) -> tuple: + raise NotImplementedError() + class Coin(Item): + avatar = pygame.image.load('assets/coin.png') - def __init__(self, pos): + def __init__(self, pos, collectable=False): super(Coin, self).__init__() self.x = pos[0] self.y = pos[1] - self.avatar = pygame.image.load('assets/coin.png') - self.collectable = False + self.collectable = collectable def get_state(self): - return (self.x, self.y) + return self.x, self.y class Bomb(Item): + DEFAULT_AVATARS = {color: pygame.image.load(f'assets/bomb_{color}.png') for color in s.AGENT_COLORS} def __init__(self, pos, owner, timer, power, color, custom_sprite=None): super(Bomb, self).__init__() @@ -37,51 +43,59 @@ def __init__(self, pos, owner, timer, power, color, custom_sprite=None): self.timer = timer self.power = power - if custom_sprite is None: - self.avatar = pygame.image.load(f'assets/bomb_{color}.png') - else: - self.avatar = custom_sprite - self.active = True + self.color = color + self.custom_sprite = custom_sprite + + @cached_property + def avatar(self): + if self.custom_sprite: + return self.custom_sprite + return Bomb.DEFAULT_AVATARS[self.color] + def get_state(self): - # return ((self.x, self.y), self.timer, self.power, self.active, self.owner.name) - return (self.x, self.y, self.timer) + return (self.x, self.y), self.timer def get_blast_coords(self, arena): x, y = self.x, self.y - blast_coords = [(x,y)] - - for i in range(1, self.power+1): - if arena[x+i,y] == -1: break - blast_coords.append((x+i,y)) - for i in range(1, self.power+1): - if arena[x-i,y] == -1: break - blast_coords.append((x-i,y)) - for i in range(1, self.power+1): - if arena[x,y+i] == -1: break - blast_coords.append((x,y+i)) - for i in range(1, self.power+1): - if arena[x,y-i] == -1: break - blast_coords.append((x,y-i)) + blast_coords = [(x, y)] + + for i in range(1, self.power + 1): + if arena[x + i, y] == -1: + break + blast_coords.append((x + i, y)) + for i in range(1, self.power + 1): + if arena[x - i, y] == -1: + break + blast_coords.append((x - i, y)) + for i in range(1, self.power + 1): + if arena[x, y + i] == -1: + break + blast_coords.append((x, y + i)) + for i in range(1, self.power + 1): + if arena[x, y - i] == -1: + break + blast_coords.append((x, y - i)) return blast_coords class Explosion(Item): + STAGES = [pygame.image.load(f'assets/explosion_{i}.png') for i in range(6)] - def __init__(self, blast_coords, screen_coords, owner): + def __init__(self, blast_coords, screen_coords, owner, timer): + super().__init__() self.blast_coords = blast_coords self.screen_coords = screen_coords self.owner = owner - self.timer = owner.explosion_timer + self.timer = timer self.active = True + self.stages = Explosion.STAGES - self.stages = [pygame.image.load(f'assets/explosion_{i}.png') for i in range(6)] - - def render(self, screen): - img = rotate(self.stages[self.timer], (-50*time()) % 360) + def render(self, screen, **kwargs): + img = pygame.transform.rotate(self.stages[self.timer], (-50 * time()) % 360) rect = img.get_rect() - for (x,y) in self.screen_coords: - rect.center = x+15, y+15 + for (x, y) in self.screen_coords: + rect.center = x + 15, y + 15 screen.blit(img, rect.topleft) diff --git a/main.py b/main.py index db8dc8cf6..1b5e008de 100644 --- a/main.py +++ b/main.py @@ -1,114 +1,160 @@ +import sys +from argparse import ArgumentParser +from time import sleep, time -from time import time, sleep -import contextlib -from time import time - -with contextlib.redirect_stdout(None): - import pygame -from pygame.locals import * -import numpy as np -import multiprocessing as mp import threading -from environment import BombeRLeWorld, ReplayWorld -from settings import s +from fallbacks import pygame, tqdm +from environment import BombeRLeWorld, GenericWorld +import settings as s # Function to run the game logic in a separate thread -def game_logic(world, user_inputs): +from replay import ReplayWorld + + +def game_logic(world: GenericWorld, user_inputs, args): last_update = time() while True: - # Game logic - if (s.turn_based and len(user_inputs) == 0): + now = time() + if args.turn_based and len(user_inputs) == 0: sleep(0.1) - elif (s.gui and (time()-last_update < s.update_interval)): - sleep(s.update_interval - (time() - last_update)) - else: - last_update = time() - if world.running: - try: - world.do_step(user_inputs.pop(0) if len(user_inputs) else 'WAIT') - except Exception as e: - world.end_round() - raise + continue + elif world.gui is not None and (now - last_update < args.update_interval): + sleep(args.update_interval - (now - last_update)) + continue + + last_update = now + if world.running: + world.do_step(user_inputs.pop(0) if len(user_inputs) else 'WAIT') + + +def main(args): + parser = ArgumentParser() + + subparsers = parser.add_subparsers(dest='command_name', required=True) + + # Run arguments + play_parser = subparsers.add_parser("play") + agent_group = play_parser.add_mutually_exclusive_group() + agent_group.add_argument("--my-agent", type=str, help="Play agent of name ... against three rule_based_agents") + agent_group.add_argument("--agents", type=str, nargs="+", default=["rule_based_agent"] * s.MAX_AGENTS, help="Explicitly set the agent names in the game") + play_parser.add_argument("--train", default=0, type=int, choices=[0, 1, 2, 3, 4], + help="First … agents should be set to training mode") + play_parser.add_argument("--continue-without-training", default=False, action="store_true") + # play_parser.add_argument("--single-process", default=False, action="store_true") + + play_parser.add_argument("--n-rounds", type=int, default=10, help="How many rounds to play") + play_parser.add_argument("--save-replay", default=False, action="store_true", help="Store the game as .pt for a replay") + play_parser.add_argument("--no-gui", default=False, action="store_true", help="Deactivate the user interface and play as fast as possible.") + + # Replay arguments + replay_parser = subparsers.add_parser("replay") + replay_parser.add_argument("replay", help="File to load replay from") + + # Interaction + for sub in [play_parser, replay_parser]: + sub.add_argument("--fps", type=int, default=15, help="FPS of the GUI (does not change game)") + sub.add_argument("--turn-based", default=False, action="store_true", + help="Wait for key press until next movement") + sub.add_argument("--update-interval", type=float, default=0.1, + help="How often agents take steps (ignored without GUI)") + + # Video? + sub.add_argument("--make-video", default=False, action="store_true", + help="Make a video from the game") + + args = parser.parse_args() + if args.command_name == "replay": + args.no_gui = False + args.n_rounds = 1 + + has_gui = not args.no_gui + if has_gui: + pygame.init() - -def main(): - pygame.init() + # Initialize environment and agents + if args.command_name == "play": + agents = [] + if args.train == 0 and not args.continue_without_training: + args.continue_without_training = True + if args.my_agent: + agents.append((args.my_agent, len(agents) < args.train)) + args.agents = ["rule_based_agent"] * (s.MAX_AGENTS - 1) + for agent_name in args.agents: + agents.append((agent_name, len(agents) < args.train)) + + world = BombeRLeWorld(args, agents) + elif args.command_name == "replay": + world = ReplayWorld(args) + else: + raise ValueError(f"Unknown command {args.command_name}") # Emulate Windows process spawning behaviour under Unix (for testing) # mp.set_start_method('spawn') - # Initialize environment and agents - world = BombeRLeWorld([ - ('simple_agent', False), - ('simple_agent', False), - ('simple_agent', False), - ('simple_agent', False) - ]) - # world = ReplayWorld('Replay 2019-01-30 16:57:42') user_inputs = [] # Start game logic thread - t = threading.Thread(target=game_logic, args=(world, user_inputs)) + t = threading.Thread(target=game_logic, args=(world, user_inputs, args), name="Game Logic") t.daemon = True t.start() # Run one or more games - for i in range(s.n_rounds): + for _ in tqdm(range(args.n_rounds)): if not world.running: world.ready_for_restart_flag.wait() world.ready_for_restart_flag.clear() world.new_round() # First render - if s.gui: + if has_gui: world.render() pygame.display.flip() round_finished = False - last_update = time() last_frame = time() user_inputs.clear() # Main game loop while not round_finished: - # Grab events - key_pressed = None - for event in pygame.event.get(): - if event.type == QUIT: - world.end_round() - world.end() - return - elif event.type == KEYDOWN: - key_pressed = event.key - if key_pressed in (K_q, K_ESCAPE): - world.end_round() - if not world.running: - round_finished = True - # Convert keyboard input into actions - if s.input_map.get(key_pressed): - if s.turn_based: - user_inputs.clear() - user_inputs.append(s.input_map.get(key_pressed)) - - if not world.running and not s.gui: + if has_gui: + # Grab GUI events + for event in pygame.event.get(): + if event.type == pygame.QUIT: + if world.running: + world.end_round() + world.end() + return + elif event.type == pygame.KEYDOWN: + key_pressed = event.key + if key_pressed in (pygame.K_q, pygame.K_ESCAPE): + world.end_round() + if not world.running: + round_finished = True + # Convert keyboard input into actions + if s.INPUT_MAP.get(key_pressed): + if args.turn_based: + user_inputs.clear() + user_inputs.append(s.INPUT_MAP.get(key_pressed)) + + # Render only once in a while + if time() - last_frame >= 1 / args.fps: + world.render() + pygame.display.flip() + last_frame = time() + else: + sleep_time = 1 / args.fps - (time() - last_frame) + if sleep_time > 0: + sleep(sleep_time) + elif not world.running: round_finished = True - - # Rendering - if s.gui and (time()-last_frame >= 1/s.fps): - world.render() - pygame.display.flip() - last_frame = time() else: - sleep_time = 1/s.fps - (time() - last_frame) - if sleep_time > 0: - sleep(sleep_time) - if not s.gui: - last_frame = time() + # Non-gui mode, check for round end in 1ms + sleep(0.001) world.end() if __name__ == '__main__': - main() + main(sys.argv) diff --git a/replay.py b/replay.py new file mode 100644 index 000000000..5ee423ff2 --- /dev/null +++ b/replay.py @@ -0,0 +1,96 @@ +import pickle +from time import sleep + +import numpy as np + +import settings as s +from agents import Agent +from environment import GenericWorld, WorldArgs +from fallbacks import pygame +from items import Coin + + +class ReplayWorld(GenericWorld): + def __init__(self, args: WorldArgs): + super().__init__(args) + + replay_file = args.replay + self.logger.info(f'Loading replay file "{replay_file}"') + self.replay_file = replay_file + with open(replay_file, 'rb') as f: + self.replay = pickle.load(f) + if not 'n_steps' in self.replay: + self.replay['n_steps'] = s.MAX_STEPS + + pygame.display.set_caption(f'{replay_file}') + + # Recreate the agents + self.agents = [ReplayAgent(name, self.colors.pop()) + for (name, s, b, xy) in self.replay['agents']] + self.new_round() + + def new_round(self): + self.logger.info('STARTING REPLAY') + + # Bookkeeping + self.step = 0 + self.bombs = [] + self.explosions = [] + self.running = True + self.frame = 0 + + # Game world and objects + self.arena = np.array(self.replay['arena']) + self.coins = [Coin(xy) for xy in self.replay['coins']] + self.active_agents = [a for a in self.agents] + for i, agent in enumerate(self.agents): + agent.start_round() + agent.x, agent.y = self.replay['agents'][i][-1] + agent.total_score = 0 + + def poll_and_run_agents(self): + # Perform recorded agent actions + perm = self.replay['permutations'][self.step - 1] + for i in perm: + a = self.active_agents[i] + self.logger.debug(f'Repeating action from agent <{a.name}>') + action = self.replay['actions'][a.name][self.step - 1] + self.logger.info(f'Agent <{a.name}> chose action {action}.') + self.perform_agent_action(a, action) + + def time_to_stop(self): + time_to_stop = super().time_to_stop() + if self.step == self.replay['n_steps']: + self.logger.info('Replay ends here, wrap up round') + time_to_stop = True + return time_to_stop + + def end_round(self): + if self.running: + self.running = False + # Wait in case there is still a game step running + sleep(self.args.update_interval) + else: + self.logger.warning('End-of-round requested while no round was running') + + self.logger.debug('Setting ready_for_restart_flag') + self.ready_for_restart_flag.set() + + +class ReplayAgent(Agent): + """ + Agents class firing off a predefined sequence of actions. + """ + + def __init__(self, name, color): + """Recreate the agent as it was at the beginning of the original game.""" + super().__init__(color, name, None, False, None) + + def setup(self): + pass + + def act(self, game_state): + pass + + def wait_for_act(self): + return 0, self.actions.popleft() diff --git a/settings.py b/settings.py index 171127602..ce99126ca 100644 --- a/settings.py +++ b/settings.py @@ -1,83 +1,46 @@ - -from collections import namedtuple -import pygame -from pygame.locals import * import logging - -settings = { - # Display - 'width': 1000, - 'height': 600, - 'gui': True, - 'fps': 15, - - # Main loop - 'update_interval': 0.1, # 0.33, - 'turn_based': False, - 'n_rounds': 10, - 'save_replay': False, - 'make_video_from_replay': False, - - # Game properties - 'cols': 17, - 'rows': 17, - 'grid_size': 30, - 'crate_density': 0.75, - 'actions': ['UP', 'DOWN', 'LEFT', 'RIGHT', 'BOMB', 'WAIT'], - 'max_agents': 4, - 'max_steps': 400, - 'stop_if_not_training': False, - 'bomb_power': 3, - 'bomb_timer': 4, - 'explosion_timer': 2, - - # Rules for agents - 'timeout': 5.0, - 'reward_kill': 5, - 'reward_coin': 1, - 'reward_slow': -1, - - # User input - 'input_map': { - K_UP: 'UP', - K_DOWN: 'DOWN', - K_LEFT: 'LEFT', - K_RIGHT: 'RIGHT', - K_RETURN: 'WAIT', - K_SPACE: 'BOMB', - }, - - # Logging levels - 'log_game': logging.INFO, - 'log_agent_wrapper': logging.INFO, - 'log_agent_code': logging.DEBUG, +from fallbacks import pygame + +# Game properties +COLS = 17 +ROWS = 17 +CRATE_DENSITY = 0.75 +MAX_AGENTS = 4 + +# Round properties +MAX_STEPS = 400 + +# GUI properties +GRID_SIZE = 30 +WIDTH = 1000 +HEIGHT = 600 +GRID_OFFSET = [(HEIGHT - ROWS * GRID_SIZE) // 2] * 2 + +AGENT_COLORS = ['blue', 'green', 'yellow', 'pink'] + +# Game rules +BOMB_POWER = 3 +BOMB_TIMER = 4 +EXPLOSION_TIMER = 2 + +# Rules for agents +TIMEOUT = 0.5 +REWARD_KILL = 5 +REWARD_COIN = 1 + +# User input +INPUT_MAP = { + pygame.K_UP: 'UP', + pygame.K_DOWN: 'DOWN', + pygame.K_LEFT: 'LEFT', + pygame.K_RIGHT: 'RIGHT', + pygame.K_RETURN: 'WAIT', + pygame.K_SPACE: 'BOMB', } -settings['grid_offset'] = [(settings['height'] - settings['rows']*settings['grid_size'])//2] * 2 -s = namedtuple("Settings", settings.keys())(*settings.values()) - - -events = [ - 'MOVED_LEFT', - 'MOVED_RIGHT', - 'MOVED_UP', - 'MOVED_DOWN', - 'WAITED', - 'INTERRUPTED', - 'INVALID_ACTION', - - 'BOMB_DROPPED', - 'BOMB_EXPLODED', - - 'CRATE_DESTROYED', - 'COIN_FOUND', - 'COIN_COLLECTED', - - 'KILLED_OPPONENT', - 'KILLED_SELF', - 'GOT_KILLED', - 'OPPONENT_ELIMINATED', - 'SURVIVED_ROUND', -] -e = namedtuple('Events', events)(*range(len(events))) +# Logging levels +LOG_GAME = logging.INFO +LOG_AGENT_WRAPPER = logging.DEBUG +LOG_AGENT_CODE = logging.DEBUG +LOG_MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB