diff --git a/src/__init__.py b/src/__init__.py index 9923f5b..131777a 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -1,16 +1,21 @@ -__version__ = '0.3.1' +__version__ = '0.3.1.dev1' # core classes -from .randvar import RV, Seq - -# core decorators -from .randvar import anydice_casting, max_func_depth +from .randvar import RV +from .seq import Seq # core functions -from .randvar import output, roll, settings_set, myrange +from .roller import myrange +from .settings import settings_set +from .output import output +from .blackrv import BlankRV + +# core decorators +from .decorators import anydice_casting, max_func_depth # helpful functions -from .randvar import roller, settings_reset +from .settings import settings_reset +from .roller import roll, roller # function library from .funclib import absolute as absolute_X, contains as X_contains_X, count_in as count_X_in_X, explode as explode_X, highest_N_of_D as highest_X_of_X @@ -22,7 +27,7 @@ __all__ = [ - 'RV', 'Seq', 'anydice_casting', 'max_func_depth', 'output', 'roll', 'settings_set', 'myrange', + 'RV', 'Seq', 'anydice_casting', 'BlankRV', 'max_func_depth', 'output', 'roll', 'settings_set', 'myrange', 'roller', 'settings_reset', 'absolute_X', 'X_contains_X', 'count_X_in_X', 'explode_X', 'highest_X_of_X', 'lowest_X_of_X', 'middle_X_of_X', 'highest_of_X_and_X', 'lowest_of_X_and_X', 'maximum_of_X', 'reverse_X', 'sort_X', 'myMatmul', 'myLen', 'myInvert', 'myAnd', 'myOr' diff --git a/src/altcast.py b/src/altcast.py index 5537db4..3a99d52 100644 --- a/src/altcast.py +++ b/src/altcast.py @@ -1,4 +1,4 @@ -from .randvar import roll +from .roller import roll class __D_BASE: diff --git a/src/blackrv.py b/src/blackrv.py new file mode 100644 index 0000000..44f71eb --- /dev/null +++ b/src/blackrv.py @@ -0,0 +1,175 @@ +from typing import Iterable + +from . import randvar as rv +from . import seq +from .typings import T_ifs, T_is, T_ifsr +from . import output + + +class BlankRV: + def __init__(self, _special_null=False): + self._special_null = _special_null # makes it such that it's _special_null, in operations like (X**2 + 1) still is blank (X). see https://anydice.com/program/395da + + def mean(self): + return 0 + + def std(self): + return 0 + + def output(self, *args, **kwargs): + return output.output(self, *args, **kwargs) + + def __matmul__(self, other: T_ifs): + # ( self:RV @ other ) thus not allowed, + raise TypeError(f'A position selector must be either a number or a sequence, but you provided "{other}"') + + def __rmatmul__(self, other: T_is): + if self._special_null: + return 0 if other != 1 else self + return self + + def __add__(self, other: T_ifsr): + if self._special_null: + return self + return other + + def __radd__(self, other: T_ifsr): + if self._special_null: + return self + return other + + def __sub__(self, other: T_ifsr): + if self._special_null: + return self + if isinstance(other, Iterable): + other = seq.Seq(*other).sum() + return (-other) + + def __rsub__(self, other: T_ifsr): + if self._special_null: + return self + return other + + def __mul__(self, other: T_ifsr): + return self + + def __rmul__(self, other: T_ifsr): + return self + + def __floordiv__(self, other: T_ifsr): + return self + + def __rfloordiv__(self, other: T_ifsr): + return self + + def __truediv__(self, other: T_ifsr): + return self + + def __rtruediv__(self, other: T_ifsr): + return self + + def __pow__(self, other: T_ifsr): + return self + + def __rpow__(self, other: T_ifsr): + return self + + def __mod__(self, other: T_ifsr): + return self + + def __rmod__(self, other: T_ifsr): + return self + + # comparison operators + def __eq__(self, other: T_ifsr): + if self._special_null: + return 1 + return self + + def __ne__(self, other: T_ifsr): + if self._special_null: + return 1 + return self + + def __lt__(self, other: T_ifsr): + if self._special_null: + return 1 + return self + + def __le__(self, other: T_ifsr): + if self._special_null: + return 1 + return self + + def __gt__(self, other: T_ifsr): + if self._special_null: + return 1 + return self + + def __ge__(self, other: T_ifsr): + if self._special_null: + return 1 + return self + + # boolean operators + def __or__(self, other: T_ifsr): + if self._special_null: + return 1 + return self if isinstance(other, BlankRV) else other + + def __ror__(self, other: T_ifsr): + if self._special_null: + return 1 + return self if isinstance(other, BlankRV) else other + + def __and__(self, other: T_ifsr): + if self._special_null: + return 1 + return self + + def __rand__(self, other: T_ifsr): + if self._special_null: + return 1 + return self + + def __bool__(self): + raise TypeError('Boolean values can only be numbers, but you provided RV') + + def __len__(self): + if self._special_null: + return 1 + return 0 + + def __pos__(self): + return self + + def __neg__(self): + return self + + def __invert__(self): + if self._special_null: + return 1 + return self + + def __abs__(self): + return self + + def __round__(self, n=0): + return self + + def __floor__(self): + return self + + def __ceil__(self): + return self + + def __trunc__(self): + return self + + def __str__(self): + if self._special_null: + return 'd{?}' + return 'd{}' + + def __repr__(self): + return output.output(self, print_=False) diff --git a/src/decorators.py b/src/decorators.py new file mode 100644 index 0000000..878a1c3 --- /dev/null +++ b/src/decorators.py @@ -0,0 +1,154 @@ +import inspect +import logging +import math +from itertools import product +from typing import Iterable, Union + +from .typings import T_ifsr +from .settings import SETTINGS +from . import randvar as rv +from . import seq +from . import blackrv + + +logger = logging.getLogger(__name__) + + +def anydice_casting(verbose=False): # noqa: C901 + # verbose = True + # in the documenation of the anydice language https://anydice.com/docs/functions + # it states that "The behavior of a function depends on what type of value it expects and what type of value it actually receives." + # Thus there are 9 scenarios for each parameters + # expect: int, actual: int = no change + # expect: int, actual: seq = seq.sum() + # expect: int, actual: rv = MUST CALL FUNCTION WITH EACH VALUE OF RV ("If a die is provided, then the function will be invoked for all numbers on the die – or the sums of a collection of dice – and the result will be a new die.") + # expect: seq, actual: int = [int] + # expect: seq, actual: seq = no change + # expect: seq, actual: rv = MUST CALL FUNCTION WITH SEQUENCE OF EVERY ROLL OF THE RV ("If Expecting a sequence and dice are provided, then the function will be invoked for all possible sequences that can be made by rolling those dice. In that case the result will be a new die.") # noqa: E501 + # expect: rv, actual: int = dice([int]) + # expect: rv, actual: seq = dice(seq) + # expect: rv, actual: rv = no change + def decorator(func): + def wrapper(*args, **kwargs): + args, kwargs = list(args), dict(kwargs) + fullspec = inspect.getfullargspec(func) + arg_names = fullspec.args # list of arg names for args (not kwargs) + param_annotations = fullspec.annotations # (arg_names): (arg_type) that have been annotated + + hard_params = {} # update parameters that are easy to update, keep the hard ones for later + combined_args = list(enumerate(args)) + list(kwargs.items()) + if verbose: + logger.debug(f'#args {len(combined_args)}') + for k, arg_val in combined_args: + arg_name = k if isinstance(k, str) else (arg_names[k] if k < len(arg_names) else None) # get the name of the parameter (args or kwargs) + if arg_name not in param_annotations: # only look for annotated parameters + if verbose: + logger.debug(f'no anot {k}') + continue + expected_type = param_annotations[arg_name] + actual_type = type(arg_val) + new_val = None + if expected_type not in (int, seq.Seq, rv.RV): + if verbose: + logger.debug(f'not int seq rv {k}') + continue + if isinstance(arg_val, blackrv.BlankRV): # EDGE CASE abort calling if casting int/Seq to BlankRV (https://github.com/Ar-Kareem/PythonDice/issues/11) + if expected_type in (int, seq.Seq): + if verbose: + logger.debug(f'abort calling func due to BlankRV! {k}') + return blackrv.BlankRV(_special_null=True) + continue # casting BlankRV to RV means the function IS called and nothing changes + casted_iter_to_seq = False + if isinstance(arg_val, Iterable) and not isinstance(arg_val, seq.Seq): # if val is iter then need to convert to Seq + arg_val = seq.Seq(*arg_val) + new_val = arg_val + actual_type = seq.Seq + casted_iter_to_seq = True + if (expected_type, actual_type) == (int, seq.Seq): + new_val = arg_val.sum() + elif (expected_type, actual_type) == (int, rv.RV): + hard_params[k] = (arg_val, expected_type) + continue + elif (expected_type, actual_type) == (seq.Seq, int): + new_val = seq.Seq([arg_val]) + elif (expected_type, actual_type) == (seq.Seq, rv.RV): + hard_params[k] = (arg_val, expected_type) + if verbose: + logger.debug(f'EXPL {k}') + continue + elif (expected_type, actual_type) == (rv.RV, int): + new_val = rv.RV.from_const(arg_val) # type: ignore + elif (expected_type, actual_type) == (rv.RV, seq.Seq): + new_val = rv.RV.from_seq(arg_val) + elif not casted_iter_to_seq: # no cast made and one of the two types is not known, no casting needed + if verbose: + logger.debug(f'no cast, {k}, {expected_type}, {actual_type}') + continue + if isinstance(k, str): + kwargs[k] = new_val + else: + args[k] = new_val + if verbose: + logger.debug('cast {k}') + if verbose: + logger.debug(f'hard {[(k, v[1]) for k, v in hard_params.items()]}') + if not hard_params: + return func(*args, **kwargs) + + var_name = tuple(hard_params.keys()) + all_rolls_and_probs = [] + for k in var_name: + v, expected_type = hard_params[k] + assert isinstance(v, rv.RV), 'expected type RV' + if expected_type == seq.Seq: + r, p = v._get_expanded_possible_rolls() + elif expected_type == int: + r, p = v.vals, v.probs + else: + raise ValueError(f'casting RV to {expected_type} not supported') + all_rolls_and_probs.append(zip(r, p)) + # FINALLY take product of all possible rolls + all_rolls_and_probs = product(*all_rolls_and_probs) + + res_vals: list[Union[rv.RV, blackrv.BlankRV, seq.Seq, int, float, None]] = [] + res_probs: list[int] = [] + for rolls_and_prob in all_rolls_and_probs: + rolls = tuple(r for r, _ in rolls_and_prob) + prob = math.prod(p for _, p in rolls_and_prob) + # will update args and kwargs with each possible roll using var_name + for k, v in zip(var_name, rolls): + if isinstance(k, str): + kwargs[k] = v + else: + args[k] = v + val: T_ifsr = func(*args, **kwargs) # single result of the function call + if isinstance(val, Iterable): + if not isinstance(val, seq.Seq): + val = seq.Seq(*val) + val = val.sum() + if verbose: + logger.debug(f'val {val} prob {prob}') + res_vals.append(val) + res_probs.append(prob) + return rv.RV.from_rvs(rvs=res_vals, weights=res_probs) + return wrapper + return decorator + + +def max_func_depth(): + # decorator to limit the depth of the function calls + def decorator(func): + def wrapper(*args, **kwargs): + if SETTINGS['INTERNAL_CURR_DEPTH'] >= SETTINGS['maximum function depth']: + msg = 'The maximum function depth was exceeded, results are truncated.' + if not SETTINGS['INTERNAL_CURR_DEPTH_WARNING_PRINTED']: + logger.warning(msg) + print(msg) + SETTINGS['INTERNAL_CURR_DEPTH_WARNING_PRINTED'] = True + return blackrv.BlankRV() + SETTINGS['INTERNAL_CURR_DEPTH'] += 1 + res = func(*args, **kwargs) + SETTINGS['INTERNAL_CURR_DEPTH'] -= 1 + return res if res is not None else blackrv.BlankRV() + return wrapper + return decorator diff --git a/src/funclib.py b/src/funclib.py index 169df86..a5ace4a 100644 --- a/src/funclib.py +++ b/src/funclib.py @@ -1,9 +1,13 @@ # recreations of functions in https://anydice.com/docs/function-library/ -from .randvar import Seq, RV, roll, anydice_casting, SETTINGS - +from .randvar import RV +from .seq import Seq +from .settings import SETTINGS +from .roller import roll +from .decorators import anydice_casting # BASE FUNCTIONS + @anydice_casting() def absolute(NUMBER: int, *args, **kwargs): if NUMBER < 0: diff --git a/src/output.py b/src/output.py new file mode 100644 index 0000000..8c996ae --- /dev/null +++ b/src/output.py @@ -0,0 +1,53 @@ +from typing import Union, Iterable + +from .typings import T_isr +from .settings import SETTINGS +from . import randvar +from . import seq +from . import blackrv + + +def output(rv: Union[T_isr, None], named=None, show_pdf=True, blocks_width=None, print_=True, print_fn=None, cdf_cut=0): + if isinstance(rv, seq.Seq) and len(rv) == 0: # empty sequence plotted as empty + rv = blackrv.BlankRV() + if isinstance(rv, int) or isinstance(rv, Iterable) or isinstance(rv, bool): + rv = randvar.RV.from_seq([rv]) + if blocks_width is None: + blocks_width = SETTINGS['DEFAULT_OUTPUT_WIDTH'] + + result = '' + if named is not None: + result += named + ' ' + + if rv is None or isinstance(rv, blackrv.BlankRV): + result += '\n' + '-' * (blocks_width + 8) + if print_: + if print_fn is None: + SETTINGS['DEFAULT_PRINT_FN'](result) + else: + print_fn(result) + return + else: + return result + assert isinstance(rv, randvar.RV), f'rv must be a RV {rv}' + + mean = rv.mean() + mean = round(mean, 2) if mean is not None else None + std = rv.std() + std = round(std, 2) if std is not None else None + result += f'{mean} ± {std}' + if show_pdf: + vp = rv.get_vals_probs(cdf_cut / 100) + max_val_len = max(len(str(v)) for v, _ in vp) + blocks = max(0, blocks_width - max_val_len) + for v, p in vp: + result += '\n' + f"{v:>{max_val_len}}: {100 * p:>5.2f} " + ('█' * round(p * blocks)) + result += '\n' + '-' * (blocks_width + 8) + if print_: + if print_fn is None: + SETTINGS['DEFAULT_PRINT_FN'](result) + else: + print_fn(result) + return + else: + return result diff --git a/src/parser/parse_and_exec.py b/src/parser/parse_and_exec.py index 65069eb..ff86fb4 100644 --- a/src/parser/parse_and_exec.py +++ b/src/parser/parse_and_exec.py @@ -45,7 +45,12 @@ def _get_lib(): import itertools import random import functools - from ..randvar import RV, Seq, anydice_casting, max_func_depth, output, roll, settings_set, myrange + from ..randvar import RV + from ..seq import Seq + from ..settings import settings_set + from ..decorators import anydice_casting, max_func_depth + from ..output import output + from ..roller import roll, myrange from ..utils import mymatmul as myMatmul, mylen as myLen, myinvert as myInvert, myand as myAnd, myor as myOr from ..funclib import absolute as absolute_X, contains as X_contains_X, count_in as count_X_in_X, explode as explode_X, highest_N_of_D as highest_X_of_X from ..funclib import lowest_N_of_D as lowest_X_of_X, middle_N_of_D as middle_X_of_X, highest_of_N_and_N as highest_of_X_and_X, lowest_of_N_and_N as lowest_of_X_and_X diff --git a/src/randvar.py b/src/randvar.py index c25a3b3..383c043 100644 --- a/src/randvar.py +++ b/src/randvar.py @@ -3,72 +3,21 @@ import operator import math from typing import Callable, Iterable, Union -from itertools import zip_longest, product, combinations_with_replacement, accumulate -import inspect +from itertools import combinations_with_replacement, accumulate from collections import defaultdict import logging -import random +from .typings import T_if, T_ifs, T_is, T_ifsr, T_s +from .settings import SETTINGS +from . import seq +from . import blackrv from . import utils - +from . import output +from . import decorators logger = logging.getLogger(__name__) -# TYPE DEFINITIONS -T_if = Union[int, float] -T_ifs = Union[T_if, Iterable['T_ifs']] # recursive type -T_is = Union[int, Iterable['T_is']] # recursive type - -T_isr = Union[T_is, 'RV', 'BlankRV'] -T_ifr = Union[T_if, 'RV', 'BlankRV'] -T_ifsr = Union[T_ifs, 'RV', 'BlankRV'] - -T_s = Iterable['T_ifs'] # same as T_ifs but excludes int and float (not iterable) - - -# SETTINGS -DEFAULT_SETTINGS = { - 'RV_TRUNC': False, # if True, then RV will automatically truncate values to ints (replicate anydice behavior) - 'RV_IGNORE_ZERO_PROBS': False, # if True, then RV remove P=0 vals when creating RVs (False by default in anydice) - 'DEFAULT_OUTPUT_WIDTH': 180, # default width of output - 'DEFAULT_PRINT_FN': print, # default print function - 'INTERNAL_CURR_DEPTH': 0, # internal use only, for max_func_depth decorator - 'INTERNAL_CURR_DEPTH_WARNING_PRINTED': False, # used with the above - - 'position order': 'highest first', # 'highest first' or 'lowest first' - 'explode depth': 2, # can only be set to a positive integer (the default is 2) - 'maximum function depth': 10 # can only be set to a positive integer (the default is 10) -} -SETTINGS = DEFAULT_SETTINGS.copy() - - -def settings_set(name, value): - if name == "position order": - assert value in ("highest first", "lowest first"), 'position order must be "highest first" or "lowest first"' - elif name == "explode depth": - assert isinstance(value, int) and value > 0, '"explode depth" can only be set to a positive integer (the default is 2) got ' + str(value) - elif name == "maximum function depth": - assert isinstance(value, int) and value > 0, '"maximum function depth" can only be set to a positive integer (the default is 10) got ' + str(value) - elif name in ('RV_TRUNC', 'RV_IGNORE_ZERO_PROBS'): - if isinstance(value, str): - assert value.lower() in ('true', 'false'), 'value must be "True" or "False"' - value = value.lower() == 'true' - assert isinstance(value, bool), 'value must be a boolean' - elif name == 'DEFAULT_OUTPUT_WIDTH': - assert isinstance(value, int) and value > 0, 'DEFAULT_OUTPUT_WIDTH must be a positive integer' - elif name == 'DEFAULT_PRINT_FN': - assert callable(value), 'DEFAULT_PRINT_FN must be a callable' - else: - assert False, f'invalid setting name: {name}' - SETTINGS[name] = value - - -def settings_reset(): - SETTINGS.clear() - SETTINGS.update(DEFAULT_SETTINGS) - - class RV: def __init__(self, vals: Iterable[float], probs: Iterable[int], truncate=None): vals, probs = list(vals), tuple(probs) @@ -115,23 +64,23 @@ def from_const(val: T_if): @staticmethod def from_seq(s: T_s): - if not isinstance(s, Seq): - s = Seq(*s) + if not isinstance(s, seq.Seq): + s = seq.Seq(*s) if len(s) == 0: return RV([0], [1]) return RV(s._seq, [1] * len(s)) @staticmethod - def from_rvs(rvs: Iterable[Union['int', 'float', 'Seq', 'RV', 'BlankRV', None]], weights: Union[Iterable[int], None] = None) -> Union['RV', 'BlankRV']: + def from_rvs(rvs: Iterable[Union['int', 'float', 'seq.Seq', 'RV', 'blackrv.BlankRV', None]], weights: Union[Iterable[int], None] = None) -> Union['RV', 'blackrv.BlankRV']: rvs = tuple(rvs) if weights is None: weights = [1] * len(rvs) weights = tuple(weights) - blank_inds = set(i for i, x in enumerate(rvs) if isinstance(x, BlankRV) or x is None) + blank_inds = set(i for i, x in enumerate(rvs) if isinstance(x, blackrv.BlankRV) or x is None) rvs = tuple(x for i, x in enumerate(rvs) if i not in blank_inds) weights = tuple(w for i, w in enumerate(weights) if i not in blank_inds) if len(rvs) == 0: - return BlankRV() + return blackrv.BlankRV() assert len(rvs) == len(weights) prob_sums = tuple(sum(r.probs) if isinstance(r, RV) else 1 for r in rvs) PROD = math.prod(prob_sums) # to normalize probabilities such that the probabilities for each individual RV sum to const (PROD) and every probability is an int @@ -165,8 +114,8 @@ def std(self): var = EX2 - EX**2 # E[X^2] - E[X]^2 return math.sqrt(var) if var >= 0 else 0 - def filter(self, seq: T_ifsr): - to_filter = set(Seq(seq)) + def filter(self, obj: T_ifsr): + to_filter = set(seq.Seq(obj)) vp = tuple((v, p) for v, p in zip(self.vals, self.probs) if v not in to_filter) if len(vp) == 0: return RV.from_const(0) @@ -192,7 +141,7 @@ def get_cdf(self): return RV(cdf_vals, cdf_probs) def output(self, *args, **kwargs): - return output(self, *args, **kwargs) + return output.output(self, *args, **kwargs) def _get_sum_probs(self, force=False): if self.sum_probs is None or force: @@ -202,12 +151,12 @@ def _get_sum_probs(self, force=False): def _get_expanded_possible_rolls(self): N, D = self._source_roll, self._source_die # N rolls of D if N == 1: # answer is simple (ALSO cannot use simplified formula for probs and bottom code WILL cause errors) - return tuple(Seq(i) for i in D.vals), D.probs + return tuple(seq.Seq(i) for i in D.vals), D.probs pdf_dict = {v: p for v, p in zip(D.vals, D.probs)} vals, probs = [], [] FACTORIAL_N = utils.factorial(N) for roll in combinations_with_replacement(D.vals[::-1], N): - vals.append(Seq(_INTERNAL_SEQ_VALUE=roll)) + vals.append(seq.Seq(_INTERNAL_SEQ_VALUE=roll)) counts = defaultdict(int) # fast counts cur_roll_probs = 1 # this is p(x_1)*...*p(x_n) where [x_1,...,x_n] is the current roll, if D is a uniform then this = 1 and is not needed. comb_with_repl_denominator = 1 @@ -227,11 +176,11 @@ def _apply_operation(self, operation: Callable[[float], float]): return RV([operation(v) for v in self.vals], self.probs) def _convolve(self, other: T_ifsr, operation: Callable[[float, float], float]): - if isinstance(other, BlankRV): # let BlankRV handle the operation + if isinstance(other, blackrv.BlankRV): # let BlankRV handle the operation return NotImplemented if isinstance(other, Iterable): - if not isinstance(other, Seq): - other = Seq(*other) + if not isinstance(other, seq.Seq): + other = seq.Seq(*other) other = other.sum() if not isinstance(other, RV): return RV([operation(v, other) for v in self.vals], self.probs) @@ -242,12 +191,12 @@ def _convolve(self, other: T_ifsr, operation: Callable[[float, float], float]): return res def _rconvolve(self, other: T_ifsr, operation: Callable[[float, float], float]): - if isinstance(other, BlankRV): # let BlankRV handle the operation + if isinstance(other, blackrv.BlankRV): # let BlankRV handle the operation return NotImplemented assert not isinstance(other, RV) if isinstance(other, Iterable): - if not isinstance(other, Seq): - other = Seq(*other) + if not isinstance(other, seq.Seq): + other = seq.Seq(*other) other = other.sum() return RV([operation(other, v) for v in self.vals], self.probs) @@ -259,7 +208,7 @@ def __rmatmul__(self, other: T_is): # ( other @ self:RV ) # DOCUMENTATION: https://anydice.com/docs/introspection/ look for "Accessing" -> "Collections of dice" and "A single die" assert not isinstance(other, RV), 'unsupported operand type(s) for @: RV and RV' - other = Seq([other]) + other = seq.Seq([other]) assert all(isinstance(i, int) for i in other._seq), 'indices must be integers' if len(other) == 1: # only one index, return the value at that index k: int = other._seq[0] # type: ignore @@ -414,12 +363,12 @@ def __str__(self): return LHS + 'd{?}' def __repr__(self): - return output(self, print_=False) + return output.output(self, print_=False) @staticmethod def dices_are_equal(d1: T_ifsr, d2: T_ifsr): - if isinstance(d1, BlankRV) or isinstance(d2, BlankRV): - return isinstance(d1, BlankRV) and isinstance(d2, BlankRV) + if isinstance(d1, blackrv.BlankRV) or isinstance(d2, blackrv.BlankRV): + return isinstance(d1, blackrv.BlankRV) and isinstance(d2, blackrv.BlankRV) if isinstance(d1, (int, float)) or isinstance(d1, Iterable): d1 = RV.from_seq([d1]) if isinstance(d2, (int, float)) or isinstance(d2, Iterable): @@ -427,580 +376,11 @@ def dices_are_equal(d1: T_ifsr, d2: T_ifsr): return d1.vals == d2.vals and d1.probs == d2.probs -class BlankRV: - def __init__(self, _special_null=False): - self._special_null = _special_null # makes it such that it's _special_null, in operations like (X**2 + 1) still is blank (X). see https://anydice.com/program/395da - - def mean(self): - return 0 - - def std(self): - return 0 - - def output(self, *args, **kwargs): - return output(self, *args, **kwargs) - - def __matmul__(self, other: T_ifs): - # ( self:RV @ other ) thus not allowed, - raise TypeError(f'A position selector must be either a number or a sequence, but you provided "{other}"') - - def __rmatmul__(self, other: T_is): - if self._special_null: - return 0 if other != 1 else self - return self - - def __add__(self, other: T_ifsr): - if self._special_null: - return self - return other - - def __radd__(self, other: T_ifsr): - if self._special_null: - return self - return other - - def __sub__(self, other: T_ifsr): - if self._special_null: - return self - if isinstance(other, Iterable): - other = Seq(*other).sum() - return (-other) - - def __rsub__(self, other: T_ifsr): - if self._special_null: - return self - return other - - def __mul__(self, other: T_ifsr): - return self - - def __rmul__(self, other: T_ifsr): - return self - - def __floordiv__(self, other: T_ifsr): - return self - - def __rfloordiv__(self, other: T_ifsr): - return self - - def __truediv__(self, other: T_ifsr): - return self - - def __rtruediv__(self, other: T_ifsr): - return self - - def __pow__(self, other: T_ifsr): - return self - - def __rpow__(self, other: T_ifsr): - return self - - def __mod__(self, other: T_ifsr): - return self - - def __rmod__(self, other: T_ifsr): - return self - - # comparison operators - def __eq__(self, other: T_ifsr): - if self._special_null: - return 1 - return self - - def __ne__(self, other: T_ifsr): - if self._special_null: - return 1 - return self - - def __lt__(self, other: T_ifsr): - if self._special_null: - return 1 - return self - - def __le__(self, other: T_ifsr): - if self._special_null: - return 1 - return self - - def __gt__(self, other: T_ifsr): - if self._special_null: - return 1 - return self - - def __ge__(self, other: T_ifsr): - if self._special_null: - return 1 - return self - - # boolean operators - def __or__(self, other: T_ifsr): - if self._special_null: - return 1 - return self if isinstance(other, BlankRV) else other - - def __ror__(self, other: T_ifsr): - if self._special_null: - return 1 - return self if isinstance(other, BlankRV) else other - - def __and__(self, other: T_ifsr): - if self._special_null: - return 1 - return self - - def __rand__(self, other: T_ifsr): - if self._special_null: - return 1 - return self - - def __bool__(self): - raise TypeError('Boolean values can only be numbers, but you provided RV') - - def __len__(self): - if self._special_null: - return 1 - return 0 - - def __pos__(self): - return self - - def __neg__(self): - return self - - def __invert__(self): - if self._special_null: - return 1 - return self - - def __abs__(self): - return self - - def __round__(self, n=0): - return self - - def __floor__(self): - return self - - def __ceil__(self): - return self - - def __trunc__(self): - return self - - def __str__(self): - if self._special_null: - return 'd{?}' - return 'd{}' - - def __repr__(self): - return output(self, print_=False) - - -class Seq(Iterable): - def __init__(self, *source: T_ifsr, _INTERNAL_SEQ_VALUE=None): - self._sum = None - self._one_indexed = 1 - if _INTERNAL_SEQ_VALUE is not None: # used for internal optimization only - self._seq: tuple[T_if, ...] = _INTERNAL_SEQ_VALUE # type: ignore - return - flat = tuple(utils.flatten(source)) - flat_rvs = [x for x in flat if isinstance(x, RV) and not isinstance(x, BlankRV)] # expand RVs - flat_rv_vals = [v for rv in flat_rvs for v in rv.vals] - flat_else: list[T_if] = [x for x in flat if not isinstance(x, (RV, BlankRV))] - assert all(isinstance(x, (int, float)) for x in flat_else), 'Seq must be made of numbers and RVs. Seq:' + str(flat_else) - self._seq = tuple(flat_else + flat_rv_vals) - - def sum(self): - if self._sum is None: - self._sum = sum(self._seq) - return self._sum - - def set_one_indexed(self, one_indexed: bool): - self._one_indexed = 1 if one_indexed else 0 - - def __str__(self): - return '{?}' - - def __repr__(self): - return f'Seq({repr(self._seq)})' - - def __iter__(self): - return iter(self._seq) - - def __len__(self): - return len(self._seq) - - def __invert__(self): - return 1 if self.sum() == 0 else 0 - - def __getitem__(self, i: int): - return self._seq[i - self._one_indexed] if 0 <= i - self._one_indexed < len(self._seq) else 0 - - def __matmul__(self, other: T_ifsr): - if isinstance(other, RV): # ( self:SEQ @ other:RV ) thus RV takes priority - return other.__rmatmul__(self) - # access at indices in other ( self @ other ) - if isinstance(other, (int, float)): - other = Seq([int(d) for d in str(other)]) # SEQ @ int thus convert int to sequence using base 10 - if not isinstance(other, Seq): - other = Seq(other) - assert all(isinstance(i, int) for i in self._seq), 'indices must be integers' - return sum(other[int(i)] for i in self._seq) - - def __rmatmul__(self, other: T_ifs): - if isinstance(other, RV): # ( other:RV @ self:SEQ ) thus not allowed, - raise TypeError(f'A position selector must be either a number or a sequence, but you provided "{other}"') - # access in my indices ( other @ self ) - if isinstance(other, (int, float)): - return self[int(other)] - if not isinstance(other, Seq): - other = Seq(other) - assert all(isinstance(i, int) for i in other._seq), 'indices must be integers' - return sum(self[int(i)] for i in other._seq) - - # operators - def __add__(self, other: T_ifs): - return operator.add(self.sum(), other) - - def __radd__(self, other: T_ifs): - return operator.add(other, self.sum()) - - def __sub__(self, other: T_ifs): - return operator.sub(self.sum(), other) - - def __rsub__(self, other: T_ifs): - return operator.sub(other, self.sum()) - - def __mul__(self, other: T_ifs): - return operator.mul(self.sum(), other) - - def __rmul__(self, other: T_ifs): - return operator.mul(other, self.sum()) - - def __floordiv__(self, other: T_ifs): - return operator.floordiv(self.sum(), other) - - def __rfloordiv__(self, other: T_ifs): - return operator.floordiv(other, self.sum()) - - def __truediv__(self, other: T_ifs): - return operator.truediv(self.sum(), other) - - def __rtruediv__(self, other: T_ifs): - return operator.truediv(other, self.sum()) - - def __pow__(self, other: T_ifs): - return operator.pow(self.sum(), other) - - def __rpow__(self, other: T_ifs): - return operator.pow(other, self.sum()) - - def __mod__(self, other: T_ifs): - return operator.mod(self.sum(), other) - - def __rmod__(self, other: T_ifs): - return operator.mod(other, self.sum()) - - # comparison operators - def __eq__(self, other: T_ifsr): - return self._compare_to(other, operator.eq) - - def __ne__(self, other: T_ifsr): - return self._compare_to(other, operator.ne) - - def __lt__(self, other: T_ifsr): - return self._compare_to(other, operator.lt) - - def __le__(self, other: T_ifsr): - return self._compare_to(other, operator.le) - - def __gt__(self, other: T_ifsr): - return self._compare_to(other, operator.gt) - - def __ge__(self, other: T_ifsr): - return self._compare_to(other, operator.ge) - - # boolean operators - def __or__(self, other: T_ifsr): - return int((self.sum() != 0) or (other != 0)) if isinstance(other, (int, float)) else operator.or_(self.sum(), other) - - def __ror__(self, other: T_ifsr): - return int((self.sum() != 0) or (other != 0)) if isinstance(other, (int, float)) else operator.or_(other, self.sum()) - - def __and__(self, other: T_ifsr): - return int((self.sum() != 0) and (other != 0)) if isinstance(other, (int, float)) else operator.and_(self.sum(), other) - - def __rand__(self, other: T_ifsr): - return int((self.sum() != 0) and (other != 0)) if isinstance(other, (int, float)) else operator.and_(other, self.sum()) - - def _compare_to(self, other: T_ifsr, operation: Callable[[float, T_ifr], bool]): - if isinstance(other, RV): - return operation(self.sum(), other) - if isinstance(other, Iterable): - if not isinstance(other, Seq): # convert to Seq if not already - other = Seq(*other) - if operation == operator.ne: # special case for NE, since it is ∃ as opposed to ∀ like the others - return not self._compare_to(other, operator.eq) - return all(operation(x, y) for x, y in zip_longest(self._seq, other._seq, fillvalue=float('-inf'))) - # if other is a number - return sum(1 for x in self._seq if operation(x, other)) - - @staticmethod - def seqs_are_equal(s1: T_ifs, s2: T_ifs): - assert not isinstance(s1, RV) and not isinstance(s2, RV), 'cannot compare Seq with RV' - if not isinstance(s1, Seq): - s1 = Seq(s1) - if not isinstance(s2, Seq): - s2 = Seq(s2) - return s1._seq == s2._seq - - -def anydice_casting(verbose=False): # noqa: C901 - # verbose = True - # in the documenation of the anydice language https://anydice.com/docs/functions - # it states that "The behavior of a function depends on what type of value it expects and what type of value it actually receives." - # Thus there are 9 scenarios for each parameters - # expect: int, actual: int = no change - # expect: int, actual: seq = seq.sum() - # expect: int, actual: rv = MUST CALL FUNCTION WITH EACH VALUE OF RV ("If a die is provided, then the function will be invoked for all numbers on the die – or the sums of a collection of dice – and the result will be a new die.") - # expect: seq, actual: int = [int] - # expect: seq, actual: seq = no change - # expect: seq, actual: rv = MUST CALL FUNCTION WITH SEQUENCE OF EVERY ROLL OF THE RV ("If Expecting a sequence and dice are provided, then the function will be invoked for all possible sequences that can be made by rolling those dice. In that case the result will be a new die.") # noqa: E501 - # expect: rv, actual: int = dice([int]) - # expect: rv, actual: seq = dice(seq) - # expect: rv, actual: rv = no change - def decorator(func): - def wrapper(*args, **kwargs): - args, kwargs = list(args), dict(kwargs) - fullspec = inspect.getfullargspec(func) - arg_names = fullspec.args # list of arg names for args (not kwargs) - param_annotations = fullspec.annotations # (arg_names): (arg_type) that have been annotated - - hard_params = {} # update parameters that are easy to update, keep the hard ones for later - combined_args = list(enumerate(args)) + list(kwargs.items()) - if verbose: - logger.debug(f'#args {len(combined_args)}') - for k, arg_val in combined_args: - arg_name = k if isinstance(k, str) else (arg_names[k] if k < len(arg_names) else None) # get the name of the parameter (args or kwargs) - if arg_name not in param_annotations: # only look for annotated parameters - if verbose: - logger.debug(f'no anot {k}') - continue - expected_type = param_annotations[arg_name] - actual_type = type(arg_val) - new_val = None - if expected_type not in (int, Seq, RV): - if verbose: - logger.debug(f'not int seq rv {k}') - continue - if isinstance(arg_val, BlankRV): # EDGE CASE abort calling if casting int/Seq to BlankRV (https://github.com/Ar-Kareem/PythonDice/issues/11) - if expected_type in (int, Seq): - if verbose: - logger.debug(f'abort calling func due to BlankRV! {k}') - return BlankRV(_special_null=True) - continue # casting BlankRV to RV means the function IS called and nothing changes - casted_iter_to_seq = False - if isinstance(arg_val, Iterable) and not isinstance(arg_val, Seq): # if val is iter then need to convert to Seq - arg_val = Seq(*arg_val) - new_val = arg_val - actual_type = Seq - casted_iter_to_seq = True - if (expected_type, actual_type) == (int, Seq): - new_val = arg_val.sum() - elif (expected_type, actual_type) == (int, RV): - hard_params[k] = (arg_val, expected_type) - continue - elif (expected_type, actual_type) == (Seq, int): - new_val = Seq([arg_val]) - elif (expected_type, actual_type) == (Seq, RV): - hard_params[k] = (arg_val, expected_type) - if verbose: - logger.debug(f'EXPL {k}') - continue - elif (expected_type, actual_type) == (RV, int): - new_val = RV.from_const(arg_val) # type: ignore - elif (expected_type, actual_type) == (RV, Seq): - new_val = RV.from_seq(arg_val) - elif not casted_iter_to_seq: # no cast made and one of the two types is not known, no casting needed - if verbose: - logger.debug(f'no cast, {k}, {expected_type}, {actual_type}') - continue - if isinstance(k, str): - kwargs[k] = new_val - else: - args[k] = new_val - if verbose: - logger.debug('cast {k}') - if verbose: - logger.debug(f'hard {[(k, v[1]) for k, v in hard_params.items()]}') - if not hard_params: - return func(*args, **kwargs) - - var_name = tuple(hard_params.keys()) - all_rolls_and_probs = [] - for k in var_name: - v, expected_type = hard_params[k] - assert isinstance(v, RV), 'expected type RV' - if expected_type == Seq: - r, p = v._get_expanded_possible_rolls() - elif expected_type == int: - r, p = v.vals, v.probs - else: - raise ValueError(f'casting RV to {expected_type} not supported') - all_rolls_and_probs.append(zip(r, p)) - # FINALLY take product of all possible rolls - all_rolls_and_probs = product(*all_rolls_and_probs) - - res_vals: list[Union[RV, BlankRV, Seq, int, float, None]] = [] - res_probs: list[int] = [] - for rolls_and_prob in all_rolls_and_probs: - rolls = tuple(r for r, _ in rolls_and_prob) - prob = math.prod(p for _, p in rolls_and_prob) - # will update args and kwargs with each possible roll using var_name - for k, v in zip(var_name, rolls): - if isinstance(k, str): - kwargs[k] = v - else: - args[k] = v - val: T_ifsr = func(*args, **kwargs) # single result of the function call - if isinstance(val, Iterable): - if not isinstance(val, Seq): - val = Seq(*val) - val = val.sum() - if verbose: - logger.debug(f'val {val} prob {prob}') - res_vals.append(val) - res_probs.append(prob) - return RV.from_rvs(rvs=res_vals, weights=res_probs) - return wrapper - return decorator - - -def max_func_depth(): - # decorator to limit the depth of the function calls - def decorator(func): - def wrapper(*args, **kwargs): - if SETTINGS['INTERNAL_CURR_DEPTH'] >= SETTINGS['maximum function depth']: - msg = 'The maximum function depth was exceeded, results are truncated.' - if not SETTINGS['INTERNAL_CURR_DEPTH_WARNING_PRINTED']: - logger.warning(msg) - print(msg) - SETTINGS['INTERNAL_CURR_DEPTH_WARNING_PRINTED'] = True - return BlankRV() - SETTINGS['INTERNAL_CURR_DEPTH'] += 1 - res = func(*args, **kwargs) - SETTINGS['INTERNAL_CURR_DEPTH'] -= 1 - return res if res is not None else BlankRV() - return wrapper - return decorator - - -@anydice_casting() -def _sum_at(orig: Seq, locs: Seq): +@decorators.anydice_casting() +def _sum_at(orig: seq.Seq, locs: seq.Seq): return sum(orig[int(i)] for i in locs) -def myrange(left, right): - if isinstance(left, RV): - raise TypeError(f'A sequence range must begin with a number, while you provided "{left}".') - if isinstance(right, RV): - raise TypeError(f'A sequence range must begin with a number, while you provided "{right}".') - return range(left, right + 1) - - -def roll(n: Union[T_isr, str], d: Union[T_isr, None] = None) -> Union[RV, BlankRV]: - """Roll n dice of d sides - - Args: - n (T_isr | str): number of dice to roll, if string then it must be 'ndm' where n and m are integers - d (T_isr, optional): number of sides of the dice (or the dice itself). Defaults to None which is equivalent to roll(1, n) - - Returns: - RV: RV of the result of rolling n dice of d sides - """ - if isinstance(n, str): # either rolL('ndm') or roll('dm') - assert d is None, 'if n is a string, then d must be None' - nm1, nm2 = n.split('d') - if nm1 == '': - nm1 = 1 - n, d = int(nm1), int(nm2) - - if d is None: # if only one argument, then roll it as a dice once - n, d = 1, n - - # make sure all iters are Seq - if isinstance(d, Iterable) and not isinstance(d, Seq): - d = Seq(*d) - if isinstance(n, Iterable) and not isinstance(n, Seq): - n = Seq(*n) - if isinstance(d, BlankRV): # SPECIAL CASE: XdY where Y is BlankRV => BlankRV - return BlankRV() - if isinstance(n, BlankRV): # SPECIAL CASE: XdY where X is BlankRV => Special BlankRV see https://anydice.com/program/395da - return BlankRV(_special_null=True) - if isinstance(d, Seq) and len(d) == 0: # SPECIAL CASE: Xd{} => BlankRV - return BlankRV() - # both arguments are now exactly int|Seq|RV - result = _roll(n, d) # ROLL! - assert not isinstance(result, BlankRV), 'should never happen!' - # below is only used for the __str__ method - _LHS = n if isinstance(n, int) else (n.sum() if isinstance(n, Seq) else 0) - if isinstance(d, int): - _RHS = d - elif isinstance(d, Seq): - _RHS = '{}' if len(d) == 0 else '{?}' - elif isinstance(d, RV): - _d_LHS, _d_RHS = d._str_LHS_RHS - _RHS = _d_RHS if _d_LHS == 1 and isinstance(_d_RHS, int) else '{?}' # so that 2d(1d2) and (2 d (1 d ( {1} d 2))) all evaluate to '2d2' - result._str_LHS_RHS = (_LHS, _RHS) - return result - - -def _roll(n: Union[int, Seq, RV], d: Union[int, Seq, RV]) -> Union[RV, BlankRV]: - if isinstance(d, int): - if d > 0: - d = RV.from_seq(range(1, d + 1)) - elif d == 0: - d = RV.from_const(0) - else: - d = RV.from_seq([range(d, 0)]) - elif isinstance(d, Seq): - d = RV.from_seq(d) - - if isinstance(n, Seq): - s = n.sum() - assert isinstance(s, int), 'cant roll non-int number of dice' - return roll(s, d) - if isinstance(n, RV): - assert all(isinstance(v, int) for v in n.vals), 'RV must have int values to roll other dice' - dies = tuple(roll(int(v), d) for v in n.vals) - result = RV.from_rvs(rvs=dies, weights=n.probs) - assert not isinstance(result, BlankRV), 'should never happen!' - result.set_source(1, d) - return result - return _roll_int_rv(n, d) - - -_MEMOIZED_ROLLS = {} - - -def _roll_int_rv(n: int, d: RV) -> RV: - if n < 0: - return -_roll_int_rv(-n, d) - if n == 0: - return RV.from_const(0) - if n == 1: - return d - if (n, d.vals, d.probs) in _MEMOIZED_ROLLS: - return _MEMOIZED_ROLLS[(n, d.vals, d.probs)] - half = _roll_int_rv(n // 2, d) - full = half + half - if n % 2 == 1: - full = full + d - full.set_source(n, d) - _MEMOIZED_ROLLS[(n, d.vals, d.probs)] = full - return full - - def _INTERNAL_PROB_LIMIT_VALS(rv: RV, sum_limit: float = 10e30): sum_ = rv._get_sum_probs() if sum_ <= sum_limit: @@ -1018,59 +398,3 @@ def _INTERNAL_PROB_LIMIT_VALS(rv: RV, sum_limit: float = 10e30): rv.probs = tuple(p // normalizing_const for p in rv.probs) rv._get_sum_probs(force=True) # force update sum return rv - - -def output(rv: Union[T_isr, None], named=None, show_pdf=True, blocks_width=None, print_=True, print_fn=None, cdf_cut=0): - if isinstance(rv, Seq) and len(rv) == 0: # empty sequence plotted as empty - rv = BlankRV() - if isinstance(rv, int) or isinstance(rv, Iterable) or isinstance(rv, bool): - rv = RV.from_seq([rv]) - if blocks_width is None: - blocks_width = SETTINGS['DEFAULT_OUTPUT_WIDTH'] - - result = '' - if named is not None: - result += named + ' ' - - if rv is None or isinstance(rv, BlankRV): - result += '\n' + '-' * (blocks_width + 8) - if print_: - if print_fn is None: - SETTINGS['DEFAULT_PRINT_FN'](result) - else: - print_fn(result) - return - else: - return result - assert isinstance(rv, RV), f'rv must be a RV {rv}' - - mean = rv.mean() - mean = round(mean, 2) if mean is not None else None - std = rv.std() - std = round(std, 2) if std is not None else None - result += f'{mean} ± {std}' - if show_pdf: - vp = rv.get_vals_probs(cdf_cut / 100) - max_val_len = max(len(str(v)) for v, _ in vp) - blocks = max(0, blocks_width - max_val_len) - for v, p in vp: - result += '\n' + f"{v:>{max_val_len}}: {100 * p:>5.2f} " + ('█' * round(p * blocks)) - result += '\n' + '-' * (blocks_width + 8) - if print_: - if print_fn is None: - SETTINGS['DEFAULT_PRINT_FN'](result) - else: - print_fn(result) - return - else: - return result - - -def roller(rv: T_isr, count: Union[int, None] = None): - if isinstance(rv, int) or isinstance(rv, Iterable) or isinstance(rv, bool): - rv = RV.from_seq([rv]) - assert isinstance(rv, RV), 'rv must be a RV' - # roll using random.choices - if count is None: - return random.choices(rv.vals, rv.probs)[0] - return tuple(random.choices(rv.vals, rv.probs)[0] for _ in range(count)) diff --git a/src/roller.py b/src/roller.py new file mode 100644 index 0000000..df67e47 --- /dev/null +++ b/src/roller.py @@ -0,0 +1,118 @@ +from typing import Union, Iterable +import random + +from .randvar import RV +from .seq import Seq +from . import blackrv +from .typings import T_isr + + +def roll(n: Union[T_isr, str], d: Union[T_isr, None] = None) -> Union[RV, blackrv.BlankRV]: + """Roll n dice of d sides + + Args: + n (T_isr | str): number of dice to roll, if string then it must be 'ndm' where n and m are integers + d (T_isr, optional): number of sides of the dice (or the dice itself). Defaults to None which is equivalent to roll(1, n) + + Returns: + RV: RV of the result of rolling n dice of d sides + """ + if isinstance(n, str): # either rolL('ndm') or roll('dm') + assert d is None, 'if n is a string, then d must be None' + nm1, nm2 = n.split('d') + if nm1 == '': + nm1 = 1 + n, d = int(nm1), int(nm2) + + if d is None: # if only one argument, then roll it as a dice once + n, d = 1, n + + # make sure all iters are Seq + if isinstance(d, Iterable) and not isinstance(d, Seq): + d = Seq(*d) + if isinstance(n, Iterable) and not isinstance(n, Seq): + n = Seq(*n) + if isinstance(d, blackrv.BlankRV): # SPECIAL CASE: XdY where Y is BlankRV => BlankRV + return blackrv.BlankRV() + if isinstance(n, blackrv.BlankRV): # SPECIAL CASE: XdY where X is BlankRV => Special BlankRV see https://anydice.com/program/395da + return blackrv.BlankRV(_special_null=True) + if isinstance(d, Seq) and len(d) == 0: # SPECIAL CASE: Xd{} => BlankRV + return blackrv.BlankRV() + # both arguments are now exactly int|Seq|RV + result = _roll(n, d) # ROLL! + assert not isinstance(result, blackrv.BlankRV), 'should never happen!' + # below is only used for the __str__ method + _LHS = n if isinstance(n, int) else (n.sum() if isinstance(n, Seq) else 0) + if isinstance(d, int): + _RHS = d + elif isinstance(d, Seq): + _RHS = '{}' if len(d) == 0 else '{?}' + elif isinstance(d, RV): + _d_LHS, _d_RHS = d._str_LHS_RHS + _RHS = _d_RHS if _d_LHS == 1 and isinstance(_d_RHS, int) else '{?}' # so that 2d(1d2) and (2 d (1 d ( {1} d 2))) all evaluate to '2d2' + result._str_LHS_RHS = (_LHS, _RHS) + return result + + +def _roll(n: Union[int, Seq, RV], d: Union[int, Seq, RV]) -> Union[RV, blackrv.BlankRV]: + if isinstance(d, int): + if d > 0: + d = RV.from_seq(range(1, d + 1)) + elif d == 0: + d = RV.from_const(0) + else: + d = RV.from_seq([range(d, 0)]) + elif isinstance(d, Seq): + d = RV.from_seq(d) + + if isinstance(n, Seq): + s = n.sum() + assert isinstance(s, int), 'cant roll non-int number of dice' + return roll(s, d) + if isinstance(n, RV): + assert all(isinstance(v, int) for v in n.vals), 'RV must have int values to roll other dice' + dies = tuple(roll(int(v), d) for v in n.vals) + result = RV.from_rvs(rvs=dies, weights=n.probs) + assert not isinstance(result, blackrv.BlankRV), 'should never happen!' + result.set_source(1, d) + return result + return _roll_int_rv(n, d) + + +_MEMOIZED_ROLLS = {} + + +def _roll_int_rv(n: int, d: RV) -> RV: + if n < 0: + return -_roll_int_rv(-n, d) + if n == 0: + return RV.from_const(0) + if n == 1: + return d + if (n, d.vals, d.probs) in _MEMOIZED_ROLLS: + return _MEMOIZED_ROLLS[(n, d.vals, d.probs)] + half = _roll_int_rv(n // 2, d) + full = half + half + if n % 2 == 1: + full = full + d + full.set_source(n, d) + _MEMOIZED_ROLLS[(n, d.vals, d.probs)] = full + return full + + +def roller(rv: T_isr, count: Union[int, None] = None): + if isinstance(rv, int) or isinstance(rv, Iterable) or isinstance(rv, bool): + rv = RV.from_seq([rv]) + assert isinstance(rv, RV), 'rv must be a RV' + # roll using random.choices + if count is None: + return random.choices(rv.vals, rv.probs)[0] + return tuple(random.choices(rv.vals, rv.probs)[0] for _ in range(count)) + + +def myrange(left, right): + if isinstance(left, RV): + raise TypeError(f'A sequence range must begin with a number, while you provided "{left}".') + if isinstance(right, RV): + raise TypeError(f'A sequence range must begin with a number, while you provided "{right}".') + return range(left, right + 1) diff --git a/src/seq.py b/src/seq.py new file mode 100644 index 0000000..67539f7 --- /dev/null +++ b/src/seq.py @@ -0,0 +1,167 @@ +from typing import Iterable, Callable +from itertools import zip_longest +import operator + +from .typings import T_if, T_ifs, T_ifsr, T_ifr +from . import randvar +from . import utils +from . import blackrv + + +class Seq(Iterable): + def __init__(self, *source: T_ifsr, _INTERNAL_SEQ_VALUE=None): + self._sum = None + self._one_indexed = 1 + if _INTERNAL_SEQ_VALUE is not None: # used for internal optimization only + self._seq: tuple[T_if, ...] = _INTERNAL_SEQ_VALUE # type: ignore + return + flat = tuple(utils.flatten(source)) + flat_rvs = [x for x in flat if isinstance(x, randvar.RV) and not isinstance(x, blackrv.BlankRV)] # expand RVs + flat_rv_vals = [v for rv in flat_rvs for v in rv.vals] + flat_else: list[T_if] = [x for x in flat if not isinstance(x, (randvar.RV, blackrv.BlankRV))] + assert all(isinstance(x, (int, float)) for x in flat_else), 'Seq must be made of numbers and RVs. Seq:' + str(flat_else) + self._seq = tuple(flat_else + flat_rv_vals) + + def sum(self): + if self._sum is None: + self._sum = sum(self._seq) + return self._sum + + def set_one_indexed(self, one_indexed: bool): + self._one_indexed = 1 if one_indexed else 0 + + def __str__(self): + return '{?}' + + def __repr__(self): + return f'Seq({repr(self._seq)})' + + def __iter__(self): + return iter(self._seq) + + def __len__(self): + return len(self._seq) + + def __invert__(self): + return 1 if self.sum() == 0 else 0 + + def __getitem__(self, i: int): + return self._seq[i - self._one_indexed] if 0 <= i - self._one_indexed < len(self._seq) else 0 + + def __matmul__(self, other: T_ifsr): + if isinstance(other, randvar.RV): # ( self:SEQ @ other:RV ) thus RV takes priority + return other.__rmatmul__(self) + # access at indices in other ( self @ other ) + if isinstance(other, (int, float)): + other = Seq([int(d) for d in str(other)]) # SEQ @ int thus convert int to sequence using base 10 + if not isinstance(other, Seq): + other = Seq(other) + assert all(isinstance(i, int) for i in self._seq), 'indices must be integers' + return sum(other[int(i)] for i in self._seq) + + def __rmatmul__(self, other: T_ifs): + if isinstance(other, randvar.RV): # ( other:RV @ self:SEQ ) thus not allowed, + raise TypeError(f'A position selector must be either a number or a sequence, but you provided "{other}"') + # access in my indices ( other @ self ) + if isinstance(other, (int, float)): + return self[int(other)] + if not isinstance(other, Seq): + other = Seq(other) + assert all(isinstance(i, int) for i in other._seq), 'indices must be integers' + return sum(self[int(i)] for i in other._seq) + + # operators + def __add__(self, other: T_ifs): + return operator.add(self.sum(), other) + + def __radd__(self, other: T_ifs): + return operator.add(other, self.sum()) + + def __sub__(self, other: T_ifs): + return operator.sub(self.sum(), other) + + def __rsub__(self, other: T_ifs): + return operator.sub(other, self.sum()) + + def __mul__(self, other: T_ifs): + return operator.mul(self.sum(), other) + + def __rmul__(self, other: T_ifs): + return operator.mul(other, self.sum()) + + def __floordiv__(self, other: T_ifs): + return operator.floordiv(self.sum(), other) + + def __rfloordiv__(self, other: T_ifs): + return operator.floordiv(other, self.sum()) + + def __truediv__(self, other: T_ifs): + return operator.truediv(self.sum(), other) + + def __rtruediv__(self, other: T_ifs): + return operator.truediv(other, self.sum()) + + def __pow__(self, other: T_ifs): + return operator.pow(self.sum(), other) + + def __rpow__(self, other: T_ifs): + return operator.pow(other, self.sum()) + + def __mod__(self, other: T_ifs): + return operator.mod(self.sum(), other) + + def __rmod__(self, other: T_ifs): + return operator.mod(other, self.sum()) + + # comparison operators + def __eq__(self, other: T_ifsr): + return self._compare_to(other, operator.eq) + + def __ne__(self, other: T_ifsr): + return self._compare_to(other, operator.ne) + + def __lt__(self, other: T_ifsr): + return self._compare_to(other, operator.lt) + + def __le__(self, other: T_ifsr): + return self._compare_to(other, operator.le) + + def __gt__(self, other: T_ifsr): + return self._compare_to(other, operator.gt) + + def __ge__(self, other: T_ifsr): + return self._compare_to(other, operator.ge) + + # boolean operators + def __or__(self, other: T_ifsr): + return int((self.sum() != 0) or (other != 0)) if isinstance(other, (int, float)) else operator.or_(self.sum(), other) + + def __ror__(self, other: T_ifsr): + return int((self.sum() != 0) or (other != 0)) if isinstance(other, (int, float)) else operator.or_(other, self.sum()) + + def __and__(self, other: T_ifsr): + return int((self.sum() != 0) and (other != 0)) if isinstance(other, (int, float)) else operator.and_(self.sum(), other) + + def __rand__(self, other: T_ifsr): + return int((self.sum() != 0) and (other != 0)) if isinstance(other, (int, float)) else operator.and_(other, self.sum()) + + def _compare_to(self, other: T_ifsr, operation: Callable[[float, T_ifr], bool]): + if isinstance(other, randvar.RV): + return operation(self.sum(), other) + if isinstance(other, Iterable): + if not isinstance(other, Seq): # convert to Seq if not already + other = Seq(*other) + if operation == operator.ne: # special case for NE, since it is ∃ as opposed to ∀ like the others + return not self._compare_to(other, operator.eq) + return all(operation(x, y) for x, y in zip_longest(self._seq, other._seq, fillvalue=float('-inf'))) + # if other is a number + return sum(1 for x in self._seq if operation(x, other)) + + @staticmethod + def seqs_are_equal(s1: T_ifs, s2: T_ifs): + assert not isinstance(s1, randvar.RV) and not isinstance(s2, randvar.RV), 'cannot compare Seq with RV' + if not isinstance(s1, Seq): + s1 = Seq(s1) + if not isinstance(s2, Seq): + s2 = Seq(s2) + return s1._seq == s2._seq diff --git a/src/settings.py b/src/settings.py new file mode 100644 index 0000000..43562be --- /dev/null +++ b/src/settings.py @@ -0,0 +1,42 @@ + + +# SETTINGS +DEFAULT_SETTINGS = { + 'RV_TRUNC': False, # if True, then RV will automatically truncate values to ints (replicate anydice behavior) + 'RV_IGNORE_ZERO_PROBS': False, # if True, then RV remove P=0 vals when creating RVs (False by default in anydice) + 'DEFAULT_OUTPUT_WIDTH': 180, # default width of output + 'DEFAULT_PRINT_FN': print, # default print function + 'INTERNAL_CURR_DEPTH': 0, # internal use only, for max_func_depth decorator + 'INTERNAL_CURR_DEPTH_WARNING_PRINTED': False, # used with the above + + 'position order': 'highest first', # 'highest first' or 'lowest first' + 'explode depth': 2, # can only be set to a positive integer (the default is 2) + 'maximum function depth': 10 # can only be set to a positive integer (the default is 10) +} +SETTINGS = DEFAULT_SETTINGS.copy() + + +def settings_set(name, value): + if name == "position order": + assert value in ("highest first", "lowest first"), 'position order must be "highest first" or "lowest first"' + elif name == "explode depth": + assert isinstance(value, int) and value > 0, '"explode depth" can only be set to a positive integer (the default is 2) got ' + str(value) + elif name == "maximum function depth": + assert isinstance(value, int) and value > 0, '"maximum function depth" can only be set to a positive integer (the default is 10) got ' + str(value) + elif name in ('RV_TRUNC', 'RV_IGNORE_ZERO_PROBS'): + if isinstance(value, str): + assert value.lower() in ('true', 'false'), 'value must be "True" or "False"' + value = value.lower() == 'true' + assert isinstance(value, bool), 'value must be a boolean' + elif name == 'DEFAULT_OUTPUT_WIDTH': + assert isinstance(value, int) and value > 0, 'DEFAULT_OUTPUT_WIDTH must be a positive integer' + elif name == 'DEFAULT_PRINT_FN': + assert callable(value), 'DEFAULT_PRINT_FN must be a callable' + else: + assert False, f'invalid setting name: {name}' + SETTINGS[name] = value + + +def settings_reset(): + SETTINGS.clear() + SETTINGS.update(DEFAULT_SETTINGS) diff --git a/src/typings.py b/src/typings.py new file mode 100644 index 0000000..111ae82 --- /dev/null +++ b/src/typings.py @@ -0,0 +1,17 @@ +from typing import Iterable, Union, TYPE_CHECKING + +if TYPE_CHECKING: # https://stackoverflow.com/questions/39740632/python-type-hinting-without-cyclic-imports + from .randvar import RV + from .blackrv import BlankRV + + +# TYPE DEFINITIONS +T_if = Union[int, float] +T_ifs = Union[T_if, Iterable['T_ifs']] # recursive type +T_is = Union[int, Iterable['T_is']] # recursive type + +T_isr = Union[T_is, 'RV', 'BlankRV'] +T_ifr = Union[T_if, 'RV', 'BlankRV'] +T_ifsr = Union[T_ifs, 'RV', 'BlankRV'] + +T_s = Iterable['T_ifs'] # same as T_ifs but excludes int and float (not iterable) diff --git a/test/casting_test.py b/test/casting_test.py index 0988b60..ffd060c 100644 --- a/test/casting_test.py +++ b/test/casting_test.py @@ -1,14 +1,12 @@ from typing import Union import pytest -import dice_calc.randvar as randvar -from dice_calc.randvar import RV, Seq, roll, anydice_casting - +from dice_calc import anydice_casting, roll, RV, Seq, settings_reset @pytest.fixture(autouse=True) -def settings_reset(): - randvar.settings_reset() +def settings_reset_fixture(): + settings_reset() @@ -135,7 +133,7 @@ def b(S:Seq): assert (a.vals, a.probs) == (vals, probs) # type: ignore def test_cast_then_matmul(): - @randvar.anydice_casting() + @anydice_casting() def count(V, SEQUENCE:Seq, *args): return V@SEQUENCE assert RV.dices_are_equal(count(1, roll(2, 4)), RV((1, 2, 3, 4), (1, 3, 5, 7))), 'NUM @ DICED SEQ' # type: ignore @@ -160,7 +158,7 @@ def f(A:int) -> Union[int, None]: @pytest.mark.run(order=-1) @pytest.mark.timeout(2) def test_time(): - @randvar.anydice_casting() + @anydice_casting() def a(n:int): return 0 a(roll(1_000)) # type: ignore diff --git a/test/funclib_test.py b/test/funclib_test.py index 58eb6c6..e81fc9f 100644 --- a/test/funclib_test.py +++ b/test/funclib_test.py @@ -1,13 +1,12 @@ import pytest -import dice_calc.randvar as randvar -from dice_calc.randvar import RV, Seq, roll, anydice_casting +from dice_calc import anydice_casting, roll, RV, Seq, settings_reset @pytest.fixture(autouse=True) -def settings_reset(): - randvar.settings_reset() +def settings_reset_fixture(): + settings_reset() from dice_calc.funclib import absolute diff --git a/test/parse_exec_test.py b/test/parse_exec_test.py index 82f21ba..6966aca 100644 --- a/test/parse_exec_test.py +++ b/test/parse_exec_test.py @@ -1,6 +1,6 @@ import pytest -from dice_calc.randvar import RV, settings_set +from dice_calc import RV, settings_set from dice_calc.parser import parse_and_exec import logging diff --git a/test/randvar_seq_test.py b/test/randvar_seq_test.py index dc6b851..d146775 100644 --- a/test/randvar_seq_test.py +++ b/test/randvar_seq_test.py @@ -1,13 +1,12 @@ from typing import Iterable import pytest -from dice_calc import randvar -from dice_calc.randvar import Seq, RV, roll +from dice_calc import RV, Seq, roll, settings_reset @pytest.fixture(autouse=True) -def settings_reset(): - randvar.settings_reset() +def settings_reset_fixture(): + settings_reset() def test_at_num(): diff --git a/test/randvar_test.py b/test/randvar_test.py index d810861..c8e79d0 100644 --- a/test/randvar_test.py +++ b/test/randvar_test.py @@ -1,13 +1,12 @@ import pytest import math -from dice_calc import randvar -from dice_calc.randvar import RV, BlankRV, Seq, roll, output +from dice_calc import roll, output, RV, Seq, BlankRV, settings_reset, settings_set @pytest.fixture(autouse=True) -def settings_reset(): - randvar.settings_reset() +def settings_reset_fixture(): + settings_reset() @pytest.mark.parametrize("vals,probs", [ @@ -50,10 +49,10 @@ def test_RV_init_empty(): assert isinstance(f, BlankRV) def test_probability_zero_RV(): - randvar.settings_set('RV_IGNORE_ZERO_PROBS', 'true') + settings_set('RV_IGNORE_ZERO_PROBS', 'true') a = RV((1, 2, 777), (1, 1, 0)) assert (a.vals, a.probs) == ((1, 2), (1, 1)) - randvar.settings_set('RV_IGNORE_ZERO_PROBS', 'false') + settings_set('RV_IGNORE_ZERO_PROBS', 'false') a = RV((1, 2, 777), (1, 1, 0)) assert (a.vals, a.probs) == ((1, 2, 777), (1, 1, 0)) @@ -83,7 +82,7 @@ def test_RV_equality(v, p, gv, gp): ('false', (3, 1, 3, 1, 5), (2, 6, 2, 4, 0), (1, 3, 5), (5, 2, 0)), ]) def test_RV_equality_SET_ZERO_PROB(setting, v, p, gv, gp): - randvar.settings_set('RV_IGNORE_ZERO_PROBS', setting) + settings_set('RV_IGNORE_ZERO_PROBS', setting) a = RV(v, p) assert (a.vals, a.probs) == (gv, gp) assert RV.dices_are_equal(a, RV(gv, gp)) @@ -382,12 +381,12 @@ def test_FAIL_die_matmul(rhs): def test_truncate(): - randvar.settings_set('RV_TRUNC', 'false') + settings_set('RV_TRUNC', 'false') a = roll(6) / roll(6) assert isinstance(a, RV) b = RV(a.vals, a.probs, truncate=False) c = RV(a.vals, a.probs, truncate=True) - randvar.settings_set('RV_TRUNC', 'true') + settings_set('RV_TRUNC', 'true') d = roll(6) / roll(6) assert RV.dices_are_equal(a, b) assert RV.dices_are_equal(c, d) diff --git a/test/test_fetch.py b/test/test_fetch.py index 62e61b8..4f99f7a 100644 --- a/test/test_fetch.py +++ b/test/test_fetch.py @@ -6,7 +6,7 @@ import copy import dice_calc.randvar -from dice_calc.randvar import RV, Seq, settings_reset, BlankRV +from dice_calc import RV, Seq, settings_reset, BlankRV from dice_calc.parser import parse_and_exec @@ -114,7 +114,7 @@ def check(inp: Union[RV, Seq, int], expected, i): @pytest.fixture(autouse=True) def fixture_settings_reset(): settings_reset() - dice_calc.randvar._MEMOIZED_ROLLS = {} + dice_calc.roller._MEMOIZED_ROLLS = {} @pytest.mark.parametrize("i", range(len(code_resp_pairs)))