Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions example/run_score_constrained_portfolios_sim.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Third party imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Add the project root directory to Python path
import os
import sys
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
src_path = os.path.join(project_root, 'src')
sys.path.append(project_root)
sys.path.append(src_path)

# Local application imports
from backtest import (
Backtest,
BacktestService,
append_custom
)
from mean_estimation import MeanEstimator
from optimization import (
PercentilePortfolios,
ScoreConstrainedPortfolios,
)
from builders import (
SelectionItemBuilder,
OptimizationItemBuilder,
bibfn_selection_data,
bibfn_return_series,
bibfn_bm_series,
bibfn_budget_constraint,
bibfn_box_constraints,
)
from data_loader import (
load_data_msci,
)
from helper_functions import output_to_strategies

path_to_data = '/home/quantagonia/PorQua/data/'

# Prepare data
data = load_data_msci(path = path_to_data, n = 24)

# Define rebalancing dates
n_days = 21 * 3
start_date = '2010-01-01'
dates = data['return_series'].index
rebdates = dates[dates > start_date][::n_days].strftime('%Y-%m-%d').tolist()
rebdates

selection_item_builders = {
'data': SelectionItemBuilder(bibfn = bibfn_selection_data),
# 'ltr': SelectionItemBuilder(bibfn = bibfn_selection_ltr),
# 'volume': SelectionItemBuilder(bibfn = bibfn_selection_volume, sector_stdz = True),
}

optimization_item_builders = {
'return_series': OptimizationItemBuilder(bibfn = bibfn_return_series, width = 365 * 3),
'bm_series': OptimizationItemBuilder(bibfn = bibfn_bm_series, width = 365 * 3),
'budget_constraint': OptimizationItemBuilder(bibfn = bibfn_budget_constraint, budget = 1),
'box_constraints': OptimizationItemBuilder(bibfn = bibfn_box_constraints, upper = 0.1),
}

bs = BacktestService(
data = data,
selection_item_builders = selection_item_builders,
optimization_item_builders = optimization_item_builders,
rebdates = rebdates,
append_fun = append_custom,
append_fun_args = ['w_dict']
)

# Define the mean estimator
mean_estimator = MeanEstimator(
method = 'geometric',
scalefactor = 1,
n_mom = 252,
n_rev = 21,
)

# Define portfolio optimization object and run backtest
#bs.optimization = PercentilePortfolios(
# n_percentile = 5,
# estimator = mean_estimator,
#)
equivolume_distanced_mode = False
bs.optimization = ScoreConstrainedPortfolios(
estimator = mean_estimator,
n_score_levels = 5,
equivolume_distanced = equivolume_distanced_mode,
)
bt_qp = Backtest()
bt_qp.run(bs = bs)


# Simulate the strategies
strat_dict = output_to_strategies(bt_qp.output)

# Simulate
fixed_costs = 0
variable_costs = 0
sim_dict = {
key: value.simulate(return_series = bs.data['return_series'], fc = fixed_costs, vc = variable_costs)
for key, value in strat_dict.items()
}

sim_qp = pd.concat(sim_dict, axis = 1).dropna()

# Plot
#np.log((1 + sim_qp)).cumsum().plot(figsize = (10, 6))
plt.figure(figsize=(10, 6))
for col in sim_qp.columns:
plt.plot(np.log(1 + sim_qp)[col].cumsum(), label=col)
plt.grid(True)
plt.legend()
plt.show()
74 changes: 74 additions & 0 deletions src/helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import numpy as np
import pickle
import matplotlib.pyplot as plt
import numpy as np

from portfolio import Portfolio, Strategy

Expand Down Expand Up @@ -127,3 +128,76 @@ def show_result(predictions, y_test, y_actual, method = None):
plt.legend(["True values", "Prediction"])
plt.title(method)
plt.show()




#------------------- Sampling long-only portfolios helpers -------------------

def sample_constrained_simplex(
n: int,
nsamples: int,
lb: np.ndarray | list = None,
ub: np.ndarray | list = None
) -> np.ndarray:
"""
Sample points from canonical simplex with coordinate-wise upper and lower bounds.

Args:
n: Dimension of the simplex
nsamples: Number of desired samples to keep
ub: Vector of upper bounds for each coordinate
lb: Vector of lower bounds for each coordinate (default: zeros)

Returns:
np.ndarray: Array of shape (nsamples, n) containing valid samples

Raises:
ValueError: If bounds dimensions don't match n or are inconsistent
"""
# Convert inputs to numpy arrays and validate
if ub is None:
ub = np.ones(n)
else:
ub = np.asarray(ub)
if lb is None:
lb = np.zeros(n)
else:
lb = np.asarray(lb)

