Source code for pyvallocation.utils.projection

import logging
from typing import Union

import numpy as np
import pandas as pd

logger = logging.getLogger(__name__)


[docs] def project_mean_covariance( mu: Union[np.ndarray, pd.Series], cov: Union[np.ndarray, pd.DataFrame], annualization_factor: float, ) -> tuple[Union[np.ndarray, pd.Series], Union[np.ndarray, pd.DataFrame]]: """Scale mean and covariance by ``annualization_factor``. Args: mu: Mean vector. cov: Covariance matrix. annualization_factor: Scaling factor (e.g., 12 for monthly to annual). Returns: tuple: Scaled mean vector and covariance matrix. """ return mu * annualization_factor, cov * annualization_factor
[docs] def convert_scenarios_compound_to_simple(scenarios: np.ndarray) -> np.ndarray: """Convert compound returns to simple returns. Args: scenarios: Log/compound return scenarios. Returns: np.ndarray: Simple return scenarios. """ return np.exp(scenarios) - 1
[docs] def convert_scenarios_simple_to_compound(scenarios: np.ndarray) -> np.ndarray: """Convert simple returns to compound (log) returns. Args: scenarios: Simple return scenarios. All values must be strictly greater than -1 (i.e., ``1 + r > 0``). Returns: np.ndarray: Log/compound return scenarios. Raises: ValueError: If any scenario has a return <= -1. """ arr = np.asarray(scenarios, dtype=float) if np.any(arr <= -1.0): raise ValueError( "Simple returns must be > -1 for log transformation " f"(min value: {float(np.min(arr)):.6f})." ) return np.log(1 + arr)
def _to_numpy(x): """Return the underlying ndarray (no copy for ndarray). Args: x: NumPy array or pandas object. Returns: np.ndarray: Dense array view/copy. """ return x.to_numpy() if isinstance(x, (pd.Series, pd.DataFrame)) else np.asarray(x) def _wrap_vector(x_np, template): """Wrap 1-D ndarray in the same type as `template` (Series or ndarray). Args: x_np: Vector to wrap. template: Template object (Series or ndarray). Returns: Union[np.ndarray, pd.Series]: Wrapped vector. """ return ( pd.Series(x_np, index=template.index, name=template.name) if isinstance(template, pd.Series) else x_np ) def _wrap_matrix(x_np, template): """Wrap 2-D ndarray in the same type as `template` (DataFrame or ndarray). Args: x_np: Matrix to wrap. template: Template object (DataFrame or ndarray). Returns: Union[np.ndarray, pd.DataFrame]: Wrapped matrix. """ return ( pd.DataFrame(x_np, index=template.index, columns=template.columns) if isinstance(template, pd.DataFrame) else x_np )
[docs] def log2simple(mu_g, cov_g): r"""\mu,\Sigma of log-returns -> \mu,\Sigma of simple returns (vectorised, pandas-aware). Args: mu_g: Mean of log-returns. cov_g: Covariance of log-returns. Returns: Tuple[ArrayLike, ArrayLike]: Mean and covariance in simple-return units. """ mu_g_np = _to_numpy(mu_g) cov_g_np = _to_numpy(cov_g) d = np.diag(cov_g_np) exp_mu = np.exp(mu_g_np + 0.5 * d) mu_r_np = exp_mu - 1 cov_r_np = ( np.exp(mu_g_np[:, None] + mu_g_np + 0.5 * (d[:, None] + d + 2 * cov_g_np)) - exp_mu[:, None] * exp_mu ) return (_wrap_vector(mu_r_np, mu_g), _wrap_matrix(cov_r_np, cov_g))
[docs] def simple2log(mu_r, cov_r): r"""\mu,\Sigma of simple returns -> \mu,\Sigma of log-returns (log-normal assumption). Args: mu_r: Mean of simple returns. cov_r: Covariance of simple returns. Returns: Tuple[ArrayLike, ArrayLike]: Mean and covariance in log-return units. """ mu_r_np = _to_numpy(mu_r) cov_r_np = _to_numpy(cov_r) m = mu_r_np + 1.0 if np.any(m <= 0): raise ValueError( "Mean simple returns must be > -1 for log transformation " f"(min 1+mu: {float(np.min(m)):.6f})." ) var_g = np.log1p(np.diag(cov_r_np) / m**2) mu_g_np = np.log(m) - 0.5 * var_g cov_g_np = np.log1p(cov_r_np / np.outer(m, m)) np.fill_diagonal(cov_g_np, var_g) # keep exact variances return (_wrap_vector(mu_g_np, mu_r), _wrap_matrix(cov_g_np, cov_r))
[docs] def project_scenarios(R, investment_horizon=2, p=None, n_simulations=1000, reprice=None, seed=None): """ Simulate horizon sums by sampling invariants with replacement. Implements **P3 (Projection)** of Meucci's Prayer framework: invariants are bootstrapped over ``investment_horizon`` steps and summed (random walk). If a ``reprice`` callable is supplied, it is applied to the projected risk drivers to obtain P&L scenarios (**P4 Pricing**). Args: R: Historical or simulated invariants (e.g. log-returns, yield changes). One-dimensional inputs represent single-instrument (length ``T``). Two-dimensional inputs represent ``T`` scenarios across ``N`` risk drivers. investment_horizon: Number of draws (with replacement) per simulated path. Defaults to ``2``. p: Scenario probabilities. When omitted, draws are uniform. Length must match the number of rows in ``R``. n_simulations: Number of simulated paths to generate. Defaults to ``1000``. reprice: Optional callable ``f(projected_risk_drivers) -> pnl_scenarios`` that converts projected risk-driver changes into P&L or simple-return scenarios. Built-in options: :func:`reprice_exp` (stocks: ``exp(Δy) - 1``), :func:`reprice_taylor` (greeks/duration: ``θτ + δΔy + ½γΔy²``). seed: Random seed for reproducibility. Defaults to ``None``. Returns: numpy.ndarray or pandas.Series or pandas.DataFrame: Simulated sums whose structure mirrors the input type: * 1-D inputs yield length-``n_simulations`` vectors. * 2-D inputs yield ``(n_simulations, n_assets)`` matrices. Examples: >>> import numpy as np >>> project_scenarios( ... np.array([0.01, -0.02, 0.03]), ... investment_horizon=2, ... n_simulations=4, ... ).shape (4,) >>> import pandas as pd >>> df = pd.DataFrame({"a": [0.01, -0.02], "b": [0.0, 0.02]}) >>> project_scenarios(df, investment_horizon=2, n_simulations=3).shape (3, 2) """ if not isinstance(investment_horizon, (int, np.integer)): investment_horizon = int(investment_horizon) logger.warning("investment_horizon cast to int: %d", investment_horizon) if investment_horizon <= 0: raise ValueError("`investment_horizon` must be a positive integer.") if n_simulations <= 0: raise ValueError("`n_simulations` must be a positive integer.") is_series = isinstance(R, pd.Series) is_dataframe = isinstance(R, pd.DataFrame) R_np = _to_numpy(R) if R_np.ndim not in (1, 2): raise ValueError("`R` must be a 1D or 2D array-like of scenarios.") num_rows = R_np.shape[0] weights = np.asarray(p, dtype=float).reshape(-1) if p is not None else None if weights is None: weights = np.full(num_rows, 1.0 / num_rows, dtype=float) else: if weights.shape[0] != num_rows: raise ValueError("Probability vector length must match the number of scenarios.") if np.any(weights < 0): raise ValueError("Scenario probabilities must be non-negative.") weight_sum = weights.sum() if not np.isfinite(weight_sum) or weight_sum <= 0: raise ValueError("Scenario probabilities must sum to a positive finite value.") if not np.isclose(weight_sum, 1.0): weights = weights / weight_sum rng = np.random.default_rng(seed) idx = rng.choice(num_rows, size=(n_simulations, investment_horizon), p=weights) scenario_sums = R_np[idx].sum(axis=1) if reprice is not None: scenario_sums = reprice(scenario_sums) if is_series: template_ser = pd.Series(dtype=float, index=range(n_simulations), name=R.name) return _wrap_vector(scenario_sums, template_ser) if is_dataframe: template_df = pd.DataFrame(index=range(n_simulations), columns=R.columns) return _wrap_matrix(scenario_sums, template_df) return scenario_sums
# Alias emphasising that inputs can be generic risk drivers, not only returns. project_risk_drivers = project_scenarios
[docs] def simulate_paths( R, horizon: int = 2, n_paths: int = 1000, p=None, reprice=None, seed=None, ): """Simulate full trajectory paths by bootstrapping invariants. Unlike :func:`project_scenarios` which returns only the terminal sum, this function returns the cumulative risk-driver change at **every** intermediate step, enabling drawdown analysis and fan charts. Args: R: Invariant scenarios ``(T, N)`` or ``(T,)`` (e.g. log-returns). horizon: Number of bootstrap steps per path. n_paths: Number of simulated paths. p: Optional scenario probabilities. reprice: Optional callable applied to cumulative risk-driver changes at each step (e.g. ``reprice_exp`` for cumulative simple returns). seed: Random seed for reproducibility. Returns: np.ndarray: Shape ``(n_paths, horizon, N)`` — cumulative risk-driver changes (or repriced values) at each time step. """ R_np = _to_numpy(R) if R_np.ndim == 1: R_np = R_np.reshape(-1, 1) if R_np.ndim != 2: raise ValueError("`R` must be 1D or 2D.") T, N = R_np.shape weights = np.asarray(p, dtype=float).ravel() if p is not None else np.full(T, 1.0 / T) weights = weights / weights.sum() rng = np.random.default_rng(seed) idx = rng.choice(T, size=(n_paths, horizon), p=weights) sampled = R_np[idx] # (n_paths, horizon, N) cumulative = np.cumsum(sampled, axis=1) # cumulative sum along time if reprice is not None: # Apply repricing at each time step out = np.empty_like(cumulative) for t in range(horizon): out[:, t, :] = reprice(cumulative[:, t, :]) return out return cumulative
# ------------------------------------------------------------------ # # Built-in repricing functions (Prayer Step P4) # ------------------------------------------------------------------ #
[docs] def reprice_exp(delta_y: np.ndarray) -> np.ndarray: """Reprice via exponentiation (stocks, equity indices). Maps projected log-return invariants to simple returns: ``P&L / V_0 = exp(Δy) - 1``. This is the **exact** repricing for instruments whose risk driver is the log-price (Meucci Prayer P4, Eq. 17 for stocks). Args: delta_y: Projected risk-driver changes ``Y_{T+τ} - Y_T`` (log-returns). Returns: np.ndarray: Simple-return scenarios. """ return np.exp(delta_y) - 1.0
[docs] def reprice_taylor( delta_y: np.ndarray, *, theta: Union[np.ndarray, float, None] = None, delta: Union[np.ndarray, float, None] = None, gamma: Union[np.ndarray, float, None] = None, tau: float = 0.0, ) -> np.ndarray: r"""Reprice via Taylor / Greek approximation (options, bonds). Approximates the P&L using a second-order expansion around the current risk-driver values (Meucci Prayer P4, Eq. 18): .. math:: \text{P\&L} \approx \theta\,\tau + \delta\,\Delta y + \tfrac12\,\gamma\,(\Delta y)^2. The coefficients are instrument-specific sensitivities: * **Equities**: ``delta=1``, others zero (reduces to linear return). * **Options**: ``theta`` (time decay), ``delta`` (option delta), ``gamma`` (option gamma). For multi-factor options, ``delta`` and ``gamma`` can be vectors/matrices matching risk-driver columns. * **Bonds**: ``delta = -duration * price``, ``gamma = convexity * price``. Args: delta_y: Projected risk-driver changes (``n_sim × n_drivers``). theta: Time-decay coefficient(s). Scalar or per-instrument array. delta: First-order sensitivity (delta, -duration, etc.). gamma: Second-order sensitivity (gamma, convexity, etc.). tau: Time step (e.g. ``1/252`` for daily, ``1/12`` for monthly). Returns: np.ndarray: Approximate P&L scenarios, same shape as ``delta_y``. """ pnl = np.zeros_like(delta_y, dtype=float) if theta is not None: pnl = pnl + np.asarray(theta, dtype=float) * tau if delta is not None: pnl = pnl + np.asarray(delta, dtype=float) * delta_y if gamma is not None: pnl = pnl + 0.5 * np.asarray(gamma, dtype=float) * delta_y ** 2 return pnl
[docs] def make_repricing_fn(pricing_fn, current_drivers: np.ndarray): """Build a repricing callable from an arbitrary pricing function. For **full repricing** (Prayer P4), the user supplies a function ``pricing_fn(Y)`` that maps risk-driver levels to instrument prices. The returned callable computes: ``P&L = pricing_fn(Y_T + Δy) - pricing_fn(Y_T)`` Args: pricing_fn: Callable ``f(Y) -> prices``, where ``Y`` has the same columns as the risk-driver scenarios. Must be vectorised over the first (scenario) axis. current_drivers: Current risk-driver levels ``Y_T`` (1-D array of length ``n_drivers``). Returns: Callable: Repricing function suitable for the ``reprice`` parameter of :func:`project_scenarios`. Examples: >>> # Bond: price = face * exp(-yield * maturity) >>> import numpy as np >>> face, maturity = 100, 5 >>> pricing_fn = lambda Y: face * np.exp(-Y * maturity) >>> current_yield = np.array([0.03]) >>> repricer = make_repricing_fn(pricing_fn, current_yield) >>> # repricer(delta_y) returns bond P&L scenarios """ y0 = np.asarray(current_drivers, dtype=float) p0 = pricing_fn(y0) def _reprice(delta_y: np.ndarray) -> np.ndarray: y_horizon = y0 + delta_y return pricing_fn(y_horizon) - p0 return _reprice
[docs] def compose_repricers(instruments, invariant_columns): """Build a repricing function mapping K invariant columns to N instrument P&Ls. For mixed portfolios where each instrument may depend on one or more risk drivers (Prayer P4). Three specification formats are supported: * **callable** -- 1-to-1: uses the invariant column with the same name as the instrument. * **("driver", callable)** or **(["driver"], callable)** -- single named driver whose name differs from the instrument. * **(["d1", "d2"], callable)** -- multi-driver: the callable receives an ``(n_sim, n_drivers)`` array. Args: instruments: Dict mapping **instrument name** to repricing spec. invariant_columns: Ordered column names of the invariant scenario matrix (length K). Returns: Tuple[Callable, List[str]]: ``(repricing_fn, instrument_names)`` where *repricing_fn* maps ``(n_sim, K) -> (n_sim, N)`` and *instrument_names* is the ordered list of output column labels. Raises: KeyError: If a required driver column is not in *invariant_columns*. TypeError: If a spec is neither a callable nor a ``(drivers, callable)`` tuple. Examples: >>> fn, names = compose_repricers( ... {"Stock": reprice_exp, ... "Bond": (["yield_10y"], lambda dy: reprice_taylor(dy, delta=-7, gamma=50)), ... "Call": (["stock_logret", "iv_chg"], my_option_fn)}, ... invariant_columns=["stock_logret", "yield_10y", "iv_chg"], ... ) >>> names ['Stock', 'Bond', 'Call'] """ inv_list = list(invariant_columns) inv_idx = {name: i for i, name in enumerate(inv_list)} instrument_names = list(instruments.keys()) parsed = [] # list of (col_indices, fn) for inst, spec in instruments.items(): if callable(spec) and not isinstance(spec, tuple): # Plain callable: 1-to-1, instrument name must match an invariant column if inst not in inv_idx: raise KeyError( f"Instrument '{inst}' has no matching invariant column. " f"Use (drivers, callable) syntax to specify which column(s) to use." ) parsed.append(([inv_idx[inst]], spec)) elif isinstance(spec, tuple) and len(spec) == 2: drivers, fn = spec if not callable(fn): raise TypeError(f"Second element of spec for '{inst}' must be callable.") if isinstance(drivers, str): drivers = [drivers] col_indices = [] for d in drivers: if d not in inv_idx: raise KeyError(f"Driver '{d}' for instrument '{inst}' not in invariant_columns {inv_list}.") col_indices.append(inv_idx[d]) parsed.append((col_indices, fn)) else: raise TypeError( f"Repricing spec for '{inst}' must be a callable or (drivers, callable) tuple, " f"got {type(spec).__name__}." ) def _combined(delta_y): n_sim = delta_y.shape[0] if delta_y.ndim == 2 else len(delta_y) pnl = np.empty((n_sim, len(parsed)), dtype=float) for j, (col_idx, fn) in enumerate(parsed): if len(col_idx) == 1: chunk = delta_y[:, col_idx[0]] if delta_y.ndim == 2 else delta_y else: chunk = delta_y[:, col_idx] pnl[:, j] = np.asarray(fn(chunk), dtype=float).ravel() return pnl return _combined, instrument_names