Source code for pyvallocation.utils.constraints

from __future__ import annotations

import logging
import numbers
import warnings
from dataclasses import dataclass
from typing import (
    Any,
    Dict,
    List,
    Optional,
    Sequence,
    Tuple,
    Union,
)

import numpy as np

logger = logging.getLogger(__name__)

Number = Union[int, float]
BoundsLike = Union[
    Tuple[Number, Number],
    Sequence[Tuple[Number, Number]],
    Dict[int, Tuple[Number, Number]],
]
RelBound = Tuple[int, int, Number]
EqRow = Tuple[Sequence[Number], Number]


[docs] @dataclass(frozen=True) class Constraints: """Typed, immutable constraint specification for portfolio optimisers. All fields have sensible defaults so that ``Constraints()`` produces a long-only, fully-invested constraint set. Examples: >>> c = Constraints() # long-only, sum=1 >>> c = Constraints(bounds=(0.0, 0.3)) # per-asset cap at 30% >>> c = Constraints(group_constraints={ ... "Equity": ([0, 1, 2], 0.2, 0.6), # 20-60% in equities ... }) >>> c = Constraints.from_dict({"long_only": True, "total_weight": 1.0}) """ long_only: bool = True total_weight: Optional[Number] = 1.0 bounds: Optional[BoundsLike] = None relative_bounds: Optional[Sequence[RelBound]] = None group_constraints: Optional[Dict[str, Tuple[Sequence[int], Number, Number]]] = None additional_G_h: Optional[Sequence[Tuple[Sequence[Number], Number]]] = None additional_A_b: Optional[Sequence[EqRow]] = None
[docs] @classmethod def from_dict(cls, d: Dict[str, Any]) -> "Constraints": """Create a Constraints instance from a dictionary. Unrecognised keys are silently ignored for backward compatibility. """ import dataclasses valid_keys = {f.name for f in dataclasses.fields(cls)} filtered = {k: v for k, v in d.items() if k in valid_keys} return cls(**filtered)
[docs] def to_matrices(self, n_assets: int) -> Tuple[ Optional[np.ndarray], Optional[np.ndarray], Optional[np.ndarray], Optional[np.ndarray], ]: """Build CVXOPT-compatible (G, h, A, b) constraint matrices. Args: n_assets: Number of portfolio assets. Returns: Tuple of (G, h, A, b) arrays or None. """ return build_G_h_A_b( n_assets, total_weight=self.total_weight, long_only=self.long_only, bounds=self.bounds, relative_bounds=self.relative_bounds, group_constraints=self.group_constraints, additional_G_h=self.additional_G_h, additional_A_b=self.additional_A_b, )
def _check_number(x: Number, name: str) -> None: """Validate that ``x`` is a finite real number. Args: x: Candidate numeric value. name: Parameter name for error messaging. Raises: TypeError: If ``x`` is not a real number. ValueError: If ``x`` is not finite. """ if not isinstance(x, numbers.Real): logger.error("%s must be a real number, got %s", name, type(x)) raise TypeError(f"{name} must be a real number, got {type(x)}") if not np.isfinite(x): logger.error("%s must be finite, got %s", name, x) raise ValueError(f"{name} must be finite, got {x}")
[docs] def build_G_h_A_b( n_assets: int, *, total_weight: Optional[Number] = 1.0, long_only: bool = True, bounds: Optional[BoundsLike] = None, relative_bounds: Optional[Sequence[RelBound]] = None, group_constraints: Optional[Dict[str, Tuple[Sequence[int], Number, Number]]] = None, additional_G_h: Optional[Sequence[Tuple[Sequence[Number], Number]]] = None, additional_A_b: Optional[Sequence[EqRow]] = None, return_none_if_empty: bool = True, ) -> Tuple[ Optional[np.ndarray], Optional[np.ndarray], Optional[np.ndarray], Optional[np.ndarray], ]: """ Assemble inequality and equality constraints for portfolio optimisers. The helper converts high-level constraint specifications (long-only, box bounds, pairwise relative bounds, and custom rows) into the ``G``, ``h``, ``A`` and ``b`` matrices expected by CVXOPT-based solvers. Args: n_assets: Number of portfolio weights to constrain. total_weight: Target sum of weights. ``None`` disables the equality row. long_only: When ``True`` append ``-I w <= 0`` so weights remain non-negative. bounds: Either a single ``(lower, upper)`` tuple applied to every asset, a sequence of per-asset tuples, or a mapping ``asset -> (lower, upper)``. relative_bounds: Sequence of triples ``(i, j, bound)`` implementing ``w_i - w_j <= bound`` style constraints. group_constraints: Mapping of group name to ``(indices, lower, upper)`` tuples. Each entry constrains the sum of weights in the group to lie between ``lower`` and ``upper``. additional_G_h: Extra inequality rows supplied as ``(row, rhs)`` pairs. additional_A_b: Extra equality rows supplied as ``(row, rhs)`` pairs. return_none_if_empty: When ``True`` return ``None`` instead of empty arrays. Returns: Tuple ``(G, h, A, b)`` suitable for CVXOPT/QP front-end functions. Each entry is ``None`` when no constraint of that type is required. """ if not isinstance(n_assets, int) or n_assets <= 0: logger.error("n_assets must be a positive integer, got %s", n_assets) raise ValueError("n_assets must be a positive integer") if total_weight is not None: _check_number(total_weight, "total_weight") if total_weight == 0: logger.warning("total_weight=0 creates a degenerate all-zero portfolio.") G_rows: List[np.ndarray] = [] h_vals: List[Number] = [] A_rows: List[np.ndarray] = [] b_vals: List[Number] = [] if long_only: G_rows.append(-np.eye(n_assets)) h_vals.extend([0.0] * n_assets) if bounds is not None: if isinstance(bounds, tuple): if len(bounds) != 2: logger.error( "bounds tuple must be (lower, upper), got length %d", len(bounds) ) raise ValueError("bounds tuple must be (lower, upper)") lower, upper = bounds if lower is not None: _check_number(lower, "lower bound") if upper is not None: _check_number(upper, "upper bound") if lower is not None and upper is not None and lower > upper: logger.error("lower bound %s greater than upper bound %s", lower, upper) raise ValueError("lower bound greater than upper bound") if lower is not None: G_rows.append(-np.eye(n_assets)) h_vals.extend([-lower] * n_assets) if upper is not None: G_rows.append(np.eye(n_assets)) h_vals.extend([upper] * n_assets) elif isinstance(bounds, dict): for idx, lu in bounds.items(): if not (0 <= idx < n_assets): logger.error( "asset index %d out of range (0..%d)", idx, n_assets - 1 ) raise IndexError( f"asset index {idx} out of range (0..{n_assets-1})" ) if len(lu) != 2: logger.error( "each bounds value must be (lower, upper), got length %d", len(lu), ) raise ValueError("each bounds value must be (lower, upper)") lower, upper = lu if lower is not None: _check_number(lower, f"lower bound for asset {idx}") if upper is not None: _check_number(upper, f"upper bound for asset {idx}") if lower is not None and upper is not None and lower > upper: logger.error( "asset %d: lower bound %s > upper bound %s", idx, lower, upper ) raise ValueError(f"asset {idx}: lower bound > upper bound") if lower is not None: row = np.zeros(n_assets) row[idx] = -1 G_rows.append(row) h_vals.append(-lower) if upper is not None: row = np.zeros(n_assets) row[idx] = 1 G_rows.append(row) h_vals.append(upper) else: bounds_seq = list(bounds) # type: ignore[arg-type] if len(bounds_seq) != n_assets: logger.error( "bounds list length %d must equal n_assets %d", len(bounds_seq), n_assets, ) raise ValueError("bounds list length must equal n_assets") for idx, (lower, upper) in enumerate(bounds_seq): if lower is not None: _check_number(lower, f"lower bound for asset {idx}") if upper is not None: _check_number(upper, f"upper bound for asset {idx}") if lower is not None and upper is not None and lower > upper: logger.error( "asset %d: lower bound %s > upper bound %s", idx, lower, upper ) raise ValueError(f"asset {idx}: lower bound > upper bound") if lower is not None: row = np.zeros(n_assets) row[idx] = -1 G_rows.append(row) h_vals.append(-lower) if upper is not None: row = np.zeros(n_assets) row[idx] = 1 G_rows.append(row) h_vals.append(upper) if relative_bounds is not None: for triple in relative_bounds: if len(triple) != 3: logger.error( "each relative_bounds entry must be (i, j, k), got length %d", len(triple), ) raise ValueError("each relative_bounds entry must be (i, j, k)") i, j, k = triple if not (0 <= i < n_assets and 0 <= j < n_assets): logger.error("relative_bounds indices %d,%d out of range", i, j) raise IndexError(f"relative_bounds indices {i},{j} out of range") if i == j: logger.error("relative_bounds: i and j must differ, got i=j=%d", i) raise ValueError(f"relative_bounds: i and j must differ (got i=j={i})") _check_number(k, "bound in relative_bounds") row = np.zeros(n_assets) row[i] = 1 row[j] = -1 G_rows.append(row) h_vals.append(float(k)) if group_constraints is not None: for name, (indices, lo, hi) in group_constraints.items(): idx = list(indices) if not all(0 <= i < n_assets for i in idx): raise IndexError(f"group '{name}' contains out-of-range asset indices.") _check_number(lo, f"lower bound for group '{name}'") _check_number(hi, f"upper bound for group '{name}'") if lo > hi: raise ValueError(f"group '{name}': lower bound {lo} > upper bound {hi}.") # sum(w_i for i in group) <= hi row_upper = np.zeros(n_assets) row_upper[idx] = 1.0 G_rows.append(row_upper) h_vals.append(float(hi)) # sum(w_i for i in group) >= lo → -sum(w_i) <= -lo row_lower = np.zeros(n_assets) row_lower[idx] = -1.0 G_rows.append(row_lower) h_vals.append(float(-lo)) if additional_G_h is not None: for row, rhs in additional_G_h: row_arr = np.asarray(row, dtype=float) if row_arr.size != n_assets: logger.error("additional_G_h row length %d mismatch", row_arr.size) raise ValueError("additional_G_h row length mismatch") _check_number(rhs, "rhs in additional_G_h") G_rows.append(row_arr) h_vals.append(rhs) if total_weight is not None: A_rows.append(np.ones(n_assets)) b_vals.append(total_weight) if additional_A_b is not None: for row, rhs in additional_A_b: row_arr = np.asarray(row, dtype=float) if row_arr.size != n_assets: logger.error("additional_A_b row length %d mismatch", row_arr.size) raise ValueError("additional_A_b row length mismatch") _check_number(rhs, "rhs in additional_A_b") A_rows.append(row_arr) b_vals.append(rhs) if not long_only and bounds is None and not relative_bounds and not additional_G_h: warnings.warn( "No position bounds given and long_only=False - feasible set may be " "unbounded -> optimisation can fail.", UserWarning, stacklevel=2, ) logger.warning( "No position bounds given and long_only=False - feasible set may be unbounded." ) if ( long_only and bounds is not None and isinstance(bounds, tuple) and bounds[0] is not None and bounds[0] == 0 ): warnings.warn( "long_only=True already enforces w >= 0; supplying lower bound = 0 " "duplicates that constraint.", UserWarning, stacklevel=2, ) def _stack(rows: List[np.ndarray]) -> Optional[np.ndarray]: """Stack constraint rows into a 2D array or return ``None`` if empty. Args: rows: List of row arrays. Returns: Optional[np.ndarray]: Stacked 2D array or ``None``. """ if rows: return np.vstack(rows) return None if return_none_if_empty else np.zeros((0, n_assets)) G = _stack(G_rows) h = ( np.asarray(h_vals) if h_vals else (None if return_none_if_empty else np.zeros(0)) ) A = _stack(A_rows) b = ( np.asarray(b_vals, float) if b_vals else (None if return_none_if_empty else np.zeros(0)) ) logger.debug("Built constraint matrices G, h, A, b.") return G, h, A, b