diff --git a/train_tetris.ipynb b/train_tetris.ipynb new file mode 100644 index 000000000..14767326b --- /dev/null +++ b/train_tetris.ipynb @@ -0,0 +1,58 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "try:\n", + " import botris\n", + "except:\n", + " # botris-interface, the environment for tetris was not installed\n", + " print(\"botris-interface was not found, attemping install...\")\n", + " !pip install botris-interface==0.1.21\n", + "\n", + "try:\n", + " import lzero, ding\n", + "except:\n", + " # LightZero, the repository for training was not installed\n", + " print(\"LightZero was not found, attemping install from relative directory...\")\n", + " !pip install -e ." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from zoo.botris.config.botris_efficientzero_config import main_config, create_config, max_env_step\n", + "from lzero.entry import train_muzero\n", + "\n", + "train_muzero([main_config, create_config], seed=0, model_path=main_config.policy.model_path, max_env_step=max_env_step)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.14" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/zoo/botris/__init__.py b/zoo/botris/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/zoo/botris/config/__init__.py b/zoo/botris/config/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/zoo/botris/config/botris_5move_efficientzero_config.py b/zoo/botris/config/botris_5move_efficientzero_config.py new file mode 100644 index 000000000..425e14c96 --- /dev/null +++ b/zoo/botris/config/botris_5move_efficientzero_config.py @@ -0,0 +1,90 @@ +from easydict import EasyDict +from zoo.botris.envs.modals import ENCODED_INPUT_SHAPE, OBSERVATION_SPACE_SIZE +from zoo.botris.envs.botris_5move_env import ACTION_SPACE_SIZE + +# ============================================================== +# begin of the most frequently changed config specified by the user +# ============================================================== +env_id = 'botris-5move' +collector_env_num = 8 +n_episode = 8 +evaluator_env_num = 4 +num_simulations = 50 +update_per_collect = None +batch_size = 64 +max_env_step = int(5e6) +reanalyze_ratio = 0. +replay_ratio = 0.25 +max_episode_len=500 +# ============================================================== +# end of the most frequently changed config specified by the user +# ============================================================== +botris_efficientzero_config = dict( + exp_name=f'data_ez/botris_5move_efficientzero_ns{num_simulations}_upc{update_per_collect}_rer{reanalyze_ratio}_seed0', + env=dict( + max_episode_steps=max_episode_len, + env_id=env_id, + obs_type='dict_encoded_board', + collector_env_num=collector_env_num, + evaluator_env_num=evaluator_env_num, + n_evaluator_episode=evaluator_env_num, + manager=dict(shared_memory=False, ), + max_score=None + ), + policy=dict( + model=dict( + observation_shape=OBSERVATION_SPACE_SIZE, + action_space_size=ACTION_SPACE_SIZE, + model_type='mlp', + lstm_hidden_size=256, + latent_state_dim=256, + discrete_action_encoding_type='one_hot', + norm_type='BN', + self_supervised_learning_loss=True, + ), + # (str) The path of the pretrained model. If None, the model will be initialized by the default model. + model_path=None, + cuda=True, + device='cuda', + env_type='not_board_games', + action_type='fixed_action_space', + game_segment_length=50, + update_per_collect=update_per_collect, + batch_size=batch_size, + optim_type='Adam', + lr_piecewise_constant_decay=True, + learning_rate=0.003, + num_simulations=num_simulations, + reanalyze_ratio=reanalyze_ratio, + n_episode=n_episode, + eval_freq=int(2e2), + replay_buffer_size=int(1e6), # the size/capacity of replay_buffer, in the terms of transitions. + collector_env_num=collector_env_num, + evaluator_env_num=evaluator_env_num, + target_update_freq=100, + use_priority=False, + ssl_loss_weight=2, + ), +) + +botris_efficientzero_config = EasyDict(botris_efficientzero_config) +main_config = botris_efficientzero_config + +botris_efficientzero_create_config = dict( + env=dict( + type='botris-5move', + import_names=['zoo.botris.envs.botris_5move_env'], + ), + env_manager=dict(type='subprocess'), + policy=dict( + type='efficientzero', + import_names=['lzero.policy.efficientzero'], + ), +) +botris_efficientzero_create_config = EasyDict(botris_efficientzero_create_config) +create_config = botris_efficientzero_create_config + +if __name__ == "__main__": + from lzero.entry import train_muzero + + train_muzero([main_config, create_config], seed=0, model_path=main_config.policy.model_path, max_env_step=max_env_step) diff --git a/zoo/botris/config/botris_alphazero_sp_mode_config.py b/zoo/botris/config/botris_alphazero_sp_mode_config.py new file mode 100644 index 000000000..10adfc66f --- /dev/null +++ b/zoo/botris/config/botris_alphazero_sp_mode_config.py @@ -0,0 +1,100 @@ +from easydict import EasyDict +from zoo.botris.envs.modals import ACTION_SPACE_SIZE, ENCODED_INPUT_SHAPE, OBSERVATION_SPACE_SIZE + +# ============================================================== +# begin of the most frequently changed config specified by the user +# ============================================================== +collector_env_num = 32 +n_episode = 32 +evaluator_env_num = 5 +num_simulations = 50 +update_per_collect = 50 +batch_size = 256 +max_env_step = int(5e5) +mcts_ctree = True +# ============================================================== +# end of the most frequently changed config specified by the user +# ============================================================== +gomoku_alphazero_config = dict( + exp_name= + f'data_az_ctree/gomoku_alphazero_sp-mode_ns{num_simulations}_upc{update_per_collect}_seed0', + env=dict( + collector_env_num=collector_env_num, + evaluator_env_num=evaluator_env_num, + n_evaluator_episode=evaluator_env_num, + manager=dict(shared_memory=False, ), + # ============================================================== + # for the creation of simulation env + render_mode=None, + replay_path=None, + alphazero_mcts_ctree=mcts_ctree, + # ============================================================== + ), + policy=dict( + mcts_ctree=mcts_ctree, + # ============================================================== + # for the creation of simulation env + simulation_env_id='botris-versus', + simulation_env_config_type='self_play', + # ============================================================== + torch_compile=False, + tensor_float_32=False, + model=dict( + observation_shape=ENCODED_INPUT_SHAPE, + action_space_size=ACTION_SPACE_SIZE, + ), + cuda=True, + update_per_collect=update_per_collect, + batch_size=batch_size, + optim_type='Adam', + lr_piecewise_constant_decay=False, + learning_rate=0.003, + manual_temperature_decay=True, + grad_clip_value=0.5, + value_weight=1.0, + entropy_weight=0.0, + n_episode=n_episode, + eval_freq=int(2e3), + mcts=dict(num_simulations=num_simulations), + collector_env_num=collector_env_num, + evaluator_env_num=evaluator_env_num, + ), +) + +gomoku_alphazero_config = EasyDict(gomoku_alphazero_config) +main_config = gomoku_alphazero_config + +gomoku_alphazero_create_config = dict( + env=dict( + type='botris-versus', + import_names=['zoo.botris.envs.botris_versus_lightzero_env'], + ), + env_manager=dict(type='subprocess'), + policy=dict( + type='alphazero', + import_names=['lzero.policy.alphazero'], + ), + collector=dict( + type='episode_alphazero', + import_names=['lzero.worker.alphazero_collector'], + ), + evaluator=dict( + type='alphazero', + import_names=['lzero.worker.alphazero_evaluator'], + ) +) +gomoku_alphazero_create_config = EasyDict(gomoku_alphazero_create_config) +create_config = gomoku_alphazero_create_config + +if __name__ == '__main__': + if main_config.policy.tensor_float_32: + import torch + + # The flag below controls whether to allow TF32 on matmul. This flag defaults to False + # in PyTorch 1.12 and later. + torch.backends.cuda.matmul.allow_tf32 = True + # The flag below controls whether to allow TF32 on cuDNN. This flag defaults to True. + torch.backends.cudnn.allow_tf32 = True + + from lzero.entry import train_alphazero + train_alphazero([main_config, create_config], seed=0, max_env_step=max_env_step) diff --git a/zoo/botris/config/botris_efficientzero_config.py b/zoo/botris/config/botris_efficientzero_config.py new file mode 100644 index 000000000..c2103fd12 --- /dev/null +++ b/zoo/botris/config/botris_efficientzero_config.py @@ -0,0 +1,89 @@ +from easydict import EasyDict +from zoo.botris.envs.modals import ACTION_SPACE_SIZE, ENCODED_INPUT_SHAPE, OBSERVATION_SPACE_SIZE + +# ============================================================== +# begin of the most frequently changed config specified by the user +# ============================================================== +env_id = 'botris' +collector_env_num = 8 +n_episode = 8 +evaluator_env_num = 4 +num_simulations = 50 +update_per_collect = None +batch_size = 256 +max_env_step = int(5e7) +reanalyze_ratio = 0. +replay_ratio = 0.25 +# ============================================================== +# end of the most frequently changed config specified by the user +# ============================================================== + +botris_efficientzero_config = dict( + exp_name=f'data_ez/botris_efficientzero_ns{num_simulations}_upc{update_per_collect}_rer{reanalyze_ratio}_seed0', + env=dict( + max_episode_steps=max_env_step, + env_id=env_id, + obs_type='dict_encoded_board', + collector_env_num=collector_env_num, + evaluator_env_num=evaluator_env_num, + n_evaluator_episode=evaluator_env_num, + manager=dict(shared_memory=False, ), + max_score=None + ), + policy=dict( + model=dict( + observation_shape=OBSERVATION_SPACE_SIZE, + action_space_size=ACTION_SPACE_SIZE, + model_type='mlp', + lstm_hidden_size=256, + latent_state_dim=256, + discrete_action_encoding_type='one_hot', + norm_type='BN', + self_supervised_learning_loss=True, + ), + # (str) The path of the pretrained model. If None, the model will be initialized by the default model. + model_path=None, + cuda=True, + device='cuda', + env_type='not_board_games', + action_type='varied_action_space', + game_segment_length=50, + update_per_collect=update_per_collect, + batch_size=batch_size, + optim_type='Adam', + lr_piecewise_constant_decay=True, + learning_rate=0.003, + num_simulations=num_simulations, + reanalyze_ratio=reanalyze_ratio, + n_episode=n_episode, + eval_freq=int(2e2), + replay_buffer_size=int(1e6), # the size/capacity of replay_buffer, in the terms of transitions. + collector_env_num=collector_env_num, + evaluator_env_num=evaluator_env_num, + target_update_freq=100, + use_priority=False, + ssl_loss_weight=2, + ), +) + +botris_efficientzero_config = EasyDict(botris_efficientzero_config) +main_config = botris_efficientzero_config + +botris_efficientzero_create_config = dict( + env=dict( + type='botris', + import_names=['zoo.botris.envs.botris_lightzero_env'], + ), + env_manager=dict(type='subprocess'), + policy=dict( + type='efficientzero', + import_names=['lzero.policy.efficientzero'], + ), +) +botris_efficientzero_create_config = EasyDict(botris_efficientzero_create_config) +create_config = botris_efficientzero_create_config + +if __name__ == "__main__": + from lzero.entry import train_muzero + + train_muzero([main_config, create_config], seed=0, model_path=main_config.policy.model_path, max_env_step=max_env_step) diff --git a/zoo/botris/config/botris_unizero_config.py b/zoo/botris/config/botris_unizero_config.py new file mode 100644 index 000000000..b6f802f29 --- /dev/null +++ b/zoo/botris/config/botris_unizero_config.py @@ -0,0 +1,100 @@ +from easydict import EasyDict + +from zoo.botris.envs.modals import ACTION_SPACE_SIZE, ENCODED_INPUT_SHAPE, OBSERVATION_SPACE_SIZE + + +# ============================================================== +# begin of the most frequently changed config specified by the user +# ============================================================== +env_id = 'botris' +action_space_size = ACTION_SPACE_SIZE +update_per_collect = None +replay_ratio = 0.25 +collector_env_num = 8 +n_episode = 8 +evaluator_env_num = 3 +num_simulations = 50 +max_env_step = int(5e5) +reanalyze_ratio = 0. +batch_size = 64 +num_unroll_steps = 10 +infer_context_length = 4 +# ============================================================== +# end of the most frequently changed config specified by the user +# ============================================================== + +botris_unizero_config = dict( + env=dict( + stop_value=int(1e6), + env_id=env_id, + obs_type='dict_encoded_board', + collector_env_num=collector_env_num, + evaluator_env_num=evaluator_env_num, + n_evaluator_episode=evaluator_env_num, + manager=dict(shared_memory=False, ), + ), + policy=dict( + model=dict( + observation_shape=OBSERVATION_SPACE_SIZE, + action_space_size=action_space_size, + model_type='mlp', + # NOTE: whether to use the self_supervised_learning_loss. default is False + self_supervised_learning_loss=True, + discrete_action_encoding_type='one_hot', + norm_type='BN', + world_model_cfg=dict( + max_blocks=num_unroll_steps, + max_tokens=2 * num_unroll_steps, + context_length=2 * infer_context_length, + context_length_for_recurrent=2 * infer_context_length, + device='cpu', + action_space_size=ACTION_SPACE_SIZE, + num_layers=4, + num_heads=8, + embed_dim=768, + env_num=max(collector_env_num, evaluator_env_num), + collector_env_num=collector_env_num, + evaluator_env_num=evaluator_env_num, + obs_type='vector', + norm_type='BN', + ), + ), + # (str) The path of the pretrained model. If None, the model will be initialized by the default model. + model_path=None, + num_unroll_steps=num_unroll_steps, + update_per_collect=update_per_collect, + replay_ratio=replay_ratio, + batch_size=batch_size, + optim_type='AdamW', + num_simulations=num_simulations, + reanalyze_ratio=reanalyze_ratio, + n_episode=n_episode, + replay_buffer_size=int(1e6), + collector_env_num=collector_env_num, + evaluator_env_num=evaluator_env_num, + ), +) +botris_unizero_config = EasyDict(botris_unizero_config) +main_config = botris_unizero_config + +botris_unizero_create_config = dict( + env=dict( + type='botris', + import_names=['zoo.botris.envs.botris_lightzero_env'], + ), + env_manager=dict(type='subprocess'), + policy=dict( + type='unizero', + import_names=['lzero.policy.unizero'], + ), +) +botris_unizero_create_config = EasyDict(botris_unizero_create_config) +create_config = botris_unizero_create_config + +if __name__ == "__main__": + seeds = [0] # You can add more seed values here + for seed in seeds: + # Update exp_name to include the current seed + main_config.exp_name = f'data_unizero/{env_id[:-14]}_stack1_unizero_upc{update_per_collect}-rr{replay_ratio}_H{num_unroll_steps}_bs{batch_size}_seed{seed}' + from lzero.entry import train_unizero + train_unizero([main_config, create_config], seed=seed, model_path=main_config.policy.model_path, max_env_step=max_env_step) diff --git a/zoo/botris/entry/__init__.py b/zoo/botris/entry/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/zoo/botris/entry/botris_5move_eval.py b/zoo/botris/entry/botris_5move_eval.py new file mode 100644 index 000000000..759d4e992 --- /dev/null +++ b/zoo/botris/entry/botris_5move_eval.py @@ -0,0 +1,78 @@ +from lzero.entry import eval_muzero +import numpy as np + +if __name__ == "__main__": + """ + Overview: + Main script to evaluate the MuZero model on Botris games. The script will loop over multiple seeds, + evaluating a certain number of episodes per seed. Results are aggregated and printed. + + Variables: + - model_path (:obj:`Optional[str]`): The pretrained model path, pointing to the ckpt file of the pretrained model. + The path is usually something like ``exp_name/ckpt/ckpt_best.pth.tar``. + - seeds (:obj:`List[int]`): List of seeds to use for the evaluations. + - num_episodes_each_seed (:obj:`int`): Number of episodes to evaluate for each seed. + - total_test_episodes (:obj:`int`): Total number of test episodes, calculated as num_episodes_each_seed * len(seeds). + - returns_mean_seeds (:obj:`np.array`): Array of mean return values for each seed. + - returns_seeds (:obj:`np.array`): Array of all return values for each seed. + """ + # Importing the necessary configuration files from the atari muzero configuration in the zoo directory. + from zoo.botris.config.botris_5move_efficientzero_config import main_config, create_config + + # model_path is the path to the trained MuZero model checkpoint. + # If no path is provided, the script will use the default model. + model_path = r'/home/nate/blocc_lzero/data_ez/botris_5move_efficientzero_ns50_upcNone_rer0.25_seed0/ckpt/ckpt_best.pth.tar' + + # seeds is a list of seed values for the random number generator, used to initialize the environment. + seeds = [0, 1, 2, 3] + # num_episodes_each_seed is the number of episodes to run for each seed. + num_episodes_each_seed = 1 + # total_test_episodes is the total number of test episodes, calculated as the product of the number of seeds and the number of episodes per seed + total_test_episodes = num_episodes_each_seed * len(seeds) + + # Setting the type of the environment manager to 'base' for the visualization purposes. + create_config.env_manager.type = 'base' + # The number of environments to evaluate concurrently. Set to 1 for visualization purposes. + main_config.env.evaluator_env_num = 1 + # The total number of evaluation episodes that should be run. + main_config.env.n_evaluator_episode = total_test_episodes + # A boolean flag indicating whether to render the environments in real-time. + main_config.env.render_mode_human = False + + # A boolean flag indicating whether to save the gif of the environment. + main_config.env.render_mode = 'image_savefile_mode' + # The path where the recorded gif will be saved. + main_config.env.replay_path = './gif' + # The maximum number of steps for each episode during evaluation. This may need to be adjusted based on the specific characteristics of the environment. + main_config.env.eval_max_episode_steps = int(1000) + main_config.env.is_collect=False + + main_config.policy.num_simulation = 1000 + + # These lists will store the mean and total rewards for each seed. + returns_mean_seeds = [] + returns_seeds = [] + + # The main evaluation loop. For each seed, the MuZero model is evaluated and the mean and total rewards are recorded. + for seed in seeds: + returns_mean, returns = eval_muzero( + [main_config, create_config], + seed=seed, + num_episodes_each_seed=num_episodes_each_seed, + print_seed_details=False, + model_path=model_path + ) + print(returns_mean, returns) + returns_mean_seeds.append(returns_mean) + returns_seeds.append(returns) + + # Convert the list of mean and total rewards into numpy arrays for easier statistical analysis. + returns_mean_seeds = np.array(returns_mean_seeds) + returns_seeds = np.array(returns_seeds) + + # Printing the evaluation results. The average reward and the total reward for each seed are displayed, followed by the mean reward across all seeds. + print("=" * 20) + print(f"We evaluated a total of {len(seeds)} seeds. For each seed, we evaluated {num_episodes_each_seed} episode(s).") + print(f"For seeds {seeds}, the mean returns are {returns_mean_seeds}, and the returns are {returns_seeds}.") + print("Across all seeds, the mean reward is:", returns_mean_seeds.mean()) + print("=" * 20) \ No newline at end of file diff --git a/zoo/botris/envs/__init__.py b/zoo/botris/envs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/zoo/botris/envs/botris_5move_env.py b/zoo/botris/envs/botris_5move_env.py new file mode 100644 index 000000000..fcf2a12f1 --- /dev/null +++ b/zoo/botris/envs/botris_5move_env.py @@ -0,0 +1,378 @@ +import copy +import logging +import os +import sys +from typing import List, Literal + +import gymnasium as gym +import imageio +import matplotlib.font_manager as fm +import matplotlib.pyplot as plt +import numpy as np +from ding.envs import BaseEnvTimestep +from ding.torch_utils import to_ndarray +from ding.utils import ENV_REGISTRY +from easydict import EasyDict +from gymnasium import spaces +from gymnasium.utils import seeding + +from .modals import NUMBER_OF_COLS, NUMBER_OF_ROWS, ENCODED_BOARD_SHAPE, MAX_MOVE_SCORE, ENCODED_INPUT_SHAPE +from .env_5move import GameEnvironment5Move, ACTION_SPACE_SIZE + + +@ENV_REGISTRY.register('botris-5move') +class Botris5MoveEnv(gym.Env): + """ + Overview: + The BotrisEnv is a gym environment implementation of Botris. The environment provides an interface to interact with + the game and receive observations, rewards, and game status information. + + Interfaces: + - reset(init_board=None, add_random_tile_flag=True): + Resets the game state and starts a new episode. It returns the initial observation of the game. + - step(action): + Advances the game by one step based on the provided action. It returns the new observation, reward, game status, + and additional information. + - render(mode='human'): + Renders the current state of the game for visualization purposes. + MDP Definition: + - Observation Space: + NOT ACCURATE!!!!!!!!!!!!!1 + The observation space is a 4x4 grid representing the game board. Each cell in the grid can contain a number from + 0 to 2048. The observation can be in different formats based on the 'obs_type' parameter in the environment configuration. + - If 'obs_type' is set to 'encode_observation' (default): + The observation is a 3D numpy array of shape (4, 4, 16). Each cell in the array is represented as a one-hot vector + encoding the value of the tile in that cell. The one-hot vector has a length of 16, representing the possible tile + values from 0 to 2048. The first element in the one-hot vector corresponds to an empty cell (0 value). + - If 'obs_type' is set to 'dict_encoded_board': + The observation is a dictionary with the following keys: + - 'observation': A 3D numpy array representing the game board as described above. + - 'action_mask': A binary mask representing the legal actions that can be taken in the current state. + - 'to_play': A placeholder value (-1) indicating the current player (not applicable in this game). + - Action Space: + NOT ACCURATE!!!!!!!!!!!!!1 + The action space is a discrete space with 4 possible actions: + - 0: Move Up + - 1: Move Right + - 2: Move Down + - 3: Move Left + - Reward: + The reward depends on the 'reward_type' parameter in the environment configuration. + - If 'reward_type' is set to 'raw': + The reward is a floating-point number representing the immediate reward obtained from the last action. + - Done: + The game ends when one of the following conditions is met: + - The maximum score (configured by 'max_score') is reached. + - There are no legal moves left. + - The number of steps in the episode exceeds the maximum episode steps (configured by 'max_episode_steps'). + - Additional Information: + The 'info' dictionary returned by the 'step' method contains additional information about the current state. + The following keys are included in the dictionary: + - 'raw_reward': The raw reward obtained from the last action. + - Rendering: + The render method provides a way to visually represent the current state of the game. It offers four distinct rendering modes: + When set to None, the game state is not rendered. + In 'state_realtime_mode', the game state is illustrated in a text-based format directly in the console. + The 'image_realtime_mode' displays the game as an RGB image in real-time. + With 'image_savefile_mode', the game is rendered as an RGB image but not displayed in real-time. Instead, the image is saved to a designated file. + Please note that the default rendering mode is set to None. + """ + + # The default_config for Botris env. + config = dict( + # (str) The name of the environment registered in the environment registry. + env_id="botris", + # (str) The render mode. Options are 'None', 'state_realtime_mode', 'image_realtime_mode' or 'image_savefile_mode'. + # If None, then the game will not be rendered. + render_mode=None, + # (str) The format in which to save the replay. 'gif' is a popular choice. + replay_format='gif', + # (str) A suffix for the replay file name to distinguish it from other files. + replay_name_suffix='eval', + # (str or None) The directory in which to save the replay file. If None, the file is saved in the current directory. + replay_path=None, + # (bool) Whether to scale the actions. If True, actions are divided by the action space size. + act_scale=True, + # (str) The type of observation to use. Options are 'raw_encoded_board' and 'dict_encoded_board'. + obs_type='dict_encoded_board', + # (bool) Whether to normalize rewards. If True, rewards are divided by the maximum possible reward. + reward_normalize=False, + # (float) The factor to scale rewards by when reward normalization is used. + reward_norm_scale=100, + # (str) The type of reward to use. 'raw' means the raw game score.. + reward_type='raw', + # (int) The maximum score in the game. A game is won when this score is reached. + max_score=int(10_000), + # (int) The number of steps to delay rewards by. If > 0, the agent only receives a reward every this many steps. + delay_reward_step=0, + # (float) The probability that a random agent is used instead of the learning agent. + prob_random_agent=0., + # (int) The maximum number of steps in an episode. + max_episode_steps=int(1e6), + # (bool) Whether to collect data during the game. + is_collect=True, + # (bool) Whether to ignore legal actions. If True, the agent can take any action, even if it's not legal. + ignore_legal_actions=False, + ) + + @classmethod + def default_config(cls: type) -> EasyDict: + cfg: EasyDict = EasyDict(copy.deepcopy(cls.config)) + cfg.cfg_type = cls.__name__ + 'Dict' + return cfg + + def __init__(self, cfg: dict) -> None: + self._cfg: dict = cfg + self._init_flag: bool = False + self._env_id: str = cfg.env_id + self.replay_format: str = cfg.replay_format + self.replay_name_suffix: str = cfg.replay_name_suffix + self.replay_path: str = cfg.replay_path + self.render_mode: Literal['state_realtime_mode', 'image_realtime_mode', 'image_savefile_mode'] | None = cfg.render_mode + + self.obs_type: Literal['raw_encoded_board', 'dict_encoded_board'] = cfg.obs_type + self.reward_type: Literal['raw'] = cfg.reward_type + self.reward_normalize: bool = cfg.reward_normalize + self.reward_norm_scale: int = cfg.reward_norm_scale + assert self.reward_type in ['raw'] + assert self.reward_type == 'raw' + self.max_score: int = cfg.max_score + # Define the maximum score that will end the game (e.g. 1_000). None means no limit. + # This does not affect the state returned. + assert self.max_score is None or isinstance(self.max_score, int) + + self.max_episode_steps: int = cfg.max_episode_steps + self.is_collect: bool = cfg.is_collect + self.ignore_legal_actions: bool = cfg.ignore_legal_actions + self.w: int = NUMBER_OF_COLS + self.h: int = NUMBER_OF_ROWS + self.episode_return: int = 0 + # Members for gym implementation: + self._action_space = spaces.Discrete(ACTION_SPACE_SIZE) + self._observation_space = spaces.Box(0, 1, ENCODED_INPUT_SHAPE, dtype=int) + self._reward_range = (0., MAX_MOVE_SCORE) + + # Initialise the random seed of the gym environment. + self.seed() + self.frames = [] + + def reset(self): + """Reset the game.""" + self.episode_length = 0 + self.gameenv: GameEnvironment5Move = GameEnvironment5Move(20, 0.1) + + self.episode_return = 0 + self._final_eval_reward = 0.0 + # Create a mask for legal actions + action_mask = np.ones(ACTION_SPACE_SIZE, np.int8) + + # Encode the board, ensure correct datatype and shape + observation = self.gameenv.get_input_encoding() + observation = observation.astype(np.float32) + + # Based on the observation type, create the appropriate observation object + if self.obs_type == 'dict_encoded_board': + observation = { + 'observation': observation, + 'action_mask': action_mask, + 'to_play': -1, + } + elif self.obs_type == 'raw_encoded_board': + observation = observation + else: + raise NotImplementedError + + # Render the beginning state of the game. + if self.render_mode is not None: + self.render(self.render_mode) + + return observation + + def step(self, action): + """ + Overview: + Perform one step of the game. This involves making a move, and updating the game state. + The rewards are calculated based on the game configuration ('raw'). + The observations are also returned based on the game configuration ('raw_encoded_board' or 'dict_encoded_board'). + Arguments: + - action (:obj:`int`): The action to be performed. + Returns: + - BaseEnvTimestep: Contains the new state observation, reward, and other game information. + """ + + # Increment the total episode length + self.episode_length += 1 + + # Check if the action is legal, otherwise choose a random legal action + raw_reward = float(self.move(action)) + + # Update total reward and add new tile + self.episode_return += raw_reward + + + # Convert rewards to float + if self.reward_type == 'raw': + raw_reward = float(raw_reward) + + # Prepare the game state observation + observation = self.gameenv.get_input_encoding() + observation = observation.astype(np.float32) + + # Return the observation based on the observation type + action_mask = np.ones(ACTION_SPACE_SIZE, np.int8) + if self.obs_type == 'dict_encoded_board': + observation = {'observation': observation, 'action_mask': action_mask, 'to_play': -1} + elif self.obs_type == 'raw_encoded_board': + observation = observation + else: + raise NotImplementedError + + # Check if the game has ended + done = self.is_done() + + # End the game if the maximum steps have been reached + if self.episode_length >= self.max_episode_steps: + done = True + + # Normalize the reward if necessary + if self.reward_normalize: + reward_normalize = raw_reward / self.reward_norm_scale + reward = reward_normalize + else: + reward = raw_reward + + self._final_eval_reward += raw_reward + + # Convert the reward to ndarray + if self.reward_type == 'raw': + reward = to_ndarray([reward]).astype(np.float32) + + # Prepare information to return + info = {"raw_reward": raw_reward} + + # Render the new step. + if self.render_mode is not None: + self.render(self.render_mode) + + # If the game has ended, save additional information and the replay if necessary + if done: + info['eval_episode_return'] = self._final_eval_reward + if self.render_mode == 'image_savefile_mode': + self.save_render_output(replay_name_suffix=self.replay_name_suffix, replay_path=self.replay_path, + format=self.replay_format) + + return BaseEnvTimestep(observation, reward, done, info) + + def move(self, action): + """ + Overview: + Perform one move in the game. The game board can be shifted in one of four directions: up (0), right (1), down (2), or left (3). + This method manages the shifting process and combines similar adjacent elements. It also returns the reward generated from the move. + Arguments: + - direction (:obj:`int`): The direction of the move. + - trial (:obj:`bool`): If true, this move is only simulated and does not change the actual game state. + """ + # TODO(pu): different transition dynamics + pre_move_score = self.gameenv.get_score() + + self.gameenv.step(action) + + move_reward = self.gameenv.get_score() - pre_move_score + return move_reward + + def is_done(self): + """Has the game ended. Game ends if there is a tile equal to the limit + or there are no legal moves. If there are empty spaces then there + must be legal moves.""" + if self.gameenv.terminal: + return True + elif (self.max_score is not None) and (self.gameenv.get_score() >= self.max_score): + return True + else: + return False + + def seed(self, seed=None, seed1=None): + """Set the random seed for the gym environment.""" + self.np_random, seed = seeding.np_random(seed) + return [seed] + + def render(self, mode: str = None): + """ + Overview: + Renders the Botris game environment. + Arguments: + - mode (:obj:`str`): The rendering mode. Options are None, 'state_realtime_mode', 'image_realtime_mode' or 'image_savefile_mode'. + When set to None, the game state is not rendered. + In 'state_realtime_mode', the game state is illustrated in a text-based format directly in the console. + The 'image_realtime_mode' displays the game as an RGB image in real-time. + With 'image_savefile_mode', the game is rendered as an RGB image but not displayed in real-time. Instead, the image is saved to a designated file. + Please note that the default rendering mode is set to None. + """ + if mode == 'state_realtime_mode': + s = 'Current Return: {}, '.format(self.episode_return) + print(s) + self.gameenv.render() + else: + pil_board = self.gameenv.draw() + + # Instead of returning the image, we display it using pyplot + if mode == 'image_realtime_mode': + plt.imshow(np.asarray(pil_board)) + plt.draw() + # plt.pause(0.001) + elif mode == 'image_savefile_mode': + # Append the frame to frames for gif + self.frames.append(np.asarray(pil_board)) + + def save_render_output(self, replay_name_suffix: str = '', replay_path=None, format='gif'): + # At the end of the episode, save the frames to a gif or mp4 file + if replay_path is None: + filename = f'botris_{replay_name_suffix}.{format}' + else: + if not os.path.exists(replay_path): + os.makedirs(replay_path) + filename = replay_path + f'/botris_{replay_name_suffix}.{format}' + + if format == 'gif': + imageio.mimsave(filename, self.frames, 'GIF') + elif format == 'mp4': + imageio.mimsave(filename, self.frames, fps=30, codec='mpeg4') + + else: + raise ValueError("Unsupported format: {}".format(format)) + + logging.info("Saved output to {}".format(filename)) + self.frames = [] + + @property + def observation_space(self) -> gym.spaces.Space: + return self._observation_space + + @property + def action_space(self) -> gym.spaces.Space: + return self._action_space + + @property + def reward_space(self) -> gym.spaces.Space: + return self._reward_range + + @staticmethod + def create_collector_env_cfg(cfg: dict) -> List[dict]: + collector_env_num = cfg.pop('collector_env_num') + cfg = copy.deepcopy(cfg) + # when in collect phase, sometimes we need to normalize the reward + # reward_normalize is determined by the config. + cfg.is_collect = True + return [cfg for _ in range(collector_env_num)] + + @staticmethod + def create_evaluator_env_cfg(cfg: dict) -> List[dict]: + evaluator_env_num = cfg.pop('evaluator_env_num') + cfg = copy.deepcopy(cfg) + # when in evaluate phase, we don't need to normalize the reward. + cfg.reward_normalize = False + cfg.is_collect = False + return [cfg for _ in range(evaluator_env_num)] + + def __repr__(self) -> str: + return "LightZero game botris Env." \ No newline at end of file diff --git a/zoo/botris/envs/botris_lightzero_env.py b/zoo/botris/envs/botris_lightzero_env.py new file mode 100644 index 000000000..0b37851f9 --- /dev/null +++ b/zoo/botris/envs/botris_lightzero_env.py @@ -0,0 +1,389 @@ +import copy +import logging +import os +import sys +from typing import List, Literal + +import gymnasium as gym +import imageio +import matplotlib.font_manager as fm +import matplotlib.pyplot as plt +import numpy as np +from PIL import Image, ImageDraw, ImageFont +from ding.envs import BaseEnvTimestep +from ding.torch_utils import to_ndarray +from ding.utils import ENV_REGISTRY +from easydict import EasyDict +from gymnasium import spaces +from gymnasium.utils import seeding + +from .modals import NUMBER_OF_COLS, NUMBER_OF_ROWS, ENCODED_BOARD_SHAPE, ACTION_SPACE_SIZE, MAX_MOVE_SCORE, ENCODED_INPUT_SHAPE +from .env import GameEnvironment + +@ENV_REGISTRY.register('botris') +class BotrisEnv(gym.Env): + """ + Overview: + The BotrisEnv is a gym environment implementation of Botris. The environment provides an interface to interact with + the game and receive observations, rewards, and game status information. + + Interfaces: + - reset(init_board=None, add_random_tile_flag=True): + Resets the game state and starts a new episode. It returns the initial observation of the game. + - step(action): + Advances the game by one step based on the provided action. It returns the new observation, reward, game status, + and additional information. + - render(mode='human'): + Renders the current state of the game for visualization purposes. + MDP Definition: + - Observation Space: + NOT ACCURATE!!!!!!!!!!!!!1 + The observation space is a 4x4 grid representing the game board. Each cell in the grid can contain a number from + 0 to 2048. The observation can be in different formats based on the 'obs_type' parameter in the environment configuration. + - If 'obs_type' is set to 'encode_observation' (default): + The observation is a 3D numpy array of shape (4, 4, 16). Each cell in the array is represented as a one-hot vector + encoding the value of the tile in that cell. The one-hot vector has a length of 16, representing the possible tile + values from 0 to 2048. The first element in the one-hot vector corresponds to an empty cell (0 value). + - If 'obs_type' is set to 'dict_encoded_board': + The observation is a dictionary with the following keys: + - 'observation': A 3D numpy array representing the game board as described above. + - 'action_mask': A binary mask representing the legal actions that can be taken in the current state. + - 'to_play': A placeholder value (-1) indicating the current player (not applicable in this game). + - Action Space: + NOT ACCURATE!!!!!!!!!!!!!1 + The action space is a discrete space with 4 possible actions: + - 0: Move Up + - 1: Move Right + - 2: Move Down + - 3: Move Left + - Reward: + The reward depends on the 'reward_type' parameter in the environment configuration. + - If 'reward_type' is set to 'raw': + The reward is a floating-point number representing the immediate reward obtained from the last action. + - Done: + The game ends when one of the following conditions is met: + - The maximum score (configured by 'max_score') is reached. + - There are no legal moves left. + - The number of steps in the episode exceeds the maximum episode steps (configured by 'max_episode_steps'). + - Additional Information: + The 'info' dictionary returned by the 'step' method contains additional information about the current state. + The following keys are included in the dictionary: + - 'raw_reward': The raw reward obtained from the last action. + - Rendering: + The render method provides a way to visually represent the current state of the game. It offers four distinct rendering modes: + When set to None, the game state is not rendered. + In 'state_realtime_mode', the game state is illustrated in a text-based format directly in the console. + The 'image_realtime_mode' displays the game as an RGB image in real-time. + With 'image_savefile_mode', the game is rendered as an RGB image but not displayed in real-time. Instead, the image is saved to a designated file. + Please note that the default rendering mode is set to None. + """ + + # The default_config for Botris env. + config = dict( + # (str) The name of the environment registered in the environment registry. + env_id="botris", + # (str) The render mode. Options are 'None', 'state_realtime_mode', 'image_realtime_mode' or 'image_savefile_mode'. + # If None, then the game will not be rendered. + render_mode=None, + # (str) The format in which to save the replay. 'gif' is a popular choice. + replay_format='gif', + # (str) A suffix for the replay file name to distinguish it from other files. + replay_name_suffix='eval', + # (str or None) The directory in which to save the replay file. If None, the file is saved in the current directory. + replay_path=None, + # (bool) Whether to scale the actions. If True, actions are divided by the action space size. + act_scale=True, + # (str) The type of observation to use. Options are 'raw_encoded_board' and 'dict_encoded_board'. + obs_type='dict_encoded_board', + # (bool) Whether to normalize rewards. If True, rewards are divided by the maximum possible reward. + reward_normalize=False, + # (float) The factor to scale rewards by when reward normalization is used. + reward_norm_scale=100, + # (str) The type of reward to use. 'raw' means the raw game score.. + reward_type='raw', + # (int) The maximum score in the game. A game is won when this score is reached. + max_score=int(10_000), + # (int) The number of steps to delay rewards by. If > 0, the agent only receives a reward every this many steps. + delay_reward_step=0, + # (float) The probability that a random agent is used instead of the learning agent. + prob_random_agent=0., + # (int) The maximum number of steps in an episode. + max_episode_steps=int(1e6), + # (bool) Whether to collect data during the game. + is_collect=True, + # (bool) Whether to ignore legal actions. If True, the agent can take any action, even if it's not legal. + ignore_legal_actions=False, + ) + + @classmethod + def default_config(cls: type) -> EasyDict: + cfg: EasyDict = EasyDict(copy.deepcopy(cls.config)) + cfg.cfg_type = cls.__name__ + 'Dict' + return cfg + + def __init__(self, cfg: dict) -> None: + self._cfg: dict = cfg + self._init_flag: bool = False + self._env_id: str = cfg.env_id + self.replay_format: str = cfg.replay_format + self.replay_name_suffix: str = cfg.replay_name_suffix + self.replay_path: str = cfg.replay_path + self.render_mode: Literal['state_realtime_mode', 'image_realtime_mode', 'image_savefile_mode'] | None = cfg.render_mode + + self.obs_type: Literal['raw_encoded_board', 'dict_encoded_board'] = cfg.obs_type + self.reward_type: Literal['raw'] = cfg.reward_type + self.reward_normalize: bool = cfg.reward_normalize + self.reward_norm_scale: int = cfg.reward_norm_scale + assert self.reward_type in ['raw'] + assert self.reward_type == 'raw' + self.max_score: int = cfg.max_score + # Define the maximum score that will end the game (e.g. 1_000). None means no limit. + # This does not affect the state returned. + assert self.max_score is None or isinstance(self.max_score, int) + + self.max_episode_steps: int = cfg.max_episode_steps + self.is_collect: bool = cfg.is_collect + self.ignore_legal_actions: bool = cfg.ignore_legal_actions + self.w: int = NUMBER_OF_COLS + self.h: int = NUMBER_OF_ROWS + self.episode_return: int = 0 + # Members for gym implementation: + self._action_space = spaces.Discrete(ACTION_SPACE_SIZE) + self._observation_space = spaces.Box(0, 1, ENCODED_INPUT_SHAPE, dtype=int) + self._reward_range = (0., MAX_MOVE_SCORE) + + # Initialise the random seed of the gym environment. + self.seed() + self.frames = [] + + def reset(self): + """Reset the game.""" + self.episode_length = 0 + self.gameenv: GameEnvironment = GameEnvironment() + + self.episode_return = 0 + self._final_eval_reward = 0.0 + self.should_done = False + # Create a mask for legal actions + self.action_mask = self.gameenv.legal_moves_mask() + + # Encode the board, ensure correct datatype and shape + observation = self.gameenv.get_input_encoding() + observation = observation.astype(np.float32) + + # Based on the observation type, create the appropriate observation object + if self.obs_type == 'dict_encoded_board': + observation = { + 'observation': observation, + 'action_mask': self.action_mask.astype(np.int8), + 'to_play': -1, + } + elif self.obs_type == 'raw_encoded_board': + observation = observation + else: + raise NotImplementedError + + # Render the beginning state of the game. + if self.render_mode is not None: + self.render(self.render_mode) + + return observation + + def step(self, action): + """ + Overview: + Perform one step of the game. This involves making a move, and updating the game state. + The rewards are calculated based on the game configuration ('raw'). + The observations are also returned based on the game configuration ('raw_encoded_board' or 'dict_encoded_board'). + Arguments: + - action (:obj:`int`): The action to be performed. + Returns: + - BaseEnvTimestep: Contains the new state observation, reward, and other game information. + """ + + # Increment the total episode length + self.episode_length += 1 + + # Check if the action is legal, otherwise choose a random legal action + if not self.action_mask[action]: + logging.warning( + f"Illegal action: {action}. Legal actions: {self.action_mask}. " + "Choosing a random action from legal actions." + ) + action = np.random.choice(np.where(self.action_mask == 1)[0]) + raw_reward = float(self.move(action)) + + # Update total reward and add new tile + self.episode_return += raw_reward + + + # Convert rewards to float + if self.reward_type == 'raw': + raw_reward = float(raw_reward) + + # Prepare the game state observation + observation = self.gameenv.get_input_encoding() + observation = observation.astype(np.float32) + + # Return the observation based on the observation type + if self.obs_type == 'dict_encoded_board': + observation = {'observation': observation, 'action_mask': self.action_mask.astype(np.int8), 'to_play': -1} + elif self.obs_type == 'raw_encoded_board': + observation = observation + else: + raise NotImplementedError + + # Check if the game has ended + done = self.is_done() + + # End the game if the maximum steps have been reached + if self.episode_length >= self.max_episode_steps: + done = True + + # Normalize the reward if necessary + if self.reward_normalize: + reward_normalize = raw_reward / self.reward_norm_scale + reward = reward_normalize + else: + reward = raw_reward + + self._final_eval_reward += raw_reward + + # Convert the reward to ndarray + if self.reward_type == 'raw': + reward = to_ndarray([reward]).astype(np.float32) + + # Prepare information to return + info = {"raw_reward": raw_reward} + + # Render the new step. + if self.render_mode is not None: + self.render(self.render_mode) + + # If the game has ended, save additional information and the replay if necessary + if done: + info['eval_episode_return'] = self._final_eval_reward + if self.render_mode == 'image_savefile_mode': + self.save_render_output(replay_name_suffix=self.replay_name_suffix, replay_path=self.replay_path, + format=self.replay_format) + + return BaseEnvTimestep(observation, reward, done, info) + + def move(self, action): + """ + Overview: + Perform one move in the game. The game board can be shifted in one of four directions: up (0), right (1), down (2), or left (3). + This method manages the shifting process and combines similar adjacent elements. It also returns the reward generated from the move. + Arguments: + - direction (:obj:`int`): The direction of the move. + - trial (:obj:`bool`): If true, this move is only simulated and does not change the actual game state. + """ + # TODO(pu): different transition dynamics + pre_move_score = self.gameenv.get_score() + + self.gameenv.step_action(action) + + move_reward = self.gameenv.get_score() - pre_move_score + return move_reward + + def is_done(self): + """Has the game ended. Game ends if there is a tile equal to the limit + or there are no legal moves. If there are empty spaces then there + must be legal moves.""" + + self.action_mask = self.gameenv.legal_moves_mask() + if self.max_score is not None and self.gameenv.get_score() >= self.max_score: + return True + elif not self.action_mask.any(): + # the agent don't have legal_actions to move, so the episode is done + return True + elif self.should_done: + return True + else: + return False + + def seed(self, seed=None, seed1=None): + """Set the random seed for the gym environment.""" + self.np_random, seed = seeding.np_random(seed) + return [seed] + + def render(self, mode: str = None): + """ + Overview: + Renders the Botris game environment. + Arguments: + - mode (:obj:`str`): The rendering mode. Options are None, 'state_realtime_mode', 'image_realtime_mode' or 'image_savefile_mode'. + When set to None, the game state is not rendered. + In 'state_realtime_mode', the game state is illustrated in a text-based format directly in the console. + The 'image_realtime_mode' displays the game as an RGB image in real-time. + With 'image_savefile_mode', the game is rendered as an RGB image but not displayed in real-time. Instead, the image is saved to a designated file. + Please note that the default rendering mode is set to None. + """ + if mode == 'state_realtime_mode': + s = 'Current Return: {}, '.format(self.episode_return) + print(s) + self.gameenv.render() + else: + pil_board = self.gameenv.draw() + + # Instead of returning the image, we display it using pyplot + if mode == 'image_realtime_mode': + plt.imshow(np.asarray(pil_board)) + plt.draw() + # plt.pause(0.001) + elif mode == 'image_savefile_mode': + # Append the frame to frames for gif + self.frames.append(np.asarray(pil_board)) + + def save_render_output(self, replay_name_suffix: str = '', replay_path=None, format='gif'): + # At the end of the episode, save the frames to a gif or mp4 file + if replay_path is None: + filename = f'botris_{replay_name_suffix}.{format}' + else: + if not os.path.exists(replay_path): + os.makedirs(replay_path) + filename = replay_path + f'/botris_{replay_name_suffix}.{format}' + + if format == 'gif': + imageio.mimsave(filename, self.frames, 'GIF') + elif format == 'mp4': + imageio.mimsave(filename, self.frames, fps=30, codec='mpeg4') + + else: + raise ValueError("Unsupported format: {}".format(format)) + + logging.info("Saved output to {}".format(filename)) + self.frames = [] + + @property + def observation_space(self) -> gym.spaces.Space: + return self._observation_space + + @property + def action_space(self) -> gym.spaces.Space: + return self._action_space + + @property + def reward_space(self) -> gym.spaces.Space: + return self._reward_range + + @staticmethod + def create_collector_env_cfg(cfg: dict) -> List[dict]: + collector_env_num = cfg.pop('collector_env_num') + cfg = copy.deepcopy(cfg) + # when in collect phase, sometimes we need to normalize the reward + # reward_normalize is determined by the config. + cfg.is_collect = True + return [cfg for _ in range(collector_env_num)] + + @staticmethod + def create_evaluator_env_cfg(cfg: dict) -> List[dict]: + evaluator_env_num = cfg.pop('evaluator_env_num') + cfg = copy.deepcopy(cfg) + # when in evaluate phase, we don't need to normalize the reward. + cfg.reward_normalize = False + cfg.is_collect = False + return [cfg for _ in range(evaluator_env_num)] + + def __repr__(self) -> str: + return "LightZero game botris Env." \ No newline at end of file diff --git a/zoo/botris/envs/botris_versus_lightzero_env.py b/zoo/botris/envs/botris_versus_lightzero_env.py new file mode 100644 index 000000000..7f401ece8 --- /dev/null +++ b/zoo/botris/envs/botris_versus_lightzero_env.py @@ -0,0 +1,376 @@ +import copy +import logging +import os +import sys +from typing import List, Literal, Tuple + +import gymnasium as gym +import imageio +import matplotlib.font_manager as fm +import matplotlib.pyplot as plt +import numpy as np +from PIL import Image, ImageDraw, ImageFont +from ding.envs import BaseEnvTimestep +from ding.torch_utils import to_ndarray +from ding.utils import ENV_REGISTRY +from easydict import EasyDict +from gymnasium import spaces +from gymnasium.utils import seeding + +from .modals import NUMBER_OF_COLS, NUMBER_OF_ROWS, ENCODED_BOARD_SHAPE, ACTION_SPACE_SIZE, MAX_MOVE_SCORE, ENCODED_INPUT_SHAPE +from .env_versus import GameEnvironment + +@ENV_REGISTRY.register('botris-versus') +class BotrisEnv(gym.Env): + """ + Overview: + The BotrisEnv is a gym environment implementation of Botris. The environment provides an interface to interact with + the game and receive observations, rewards, and game status information. + + Interfaces: + - reset(init_board=None, add_random_tile_flag=True): + Resets the game state and starts a new episode. It returns the initial observation of the game. + - step(action): + Advances the game by one step based on the provided action. It returns the new observation, reward, game status, + and additional information. + - render(mode='human'): + Renders the current state of the game for visualization purposes. + MDP Definition: + - Observation Space: + NOT ACCURATE!!!!!!!!!!!!!1 + The observation space is a 4x4 grid representing the game board. Each cell in the grid can contain a number from + 0 to 2048. The observation can be in different formats based on the 'obs_type' parameter in the environment configuration. + - If 'obs_type' is set to 'encode_observation' (default): + The observation is a 3D numpy array of shape (4, 4, 16). Each cell in the array is represented as a one-hot vector + encoding the value of the tile in that cell. The one-hot vector has a length of 16, representing the possible tile + values from 0 to 2048. The first element in the one-hot vector corresponds to an empty cell (0 value). + - If 'obs_type' is set to 'dict_encoded_board': + The observation is a dictionary with the following keys: + - 'observation': A 3D numpy array representing the game board as described above. + - 'action_mask': A binary mask representing the legal actions that can be taken in the current state. + - 'to_play': A placeholder value (-1) indicating the current player (not applicable in this game). + - Action Space: + NOT ACCURATE!!!!!!!!!!!!!1 + The action space is a discrete space with 4 possible actions: + - 0: Move Up + - 1: Move Right + - 2: Move Down + - 3: Move Left + - Reward: + The reward depends on the 'reward_type' parameter in the environment configuration. + - If 'reward_type' is set to 'raw': + The reward is a floating-point number representing the immediate reward obtained from the last action. + - Done: + The game ends when one of the following conditions is met: + - The maximum score (configured by 'max_score') is reached. + - There are no legal moves left. + - The number of steps in the episode exceeds the maximum episode steps (configured by 'max_episode_steps'). + - Additional Information: + The 'info' dictionary returned by the 'step' method contains additional information about the current state. + The following keys are included in the dictionary: + - 'raw_reward': The raw reward obtained from the last action. + - Rendering: + The render method provides a way to visually represent the current state of the game. It offers four distinct rendering modes: + When set to None, the game state is not rendered. + In 'state_realtime_mode', the game state is illustrated in a text-based format directly in the console. + The 'image_realtime_mode' displays the game as an RGB image in real-time. + With 'image_savefile_mode', the game is rendered as an RGB image but not displayed in real-time. Instead, the image is saved to a designated file. + Please note that the default rendering mode is set to None. + """ + + # The default_config for Botris env. + config = dict( + # (str) The name of the environment registered in the environment registry. + env_id="botris", + # (str) The render mode. Options are 'None', 'state_realtime_mode', 'image_realtime_mode' or 'image_savefile_mode'. + # If None, then the game will not be rendered. + render_mode=None, + # (str) The format in which to save the replay. 'gif' is a popular choice. + replay_format='gif', + # (str) A suffix for the replay file name to distinguish it from other files. + replay_name_suffix='eval', + # (str or None) The directory in which to save the replay file. If None, the file is saved in the current directory. + replay_path=None, + # (bool) Whether to scale the actions. If True, actions are divided by the action space size. + act_scale=True, + # (str) The type of observation to use. Options are 'raw_encoded_board' and 'dict_encoded_board'. + obs_type='dict_encoded_board', + # (bool) Whether to normalize rewards. If True, rewards are divided by the maximum possible reward. + reward_normalize=False, + # (float) The factor to scale rewards by when reward normalization is used. + reward_norm_scale=100, + # (str) The type of reward to use. 'raw' means the raw game score.. + reward_type='raw', + # (int) The maximum score in the game. A game is won when this score is reached. + max_score=int(10_000), + # (int) The number of steps to delay rewards by. If > 0, the agent only receives a reward every this many steps. + delay_reward_step=0, + # (float) The probability that a random agent is used instead of the learning agent. + prob_random_agent=0., + # (int) The maximum number of steps in an episode. + max_episode_steps=int(1e6), + # (bool) Whether to collect data during the game. + is_collect=True, + # (bool) Whether to ignore legal actions. If True, the agent can take any action, even if it's not legal. + ignore_legal_actions=False, + ) + + @classmethod + def default_config(cls: type) -> EasyDict: + cfg: EasyDict = EasyDict(copy.deepcopy(cls.config)) + cfg.cfg_type = cls.__name__ + 'Dict' + return cfg + + def __init__(self, cfg: dict) -> None: + self._cfg: dict = cfg + self._init_flag: bool = False + self._env_id: str = cfg.env_id + self.replay_format: str = cfg.replay_format + self.replay_name_suffix: str = cfg.replay_name_suffix + self.replay_path: str = cfg.replay_path + self.render_mode: Literal['state_realtime_mode', 'image_realtime_mode', 'image_savefile_mode'] | None = cfg.render_mode + + self.obs_type: Literal['raw_encoded_board', 'dict_encoded_board'] = cfg.obs_type + self.reward_type: Literal['raw'] = cfg.reward_type + self.reward_normalize: bool = cfg.reward_normalize + self.reward_norm_scale: int = cfg.reward_norm_scale + assert self.reward_type in ['raw'] + assert self.reward_type == 'raw' + self.max_score: int = cfg.max_score + # Define the maximum score that will end the game (e.g. 1_000). None means no limit. + # This does not affect the state returned. + assert self.max_score is None or isinstance(self.max_score, int) + + self.max_episode_steps: int = cfg.max_episode_steps + self.is_collect: bool = cfg.is_collect + self.ignore_legal_actions: bool = cfg.ignore_legal_actions + self.w: int = NUMBER_OF_COLS + self.h: int = NUMBER_OF_ROWS + self.episode_return: int = 0 + # Members for gym implementation: + self._action_space = spaces.Discrete(ACTION_SPACE_SIZE) + self._observation_space = spaces.Box(0, 1, ENCODED_INPUT_SHAPE, dtype=int) + self._reward_space = gym.spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32) + + # Initialise the random seed of the gym environment. + self.seed() + self.frames = [] + + def reset(self): + """Reset the game.""" + self.episode_length = 0 + self.gameenv: GameEnvironment = GameEnvironment() + obs = self.observe() + return obs + + def observe(self) -> dict: + self.action_mask = self.gameenv.legal_moves_mask().astype(np.int8) + return {"observation": self.current_state(), + "action_mask": self.action_mask, + "to_play": self.current_player + } + + def current_state(self) -> Tuple[np.ndarray, np.ndarray]: + """ + Overview: + Obtain the state from the view of current player.\ + self.board is nd-array, 0 indicates that no stones is placed here,\ + 1 indicates that player 1's stone is placed here, 2 indicates player 2's stone is placed here. + Returns: + - current_state (:obj:`array`): + the 0 dim means which positions is occupied by ``self.current_player``,\ + the 1 dim indicates which positions are occupied by ``self.next_player``,\ + the 2 dim indicates which player is the to_play player, 1 means player 1, 2 means player 2. + """ + observation = self.gameenv.get_input_encoding() + observation = observation.astype(np.float32) + return observation + + def get_done_winner(self) -> Tuple[bool, int]: + """ + Overview: + Check if the game is done and find the winner. + Returns: + - outputs (:obj:`Tuple`): Tuple containing 'done' and 'winner', + - if player 1 win, 'done' = True, 'winner' = 1 + - if player 2 win, 'done' = True, 'winner' = 2 + - if draw, 'done' = True, 'winner' = -1 + - if game is not over, 'done' = False,'winner' = -1 + """ + done = self.gameenv.is_terminal() + winner = self.gameenv.get_winner() + return done, winner + + def _player_step(self, action: int) -> BaseEnvTimestep: + """ + Overview: + A function that implements the transition of the environment's state. \ + After taking an action in the environment, the function transitions the environment to the next state \ + and returns the relevant information for the next time step. + Arguments: + - action (:obj:`int`): A value from 0 to 6 indicating the position to move on the connect4 board. + - flag (:obj:`str`): A marker indicating the source of an action, for debugging convenience. + Returns: + - timestep (:obj:`BaseEnvTimestep`): A namedtuple that records the observation and obtained reward after taking the action, \ + whether the game is terminated, and some other information. + """ + if self.legal_actions[action]: + self.gameenv.step_action(action) + else: + logging.warning( + f"You input illegal action: {action}, the legal_actions are {self.legal_actions}. " + f"Now we randomly choice a action from self.legal_actions." + ) + action = np.random.choice(self.legal_actions) + self.gameenv.step_action(action) + + done, winner = self.get_done_winner() + if winner != -1: + reward = np.array(1).astype(np.float32) + else: + reward = np.array(0).astype(np.float32) + + info = {} + + obs = self.observe() + + # Render the new step. + if self.render_mode is not None: + self.render(self.render_mode) + if done: + info['eval_episode_return'] = reward + if self.render_mode == 'image_savefile_mode': + self.save_render_output(replay_name_suffix=self.replay_name_suffix, replay_path=self.replay_path, + format=self.replay_format) + + return BaseEnvTimestep(obs, reward, done, info) + + def step(self, action): + """ + Overview: + Perform one step of the game. This involves making a move, and updating the game state. + The rewards are calculated based on the game configuration ('raw'). + The observations are also returned based on the game configuration ('raw_encoded_board' or 'dict_encoded_board'). + Arguments: + - action (:obj:`int`): The action to be performed. + Returns: + - BaseEnvTimestep: Contains the new state observation, reward, and other game information. + """ + timestep = self._player_step(action) + + if timestep.done: + # The ``eval_episode_return`` is calculated from player 1's perspective. + timestep.info['eval_episode_return'] = -timestep.reward if timestep.obs[ + 'to_play'] == 1 else timestep.reward + + return timestep + + + @property + def observation_space(self) -> gym.spaces.Space: + return self._observation_space + + @property + def action_space(self) -> gym.spaces.Space: + return self._action_space + + @property + def reward_space(self) -> gym.spaces.Space: + return self._reward_space + + def seed(self, seed=None, seed1=None): + """Set the random seed for the gym environment.""" + self.np_random, seed = seeding.np_random(seed) + return [seed] + + def render(self, mode: str = None): + """ + Overview: + Renders the Botris game environment. + Arguments: + - mode (:obj:`str`): The rendering mode. Options are None, 'state_realtime_mode', 'image_realtime_mode' or 'image_savefile_mode'. + When set to None, the game state is not rendered. + In 'state_realtime_mode', the game state is illustrated in a text-based format directly in the console. + The 'image_realtime_mode' displays the game as an RGB image in real-time. + With 'image_savefile_mode', the game is rendered as an RGB image but not displayed in real-time. Instead, the image is saved to a designated file. + Please note that the default rendering mode is set to None. + """ + if mode == 'state_realtime_mode': + s = 'Current Return: {}, '.format(self.episode_return) + print(s) + self.gameenv.render() + else: + pil_board = self.gameenv.draw() + + # Instead of returning the image, we display it using pyplot + if mode == 'image_realtime_mode': + plt.imshow(np.asarray(pil_board)) + plt.draw() + # plt.pause(0.001) + elif mode == 'image_savefile_mode': + # Append the frame to frames for gif + self.frames.append(np.asarray(pil_board)) + + def save_render_output(self, replay_name_suffix: str = '', replay_path=None, format='gif'): + # At the end of the episode, save the frames to a gif or mp4 file + if replay_path is None: + filename = f'botris_{replay_name_suffix}.{format}' + else: + if not os.path.exists(replay_path): + os.makedirs(replay_path) + filename = replay_path + f'/botris_{replay_name_suffix}.{format}' + + if format == 'gif': + imageio.mimsave(filename, self.frames, 'GIF') + elif format == 'mp4': + imageio.mimsave(filename, self.frames, fps=30, codec='mpeg4') + + else: + raise ValueError("Unsupported format: {}".format(format)) + + logging.info("Saved output to {}".format(filename)) + self.frames = [] + + @property + def legal_actions(self) -> List[int]: + return np.where(self.legal_actions == 1)[0] + + @property + def current_player(self): + return self.gameenv.current_player + + @property + def next_player(self): + return 1 - self.current_player + + + def simulate_action(self, action): + """ + Overview: + execute action and get next_simulator_env. used in AlphaZero. + Returns: + Returns Gomoku instance. + """ + if not self.action_mask[action]: + raise ValueError("action {0} on board {1} is not legal".format(action, self.board)) + next_simulator_env = copy.deepcopy(self) + next_simulator_env.step(action) + return next_simulator_env + + @staticmethod + def create_collector_env_cfg(cfg: dict) -> List[dict]: + collector_env_num = cfg.pop('collector_env_num') + cfg = copy.deepcopy(cfg) + return [cfg for _ in range(collector_env_num)] + + @staticmethod + def create_evaluator_env_cfg(cfg: dict) -> List[dict]: + evaluator_env_num = cfg.pop('evaluator_env_num') + cfg = copy.deepcopy(cfg) + # In eval phase, we use ``eval_mode`` to make agent play with the built-in bot to + # evaluate the performance of the current agent. + cfg.battle_mode = 'eval_mode' + return [cfg for _ in range(evaluator_env_num)] + + def __repr__(self) -> str: + return "LightZero Botris Env" \ No newline at end of file diff --git a/zoo/botris/envs/env.py b/zoo/botris/envs/env.py new file mode 100644 index 000000000..6c2b162fb --- /dev/null +++ b/zoo/botris/envs/env.py @@ -0,0 +1,95 @@ +from __future__ import annotations +from botris import TetrisGame +from .utils import encode_input, encode_move_index, encode_board, decode_move_index, encode_piece_coordinates, dencode_piece_coordinates, decode_queue, decode_board +from .modals import Piece, EncodedInput, Rotation, EncodedMove, ENCODED_MOVE_SHAPE, EncodedBoard, NUMBER_OF_ROWS, NUMBER_OF_COLS, NUMBER_OF_PIECES +from typing import Deque, List, Tuple +from botris.engine import Board, PieceData +from botris.engine import Piece as BotrisPiece +import numpy as np +from collections import deque +from PIL import Image + +class GameEnvironment: + def __init__(self, score_scale: int | None = 5, piece_reward: int | None = 1) -> None: + options = {} + if score_scale is not None: + options['attack_table'] = { + "single": score_scale, + "double": score_scale * 2, + "triple": score_scale * 4, + "quad": score_scale * 8, + "ass": score_scale * 4, + "asd": score_scale * 8, + "ast": score_scale * 12, + "pc": score_scale * 20, + "b2b": score_scale * 2, + } + options['combo_table'] = [score_scale, score_scale, score_scale * 2, score_scale * 2, score_scale * 2, score_scale * 4, score_scale * 4, score_scale * 6, score_scale * 6, score_scale * 8] + self.game: TetrisGame = TetrisGame(options=options) + self.piece_reward: int | None = piece_reward + + def copy(self) -> GameEnvironment: + new_env = GameEnvironment() + new_env.game = self.game.copy() + return new_env + + def reset(self) -> None: + self.game.reset() + + def is_terminal(self) -> bool: + return self.game.dead + + def get_input_encoding(self) -> EncodedInput: + _board: Board = self.game.board + board: EncodedBoard = encode_board(_board) + + _queue: Deque[BotrisPiece] = self.game.queue + queue: List[Piece] = [piece.index for piece in list(_queue)] + + _current_piece: BotrisPiece = self.game.current.piece + current_piece: Piece = _current_piece.index + + _held_piece: BotrisPiece = self.game.held + held_piece: Piece = _held_piece.index if _held_piece is not None else Piece.NONE + + garbage_queued: int = len(self.game.garbage_queue) + combo: int = self.game.combo + b2b: bool = self.game.b2b + + return encode_input(board, queue, current_piece, held_piece, garbage_queued, combo, b2b) + + def step(self, move: Tuple[Piece, Rotation, int, int]) -> None: + piece_type, rotation, row, col = move + botris_piece: BotrisPiece = BotrisPiece.from_index(piece_type) + x, y = dencode_piece_coordinates(botris_piece, rotation, row, col) + piece_data = PieceData(botris_piece, x, y, rotation) + self.game.dangerously_drop_piece(piece_data) + + def step_action(self, action) -> None: + move = decode_move_index(action) + self.step(move) + + def get_score(self, terminal_score=None) -> int: + if self.game.dead and terminal_score is not None: + return terminal_score + if self.piece_reward is not None: + return self.game.score + self.game.pieces_placed * self.piece_reward + return self.game.score + + def legal_moves_mask(self) -> EncodedMove: + legal_moves_dict = self.game.generate_moves() + legal_moves = np.zeros(ENCODED_MOVE_SHAPE, dtype=bool) + for piece_data in legal_moves_dict.keys(): + piece, rotation = piece_data.piece.index, piece_data.rotation + col, row = encode_piece_coordinates(piece_data) + if (col < 0) or (col >= NUMBER_OF_COLS) or (row < 0) or (row >= NUMBER_OF_ROWS): + continue + move_idx = encode_move_index(piece, rotation, row, col) + legal_moves[move_idx] = True + return legal_moves + + def render(self, render_current=False) -> None: + self.game.render_board(render_current=render_current) + + def draw(self) -> Image: + return self.game.draw_board() \ No newline at end of file diff --git a/zoo/botris/envs/env_5move.py b/zoo/botris/envs/env_5move.py new file mode 100644 index 000000000..f7dd91297 --- /dev/null +++ b/zoo/botris/envs/env_5move.py @@ -0,0 +1,84 @@ +from __future__ import annotations +from botris import TetrisGame +from .utils import encode_input, encode_move_index, encode_board, decode_move_index, encode_piece_coordinates, dencode_piece_coordinates, decode_queue, decode_board +from .modals import Piece, EncodedInput, Rotation, EncodedBoard, NUMBER_OF_ROWS, NUMBER_OF_COLS, NUMBER_OF_PIECES +from typing import Deque, List, Tuple +from botris.engine import Board, PieceData, Move +from botris.engine import Piece as BotrisPiece +from botris.engine.utils import place_piece +import numpy as np +from collections import deque +from PIL import Image + +MOVES = np.array([Move.hold, Move.move_left, Move.move_right, Move.rotate_cw, Move.rotate_ccw, Move.drop, Move.sonic_drop, Move.sonic_left, Move.sonic_right, Move.hard_drop]) +ACTION_SPACE_SIZE: int = MOVES.size + + +class GameEnvironment5Move: + def __init__(self, score_scale: int | None = 5, piece_reward: int | None = 1) -> None: + options = {} + if score_scale is not None: + options['attack_table'] = { + "single": score_scale, + "double": score_scale * 2, + "triple": score_scale * 4, + "quad": score_scale * 8, + "ass": score_scale * 4, + "asd": score_scale * 8, + "ast": score_scale * 12, + "pc": score_scale * 20, + "b2b": score_scale * 2, + } + options['combo_table'] = [score_scale, score_scale, score_scale * 2, score_scale * 2, score_scale * 2, score_scale * 4, score_scale * 4, score_scale * 6, score_scale * 6, score_scale * 8] + options['board_width'] = NUMBER_OF_COLS + options['board_height'] = NUMBER_OF_ROWS + self.game: TetrisGame = TetrisGame(options=options) + self.piece_reward: int | None = piece_reward + + def copy(self) -> GameEnvironment5Move: + new_env = GameEnvironment5Move() + new_env.game = self.game.copy() + return new_env + + def reset(self) -> None: + self.game.reset() + + @property + def terminal(self) -> bool: + return self.game.dead + + def get_input_encoding(self) -> EncodedInput: + _board: Board = self.game.board #place_piece(self.game.board, self.game.current, self.game.options.board_width) + board: EncodedBoard = encode_board(_board) + + _queue: Deque[BotrisPiece] = self.game.queue + queue: List[Piece] = [piece.index for piece in list(_queue)] + + _current_piece: BotrisPiece = self.game.current.piece + current_piece: Piece = _current_piece.index + + _held_piece: BotrisPiece = self.game.held + held_piece: Piece = _held_piece.index if _held_piece is not None else Piece.NONE + + garbage_queued: int = len(self.game.garbage_queue) + combo: int = self.game.combo + b2b: bool = self.game.b2b + + return encode_input(board, queue, current_piece, held_piece, garbage_queued, combo, b2b) + + def step(self, move_idx: int) -> None: + move: Move = MOVES[move_idx] + self.game.execute_move(move) + + def get_score(self, terminal_score=None) -> int: + if self.game.dead and terminal_score is not None: + return terminal_score + if self.piece_reward is not None: + return self.game.score + self.game.pieces_placed * self.piece_reward + return self.game.score + + def render(self, render_current=False) -> None: + self.game.render_board(render_current=render_current) + + def draw(self) -> Image: + return self.game.draw_board() \ No newline at end of file diff --git a/zoo/botris/envs/env_versus.py b/zoo/botris/envs/env_versus.py new file mode 100644 index 000000000..1fbf6090b --- /dev/null +++ b/zoo/botris/envs/env_versus.py @@ -0,0 +1,100 @@ +from __future__ import annotations +from botris import TetrisGame +from .utils import encode_input, encode_move_index, encode_board, decode_move_index, encode_piece_coordinates, dencode_piece_coordinates, decode_queue, decode_board +from .modals import Piece, EncodedInput, Rotation, EncodedMove, ENCODED_MOVE_SHAPE, EncodedBoard, NUMBER_OF_ROWS, NUMBER_OF_COLS, NUMBER_OF_PIECES +from typing import Deque, List, Tuple +from botris.engine import Board, PieceData, generate_garbage +from botris.engine import Piece as BotrisPiece +import numpy as np +from collections import deque +from PIL import Image + +class GameEnvironment: + def __init__(self) -> None: + self.game1: TetrisGame = TetrisGame() + self.game2: TetrisGame = TetrisGame() + self.current_player: int = 0 + + def copy(self) -> GameEnvironment: + new_env = GameEnvironment() + new_env.game1 = self.game1.copy() + new_env.game2 = self.game2.copy() + new_env.current_player = self.current_player + return new_env + + def reset(self) -> None: + self.game1.reset() + self.game2.reset() + self.current_player = 0 + + def is_terminal(self) -> bool: + return self.game1.dead or self.game2.dead + + def get_input_encoding(self) -> EncodedInput: + game = self.game2 if self.current_player else self.game1 + _board: Board = game.board + board: EncodedBoard = encode_board(_board) + + _queue: Deque[BotrisPiece] = game.queue + queue: List[Piece] = [piece.index for piece in list(_queue)] + + _current_piece: BotrisPiece = game.current.piece + current_piece: Piece = _current_piece.index + + _held_piece: BotrisPiece = game.held + held_piece: Piece = _held_piece.index if _held_piece is not None else Piece.NONE + + garbage_queued: int = len(game.garbage_queue) + combo: int = game.combo + b2b: bool = game.b2b + + return encode_input(board, queue, current_piece, held_piece, garbage_queued, combo, b2b) + + def step(self, move: Tuple[Piece, Rotation, int, int]) -> None: + piece_type, rotation, row, col = move + botris_piece: BotrisPiece = BotrisPiece.from_index(piece_type) + x, y = dencode_piece_coordinates(botris_piece, rotation, row, col) + piece_data = PieceData(botris_piece, x, y, rotation) + game = self.game2 if self.current_player else self.game1 + events = game.dangerously_drop_piece(piece_data) + self.current_player = 1 - self.current_player + other_game = self.game2 if self.current_player else self.game1 + for event in events: + if event.type == "clear": + attack: int = event.attack + other_game.queue_attack(attack) + + def step_action(self, action) -> None: + move = decode_move_index(action) + self.step(move) + + def get_winner(self) -> int: + if self.game1.dead and self.game2.dead: + return -1 + if self.game1.dead: + return 1 + if self.game2.dead: + return 0 + return None + + def legal_moves_mask(self) -> EncodedMove: + game = self.game2 if self.current_player else self.game1 + + legal_moves_dict = game.generate_moves() + legal_moves = np.zeros(ENCODED_MOVE_SHAPE, dtype=bool) + for piece_data in legal_moves_dict.keys(): + piece, rotation = piece_data.piece.index, piece_data.rotation + col, row = encode_piece_coordinates(piece_data) + if (col < 0) or (col >= NUMBER_OF_COLS) or (row < 0) or (row >= NUMBER_OF_ROWS): + continue + move_idx = encode_move_index(piece, rotation, row, col) + legal_moves[move_idx] = True + return legal_moves + + def render(self, render_current=False) -> None: + game = self.game2 if self.current_player else self.game1 + game.render_board(render_current=render_current) + + def draw(self) -> Image: + game = self.game2 if self.current_player else self.game1 + return game.draw_board() \ No newline at end of file diff --git a/zoo/botris/envs/modals.py b/zoo/botris/envs/modals.py new file mode 100644 index 000000000..6366113d6 --- /dev/null +++ b/zoo/botris/envs/modals.py @@ -0,0 +1,45 @@ +from __future__ import annotations +from typing import Tuple, Literal, Annotated +import numpy as np +from numpy.typing import NDArray + +class Piece(int): + I: Piece = 0 + O: Piece = 1 + J: Piece = 2 + L: Piece = 3 + S: Piece = 4 + Z: Piece = 5 + T: Piece = 6 + NONE: Piece = 7 + +NUMBER_OF_PIECES: int = 8 +PIECES: Tuple[Piece] = (Piece.I, Piece.O, Piece.J, Piece.L, Piece.S, Piece.Z, Piece.T, Piece.NONE) + +NUMBER_OF_ROWS: int = 8 +NUMBER_OF_COLS: int = 10 + +Rotation = Literal[0, 1, 2, 3] +NUMBER_OF_ROTATIONS: int = 4 + +QUEUE_SIZE: int = 6 +INCLUDE_CURRENT_PIECE: Literal[0, 1] = 1 +INCLUDE_HELD_PIECE: Literal[0, 1] = 1 +INCLUDE_GARBAGE_QUEUED: Literal[0, 1] = 0 +INCLUDE_COMBO: Literal[0, 1] = 0 +INCLUDE_B2B: Literal[0, 1] = 0 + +MAX_GARBAGE_QUEUED: int = 15 +MAX_COMBO: int = 11 +MAX_MOVE_SCORE: int = 100 + +ACTION_SPACE_SIZE: int = NUMBER_OF_PIECES * NUMBER_OF_ROTATIONS * NUMBER_OF_ROWS * NUMBER_OF_COLS +OBSERVATION_SPACE_SIZE: int = NUMBER_OF_ROWS * NUMBER_OF_COLS + QUEUE_SIZE * NUMBER_OF_PIECES + NUMBER_OF_PIECES * INCLUDE_CURRENT_PIECE + NUMBER_OF_PIECES * INCLUDE_HELD_PIECE + MAX_GARBAGE_QUEUED * INCLUDE_GARBAGE_QUEUED + MAX_COMBO * INCLUDE_COMBO + INCLUDE_B2B + +ENCODED_MOVE_SHAPE: Tuple[int] = (ACTION_SPACE_SIZE,) +ENCODED_BOARD_SHAPE: Tuple[int] = (NUMBER_OF_ROWS, NUMBER_OF_COLS,) +ENCODED_INPUT_SHAPE: Tuple[int] = (OBSERVATION_SPACE_SIZE,) + +EncodedMove = Annotated[NDArray[np.int8], ENCODED_MOVE_SHAPE] +EncodedBoard = Annotated[NDArray[np.int8], ENCODED_BOARD_SHAPE] +EncodedInput = Annotated[NDArray[np.int8], ENCODED_INPUT_SHAPE] \ No newline at end of file diff --git a/zoo/botris/envs/utils.py b/zoo/botris/envs/utils.py new file mode 100644 index 000000000..89f8ecc8c --- /dev/null +++ b/zoo/botris/envs/utils.py @@ -0,0 +1,134 @@ +import numpy as np +from .modals import (NUMBER_OF_COLS, NUMBER_OF_PIECES, NUMBER_OF_ROWS, NUMBER_OF_ROTATIONS, Rotation, + Piece, EncodedBoard, EncodedMove, QUEUE_SIZE, ENCODED_MOVE_SHAPE, ENCODED_INPUT_SHAPE, + ENCODED_BOARD_SHAPE, ACTION_SPACE_SIZE, INCLUDE_GARBAGE_QUEUED, INCLUDE_CURRENT_PIECE, + INCLUDE_HELD_PIECE, INCLUDE_COMBO, INCLUDE_B2B, MAX_COMBO, MAX_GARBAGE_QUEUED) +from botris.engine import Board, get_piece_border, PieceData +from botris.engine import Piece as BotrisPiece +from typing import Tuple, List + + +def encode_move_index(piece_type: Piece, rotation: Rotation, row: int, col: int) -> int: + return piece_type * NUMBER_OF_ROTATIONS * NUMBER_OF_ROWS * NUMBER_OF_COLS + rotation * NUMBER_OF_ROWS * NUMBER_OF_COLS + row * NUMBER_OF_COLS + col + +def decode_move_index(move_idx: int) -> Tuple[Piece, Rotation, int, int]: + piece_type = move_idx // (NUMBER_OF_ROTATIONS * NUMBER_OF_ROWS * NUMBER_OF_COLS) + move_idx -= piece_type * NUMBER_OF_ROTATIONS * NUMBER_OF_ROWS * NUMBER_OF_COLS + rotation = move_idx // (NUMBER_OF_ROWS * NUMBER_OF_COLS) + move_idx -= rotation * NUMBER_OF_ROWS * NUMBER_OF_COLS + row = move_idx // NUMBER_OF_COLS + col = move_idx % NUMBER_OF_COLS + return piece_type, rotation, row, col + + +def encode_move(piece_type: Piece, rotation: Rotation, row: int, col: int) -> EncodedMove: + move_encoding = np.zeros(ENCODED_MOVE_SHAPE, dtype=np.int8) + move_index: int = encode_move_index(piece_type, rotation, row, col) + move_encoding[move_index] = 1 + return move_encoding + +def decode_move(move_encoding: EncodedMove) -> Tuple[Piece, Rotation, int, int]: + move_idx = np.argmax(move_encoding) + return decode_move_index(move_idx) + + +def encode_input(binary_plane: EncodedBoard, queue: List[Piece], current_piece: Piece, held_piece: Piece, garbage_queued: int, combo: int, b2b: bool): + + queue_encoded = np.zeros((QUEUE_SIZE, NUMBER_OF_PIECES), dtype=np.int8) + for i, piece_type in enumerate(queue[:QUEUE_SIZE]): + queue_encoded[i, piece_type] = 1 + + # Encode current piece type + if INCLUDE_CURRENT_PIECE: + current_piece_encoded = np.zeros(NUMBER_OF_PIECES, dtype=np.int8) + current_piece_encoded[current_piece] = 1 + else: + current_piece_encoded = np.array([]) + + # Encode held piece type + if INCLUDE_HELD_PIECE: + held_piece_encoded = np.zeros(NUMBER_OF_PIECES, dtype=np.int8) + held_piece_encoded[held_piece] = 1 + else: + held_piece_encoded = np.array([]) + + # Encode garbage queued + if INCLUDE_GARBAGE_QUEUED: + garbage_queued = min(garbage_queued, MAX_GARBAGE_QUEUED) + garbage_queued_encoded = np.zeros(MAX_GARBAGE_QUEUED, dtype=np.int8) + if garbage_queued >= 0: + garbage_queued_encoded[garbage_queued - 1] = 1 + else: + garbage_queued_encoded = np.array([]) + + # Encode combo' + if INCLUDE_COMBO: + combo = min(combo, MAX_COMBO) + combo_encoded = np.zeros(MAX_COMBO, dtype=np.int8) + if combo >= 0: + combo_encoded[combo - 1] = 1 + else: + combo_encoded = np.array([]) + + # Encode b2b + if INCLUDE_B2B: + b2b_encoded = np.array([int(b2b)], dtype=np.int8) + else: + b2b_encoded = np.array([]) + + # Concatenate all encodings + input_encoding = np.concatenate([ + binary_plane.flatten(), + queue_encoded.flatten(), + current_piece_encoded, + held_piece_encoded, + garbage_queued_encoded, + combo_encoded, + b2b_encoded + ]) + + return input_encoding + +def encode_board(board: Board) -> EncodedBoard: + binary_plane = np.zeros(ENCODED_BOARD_SHAPE, dtype=np.int8) + for row in range(NUMBER_OF_ROWS): + if row >= len(board): + continue + for col in range(NUMBER_OF_COLS): + if col >= len(board[row]): + continue + if board[row][col] is not None: + binary_plane[row, col] = 1 + return binary_plane + +def decode_board(binary_plane: EncodedBoard) -> Board: + board = [] + for row in range(NUMBER_OF_ROWS): + board_row = [] + for col in range(NUMBER_OF_COLS): + board_row.append(BotrisPiece.I if binary_plane[row, col] else None) + board.append(board_row) + return board + +def decode_queue(queue: List[Piece]) -> List[BotrisPiece]: + return [BotrisPiece.from_index(piece_type) for piece_type in queue] + +def softmax(x, temperature=1.0): + x = x / temperature + e_x = np.exp(x - np.max(x)) + return e_x / e_x.sum() + +def softmax_sample(policy, mask, temperature=1.0): + policy_probs = softmax(policy, temperature) + masked_probs = policy_probs * mask + masked_probs /= masked_probs.sum() + selected_move = np.random.choice(ACTION_SPACE_SIZE, p=masked_probs) + return selected_move + +def encode_piece_coordinates(piece_data: PieceData) -> Tuple[int, int]: + lowest_x, highest_x, lowest_y, highest_y = get_piece_border(piece_data.piece, piece_data.rotation) + return piece_data.x + lowest_x, piece_data.y - highest_y + +def dencode_piece_coordinates(piece: BotrisPiece, rotation: Rotation, row: int, col: int) -> Tuple[int, int]: + lowest_x, highest_x, lowest_y, highest_y = get_piece_border(piece, rotation) + return col - lowest_x, row + highest_y \ No newline at end of file