Source code for pyvallocation.optimization

r"""
Portfolio-Optimisation Toolbox
==============================

This package groups three **single-period convex allocation models** and
exposes an identical, numerically stable API for each of them.  All back-end
calls rely exclusively on **CVXOPT 1.3+**; hence global optimality is
guaranteed provided the solver returns a status flag *"optimal"*.

.. list-table::
   :header-rows: 1
   :widths: 18 40 18 18

   * - Acronym
     - Risk measure / Objective functional :math:`f(w)`
     - Cone class
     - CVXOPT routine
   * - ``MV``
     - :math:`\tfrac12\,w^{\top}\Sigma w`
     - PSD
     - - :py:func:`cvxopt.solvers.qp`
   * - ``CVaR``\ :sub:`\alpha`
     - :math:`\operatorname{CVaR}_{\alpha}\bigl(-R\,w\bigr)`
     - LP
     - :py:func:`cvxopt.solvers.conelp`
   * - ``RB``
     - :math:`\displaystyle
       \max_{\mu\in\mathcal U}\bigl[-w^{\top}\mu + \lambda\|S^{1/2}w\|_2\bigr]`
     - SOC
     - :py:func:`cvxopt.solvers.conelp`

Global symbols
--------------
* :math:`N`  -- number of risky assets.
* :math:`T`  -- number of Monte-Carlo or historical scenarios.
* :math:`R\in\mathbb R^{T\times N}` -- scenario matrix of *excess* returns.
* :math:`p\in\Delta^{T}` -- probability vector, :math:`\mathbf1^{\top}p=1`.
* :math:`\mu:=R^{\top}p`,   :math:`\Sigma:=(R-\mu^{\top})^{\top}\!\mathrm{diag}(p)(R-\mu^{\top})`.
* :math:`w\in\mathbb R^{N}` -- portfolio after re-balancing,  
  :math:`\mathbf1^{\top}w=1`.

Optional affine trading rules
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. math::
   G\,w\;\le\;h, \qquad A\,w\;=\;b,

encode leverage caps, tracking constraints, minimum/maximum holdings, *etc.*

Transaction-cost primitives
---------------------------
* *Quadratic impact* : :math:`\Lambda=\operatorname{diag}(\lambda)`   (QP only)

  .. math:: (w-w_0)^{\top}\Lambda\,(w-w_0).

* *Proportional turnover* : :math:`c^{+},c^{-}\ge0`   (LP/SOCP)

  .. math::
     \sum_{i=1}^{N}\bigl(c^{+}_iu^{+}_i+c^{-}_iu^{-}_i\bigr),
     \qquad
     w = w_0 + u^{+}-u^{-},\;u^{+},u^{-}\ge0.

Primary references
------------------
Markowitz (1952); Rockafellar & Uryasev (2000); Meucci (2005);
Lobo *et al.* (2007).

Credits 
------------------
MeanVariance and MeanCVaR classes are adapted from fortitudo-tech package (https://github.com/fortitudo-tech/fortitudo.tech)

"""

from __future__ import annotations

import logging
from abc import ABC
from dataclasses import dataclass
from typing import Optional, Sequence, Tuple, Final

import numpy as np
import numpy.typing as npt
from cvxopt import matrix, solvers

from .bayesian import _cholesky_pd