# Input validation
if len(ub) != n or len(lb) != n:
raise ValueError(f"Bounds must have length {n}")
if not np.all(ub > lb):
raise ValueError("Upper bounds must be greater than lower bounds")
if not np.all(lb >= 0):
raise ValueError("Lower bounds must be non-negative")
if np.sum(lb) > 1 or np.max(ub) < 1/n:
raise ValueError("Bounds are incompatible with simplex constraints")

valid_samples = []
batch_size = max(1000, nsamples * 10) # Adjust batch size based on target
max_iter = 10000
iter = 0
success = False
while len(valid_samples) < nsamples and iter < max_iter:
# Generate exponential random variables
Y = np.random.exponential(scale=1.0, size=(batch_size, n))

# Normalize by row sums to get points on simplex
row_sums = Y.sum(axis=1, keepdims=True)
E = Y / row_sums

# Filter points where coordinates are within bounds
valid_mask = np.all(E <= ub, axis=1) & np.all(E >= lb, axis=1)
valid_points = E[valid_mask]

valid_samples.extend(valid_points)

# Trim to exact size if we have enough samples
if len(valid_samples) >= nsamples:
valid_samples = valid_samples[:nsamples]
success = True
break
iter += 1
return np.array(valid_samples), success
182 changes: 181 additions & 1 deletion src/optimization.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import numpy as np
import pandas as pd

from scipy.optimize import linprog

from helper_functions import to_numpy
from covariance import Covariance
Expand Down Expand Up @@ -415,3 +415,183 @@ def solve(self) -> bool:
self.results = {'weights': weights.to_dict(),
'w_dict': w_dict}
return True



class ScoreConstrainedPortfolios(Optimization):
"""
This class computes score-constrained portfolios.

It sets n score levels and computes n percentile portfolios.
For each percentile portfolio it computes the corresponding
score constrained portfolio that is the closest to it
according to the L1 norm.
"""

def __init__(self,
field: Optional[str] = None,
estimator: Optional[MeanEstimator] = None,
n_score_levels = 5,
equivolume_distanced = False,
**kwargs):
super().__init__(**kwargs)
self.params = {'solver_name': 'ScoreConstrained',
'n_score_levels': n_score_levels,
'equivolume_distanced': equivolume_distanced,
'field': field}
self.percentile_portfolio_solver = PercentilePortfolios(
n_percentiles = n_score_levels,
estimator = estimator,
)
self.sampling_done = False
self.samples = None
self.num_samples_factor = 10000

def set_objective(self, optimization_data: OptimizationData) -> None:
self.percentile_portfolio_solver.set_objective(optimization_data)

def solve(self) -> dict:
if not self.sampling_done and self.params.get('equivolume_distanced'):
success = self.compute_samples()
if not success:
self.params['equivolume_distanced'] = False
print("Equi-volume distanced score levels failed. Using linear programming instead.")
else:
self.sampling_done = True

self.percentile_portfolio_solver.solve()
scores = self.percentile_portfolio_solver.objective['scores']
N = self.params['n_score_levels']

constraints = self.constraints
GhAb = constraints.to_GhAb()

if not self.params.get('equivolume_distanced'):
min_score, max_score = self.get_max_min_score(scores, GhAb)

# set N equi-distanced score values starting from min and the last being the max
score_vec = np.linspace(min_score, max_score, N+2)
# keep all except the first and the last one
score_vec = score_vec[1:-1]
else:
score_vec = self.samples @ scores

# Sort the score vector
sorted_scores = np.sort(score_vec)

# Calculate indices for N evenly spaced percentiles
n_samples = len(sorted_scores)
indices = [int(i * n_samples/(N+1)) for i in range(1, N+1)]

# Get the score values at those indices
score_vec = sorted_scores[indices]

n = len(scores)

# we represent the portfolio domain with the form P = {x | Gx <= h, Ax = b}
lb = constraints.box['lower'] if constraints.box['box_type'] != 'NA' else None
ub = constraints.box['upper'] if constraints.box['box_type'] != 'NA' else None

