From 7351c3543bbf5d981175f4e195be21a571b62e81 Mon Sep 17 00:00:00 2001 From: Apostolos Chalkis Date: Thu, 5 Dec 2024 15:33:14 +0200 Subject: [PATCH] implement score constrained portfolio features --- .../run_score_constrained_portfolios_sim.py | 116 +++++++++++ src/helper_functions.py | 74 +++++++ src/optimization.py | 182 +++++++++++++++++- 3 files changed, 371 insertions(+), 1 deletion(-) create mode 100644 example/run_score_constrained_portfolios_sim.py diff --git a/example/run_score_constrained_portfolios_sim.py b/example/run_score_constrained_portfolios_sim.py new file mode 100644 index 0000000..cdbd14c --- /dev/null +++ b/example/run_score_constrained_portfolios_sim.py @@ -0,0 +1,116 @@ +# Third party imports +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt + +# Add the project root directory to Python path +import os +import sys +project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +src_path = os.path.join(project_root, 'src') +sys.path.append(project_root) +sys.path.append(src_path) + +# Local application imports +from backtest import ( + Backtest, + BacktestService, + append_custom +) +from mean_estimation import MeanEstimator +from optimization import ( + PercentilePortfolios, + ScoreConstrainedPortfolios, +) +from builders import ( + SelectionItemBuilder, + OptimizationItemBuilder, + bibfn_selection_data, + bibfn_return_series, + bibfn_bm_series, + bibfn_budget_constraint, + bibfn_box_constraints, +) +from data_loader import ( + load_data_msci, +) +from helper_functions import output_to_strategies + +path_to_data = '/home/quantagonia/PorQua/data/' + +# Prepare data +data = load_data_msci(path = path_to_data, n = 24) + +# Define rebalancing dates +n_days = 21 * 3 +start_date = '2010-01-01' +dates = data['return_series'].index +rebdates = dates[dates > start_date][::n_days].strftime('%Y-%m-%d').tolist() +rebdates + +selection_item_builders = { + 'data': SelectionItemBuilder(bibfn = bibfn_selection_data), + # 'ltr': SelectionItemBuilder(bibfn = bibfn_selection_ltr), + # 'volume': SelectionItemBuilder(bibfn = bibfn_selection_volume, sector_stdz = True), +} + +optimization_item_builders = { + 'return_series': OptimizationItemBuilder(bibfn = bibfn_return_series, width = 365 * 3), + 'bm_series': OptimizationItemBuilder(bibfn = bibfn_bm_series, width = 365 * 3), + 'budget_constraint': OptimizationItemBuilder(bibfn = bibfn_budget_constraint, budget = 1), + 'box_constraints': OptimizationItemBuilder(bibfn = bibfn_box_constraints, upper = 0.1), +} + +bs = BacktestService( + data = data, + selection_item_builders = selection_item_builders, + optimization_item_builders = optimization_item_builders, + rebdates = rebdates, + append_fun = append_custom, + append_fun_args = ['w_dict'] +) + +# Define the mean estimator +mean_estimator = MeanEstimator( + method = 'geometric', + scalefactor = 1, + n_mom = 252, + n_rev = 21, +) + +# Define portfolio optimization object and run backtest +#bs.optimization = PercentilePortfolios( +# n_percentile = 5, +# estimator = mean_estimator, +#) +equivolume_distanced_mode = False +bs.optimization = ScoreConstrainedPortfolios( + estimator = mean_estimator, + n_score_levels = 5, + equivolume_distanced = equivolume_distanced_mode, +) +bt_qp = Backtest() +bt_qp.run(bs = bs) + + +# Simulate the strategies +strat_dict = output_to_strategies(bt_qp.output) + +# Simulate +fixed_costs = 0 +variable_costs = 0 +sim_dict = { + key: value.simulate(return_series = bs.data['return_series'], fc = fixed_costs, vc = variable_costs) + for key, value in strat_dict.items() +} + +sim_qp = pd.concat(sim_dict, axis = 1).dropna() + +# Plot +#np.log((1 + sim_qp)).cumsum().plot(figsize = (10, 6)) +plt.figure(figsize=(10, 6)) +for col in sim_qp.columns: + plt.plot(np.log(1 + sim_qp)[col].cumsum(), label=col) +plt.grid(True) +plt.legend() +plt.show() diff --git a/src/helper_functions.py b/src/helper_functions.py index 868deab..9ad1fa8 100644 --- a/src/helper_functions.py +++ b/src/helper_functions.py @@ -20,6 +20,7 @@ import numpy as np import pickle import matplotlib.pyplot as plt +import numpy as np from portfolio import Portfolio, Strategy @@ -127,3 +128,76 @@ def show_result(predictions, y_test, y_actual, method = None): plt.legend(["True values", "Prediction"]) plt.title(method) plt.show() + + + + +#------------------- Sampling long-only portfolios helpers ------------------- + +def sample_constrained_simplex( + n: int, + nsamples: int, + lb: np.ndarray | list = None, + ub: np.ndarray | list = None +) -> np.ndarray: + """ + Sample points from canonical simplex with coordinate-wise upper and lower bounds. + + Args: + n: Dimension of the simplex + nsamples: Number of desired samples to keep + ub: Vector of upper bounds for each coordinate + lb: Vector of lower bounds for each coordinate (default: zeros) + + Returns: + np.ndarray: Array of shape (nsamples, n) containing valid samples + + Raises: + ValueError: If bounds dimensions don't match n or are inconsistent + """ + # Convert inputs to numpy arrays and validate + if ub is None: + ub = np.ones(n) + else: + ub = np.asarray(ub) + if lb is None: + lb = np.zeros(n) + else: + lb = np.asarray(lb) + + # Input validation + if len(ub) != n or len(lb) != n: + raise ValueError(f"Bounds must have length {n}") + if not np.all(ub > lb): + raise ValueError("Upper bounds must be greater than lower bounds") + if not np.all(lb >= 0): + raise ValueError("Lower bounds must be non-negative") + if np.sum(lb) > 1 or np.max(ub) < 1/n: + raise ValueError("Bounds are incompatible with simplex constraints") + + valid_samples = [] + batch_size = max(1000, nsamples * 10) # Adjust batch size based on target + max_iter = 10000 + iter = 0 + success = False + while len(valid_samples) < nsamples and iter < max_iter: + # Generate exponential random variables + Y = np.random.exponential(scale=1.0, size=(batch_size, n)) + + # Normalize by row sums to get points on simplex + row_sums = Y.sum(axis=1, keepdims=True) + E = Y / row_sums + + # Filter points where coordinates are within bounds + valid_mask = np.all(E <= ub, axis=1) & np.all(E >= lb, axis=1) + valid_points = E[valid_mask] + + valid_samples.extend(valid_points) + + # Trim to exact size if we have enough samples + if len(valid_samples) >= nsamples: + valid_samples = valid_samples[:nsamples] + success = True + break + iter += 1 + return np.array(valid_samples), success diff --git a/src/optimization.py b/src/optimization.py index e87ba19..3b37024 100644 --- a/src/optimization.py +++ b/src/optimization.py @@ -20,7 +20,7 @@ import numpy as np import pandas as pd - +from scipy.optimize import linprog from helper_functions import to_numpy from covariance import Covariance @@ -415,3 +415,183 @@ def solve(self) -> bool: self.results = {'weights': weights.to_dict(), 'w_dict': w_dict} return True + + + +class ScoreConstrainedPortfolios(Optimization): + """ + This class computes score-constrained portfolios. + + It sets n score levels and computes n percentile portfolios. + For each percentile portfolio it computes the corresponding + score constrained portfolio that is the closest to it + according to the L1 norm. + """ + + def __init__(self, + field: Optional[str] = None, + estimator: Optional[MeanEstimator] = None, + n_score_levels = 5, + equivolume_distanced = False, + **kwargs): + super().__init__(**kwargs) + self.params = {'solver_name': 'ScoreConstrained', + 'n_score_levels': n_score_levels, + 'equivolume_distanced': equivolume_distanced, + 'field': field} + self.percentile_portfolio_solver = PercentilePortfolios( + n_percentiles = n_score_levels, + estimator = estimator, + ) + self.sampling_done = False + self.samples = None + self.num_samples_factor = 10000 + + def set_objective(self, optimization_data: OptimizationData) -> None: + self.percentile_portfolio_solver.set_objective(optimization_data) + + def solve(self) -> dict: + if not self.sampling_done and self.params.get('equivolume_distanced'): + success = self.compute_samples() + if not success: + self.params['equivolume_distanced'] = False + print("Equi-volume distanced score levels failed. Using linear programming instead.") + else: + self.sampling_done = True + + self.percentile_portfolio_solver.solve() + scores = self.percentile_portfolio_solver.objective['scores'] + N = self.params['n_score_levels'] + + constraints = self.constraints + GhAb = constraints.to_GhAb() + + if not self.params.get('equivolume_distanced'): + min_score, max_score = self.get_max_min_score(scores, GhAb) + + # set N equi-distanced score values starting from min and the last being the max + score_vec = np.linspace(min_score, max_score, N+2) + # keep all except the first and the last one + score_vec = score_vec[1:-1] + else: + score_vec = self.samples @ scores + + # Sort the score vector + sorted_scores = np.sort(score_vec) + + # Calculate indices for N evenly spaced percentiles + n_samples = len(sorted_scores) + indices = [int(i * n_samples/(N+1)) for i in range(1, N+1)] + + # Get the score values at those indices + score_vec = sorted_scores[indices] + + n = len(scores) + + # we represent the portfolio domain with the form P = {x | Gx <= h, Ax = b} + lb = constraints.box['lower'] if constraints.box['box_type'] != 'NA' else None + ub = constraints.box['upper'] if constraints.box['box_type'] != 'NA' else None + + A = GhAb['A'] + b = GhAb['b'] + + G = None + h = None + if GhAb['G'] is None: + G = np.block([ + [np.eye(n)], + [-np.eye(n)] + ]) + else: + G = np.block([ + GhAb['G'], + [np.eye(n)], + [-np.eye(n)] + ]) + if GhAb['h'] is None: + if ub is None or lb is None: + raise ValueError("Portfolio domain is unbounded.") + h = np.append(ub, -lb) + else: + h = np.concatenate([GhAb['h'], ub, -np.array(lb)]) + # we add the coefficients of the slack variables in G before the loop because they are the same for all iterations + G_iter = np.block([ + [G, np.zeros((G.shape[0], n))], + [np.eye(n), -np.eye(n)], + [-np.eye(n), -np.eye(n)], + [np.zeros((n, n)), -np.eye(n)] + ]) + bounds = list(zip(np.full(2*n, -np.inf), np.full(2*n, np.inf))) + # the objective function which is the \sum y_i + c = np.concatenate([np.zeros(n), np.ones(n)]) + + w_dict = {} + counter = 0 + for score in score_vec: + w = scores * 0 + w_perc = self.percentile_portfolio_solver.results['w_dict'][counter+1] + w[w_perc.keys()] = 1 / len(w_perc.keys()) + + # add the score constraint + A_iter = np.vstack((A, scores.to_numpy())) + b_iter = np.append(b, score) + # shift the origin to the percentile portfolio w + b_iter = b_iter - A_iter @ w + h_iter = h - G @ w + # add the coefficients of the slack variables + A_iter = np.hstack((A_iter, np.zeros((A_iter.shape[0], n)))) + h_iter = np.append(h_iter, np.zeros(3*n)) + # solve the linear program that minimizes the L1 distance from w + result = linprog(c=c, A_ub=G_iter, b_ub=h_iter, A_eq=A_iter, b_eq=b_iter, bounds = bounds) + + if not result.success: + raise ValueError("L1 minimization failed.") #stop excecution + + weights = scores * 0 + # take the first n coordinates of the solution and ignore the slack variables + weights[scores.keys()] = result.x[:n] + w_dict[counter+1] = weights + w # shift back by w + counter += 1 + + weights = w_dict[1] - w_dict[N] + self.results = { + 'weights': weights.to_dict(), + 'w_dict': w_dict + } + return True + + def get_max_min_score(self, scores, GhAb): + n = len(scores) + lb = self.constraints.box['lower'] if self.constraints.box['box_type'] != 'NA' else [None] * n + ub = self.constraints.box['upper'] if self.constraints.box['box_type'] != 'NA' else [None] * n + bounds = list(zip(lb, ub)) + + port_min_score = linprog(c = scores, A_ub = GhAb['G'], b_ub = GhAb['h'], A_eq=GhAb['A'], b_eq=GhAb['b'], bounds = bounds) + port_max_score = linprog(c = -scores, A_ub = GhAb['G'], b_ub = GhAb['h'], A_eq=GhAb['A'], b_eq=GhAb['b'], bounds = bounds) + return port_min_score.fun, -port_max_score.fun + + def compute_samples(self): + GhAb = self.constraints.to_GhAb() + n = GhAb['A'].shape[1] + n_score_levels = self.params['n_score_levels'] + lb = self.constraints.box['lower'] if self.constraints.box['box_type'] != 'NA' else None + ub = self.constraints.box['upper'] if self.constraints.box['box_type'] != 'NA' else None + # check if the portfolio domain is a subset of the canonical simplex + if not (GhAb['A'] is not None and GhAb['A'].shape[0] == 1 and np.all(GhAb['A'][0] == 1) and GhAb['A'].shape[1] == n): + return False + if not (GhAb['b'] is not None and np.asarray(GhAb['b']).size == 1 and np.asarray(GhAb['b']).item() == 1): + return False + if GhAb['G'] is not None or GhAb['h'] is not None: + return False + if not (isinstance(lb, pd.Series) and len(lb) == n and np.all(lb >= 0)): + return False + if not (isinstance(ub, pd.Series) and len(ub) == n and np.all(ub <= 1) and np.all(ub > lb)): + return False + + from helper_functions import sample_constrained_simplex + + samples, success = sample_constrained_simplex(n=n, nsamples=(n_score_levels+1)*self.num_samples_factor, lb=lb, ub=ub) + if not success: + return False + self.samples = samples + return True