[docs] class InfeasibleOptimizationError(RuntimeError): """Raised when a portfolio optimisation problem has no feasible solution.""" pass
# ------------------------------------------------------------------ # # Solver settings & logging # ------------------------------------------------------------------ # solvers.options.update({"glpk": {"msg_lev": "GLP_MSG_OFF"}, "show_progress": False}) _LOGGER: Final = logging.getLogger(__name__) # ------------------------------------------------------------------ # # Helpers # ------------------------------------------------------------------ # def _check_shapes(**arrays: npt.NDArray[np.floating]) -> None: """Raise ``ValueError`` if any supplied arrays have mismatched shapes. Args: **arrays: Named arrays to compare. Raises: ValueError: If shapes are not identical. """ shapes = {k: v.shape for k, v in arrays.items()} if len(set(shapes.values())) > 1: raise ValueError(f"Shape mismatch: {shapes}") def _quadratic_turnover( P: np.ndarray, q: np.ndarray, w0: npt.NDArray[np.floating], lambdas: npt.NDArray[np.floating], ) -> tuple[np.ndarray, np.ndarray]: r""" Inject the quadratic impact term :math:`(w-w_0)^{\top}\Lambda(w-w_0)` into the Hessian/gradient pair expected by *CVXOPT*. The factor two stems from the *1/2* convention adopted internally by :py:func:`cvxopt.solvers.qp`. """ _check_shapes(w0=w0, lambdas=lambdas) lambda_diag = np.diag(lambdas) return P + 2 * lambda_diag, q - 2 * lambda_diag @ w0 def _linear_turnover_blocks( N: int, T: int, w0: npt.NDArray[np.floating], costs: npt.NDArray[np.floating], ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: r""" Construct the *LP* blocks that realise proportional turnover costs. Returns ------- c_cost : np.ndarray Objective coefficients for :math:`u^{+},u^{-}` (concatenated ``[c,c]``). A_trade : np.ndarray Coefficient matrix that enforces the inventory identity :math:`w = w_0 + u^{+}-u^{-}`. b_trade : np.ndarray Right-hand side (simply ``w0``). """ _check_shapes(w0=w0, costs=costs) c_cost = np.hstack((costs, costs)) A_trade = np.concatenate( (np.eye(N), np.zeros((N, 1 + T)), -np.eye(N), np.eye(N)), axis=1 ) return c_cost, A_trade, w0 # ------------------------------------------------------------------ # # Result container # ------------------------------------------------------------------ #
[docs] @dataclass(frozen=True) class OptimizationResult: r""" Immutable result object returned by :class:`RobustOptimizer`. It stores the optimal **post-trade weights**, the associated point-estimate return and a model-specific **risk proxy** (\sigma* for mean-variance, CVaR* for mean-CVaR, or the robust radius *t**). """ weights: npt.NDArray[np.floating] nominal_return: float risk: float
[docs] @dataclass(frozen=True) class RelaxedRiskParityResult: r""" Result container for the relaxed risk parity SOCP. Attributes ---------- weights : Optimal long-only allocations :math:`x^{\\star}` (shape ``(n,)``). marginal_risk : Gradient vector :math:`\\zeta^{\\star} = \\Sigma x^{\\star}` (variance-scale, not Roncalli's volatility-normalised marginal risk). psi : Optimal average-risk proxy :math:`\\psi^{\\star}`. gamma : Optimal ARC floor :math:`\\gamma^{\\star}`; enforces the lower risk contribution bound :math:`x_i \\zeta_i \\ge \\gamma^2`. rho : Regulator slack variable :math:`\\rho^{\\star}` that upper bounds the diagonal risk penalty :math:`\\lambda\\,x^{\\top}\\Theta x`. objective : Optimal value of :math:`\\psi - \\gamma`, i.e. the gap between the average-risk ceiling and the ARC floor. target_return : Effective return constraint :math:`\\mu^{\\top}x \\ge R` imposed at optimality. May differ from the requested target when clipping was required. max_return : Feasible bound on the return target under the supplied constraints. ``None`` if no target was requested. """ weights: npt.NDArray[np.floating] marginal_risk: npt.NDArray[np.floating] psi: float gamma: float rho: float objective: float target_return: float | None max_return: float | None
# ------------------------------------------------------------------ # # Base class # ------------------------------------------------------------------ # class _BaseOptimization(ABC): r""" Abstract helper that stores **data & affine constraints** common to all models. Implementations must 1. call :py:func:`_init_constraints` early in ``__init__``; 2. finish with :py:func:`_finalise_expected_row` (passing ``extra_dims``). The latter pre-computes a padded row :math:`[-\mu^{\top},\,0,\dots,0]` used by all efficient-frontier routines. """ _I: int _mean: np.ndarray _G: matrix | None _h: matrix | None _A: matrix | None _b: matrix | None _expected_row: matrix # ---- init helpers --------------------------------------------- # def _init_constraints( self, mean: npt.ArrayLike, G: Optional[npt.ArrayLike], h: Optional[npt.ArrayLike], A: Optional[npt.ArrayLike], b: Optional[npt.ArrayLike], ) -> None: """Store the mean vector and affine constraints as CVXOPT matrices. Args: mean: Expected returns vector. G, h: Inequality constraint matrices such that ``G w <= h``. A, b: Equality constraint matrices such that ``A w = b``. """ self._mean = np.asarray(mean, float) self._I = self._mean.size self._G = matrix(G) if G is not None else None self._h = matrix(h) if h is not None else None self._A = matrix(A) if A is not None else None self._b = matrix(b) if b is not None else None def _finalise_expected_row(self, extra: int) -> None: r"""Pad ``-\mu`` with *extra* zeros for later frontier sweeps. Args: extra: Number of zeros to append. """ self._expected_row = -matrix( np.hstack((self._mean, np.zeros(extra, float))) ).T # ---- utilities ------------------------------------------------ # @staticmethod def _assert_optimal(sol: dict, kind: str) -> None: """Raise when a CVXOPT solve did not terminate optimally. Args: sol: Solver output dictionary. kind: Solver label used in error messages. """ if sol["status"] != "optimal": raise RuntimeError(f"{kind} solver failed (status='{sol['status']}').") def _max_expected_return(self) -> float: r""" Solve a single auxiliary LP to *maximise* :math:`\mu^{\top}w` subject to the current constraint set. Used to anchor the **right end** of every efficient frontier. """ sol = solvers.lp( self._expected_row.T, self._G, self._h, self._A, self._b, solver="glpk" ) self._assert_optimal(sol, "LP") return -sol["primal objective"] # ------------------------------------------------------------------ # # Frontier mix-in # ------------------------------------------------------------------ # class _FrontierMixin: r""" Provides :pymeth:`_frontier` - a generic **linear interpolation** between the minimum-risk portfolio and the *return-maximiser* obtained from :py:func:`_BaseOptimization._max_expected_return`. """ def _frontier( self, first: npt.NDArray[np.floating], fn: callable, num: int, mean: np.ndarray, max_ret: float, ) -> np.ndarray: """Interpolate a frontier between minimum-risk and max-return portfolios. Args: first: Minimum-risk portfolio weights. fn: Callable returning weights for a target return. num: Number of portfolios to generate. mean: Mean return vector. max_ret: Maximum attainable return. Returns: np.ndarray: Weight matrix with shape ``(N, num)``. """ min_ret = float(mean @ first) if num < 2 or np.isclose(min_ret, max_ret): _LOGGER.warning("Frontier collapses to a single point.") return first[:, None] grid = np.linspace(min_ret, max_ret, num) return np.column_stack([first] + [fn(t) for t in grid[1:]]) # =================================================================== # # 1. MEAN-VARIANCE # =================================================================== #
[docs] class MeanVariance(_FrontierMixin, _BaseOptimization): r""" **Classic mean-variance programme** a la Markowitz (1952). Problem statement ----------------- .. math:: \begin{aligned} \min_{w}\;&\tfrac12\,w^{\top}\Sigma w + \tfrac12\,(w-w_0)^{\top}\Lambda(w-w_0) \\[3pt] \text{s.t. }& \mu^{\top}w \;\ge\; \tau, \quad \mathbf1^{\top}w = 1, \quad G w \le h,\; A w = b. \end{aligned} * ``\tau`` is supplied on the fly via :py:meth:`efficient_portfolio`. * ``\Lambda`` (quadratic impact) is optional; if omitted the model degenerates to the textbook QP with *no* trading costs. Notes ----- The QP is **strictly convex** whenever ``\Sigma`` > 0 or at least one positive \lambda_i is present, hence the solution is unique. Args: mean, covariance: Mean vector :math:`\\mu` and covariance matrix :math:`\\Sigma`. G, h, A, b: Optional affine constraints as defined in the module docstring. initial_weights, market_impact_costs: ``w0`` and diag-elements of :math:`\\Lambda` – must be passed together. """ def __init__( self, mean: npt.ArrayLike, covariance: npt.ArrayLike, G: Optional[npt.ArrayLike] = None, h: Optional[npt.ArrayLike] = None, A: Optional[npt.ArrayLike] = None, b: Optional[npt.ArrayLike] = None, *, initial_weights: Optional[npt.NDArray[np.floating]] = None, market_impact_costs: Optional[npt.NDArray[np.floating]] = None, ): """Create a mean-variance optimizer instance. Args: mean: Expected return vector. covariance: Covariance matrix. G, h: Inequality constraints ``G w <= h``. A, b: Equality constraints ``A w = b``. initial_weights: Optional initial portfolio for quadratic costs. market_impact_costs: Optional diagonal costs (requires ``initial_weights``). """ self._cov = np.asarray(covariance, float) super()._init_constraints(mean, G, h, A, b) P, q = 2 * self._cov, np.zeros(self._I) if initial_weights is not None and market_impact_costs is not None: P, q = _quadratic_turnover(P, q, initial_weights, market_impact_costs) self._P, self._q = matrix(P), matrix(q) self._finalise_expected_row(0) # ---------------------------------------------------------------- # # core solver # ---------------------------------------------------------------- # def _solve_target(self, return_target: float | None = None) -> np.ndarray: r""" Solve the QP **once** for a given target :math:`\tau`. Passing ``None`` yields the *minimum-variance* solution, i.e. the left-most point on the efficient frontier. """ G, h = self._G, self._h if return_target is not None: G = self._expected_row if G is None else matrix([G, self._expected_row]) h = matrix([-return_target]) if h is None else matrix( np.append(np.asarray(h).ravel(), -return_target) ) sol = solvers.qp(self._P, self._q, G, h, self._A, self._b) if sol["status"] != "optimal": # scale-adaptive ridge regularisation fallback P_np = np.asarray(self._P) ridge = max(1e-8, 1e-8 * np.trace(P_np) / max(self._I, 1)) P_reg = matrix(P_np + np.eye(self._I) * ridge) _LOGGER.debug("QP retry with ridge=%.2e", ridge) sol = solvers.qp(P_reg, self._q, G, h, self._A, self._b) self._assert_optimal(sol, "QP") return np.asarray(sol["x"]).ravel() efficient_portfolio = _solve_target
[docs] def efficient_frontier(self, num_portfolios: int) -> np.ndarray: """ Return an ``(N, num_portfolios)`` array whose columns trace the Markowitz efficient set between the variance minimiser and the return maximiser. Args: num_portfolios: Number of portfolios on the frontier. Returns: np.ndarray: Weight matrix with shape ``(N, num_portfolios)``. """ first = self._solve_target(None) max_ret = self._max_expected_return() return self._frontier(first, self._solve_target, num_portfolios, self._mean, max_ret)
[docs] def max_sharpe(self, risk_free_rate: float = 0.0) -> OptimizationResult: r"""Solve directly for the maximum Sharpe ratio portfolio. Uses the Cornuejols-Tütüncü (2007) reformulation: .. math:: \min_{y}\; y^\top \Sigma\, y \quad\text{s.t.}\quad (\mu - r_f)^\top y = 1,\; G\,y \le 0,\; A\,y = 0 The optimal weights are ``w = y / \mathbf{1}^\top y``. Args: risk_free_rate: Risk-free rate for Sharpe computation. Returns: OptimizationResult: Optimal weights, return, and volatility. """ mu_excess = np.asarray(self._mean).ravel() - risk_free_rate N = self._I # Build the auxiliary problem: min y'Σy s.t. (μ-rf)'y = 1 P = matrix(2.0 * np.asarray(self._cov, dtype=float)) q = matrix(np.zeros(N)) # Equality: (μ-rf)'y = 1 + any user equalities scaled to 0 A_rows = [mu_excess.reshape(1, -1)] b_vals = [1.0] if self._A is not None: # User equalities become homogeneous: A y = 0 (since we normalize later). # Non-homogeneous constraints (b != 0) cannot be enforced in this # reformulation (Cornuejols-Tutuncu) and are skipped with a warning. A_np = np.asarray(self._A) b_np = np.asarray(self._b).ravel() for row_idx in range(A_np.shape[0]): if not np.isclose(b_np[row_idx], 0.0, atol=1e-12): _LOGGER.warning( "max_sharpe: equality b[%d]=%.4g is non-homogeneous; " "skipping (cannot be enforced in Cornuejols-Tutuncu " "reformulation).", row_idx, b_np[row_idx], ) continue A_rows.append(A_np[row_idx:row_idx+1]) b_vals.append(0.0) A_mat = matrix(np.vstack(A_rows)) b_vec = matrix(np.array(b_vals, dtype=float)) # Inequalities: G y <= 0 (homogeneous version of G w <= h) if self._G is not None: G_mat = matrix(np.asarray(self._G, dtype=float)) h_vec = matrix(np.zeros(np.asarray(self._G).shape[0])) else: G_mat = None h_vec = None sol = solvers.qp(P, q, G_mat, h_vec, A_mat, b_vec) if sol["status"] != "optimal": raise InfeasibleOptimizationError("Max Sharpe portfolio is infeasible.") y = np.asarray(sol["x"]).ravel() y_sum = y.sum() if abs(y_sum) < 1e-12: raise InfeasibleOptimizationError("Max Sharpe portfolio has zero weight sum (degenerate).") w = y / y_sum ret = float(np.dot(np.asarray(self._mean).ravel(), w)) risk = float(np.sqrt(w @ np.asarray(self._cov, dtype=float) @ w)) return OptimizationResult(weights=w, nominal_return=ret, risk=risk)
# =================================================================== # # 2. MEAN-CVaR # =================================================================== #
[docs] class MeanCVaR(_FrontierMixin, _BaseOptimization): r""" **Rockafellar-Uryasev CVaR optimisation** with optional proportional costs. Formulation ----------- For level :math:`\alpha\in(0,1)` define the *conditional value-at-risk* .. math:: \operatorname{CVaR}_{\alpha}(Z)= \min_{c\in\mathbb R}\; c+\frac1\alpha\,\mathbb E[(Z-c)_{+}]. Substituting :math:`Z=-R\,w` and applying the *sample average* approximation produces the *LP* .. math:: \begin{aligned} \min_{w,c,\xi}\quad & c + \tfrac1\alpha\,p^{\top}\xi + c^{\top}(u^{+}+u^{-}) \\[4pt] \text{s.t.}\quad & \xi \;\ge\; -R\,w - c\mathbf1, \\[2pt] & \xi \;\ge\; 0, \\ & w = w_0 + u^{+}-u^{-},\;u^{+},u^{-}\ge 0, \\ & \mathbf1^{\top}w = 1,\; G w \le h,\; A w = b. \end{aligned} Args: R, p: Scenario matrix and probabilities. alpha: Tail probability :math:`\\alpha` (e.g. ``0.05`` = 95% CVaR). initial_weights, proportional_costs: Activate linear turnover frictions only when both are provided. """ def __init__( self, R: npt.ArrayLike, p: npt.ArrayLike, alpha: float, G: Optional[npt.ArrayLike] = None, h: Optional[npt.ArrayLike] = None, A: Optional[npt.ArrayLike] = None, b: Optional[npt.ArrayLike] = None, *, initial_weights: Optional[npt.NDArray[np.floating]] = None, proportional_costs: Optional[npt.NDArray[np.floating]] = None, ): """Create a mean-CVaR optimizer instance. Args: R: Scenario matrix with shape ``(T, N)``. p: Scenario probabilities with shape ``(T,)``. alpha: CVaR tail probability (e.g., ``0.05``). G, h: Inequality constraints ``G w <= h``. A, b: Equality constraints ``A w = b``. initial_weights: Optional initial portfolio for proportional costs. proportional_costs: Optional linear cost vector (requires ``initial_weights``). """ R, p = np.asarray(R, float), np.asarray(p, float) T, N = R.shape self._alpha = float(alpha) self._has_costs = initial_weights is not None and proportional_costs is not None super()._init_constraints(p @ R, G, h, A, b) base_len = N + 1 + T extra_len = 2 * N if self._has_costs else 0 # objective turnover_cost = ( _linear_turnover_blocks(N, T, initial_weights, proportional_costs)[0] if self._has_costs else [] ) self._c = matrix(np.hstack((np.zeros(N), 1.0, p / alpha, turnover_cost))) # --- inequality blocks -------------------------------------- # G_blocks, h_blocks = [], [] # 1) \xi >= -R w - c G_blocks += [ np.concatenate( (-R, -np.ones((T, 1)), -np.eye(T), np.zeros((T, extra_len))), axis=1 ), # 2) \xi >= 0 np.concatenate( (np.zeros((T, N + 1)), -np.eye(T), np.zeros((T, extra_len))), axis=1 ), ] h_blocks += [np.zeros(T), np.zeros(T)] # 3) turnover cost cone u^+,u^- >= 0 if self._has_costs: G_blocks.append( np.concatenate((np.zeros((2 * N, base_len)), -np.eye(2 * N)), axis=1) ) h_blocks.append(np.zeros(2 * N)) # 4) user-supplied G w <= h if G is not None: G_blocks.append( np.concatenate((G, np.zeros((G.shape[0], 1 + T + extra_len))), axis=1) ) h_blocks.append(h) self._G, self._h = matrix(np.vstack(G_blocks)), matrix(np.hstack(h_blocks)) # --- equalities --------------------------------------------- # if self._has_costs: c_cost, A_trade, b_trade = _linear_turnover_blocks( N, T, initial_weights, proportional_costs ) if A is not None: self._A = matrix( np.vstack( [ np.concatenate((A, np.zeros((A.shape[0], 1 + T + extra_len))), axis=1), A_trade, ] ) ) self._b = matrix(np.hstack([b, b_trade])) else: self._A, self._b = matrix(A_trade), matrix(b_trade) elif A is not None: self._A = matrix( np.concatenate((A, np.zeros((A.shape[0], 1 + T))), axis=1) ) self._finalise_expected_row(1 + T + extra_len) # ---------------------------------------------------------------- # # core solver # ---------------------------------------------------------------- # def _solve_target(self, return_target: float | None = None) -> np.ndarray: r""" Solve the CVaR *LP* for a given target return ``\tau`` (or *min-CVaR* portfolio if ``\tau is None``). """ G, h = self._G, self._h if return_target is not None: G = self._expected_row if G is None else matrix([G, self._expected_row]) h = matrix([-return_target]) if h is None else matrix( np.append(np.asarray(h).ravel(), -return_target) ) sol = solvers.lp(self._c, G, h, self._A, self._b, solver="glpk") self._assert_optimal(sol, "LP") return np.asarray(sol["x"]).ravel()[: self._I] efficient_portfolio = _solve_target
[docs] def efficient_frontier(self, num_portfolios: int) -> np.ndarray: """ Return the CVaR efficient frontier with ``num_portfolios`` vertices. Args: num_portfolios: Number of portfolios on the frontier. Returns: np.ndarray: Weight matrix with shape ``(N, num_portfolios)``. """ first = self._solve_target(None) max_ret = self._max_expected_return() return self._frontier(first, self._solve_target, num_portfolios, self._mean, max_ret)
# =================================================================== # # 3. ROBUST MEAN-VARIANCE (SOCP) # =================================================================== #
[docs] class RobustOptimizer(_BaseOptimization): r""" RobustOptimizer =============== Single-period **mean-variance allocator** that immunises the portfolio against estimation error in the *expected-return vector* while leaving the covariance matrix untouched. The model is the direct implementation of the ellipsoidal framework put forward in * **Goldfarb & Iyengar** - "Robust Portfolio Selection Problems", *Math. Oper. Res.* 28 (1), 1-38 (2003) * **Meucci** - *Risk & Asset Allocation*, Ch. 9 (Springer, 2005) and the robust-Bayesian extension in SSRN 681553 (2011) ------------------------------------------------------------------------ 1 Ellipsoidal ambiguity set ------------------------------------------------------------------------ We assume the unknown mean :math:`\mu` lies in .. math:: \mathcal U(\hat\mu,S,q) := \bigl\{\mu\in\mathbb R^{N}\;|\; \lVert S^{-1/2}(\mu-\hat\mu)\rVert_2 \le q\bigr\}, where * :math:`\hat\mu` -- point estimate (MLE, posterior mean, ...) * :math:`S\succ0` -- mean-uncertainty scatter matrix :math:`S_\mu`. For NIW posteriors this is :math:`S_\mu = \frac{1}{T_1}\frac{\nu_1}{\nu_1-2}\Sigma_1` (Meucci Eq. 9.151), **not** the posterior covariance :math:`\Sigma_1`. * :math:`q` -- radius, usually :math:`\sqrt{\chi^2_N(1-\alpha)}` for a :math:`100(1-\alpha)\%` credible set. ------------------------------------------------------------------------ 2 SOCP reformulation ------------------------------------------------------------------------ Goldfarb-Iyengar (Thm 3.1) show .. math:: \min_{\mu\in\mathcal U}w^{\top}\mu = \hat\mu^{\top}w-q\,\lVert S^{1/2}w\rVert_2, so the worst-case mean is a *linear* term minus a 2-norm penalty. Introducing an epigraph variable :math:`t` gives the cone programme .. math:: \min_{w,t}\;t+\lambda\lVert S^{1/2}w\rVert_2 \;\;\text{s.t.}\;\;t\ge-\hat\mu^{\top}w,\;w\!\in\!C, which CVXOPT solves as a **second-order cone programme** (type `"q"`). ------------------------------------------------------------------------ 3. Two parameterisations ------------------------------------------------------------------------ ``solve_lambda_variant(lam)`` Direct penalty :math:`\lambda`. Higher \lambda => stronger shrinkage toward the minimum-uncertainty portfolio. ``solve_gamma_variant(gamma_mu, gamma_sigma_sq)`` Uses ``gamma_mu`` as the penalty weight and optionally caps the uncertainty radius via ``t^2 \le \gamma_{\sigma}^{2}`` (implemented as ``t <= sqrt(gamma_sigma_sq)``). Both wrappers feed the same private routine :py:meth:`_solve_socp`. ------------------------------------------------------------------------ 4. Optional proportional turnover ------------------------------------------------------------------------ Passing *both* ``initial_weights`` *and* ``proportional_costs`` activates the linear-cost mechanism of Lobo et al. (2007). The decision vector becomes .. math:: (w,\;t,\;u^{+},u^{-})\in\mathbb R^{\,3N+1}, with inventory balance :math:`w=w_0+u^{+}-u^{-},\;u^{+},u^{-}\ge0`. ------------------------------------------------------------------------ 5. Limitations ------------------------------------------------------------------------ * Only mean uncertainty is modelled; covariance risk would require an SDP. * The ellipsoid is static; time-varying radii must be supplied upstream. * Extremely ill-conditioned ``uncertainty_cov`` can trigger numerical warnings in CVXOPT. """ def __init__( self, expected_return: npt.NDArray[np.floating], uncertainty_cov: npt.NDArray[np.floating], G: Optional[npt.ArrayLike] = None, h: Optional[npt.ArrayLike] = None, A: Optional[npt.ArrayLike] = None, b: Optional[npt.ArrayLike] = None, *, initial_weights: Optional[npt.NDArray[np.floating]] = None, proportional_costs: Optional[npt.NDArray[np.floating]] = None, ): """Create a robust mean optimizer instance. Args: expected_return: Nominal mean vector :math:`\\hat\\mu` (e.g. posterior mean :math:`\\mu_1`). uncertainty_cov: Mean-uncertainty scatter matrix :math:`S_\\mu`. For NIW posteriors use ``RobustBayesPosterior.s_mu``, **not** the posterior covariance ``sigma`` or ``sigma1``. G, h: Inequality constraints ``G w <= h``. A, b: Equality constraints ``A w = b``. initial_weights: Optional initial portfolio for proportional costs. proportional_costs: Optional linear cost vector (requires ``initial_weights``). """ super()._init_constraints(expected_return, G, h, A, b) self._s_sqrt = _cholesky_pd(np.asarray(uncertainty_cov, float)) self._w0, self._costs = initial_weights, proportional_costs self._has_costs = initial_weights is not None and proportional_costs is not None if self._has_costs: _check_shapes(initial_weights=initial_weights, proportional_costs=proportional_costs) # ---------------------------------------------------------------- # # public wrappers # ---------------------------------------------------------------- #
[docs] def solve_lambda_variant(self, lam: float) -> OptimizationResult: r""" Solve the *\lambda-variant*: .. math:: \min_{w,t}\;t + \lambda\|S^{1/2}w\|_2 \quad\text{s.t.}\; t \ge -\hat\mu^{\top}w,\;\ldots """ if lam < 0: raise ValueError(r"\lambda must be non-negative.") return self._solve_socp(lam=lam)
[docs] def solve_gamma_variant(self, gamma_mu: float, gamma_sigma_sq: float) -> OptimizationResult: r""" Solve the \gamma-variant with a penalty weight and an optional radius cap. ``gamma_mu`` acts as the penalty weight on the uncertainty radius and ``gamma_sigma_sq`` caps :math:`t^2` (implemented as ``t <= sqrt(gamma_sigma_sq)``). """ if gamma_mu < 0 or gamma_sigma_sq < 0: raise ValueError(r"\gamma must be non-negative.") return self._solve_socp(gamma_mu=gamma_mu, gamma_sigma_sq=gamma_sigma_sq)
[docs] def efficient_frontier( self, lambdas: Sequence[float] ) -> Tuple[list[float], list[float], npt.NDArray[np.floating]]: """ Sweep a list of ``lambdas`` and return * nominal returns, * robust radii (``t*``), * and a weight matrix ``(N, len(lambdas))``. Args: lambdas: Sequence of penalty parameters. Returns: tuple: Returns list, risk radius list, and weight matrix. """ res = [self.solve_lambda_variant(l) for l in lambdas] return ( [r.nominal_return for r in res], [r.risk for r in res], np.column_stack([r.weights for r in res]), )
# ---------------------------------------------------------------- # # core SOCP # ---------------------------------------------------------------- # def _solve_socp(self, **kw) -> OptimizationResult: r""" Internal driver - constructs and solves the *conic* form shared by both \lambda- and \gamma-variants. Never call directly. """ n = self._I extra = 2 * n if self._has_costs else 0 n_vars = n + 1 + extra # w | t | (u^+,u^-) penalty = kw.get("lam", kw.get("gamma_mu")) c = np.hstack( (-self._mean, penalty, (self._costs, self._costs) if self._has_costs else []) ) # --- SOC: ||S^{1/2}w||_2 <= t ------------------------------- # G_soc = np.zeros((n + 1, n_vars)) G_soc[0, n] = -1 G_soc[1:, :n] = -self._s_sqrt h_soc = np.zeros(n + 1) # --- linear inequalities ----------------------------------- # if self._has_costs: G_user = ( np.concatenate((np.asarray(self._G), np.zeros((self._G.size[0], 1 + extra))), axis=1) if self._G is not None else np.empty((0, n_vars)) ) G_lin = np.vstack( [G_user, np.concatenate((np.zeros((extra, n + 1)), -np.eye(extra)), axis=1)] ) h_lin = ( np.hstack([np.asarray(self._h).ravel(), np.zeros(extra)]) if self._h is not None else np.zeros(G_lin.shape[0]) ) else: G_lin = ( np.concatenate((np.asarray(self._G), np.zeros((self._G.size[0], 1))), axis=1) if self._G is not None else np.empty((0, n_vars)) ) h_lin = np.asarray(self._h).ravel() if self._h is not None else np.zeros(G_lin.shape[0]) # optional *gamma_sigma_sq* translates to an upper bound on *t* if "gamma_sigma_sq" in kw: G_lin = np.vstack([G_lin, np.eye(1, n_vars, n)]) h_lin = np.hstack([h_lin, np.sqrt(kw["gamma_sigma_sq"])]) # --- equalities -------------------------------------------- # if self._has_costs: A_trade = np.concatenate( (np.eye(n), np.zeros((n, 1)), -np.eye(n), np.eye(n)), axis=1 ) b_trade = self._w0 if self._A is not None: A_mat = np.vstack( [ np.concatenate( (np.asarray(self._A), np.zeros((self._A.size[0], 1 + extra))), axis=1 ), A_trade, ] ) b_vec = np.hstack([np.asarray(self._b).ravel(), b_trade]) else: A_mat, b_vec = A_trade, b_trade else: A_mat, b_vec = ( np.concatenate((np.asarray(self._A), np.zeros((self._A.size[0], 1))), axis=1), np.asarray(self._b).ravel(), ) if self._A is not None else (None, None) sol = solvers.conelp( matrix(c), matrix([matrix(G_lin), matrix(G_soc)]), matrix([matrix(h_lin), matrix(h_soc)]), dims={"l": int(G_lin.shape[0]), "q": [n + 1], "s": []}, A=matrix(A_mat) if A_mat is not None else None, b=matrix(b_vec) if b_vec is not None else None, ) self._assert_optimal(sol, "SOCP") x = np.asarray(sol["x"]).ravel() w, t = x[:n], x[n] return OptimizationResult(w, float(self._mean @ w), float(t))
# =================================================================== # # 4. RELAXED RISK PARITY (SOCP) # =================================================================== #
[docs] class RelaxedRiskParity(_BaseOptimization): r""" Implement the relaxed risk parity model of Gambeta & Kwon (2020). The decision vector is ordered as .. code-block:: text [ x (n) | \zeta (n) | \psi | \gamma | \rho | q ], with ``q`` acting as the auxiliary variable that nests the average-risk constraint into standard SOC blocks. Long-only holdings, non-negative marginal risks, and the regulator variable are enforced explicitly. The formulation minimises :math:`\\psi - \\gamma` subject to: * marginal risk consistency :math:`\\zeta = \\Sigma x`; * budget constraint :math:`\\mathbf{1}^{\\top}x = 1`; * ARC floor :math:`x_i\\zeta_i \\ge \\gamma^2` (realised via rotated second-order cones); * regulated average risk :math:`\\|Lx\\|_2 \\le q`, :math:`\\|(q,\\sqrt{n}\\rho)\\|_2 \\le \\sqrt{n}\\psi`; * diagonal penalty :math:`\\sqrt{\\lambda}\\,\\|D x\\|_2 \\le \\rho` whenever :math:`\\lambda > 0`; * optional return target :math:`\\mu^{\\top}x \\ge R`. All conic blocks are expressed in a solver-ready format compatible with ``cvxopt.solvers.conelp``. """ def __init__( self, mean: npt.ArrayLike, covariance: npt.ArrayLike, G: Optional[npt.ArrayLike] = None, h: Optional[npt.ArrayLike] = None, A: Optional[npt.ArrayLike] = None, b: Optional[npt.ArrayLike] = None, *, risk_budgets: Optional[npt.ArrayLike] = None, ): """Create a relaxed risk parity optimizer instance. Args: mean: Expected return vector. covariance: Covariance matrix. G, h: Inequality constraints ``G w <= h``. A, b: Equality constraints ``A w = b``. risk_budgets: Optional target risk budgets :math:`b_i > 0` with :math:`\\sum b_i = 1`. When provided, the ARC floor becomes :math:`x_i \\zeta_i \\ge b_i \\gamma^2` (risk budgeting). When ``None`` (default), equal budgets :math:`b_i = 1/N` are used, recovering the standard ERC portfolio. """ cov = np.asarray(covariance, dtype=float) if cov.ndim != 2 or cov.shape[0] != cov.shape[1]: raise ValueError("`covariance` must be a square matrix.") cov = 0.5 * (cov + cov.T) super()._init_constraints(mean, G, h, A, b) if cov.shape[0] != self._I: raise ValueError("`mean` and `covariance` dimensions are inconsistent.") self._cov = cov self._chol = _cholesky_pd(cov) diag = np.clip(np.diag(cov), a_min=0.0, a_max=None) self._theta_sqrt = np.sqrt(diag, dtype=float) self._finalise_expected_row(self._I + 4) n = self._I if risk_budgets is not None: rb = np.asarray(risk_budgets, dtype=float).ravel() if rb.shape[0] != n: raise ValueError(f"`risk_budgets` must have length {n}, got {rb.shape[0]}.") if np.any(rb <= 0): raise ValueError("`risk_budgets` must be strictly positive.") if not np.isclose(rb.sum(), 1.0): rb = rb / rb.sum() self._risk_budgets = rb else: self._risk_budgets = np.full(n, 1.0 / n) # ------------------------------------------------------------------ # # core SOCP # ------------------------------------------------------------------ #
[docs] def solve( self, *, lambda_reg: float = 0.0, return_target: float | None = None, min_target: float | None = None, ) -> RelaxedRiskParityResult: r""" Solve the relaxed risk parity SOCP for a given ``lambda_reg`` and optional target return ``return_target``. """ if lambda_reg < 0: raise ValueError("`lambda_reg` must be non-negative.") if return_target is not None and (not np.isfinite(return_target)): raise ValueError("`return_target` must be a finite scalar.") n = self._I x_slice = slice(0, n) zeta_slice = slice(n, 2 * n) psi_idx = 2 * n gamma_idx = 2 * n + 1 rho_idx = 2 * n + 2 q_idx = 2 * n + 3 n_vars = 2 * n + 4 G_user, h_user, A_user, b_user = self._extract_user_constraints() target_used = return_target max_return_value: float | None = None if return_target is not None: max_return_value = self._max_linear_return(G_user, h_user, A_user, b_user) tolerance = max(1e-8, 1e-3 * abs(max_return_value)) if target_used >= max_return_value - tolerance: target_used = max_return_value - tolerance if min_target is not None and target_used is not None: target_used = max(target_used, min_target) c = np.zeros(n_vars, dtype=float) c[psi_idx] = 1.0 c[gamma_idx] = -1.0 # --- equalities ------------------------------------------------ # A_rows: list[np.ndarray] = [] b_rows: list[np.ndarray] = [] A_rows.append( np.hstack( (-self._cov, np.eye(n, dtype=float), np.zeros((n, 4), dtype=float)) ) ) b_rows.append(np.zeros(n, dtype=float)) add_budget_row = True if A_user is not None: ones = np.ones(n, dtype=float) for row, rhs in zip(A_user, b_user): if np.allclose(row, ones, atol=1e-8, rtol=1e-8): add_budget_row = False break if add_budget_row: budget_row = np.zeros((1, n_vars), dtype=float) budget_row[0, x_slice] = 1.0 A_rows.append(budget_row) b_rows.append(np.array([1.0], dtype=float)) if A_user is not None and A_user.size: pad = np.zeros((A_user.shape[0], n + 4), dtype=float) A_rows.append(np.hstack((A_user, pad))) b_rows.append(b_user) A_mat = np.vstack(A_rows) b_vec = np.hstack(b_rows) # --- linear inequalities -------------------------------------- # G_blocks: list[np.ndarray] = [] h_blocks: list[np.ndarray] = [] G_blocks.append( np.hstack( (-np.eye(n, dtype=float), np.zeros((n, n + 4), dtype=float)) ) ) h_blocks.append(np.zeros(n, dtype=float)) G_blocks.append( np.hstack( (np.zeros((n, n), dtype=float), -np.eye(n, dtype=float), np.zeros((n, 4), dtype=float)) ) ) h_blocks.append(np.zeros(n, dtype=float)) for idx in (psi_idx, gamma_idx, rho_idx): row = np.zeros((1, n_vars), dtype=float) row[0, idx] = -1.0 G_blocks.append(row) h_blocks.append(np.zeros(1, dtype=float)) if G_user is not None and G_user.size: pad = np.zeros((G_user.shape[0], n + 4), dtype=float) G_blocks.append(np.hstack((G_user, pad))) h_blocks.append(h_user) if target_used is not None: G_ret = np.zeros((1, n_vars), dtype=float) G_ret[0, x_slice] = -self._mean G_blocks.append(G_ret) h_blocks.append(np.array([-target_used], dtype=float)) if G_blocks: G_lin = np.vstack(G_blocks) h_lin = np.hstack(h_blocks) else: G_lin = np.empty((0, n_vars), dtype=float) h_lin = np.empty(0, dtype=float) # --- SOC blocks ----------------------------------------------- # soc_dims: list[int] = [] G_soc_all: list[np.ndarray] = [] h_soc_all: list[np.ndarray] = [] G1 = np.zeros((n + 1, n_vars), dtype=float) G1[0, q_idx] = -1.0 G1[1:, x_slice] = -self._chol G_soc_all.append(G1) h_soc_all.append(np.zeros(n + 1, dtype=float)) soc_dims.append(n + 1) sqrt_n = float(np.sqrt(n)) G2 = np.zeros((3, n_vars), dtype=float) G2[0, psi_idx] = -sqrt_n G2[1, q_idx] = -1.0 G2[2, rho_idx] = -sqrt_n G_soc_all.append(G2) h_soc_all.append(np.zeros(3, dtype=float)) soc_dims.append(3) if lambda_reg > 0.0: G3 = np.zeros((n + 1, n_vars), dtype=float) G3[0, rho_idx] = -1.0 coeffs = np.diag(np.sqrt(lambda_reg) * self._theta_sqrt, k=0) G3[1:, x_slice] = -coeffs G_soc_all.append(G3) h_soc_all.append(np.zeros(n + 1, dtype=float)) soc_dims.append(n + 1) # ARC floor: x_i * zeta_i >= b_i * gamma^2 # Encoded via rotated SOC: sqrt(4*b_i*gamma^2 + (x_i - zeta_i)^2) <= x_i + zeta_i for i in range(n): block = np.zeros((3, n_vars), dtype=float) block[0, x_slice.start + i] = -1.0 block[0, zeta_slice.start + i] = -1.0 block[1, gamma_idx] = -2.0 * np.sqrt(self._risk_budgets[i]) block[2, x_slice.start + i] = -1.0 block[2, zeta_slice.start + i] = 1.0 G_soc_all.append(block) h_soc_all.append(np.zeros(3, dtype=float)) soc_dims.append(3) dims = {"l": int(G_lin.shape[0]), "q": soc_dims, "s": []} G_total = ( np.vstack([G_lin, *G_soc_all]) if G_lin.size else np.vstack(G_soc_all) ) h_total = ( np.hstack([h_lin, *h_soc_all]) if h_lin.size else np.hstack(h_soc_all) ) sol = solvers.conelp( matrix(c), matrix(G_total), matrix(h_total), dims=dims, A=matrix(A_mat), b=matrix(b_vec), ) self._assert_optimal(sol, "SOCP") x_opt = np.asarray(sol["x"], dtype=float).ravel() weights = x_opt[x_slice] marginal_risk = x_opt[zeta_slice] psi = float(x_opt[psi_idx]) gamma = float(x_opt[gamma_idx]) rho = float(x_opt[rho_idx]) return RelaxedRiskParityResult( weights=weights, marginal_risk=marginal_risk, psi=psi, gamma=gamma, rho=rho, objective=psi - gamma, target_return=target_used, max_return=max_return_value, )
def _extract_user_constraints( self, ) -> tuple[ Optional[np.ndarray], Optional[np.ndarray], Optional[np.ndarray], Optional[np.ndarray], ]: """Return user-supplied linear constraint matrices as NumPy arrays. Returns: tuple: ``(G, h, A, b)`` arrays or ``None`` when missing. """ n = self._I G_user: Optional[np.ndarray] = None h_user: Optional[np.ndarray] = None if self._G is not None: G_user = np.asarray(self._G, dtype=float) if G_user.ndim != 2: raise ValueError("`G` must be a 2D array.") if G_user.shape[1] != n: raise ValueError("`G` must have `n` columns.") h_user = np.asarray(self._h, dtype=float).ravel() if h_user.size != G_user.shape[0]: raise ValueError("`h` dimension mismatch.") A_user: Optional[np.ndarray] = None b_user: Optional[np.ndarray] = None if self._A is not None: A_user = np.asarray(self._A, dtype=float) if A_user.ndim != 2: raise ValueError("`A` must be a 2D array.") if A_user.shape[1] != n: raise ValueError("`A` must have `n` columns.") b_user = np.asarray(self._b, dtype=float).ravel() if b_user.size != A_user.shape[0]: raise ValueError("`b` dimension mismatch.") return G_user, h_user, A_user, b_user def _max_linear_return( self, G_user: Optional[np.ndarray], h_user: Optional[np.ndarray], A_user: Optional[np.ndarray], b_user: Optional[np.ndarray], ) -> float: """Solve a linear program to compute the maximum attainable return. Args: G_user, h_user: Optional inequality constraints. A_user, b_user: Optional equality constraints. Returns: float: Maximum feasible expected return. """ n = self._I G_lp_rows: list[np.ndarray] = [] h_lp_vals: list[np.ndarray] = [] if G_user is not None and G_user.size: G_lp_rows.append(G_user) h_lp_vals.append(h_user) G_lp_rows.append(-np.eye(n, dtype=float)) h_lp_vals.append(np.zeros(n, dtype=float)) G_lp = np.vstack(G_lp_rows) h_lp = np.hstack(h_lp_vals) if A_user is None: A_lp = np.ones((1, n), dtype=float) b_lp = np.array([1.0], dtype=float) else: A_lp, b_lp = A_user, b_user sol_lp = solvers.lp( matrix(-self._mean), matrix(G_lp), matrix(h_lp), matrix(A_lp), matrix(b_lp), solver="glpk", ) self._assert_optimal(sol_lp, "LP") return -sol_lp["primal objective"]