Source code for pyvallocation.utils.constraints

from __future__ import annotations

import logging
import numbers
import warnings
from typing import (
    Dict,
    List,
    Optional,
    Sequence,
    Tuple,
    Union,
)

import numpy as np

logger = logging.getLogger(__name__)

Number = Union[int, float]
BoundsLike = Union[
    Tuple[Number, Number],
    Sequence[Tuple[Number, Number]],
    Dict[int, Tuple[Number, Number]],
]
RelBound = Tuple[int, int, Number]
EqRow = Tuple[Sequence[Number], Number]

def _check_number(x: Number, name: str) -> None:
    if not isinstance(x, numbers.Real):
        logger.error("%s must be a real number, got %s", name, type(x))
        raise TypeError(f"{name} must be a real number, got {type(x)}")
    if not np.isfinite(x):
        logger.error("%s must be finite, got %s", name, x)
        raise ValueError(f"{name} must be finite, got {x}")


[docs] def build_G_h_A_b( n_assets: int, *, total_weight: Optional[Number] = 1.0, long_only: bool = True, bounds: Optional[BoundsLike] = None, relative_bounds: Optional[Sequence[RelBound]] = None, additional_G_h: Optional[Sequence[Tuple[Sequence[Number], Number]]] = None, additional_A_b: Optional[Sequence[EqRow]] = None, return_none_if_empty: bool = True, ) -> Tuple[ Optional[np.ndarray], Optional[np.ndarray], Optional[np.ndarray], Optional[np.ndarray], ]: if not isinstance(n_assets, int) or n_assets <= 0: logger.error("n_assets must be a positive integer, got %s", n_assets) raise ValueError("n_assets must be a positive integer") if total_weight is not None: _check_number(total_weight, "total_weight") G_rows: List[np.ndarray] = [] h_vals: List[Number] = [] A_rows: List[np.ndarray] = [] b_vals: List[Number] = [] if long_only: G_rows.append(-np.eye(n_assets)) h_vals.extend([0.0] * n_assets) if bounds is not None: if isinstance(bounds, tuple): if len(bounds) != 2: logger.error( "bounds tuple must be (lower, upper), got length %d", len(bounds) ) raise ValueError("bounds tuple must be (lower, upper)") lower, upper = bounds if lower is not None: _check_number(lower, "lower bound") if upper is not None: _check_number(upper, "upper bound") if lower is not None and upper is not None and lower > upper: logger.error("lower bound %s greater than upper bound %s", lower, upper) raise ValueError("lower bound greater than upper bound") if lower is not None: G_rows.append(-np.eye(n_assets)) h_vals.extend([-lower] * n_assets) if upper is not None: G_rows.append(np.eye(n_assets)) h_vals.extend([upper] * n_assets) elif isinstance(bounds, dict): for idx, lu in bounds.items(): if not (0 <= idx < n_assets): logger.error( "asset index %d out of range (0..%d)", idx, n_assets - 1 ) raise IndexError( f"asset index {idx} out of range (0..{n_assets-1})" ) if len(lu) != 2: logger.error( "each bounds value must be (lower, upper), got length %d", len(lu), ) raise ValueError("each bounds value must be (lower, upper)") lower, upper = lu if lower is not None: _check_number(lower, f"lower bound for asset {idx}") if upper is not None: _check_number(upper, f"upper bound for asset {idx}") if lower is not None and upper is not None and lower > upper: logger.error( "asset %d: lower bound %s > upper bound %s", idx, lower, upper ) raise ValueError(f"asset {idx}: lower bound > upper bound") if lower is not None: row = np.zeros(n_assets) row[idx] = -1 G_rows.append(row) h_vals.append(-lower) if upper is not None: row = np.zeros(n_assets) row[idx] = 1 G_rows.append(row) h_vals.append(upper) else: bounds_seq = list(bounds) # type: ignore[arg-type] if len(bounds_seq) != n_assets: logger.error( "bounds list length %d must equal n_assets %d", len(bounds_seq), n_assets, ) raise ValueError("bounds list length must equal n_assets") for idx, (lower, upper) in enumerate(bounds_seq): if lower is not None: _check_number(lower, f"lower bound for asset {idx}") if upper is not None: _check_number(upper, f"upper bound for asset {idx}") if lower is not None and upper is not None and lower > upper: logger.error( "asset %d: lower bound %s > upper bound %s", idx, lower, upper ) raise ValueError(f"asset {idx}: lower bound > upper bound") if lower is not None: row = np.zeros(n_assets) row[idx] = -1 G_rows.append(row) h_vals.append(-lower) if upper is not None: row = np.zeros(n_assets) row[idx] = 1 G_rows.append(row) h_vals.append(upper) if relative_bounds is not None: for triple in relative_bounds: if len(triple) != 3: logger.error( "each relative_bounds entry must be (i, j, k), got length %d", len(triple), ) raise ValueError("each relative_bounds entry must be (i, j, k)") i, j, k = triple if not (0 <= i < n_assets and 0 <= j < n_assets): logger.error("relative_bounds indices %d,%d out of range", i, j) raise IndexError(f"relative_bounds indices {i},{j} out of range") _check_number(k, "k in relative bound") if k < 0: logger.error("k in relative bound must be non-negative, got %s", k) raise ValueError("k in relative bound must be non-negative") row = np.zeros(n_assets) row[i] = 1 row[j] = -k G_rows.append(row) h_vals.append(0.0) if additional_G_h is not None: for row, rhs in additional_G_h: row_arr = np.asarray(row, dtype=float) if row_arr.size != n_assets: logger.error("additional_G_h row length %d mismatch", row_arr.size) raise ValueError("additional_G_h row length mismatch") _check_number(rhs, "rhs in additional_G_h") G_rows.append(row_arr) h_vals.append(rhs) if total_weight is not None: A_rows.append(np.ones(n_assets)) b_vals.append(total_weight) if additional_A_b is not None: for row, rhs in additional_A_b: row_arr = np.asarray(row, dtype=float) if row_arr.size != n_assets: logger.error("additional_A_b row length %d mismatch", row_arr.size) raise ValueError("additional_A_b row length mismatch") _check_number(rhs, "rhs in additional_A_b") A_rows.append(row_arr) b_vals.append(rhs) if not long_only and bounds is None and not relative_bounds and not additional_G_h: warnings.warn( "No position bounds given and long_only=False – feasible set may be " "unbounded → optimisation can fail.", UserWarning, stacklevel=2, ) logger.warning( "No position bounds given and long_only=False – feasible set may be unbounded." ) if ( long_only and bounds is not None and isinstance(bounds, tuple) and bounds[0] is not None and bounds[0] >= 0 ): warnings.warn( "long_only=True already enforces w ≥ 0; supplying a non-negative lower " "bound duplicates that constraint.", UserWarning, stacklevel=2, ) logger.warning( "long_only=True already enforces w ≥ 0; supplying a non-negative lower bound duplicates that constraint." ) def _stack(rows: List[np.ndarray]) -> Optional[np.ndarray]: if rows: return np.vstack(rows) return None if return_none_if_empty else np.zeros((0, n_assets)) G = _stack(G_rows) h = ( np.asarray(h_vals) if h_vals else (None if return_none_if_empty else np.zeros(0)) ) A = _stack(A_rows) b = ( np.asarray(b_vals, float) if b_vals else (None if return_none_if_empty else np.zeros(0)) ) logger.debug("Built constraint matrices G, h, A, b.") return G, h, A, b