A = GhAb['A']
b = GhAb['b']

G = None
h = None
if GhAb['G'] is None:
G = np.block([
[np.eye(n)],
[-np.eye(n)]
])
else:
G = np.block([
GhAb['G'],
[np.eye(n)],
[-np.eye(n)]
])
if GhAb['h'] is None:
if ub is None or lb is None:
raise ValueError("Portfolio domain is unbounded.")
h = np.append(ub, -lb)
else:
h = np.concatenate([GhAb['h'], ub, -np.array(lb)])
# we add the coefficients of the slack variables in G before the loop because they are the same for all iterations
G_iter = np.block([
[G, np.zeros((G.shape[0], n))],
[np.eye(n), -np.eye(n)],
[-np.eye(n), -np.eye(n)],
[np.zeros((n, n)), -np.eye(n)]
])
bounds = list(zip(np.full(2*n, -np.inf), np.full(2*n, np.inf)))
# the objective function which is the \sum y_i
c = np.concatenate([np.zeros(n), np.ones(n)])

w_dict = {}
counter = 0
for score in score_vec:
w = scores * 0
w_perc = self.percentile_portfolio_solver.results['w_dict'][counter+1]
w[w_perc.keys()] = 1 / len(w_perc.keys())

# add the score constraint
A_iter = np.vstack((A, scores.to_numpy()))
b_iter = np.append(b, score)
# shift the origin to the percentile portfolio w
b_iter = b_iter - A_iter @ w
h_iter = h - G @ w
# add the coefficients of the slack variables
A_iter = np.hstack((A_iter, np.zeros((A_iter.shape[0], n))))
h_iter = np.append(h_iter, np.zeros(3*n))
# solve the linear program that minimizes the L1 distance from w
result = linprog(c=c, A_ub=G_iter, b_ub=h_iter, A_eq=A_iter, b_eq=b_iter, bounds = bounds)

if not result.success:
raise ValueError("L1 minimization failed.") #stop excecution

weights = scores * 0
# take the first n coordinates of the solution and ignore the slack variables
weights[scores.keys()] = result.x[:n]
w_dict[counter+1] = weights + w # shift back by w
counter += 1

weights = w_dict[1] - w_dict[N]
self.results = {
'weights': weights.to_dict(),
'w_dict': w_dict
}
return True

def get_max_min_score(self, scores, GhAb):
n = len(scores)
lb = self.constraints.box['lower'] if self.constraints.box['box_type'] != 'NA' else [None] * n
ub = self.constraints.box['upper'] if self.constraints.box['box_type'] != 'NA' else [None] * n
bounds = list(zip(lb, ub))

port_min_score = linprog(c = scores, A_ub = GhAb['G'], b_ub = GhAb['h'], A_eq=GhAb['A'], b_eq=GhAb['b'], bounds = bounds)
port_max_score = linprog(c = -scores, A_ub = GhAb['G'], b_ub = GhAb['h'], A_eq=GhAb['A'], b_eq=GhAb['b'], bounds = bounds)
return port_min_score.fun, -port_max_score.fun

def compute_samples(self):
GhAb = self.constraints.to_GhAb()
n = GhAb['A'].shape[1]
n_score_levels = self.params['n_score_levels']
lb = self.constraints.box['lower'] if self.constraints.box['box_type'] != 'NA' else None
ub = self.constraints.box['upper'] if self.constraints.box['box_type'] != 'NA' else None
# check if the portfolio domain is a subset of the canonical simplex
if not (GhAb['A'] is not None and GhAb['A'].shape[0] == 1 and np.all(GhAb['A'][0] == 1) and GhAb['A'].shape[1] == n):
return False
if not (GhAb['b'] is not None and np.asarray(GhAb['b']).size == 1 and np.asarray(GhAb['b']).item() == 1):
return False
if GhAb['G'] is not None or GhAb['h'] is not None:
return False
if not (isinstance(lb, pd.Series) and len(lb) == n and np.all(lb >= 0)):
return False
if not (isinstance(ub, pd.Series) and len(ub) == n and np.all(ub <= 1) and np.all(ub > lb)):
return False

from helper_functions import sample_constrained_simplex

samples, success = sample_constrained_simplex(n=n, nsamples=(n_score_levels+1)*self.num_samples_factor, lb=lb, ub=ub)
if not success:
return False
self.samples = samples
return True