"""
This module provides functions for estimating and shrinking statistical moments
(mean and covariance matrix) of asset returns.
It includes:
- `estimate_sample_moments`: For computing weighted sample mean and covariance.
- `shrink_mean_jorion`: Implements the Bayes-Stein shrinkage estimator for the mean vector.
- `shrink_covariance_ledoit_wolf`: Implements the Ledoit-Wolf shrinkage estimator for the covariance matrix.
"""
from __future__ import annotations
import logging
from typing import Optional, Sequence, Tuple, Union
import numpy as np
from numpy.linalg import LinAlgError, inv
from pyvallocation.utils.validation import (
check_non_negativity,
check_weights_sum_to_one,
ensure_psd_matrix,
)
import pandas as pd
ArrayLike = Union[np.ndarray, "pd.Series", "pd.DataFrame"]
logger = logging.getLogger(__name__)
def _labels(*objs: ArrayLike) -> Optional[Sequence[str]]:
for obj in objs:
if isinstance(obj, pd.DataFrame):
return obj.columns.to_list()
if isinstance(obj, pd.Series):
return obj.index.to_list()
return None
def _wrap(x: np.ndarray, labels: Optional[Sequence[str]], vector: bool) -> ArrayLike:
if labels is None:
return x
if vector:
return pd.Series(x, index=labels, name="mu")
return pd.DataFrame(x, index=labels, columns=labels)
[docs]
def estimate_sample_moments(R: ArrayLike, p: ArrayLike) -> Tuple[ArrayLike, ArrayLike]:
"""
Estimates the weighted mean vector and covariance matrix from scenarios.
This function computes the first two statistical moments (mean and covariance)
of asset returns, given a set of scenarios and their associated probabilities.
The scenarios `R` represent different possible outcomes for asset returns,
and `p` represents the probability of each scenario.
Args:
R (ArrayLike): A 2D array-like object (e.g., :class:`numpy.ndarray`,
:class:`pandas.DataFrame`) of shape (T, N), where T is the number of
scenarios/observations and N is the number of assets. Each row
represents a scenario of asset returns.
p (ArrayLike): A 1D array-like object (e.g., :class:`numpy.ndarray`,
:class:`pandas.Series`) of shape (T,), representing the probabilities
associated with each scenario in `R`. These probabilities must be
non-negative and sum to one.
Returns:
Tuple[ArrayLike, ArrayLike]: A tuple containing:
- **mu** (ArrayLike): The weighted mean vector of asset returns.
If `R` or `p` were pandas objects, `mu` will be a :class:`pandas.Series`.
- **S** (ArrayLike): The weighted covariance matrix of asset returns.
If `R` or `p` were pandas objects, `S` will be a :class:`pandas.DataFrame`.
Raises:
ValueError: If `p` has a length mismatch with `R`, or if `p` contains
negative values or does not sum to one.
"""
R_arr, p_arr = np.asarray(R), np.asarray(p)
T, N = R_arr.shape
if p_arr.shape[0] != T:
logger.error(
"Weight length mismatch: weights length %d, returns length %d",
p_arr.shape[0],
T,
)
raise ValueError("Weight length mismatch.")
if not (check_non_negativity(p_arr) and check_weights_sum_to_one(p_arr)):
logger.error("Weights must be non-negative and sum to one.")
raise ValueError("Weights must be non-negative and sum to one.")
mu = R_arr.T @ p_arr
X = R_arr - mu
S = (X.T * p_arr) @ X
S = (S + S.T) / 2
labels = _labels(R, p)
logger.debug("Estimated weighted mean and covariance matrix.")
return _wrap(mu, labels, True), _wrap(S, labels, False)
[docs]
def shrink_mean_jorion(mu: ArrayLike, S: ArrayLike, T: int) -> ArrayLike:
"""
Applies Bayes–Stein shrinkage to the mean vector as in Jorion :cite:p:`jorion1986bayes`.
This shrinkage estimator aims to improve the out-of-sample performance of
mean estimates, especially when the number of assets (N) is large relative
to the number of observations (T). It shrinks the sample mean towards a
common mean (e.g., the global minimum variance portfolio mean).
Args:
mu (ArrayLike): The sample mean vector (1D array-like, length N).
Can be a :class:`numpy.ndarray` or :class:`pandas.Series`.
S (ArrayLike): The sample covariance matrix (2D array-like, N×N).
Can be a :class:`numpy.ndarray` or :class:`pandas.DataFrame`.
T (int): The number of observations (scenarios) used to estimate `mu` and `S`.
Returns:
ArrayLike: The Bayes-Stein shrunk mean vector. If `mu` was a
:class:`pandas.Series`, the output will also be a :class:`pandas.Series`.
Raises:
ValueError: If input dimensions are invalid (e.g., T <= 0, N <= 2,
or `S` shape mismatch), or if the covariance matrix `S` is singular.
Notes:
A small jitter (1e-8 * identity matrix) is added to `S` before inversion
to handle potential singularity issues. The shrinkage intensity `v` is
clipped between 0 and 1 to ensure a valid shrinkage factor.
"""
mu_arr, S_arr = np.asarray(mu), np.asarray(S)
N = mu_arr.size
if T <= 0 or N <= 2 or S_arr.shape != (N, N):
logger.error(
"Invalid dimensions for Jorion shrinkage: T=%d, N=%d, S shape=%s",
T,
N,
S_arr.shape,
)
raise ValueError("Invalid dimensions for Jorion shrinkage.")
S_arr = (S_arr + S_arr.T) / 2
try:
S_inv = inv(S_arr + 1e-8 * np.eye(N))
except LinAlgError as e:
logger.error("Covariance matrix singular during inversion.")
raise ValueError("Covariance matrix singular.") from e
ones = np.ones(N)
mu_gmv = (ones @ S_inv @ mu_arr) / (ones @ S_inv @ ones)
diff = mu_arr - mu_gmv
v = (N + 2) / ((N + 2) + T * (diff @ S_inv @ diff))
v_clipped = np.clip(v, 0.0, 1.0)
mu_bs = mu_arr - v_clipped * diff
logger.debug("Applied Bayes-Stein shrinkage to mean vector.")
return _wrap(mu_bs, _labels(mu, S), True)
[docs]
def shrink_covariance_ledoit_wolf(
R: ArrayLike,
S_hat: ArrayLike,
target: str = "identity",
) -> ArrayLike:
"""
Applies the Ledoit–Wolf shrinkage estimator for the covariance matrix :cite:p:`ledoit2004well`.
This estimator provides a well-conditioned covariance matrix, especially useful
when the number of observations is small relative to the number of assets,
or when the sample covariance matrix is ill-conditioned. It shrinks the
sample covariance matrix towards a structured target matrix.
Args:
R (ArrayLike): A 2D array-like object (e.g., :class:`numpy.ndarray`,
:class:`pandas.DataFrame`) of shape (T, N), where T is the number of
observations and N is the number of assets. These are the returns data.
S_hat (ArrayLike): The sample covariance matrix (2D array-like, N×N).
Can be a :class:`numpy.ndarray` or :class:`pandas.DataFrame`.
target (str, optional): The shrinkage target.
- ``"identity"``: Shrinks towards a scaled identity matrix.
- ``"constant_correlation"``: Shrinks towards a constant correlation matrix.
Defaults to ``"identity"``.
Returns:
ArrayLike: The shrunk covariance matrix. If `R` or `S_hat` were pandas
objects, the output will be a :class:`pandas.DataFrame`.
Raises:
ValueError: If input dimensions are invalid (e.g., T = 0, or `S_hat`
shape mismatch), or if an unsupported `target` is specified.
Notes:
The function calculates various components of the Ledoit-Wolf formula:
* `F`: The target matrix.
* `pi_mat`, `pi_hat`, `diag_pi`, `off_pi`, `rho_hat`: Components related
to the estimation of the optimal shrinkage intensity.
* `gamma_hat`: The squared Frobenius norm of the difference between
the sample covariance and the target matrix.
* `kappa`: Intermediate value for shrinkage intensity.
* `delta`: The optimal shrinkage intensity, clipped between 0 and 1.
The final shrunk covariance matrix is ensured to be positive semi-definite
using `ensure_psd_matrix`.
"""
R_arr, S_arr = np.asarray(R), np.asarray(S_hat)
T, N = R_arr.shape
if T == 0 or S_arr.shape != (N, N):
logger.error(
"Shape mismatch in inputs: R shape %s, S_hat shape %s",
R_arr.shape,
S_arr.shape,
)
raise ValueError("Shape mismatch in inputs.")
S_arr = (S_arr + S_arr.T) / 2
X = R_arr - R_arr.mean(0)
if target == "identity":
F = np.eye(N) * np.trace(S_arr) / N
elif target == "constant_correlation":
std = np.sqrt(np.diag(S_arr))
corr = S_arr / np.outer(std, std)
r_bar = (corr.sum() - N) / (N * (N - 1))
F = r_bar * np.outer(std, std)
np.fill_diagonal(F, np.diag(S_arr))
else:
logger.error("Unsupported shrinkage target: %s", target)
raise ValueError("Unsupported target: " + target)
M = X[:, :, None] * X[:, None, :]
pi_mat = np.mean((M - S_arr) ** 2, axis=0)
pi_hat = np.mean(pi_mat)
diag_pi = np.trace(pi_mat)
off_pi = pi_hat - diag_pi
if target == "identity":
rho_hat = diag_pi
else:
rho_hat = diag_pi + ((F - np.diag(np.diag(F))).sum() / (N * (N - 1))) * off_pi
gamma_hat = np.linalg.norm(S_arr - F, "fro") ** 2
kappa = (pi_hat - rho_hat) / gamma_hat
delta = float(np.clip(kappa if target == "identity" else kappa / T, 0.0, 1.0))
Sigma = ensure_psd_matrix(delta * F + (1 - delta) * S_arr)
Sigma = (Sigma + Sigma.T) / 2
logger.debug("Applied Ledoit-Wolf shrinkage to covariance matrix.")
return _wrap(Sigma, _labels(R, S_hat), False)