r"""
High-quality estimators for the first two moments (mean vector ``\mu`` and
covariance matrix ``\Sigma``) of asset returns, including classical and robust
shrinkage techniques. The implementations follow well-established statistical
and quantitative-finance references and are engineered to preserve pandas
indices/columns when supplied.
Overview
--------
The module provides:
* Weighted sample statistics (``estimate_sample_moments``).
* Bayes-Stein :cite:p:`jorion1986bayes` and James-Stein shrinkage estimators for the
mean vector.
* Linear shrinkage for covariance matrices, including Ledoit-Wolf
:cite:p:`ledoit2004well` and Oracle Approximating Shrinkage (OAS)
:cite:p:`chen2010shrinkage`.
* Analytical nonlinear shrinkage (QuEST) :cite:p:`ledoit2020analytical`.
* POET factor+sparse covariance estimator :cite:p:`fan2013large`.
* Tyler's M-estimator with shrinkage toward a target for elliptical robustness
:cite:p:`tyler1987statistical,chen2010shrinkage`.
* Sparse precision matrices via Graphical Lasso :cite:p:`friedman2008sparse`.
* Convenience adapters to the Black-Litterman
:cite:p:`black1992global,he2002intuition` and Normal-Inverse-Wishart workflows.
* A factory (``estimate_moments``) to combine any of the above seamlessly.
Usage Examples
--------------
Simple pipeline
^^^^^^^^^^^^^^^
.. code-block:: python
import pandas as pd
from pyvallocation.moments import estimate_moments
returns = pd.read_csv("returns.csv", index_col=0, parse_dates=True)
mu_js, sigma_oas = estimate_moments(
returns,
mean_estimator="james_stein",
cov_estimator="oas",
)
# Objects retain pandas labels, making downstream optimisation convenient.
mu_js.name = "Expected Excess Return"
sigma_oas.index.name = sigma_oas.columns.name = "Asset"
Interfacing with portfolio wrappers
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. code-block:: python
from pyvallocation.portfolioapi import AssetsDistribution, PortfolioWrapper
wrapper = PortfolioWrapper.from_moments(mu_js, sigma_oas)
frontier = wrapper.variance_frontier(num_portfolios=25)
tangency_weights, exp_ret, exp_vol = frontier.tangency(risk_free_rate=0.02)
Robust combinations
^^^^^^^^^^^^^^^^^^^
.. code-block:: python
from pyvallocation.moments import (
shrink_covariance_nls,
robust_covariance_tyler,
robust_mean_huber,
posterior_moments_black_litterman,
)
from pyvallocation.views import BlackLittermanProcessor
# Nonlinear shrinkage + Tyler blend
sigma_nls = shrink_covariance_nls(returns)
sigma_tyler = robust_covariance_tyler(returns, shrinkage=0.1)
mu_huber = robust_mean_huber(returns)
# Blend sample and robust covariances (50/50)
sigma_blend = 0.5 * (sigma_nls + sigma_tyler)
# Extract Black-Litterman posterior moments using existing views infrastructure
mu_bl, sigma_bl = posterior_moments_black_litterman(
prior_cov=sigma_blend,
prior_mean=mu_huber,
mean_views={("AssetA", "AssetB"): 0.015},
view_confidences={"AssetA,AssetB": 0.6},
)
# These outputs remain pandas Series/DataFrames if the inputs were pandas objects.
Sparse precision workflow
^^^^^^^^^^^^^^^^^^^^^^^^^
.. code-block:: python
from pyvallocation.moments import sparse_precision_glasso
sigma_sparse, theta_sparse = sparse_precision_glasso(
returns,
alpha=0.02,
return_precision=True,
)
# theta_sparse (precision matrix) and sigma_sparse (covariance) are aligned with returns.columns.
Testing & Reliability
---------------------
Each estimator is covered by dedicated unit tests exercising numerical
stability, positive semi-definiteness, and pandas round-tripping (see
``tests/test_moments_shrinkage.py``). The full repository test suite is executed
under multiple shrinkage configurations to ensure production readiness.
"""
from __future__ import annotations
import importlib
import logging
import warnings
from typing import Any, Dict, Optional, Sequence, Tuple, Union
import numpy as np
from numpy.linalg import LinAlgError, inv
from pyvallocation.utils.validation import (
check_non_negativity,
check_weights_sum_to_one,
ensure_psd_matrix,
)
import pandas as pd
ArrayLike = Union[np.ndarray, "pd.Series", "pd.DataFrame"]
logger = logging.getLogger(__name__)
def _labels(*objs: ArrayLike) -> Optional[Sequence[str]]:
"""Return asset labels when pandas objects are present.
Raises ValueError if multiple pandas inputs have different label orderings
for the *same* axis length (i.e. both represent asset-dimension labels).
Objects whose label length differs from a previously seen asset dimension
are silently skipped (e.g. a probability Series with T entries vs. an
N-column DataFrame).
"""
found: Optional[Sequence[str]] = None
for obj in objs:
candidate = None
if isinstance(obj, pd.DataFrame):
candidate = obj.columns.to_list()
elif isinstance(obj, pd.Series):
candidate = obj.index.to_list()
if candidate is not None:
if found is None:
found = candidate
elif len(candidate) == len(found) and candidate != found:
raise ValueError(
f"Input label orderings differ: {found} vs {candidate}. "
"Reindex inputs to a consistent ordering before calling."
)
return found
def _wrap(x: np.ndarray, labels: Optional[Sequence[str]], vector: bool) -> ArrayLike:
"""Wrap arrays into pandas containers when labels are available.
Args:
x: Array data to wrap.
labels: Asset labels (index/columns) or ``None``.
vector: ``True`` for a 1-D vector, ``False`` for a square matrix.
Returns:
ArrayLike: pandas Series/DataFrame when labels are provided, otherwise ndarray.
"""
if labels is None:
return x
if vector:
return pd.Series(x, index=labels, name="mu")
return pd.DataFrame(x, index=labels, columns=labels)
[docs]
def estimate_sample_moments(R: ArrayLike, p: ArrayLike) -> Tuple[ArrayLike, ArrayLike]:
"""
Estimates the weighted mean vector and covariance matrix from scenarios.
This function computes the first two statistical moments (mean and covariance)
of asset returns, given a set of scenarios and their associated probabilities.
The scenarios `R` represent different possible outcomes for asset returns,
and `p` represents the probability of each scenario.
Args:
R (ArrayLike): A 2D array-like object (e.g., :class:`numpy.ndarray`,
:class:`pandas.DataFrame`) of shape (T, N), where T is the number of
scenarios/observations and N is the number of assets. Each row
represents a scenario of asset returns.
p (ArrayLike): A 1D array-like object (e.g., :class:`numpy.ndarray`,
:class:`pandas.Series`) of shape (T,), representing the probabilities
associated with each scenario in `R`. These probabilities must be
non-negative and sum to one.
Returns:
Tuple[ArrayLike, ArrayLike]: A tuple containing:
- **mu** (ArrayLike): The weighted mean vector of asset returns.
If `R` or `p` were pandas objects, `mu` will be a :class:`pandas.Series`.
- **S** (ArrayLike): The weighted covariance matrix of asset returns.
If `R` or `p` were pandas objects, `S` will be a :class:`pandas.DataFrame`.
Raises:
ValueError: If `p` has a length mismatch with `R`, or if `p` contains
negative values or does not sum to one.
"""
R_arr = np.asarray(R, dtype=float)
p_arr = np.asarray(p, dtype=float).reshape(-1)
T, N = R_arr.shape
if p_arr.shape[0] != T:
logger.error(
"Weight length mismatch: weights length %d, returns length %d",
p_arr.shape[0],
T,
)
raise ValueError("Weight length mismatch.")
if not (check_non_negativity(p_arr) and check_weights_sum_to_one(p_arr)):
logger.error("Weights must be non-negative and sum to one.")
raise ValueError("Weights must be non-negative and sum to one.")
mu = R_arr.T @ p_arr
X = R_arr - mu
S = np.einsum('ti,tj,t->ij', X, X, p_arr, optimize=True)
S = (S + S.T) / 2
labels = _labels(R, p)
logger.debug("Estimated weighted mean and covariance matrix.")
return _wrap(mu, labels, True), _wrap(S, labels, False)
[docs]
def shrink_mean_jorion(mu: ArrayLike, S: ArrayLike, T: int) -> ArrayLike:
"""
Applies Bayes-Stein shrinkage to the mean vector as in Jorion :cite:p:`jorion1986bayes`.
This shrinkage estimator aims to improve the out-of-sample performance of
mean estimates, especially when the number of assets (N) is large relative
to the number of observations (T). It shrinks the sample mean towards a
common mean (e.g., the global minimum variance portfolio mean).
Args:
mu (ArrayLike): The sample mean vector (1D array-like, length N).
Can be a :class:`numpy.ndarray` or :class:`pandas.Series`.
S (ArrayLike): The sample covariance matrix (2D array-like, NxN).
Can be a :class:`numpy.ndarray` or :class:`pandas.DataFrame`.
T (int): The number of observations (scenarios) used to estimate `mu` and `S`.
Returns:
ArrayLike: The Bayes-Stein shrunk mean vector. If `mu` was a
:class:`pandas.Series`, the output will also be a :class:`pandas.Series`.
Raises:
ValueError: If input dimensions are invalid (e.g., T <= 0, N <= 2,
or `S` shape mismatch), or if the covariance matrix `S` is singular.
Notes:
A small jitter (1e-8 * identity matrix) is added to `S` before inversion
to handle potential singularity issues. The shrinkage intensity `v` is
clipped between 0 and 1 to ensure a valid shrinkage factor.
"""
mu_arr, S_arr = np.asarray(mu), np.asarray(S)
N = mu_arr.size
if T <= 0 or N <= 2 or S_arr.shape != (N, N):
logger.error(
"Invalid dimensions for Jorion shrinkage: T=%d, N=%d, S shape=%s",
T,
N,
S_arr.shape,
)
raise ValueError("Invalid dimensions for Jorion shrinkage.")
S_arr = (S_arr + S_arr.T) / 2
try:
S_inv = inv(S_arr + 1e-8 * np.eye(N))
except LinAlgError as e:
logger.error("Covariance matrix singular during inversion.")
raise ValueError("Covariance matrix singular.") from e
ones = np.ones(N)
mu_gmv = (ones @ S_inv @ mu_arr) / (ones @ S_inv @ ones)
diff = mu_arr - mu_gmv
v = (N + 2) / ((N + 2) + T * (diff @ S_inv @ diff))
v_clipped = np.clip(v, 0.0, 1.0)
mu_bs = mu_arr - v_clipped * diff
logger.debug("Applied Bayes-Stein shrinkage to mean vector.")
return _wrap(mu_bs, _labels(mu, S), True)
[docs]
def shrink_covariance_ledoit_wolf(
R: ArrayLike,
S_hat: ArrayLike,
target: str = "identity",
) -> ArrayLike:
"""
Applies the Ledoit-Wolf shrinkage estimator for the covariance matrix :cite:p:`ledoit2004well`.
This estimator provides a well-conditioned covariance matrix, especially useful
when the number of observations is small relative to the number of assets,
or when the sample covariance matrix is ill-conditioned. It shrinks the
sample covariance matrix towards a structured target matrix.
Args:
R (ArrayLike): A 2D array-like object (e.g., :class:`numpy.ndarray`,
:class:`pandas.DataFrame`) of shape (T, N), where T is the number of
observations and N is the number of assets. These are the returns data.
S_hat (ArrayLike): The sample covariance matrix (2D array-like, NxN).
Can be a :class:`numpy.ndarray` or :class:`pandas.DataFrame`.
target (str, optional): The shrinkage target.
- ``"identity"``: Shrinks towards a scaled identity matrix.
- ``"constant_correlation"``: Shrinks towards a constant correlation matrix.
Defaults to ``"identity"``.
Returns:
ArrayLike: The shrunk covariance matrix. If `R` or `S_hat` were pandas
objects, the output will be a :class:`pandas.DataFrame`.
Raises:
ValueError: If input dimensions are invalid (e.g., T = 0, or `S_hat`
shape mismatch), or if an unsupported `target` is specified.
Notes:
The function calculates various components of the Ledoit-Wolf formula:
* `F`: The target matrix.
* `pi_mat`, `pi_hat`, `diag_pi`, `off_pi`, `rho_hat`: Components related
to the estimation of the optimal shrinkage intensity.
* `gamma_hat`: The squared Frobenius norm of the difference between
the sample covariance and the target matrix.
* `kappa`: Intermediate value for shrinkage intensity.
* `delta`: The optimal shrinkage intensity, clipped between 0 and 1.
The final shrunk covariance matrix is ensured to be positive semi-definite
using `ensure_psd_matrix`.
"""
R_arr, S_arr = np.asarray(R), np.asarray(S_hat)
T, N = R_arr.shape
if T == 0 or S_arr.shape != (N, N):
logger.error(
"Shape mismatch in inputs: R shape %s, S_hat shape %s",
R_arr.shape,
S_arr.shape,
)
raise ValueError("Shape mismatch in inputs.")
S_arr = (S_arr + S_arr.T) / 2
X = R_arr - R_arr.mean(0)
if target == "identity":
F = np.eye(N) * np.trace(S_arr) / N
elif target == "constant_correlation":
std = np.sqrt(np.diag(S_arr))
corr = S_arr / np.outer(std, std)
r_bar = (corr.sum() - N) / (N * (N - 1))
F = r_bar * np.outer(std, std)
np.fill_diagonal(F, np.diag(S_arr))
else:
logger.error("Unsupported shrinkage target: %s", target)
raise ValueError("Unsupported target: " + target)
M = X[:, :, None] * X[:, None, :]
pi_mat = np.mean((M - S_arr) ** 2, axis=0)
pi_hat = np.mean(pi_mat)
diag_pi = np.trace(pi_mat)
off_pi = pi_hat - diag_pi
if target == "identity":
rho_hat = diag_pi
else:
rho_hat = diag_pi + ((F - np.diag(np.diag(F))).sum() / (N * (N - 1))) * off_pi
gamma_hat = np.linalg.norm(S_arr - F, "fro") ** 2
kappa = (pi_hat - rho_hat) / gamma_hat
# For identity target: delta = kappa (Ledoit-Wolf 2004, Eq. 2).
# For constant_correlation target: delta = kappa / T (variant that accounts
# for the larger number of estimated parameters in the structured target).
delta = float(np.clip(kappa if target == "identity" else kappa / T, 0.0, 1.0))
Sigma = ensure_psd_matrix(delta * F + (1 - delta) * S_arr)
Sigma = (Sigma + Sigma.T) / 2
logger.debug("Applied Ledoit-Wolf shrinkage to covariance matrix.")
return _wrap(Sigma, _labels(R, S_hat), False)
[docs]
def shrink_covariance_oas(R: ArrayLike, assume_centered: bool = True) -> ArrayLike:
"""Return the Oracle Approximating Shrinkage (OAS) covariance estimator.
Args:
R: Scenario matrix with shape ``(T, N)``.
assume_centered: If ``True`` treat data as centered. Defaults to ``True``.
Returns:
ArrayLike: Shrunk covariance matrix (pandas DataFrame when labels are available).
References:
:cite:p:`chen2010shrinkage`
"""
X = np.asarray(R, dtype=float)
if X.ndim != 2:
raise ValueError("Input `R` must be two-dimensional with shape (T, N).")
T, N = X.shape
if T == 0 or N == 0:
raise ValueError("Input `R` must contain at least one observation and one asset.")
if not assume_centered:
X = X - X.mean(axis=0, keepdims=True)
with np.errstate(over="ignore", under="ignore", divide="ignore", invalid="ignore"):
emp_cov = (X.T @ X) / float(T)
emp_cov = np.nan_to_num(emp_cov, copy=False)
if N == 1:
shrunk = emp_cov.reshape(1, 1)
return _wrap(shrunk, _labels(R), False)
alpha = np.mean(emp_cov**2)
mu = np.trace(emp_cov) / N
mu_sq = mu * mu
# Chen et al. (2010) OAS formula with (1 - 2/p) correction
denom = (T + 1.0 - 2.0 / N) * (alpha - mu_sq / N)
shrinkage = 1.0 if denom <= 0 else min(((1.0 - 2.0 / N) * alpha + mu_sq) / denom, 1.0)
shrunk_cov = (1.0 - shrinkage) * emp_cov
diag_idx = np.diag_indices(N)
shrunk_cov[diag_idx] += shrinkage * mu
shrunk_cov = ensure_psd_matrix(shrunk_cov)
return _wrap(shrunk_cov, _labels(R), False)
[docs]
def shrink_covariance_nls(
R_or_S: ArrayLike,
*,
input_is_cov: bool = False,
dof_correction: int = 0,
) -> ArrayLike:
"""Return Ledoit-Wolf analytical nonlinear shrinkage (QuEST) of covariance.
Args:
R_or_S: Scenario matrix with shape ``(T, N)`` (raw returns).
input_is_cov: Reserved for compatibility; must remain ``False``. Defaults to ``False``.
dof_correction: Degrees-of-freedom correction applied to the sample covariance.
Returns:
ArrayLike: Shrunk covariance matrix.
References:
:cite:p:`ledoit2020analytical`
"""
if input_is_cov:
raise ValueError(
"`shrink_covariance_nls` expects raw returns. "
"Pass `input_is_cov=False` and supply `R` with shape (T, N)."
)
try:
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
category=PendingDeprecationWarning,
message="Importing from numpy\\.matlib is deprecated",
)
nonlinshrink = importlib.import_module("nonlinshrink")
except ImportError as exc:
raise ImportError(
"`shrink_covariance_nls` requires the `non-linear-shrinkage` package. "
"Install it via `pip install non-linear-shrinkage`."
) from exc
X = np.asarray(R_or_S, dtype=float)
if X.ndim != 2:
raise ValueError("Input must be two-dimensional with shape (T, N).")
k_arg: Optional[int] = None if dof_correction == 0 else int(dof_correction)
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
category=PendingDeprecationWarning,
message="Importing from numpy\\.matlib is deprecated",
)
Sigma = nonlinshrink.shrink_cov(X, k=k_arg)
Sigma = ensure_psd_matrix(Sigma)
return _wrap(Sigma, _labels(R_or_S), False)
[docs]
def factor_covariance_poet(
R: ArrayLike,
k: int | str = "auto",
thresh: float | str = "auto",
standardize: bool = True,
return_decomp: bool = False,
) -> ArrayLike | Tuple[ArrayLike, ArrayLike, ArrayLike]:
"""Return POET low-rank plus sparse covariance estimator.
Args:
R: Scenario matrix with shape ``(T, N)``.
k: Number of factors or ``"auto"`` to pick via eigen-gap.
thresh: Threshold for the sparse residual (``"auto"`` uses a heuristic).
standardize: Whether to standardize returns before decomposition.
return_decomp: If ``True`` return factor loadings and factor scores.
Returns:
ArrayLike or tuple: Covariance estimate, and optionally factor loadings/scores.
References:
:cite:p:`fan2013large`
"""
X = np.asarray(R, dtype=float)
if X.ndim != 2:
raise ValueError("Input `R` must be two-dimensional with shape (T, N).")
T, N = X.shape
labels = _labels(R)
idx = getattr(R, "index", None) if isinstance(R, pd.DataFrame) else None
X_centered = X - X.mean(axis=0, keepdims=True)
if standardize:
scale = X_centered.std(axis=0, ddof=1)
scale[scale <= 1e-12] = 1.0
else:
scale = np.ones(N, dtype=float)
X_scaled = X_centered / scale
S_scaled = np.cov(X_scaled, rowvar=False, ddof=1)
eigvals, eigvecs = np.linalg.eigh(S_scaled)
order = np.argsort(eigvals)[::-1]
eigvals = eigvals[order]
eigvecs = eigvecs[:, order]
if isinstance(k, str):
if eigvals.size <= 1:
k_sel = 1
elif eigvals.size == 2:
k_sel = 1
else:
diffs = np.diff(eigvals)
accel = diffs[:-1] - diffs[1:]
k_sel = int(np.argmax(accel) + 1)
else:
if k <= 0:
raise ValueError("`k` must be positive when provided as an integer.")
k_sel = int(k)
if k_sel >= N:
logger.warning(
"Requested k=%d clipped to N=%d; POET provides no sparsity benefit at full rank.",
k_sel, N,
)
k_sel = max(1, min(k_sel, N))
loadings = eigvecs[:, :k_sel]
sqrt_eigs = np.sqrt(np.maximum(eigvals[:k_sel], 0.0))
B_scaled = loadings * sqrt_eigs
S_factor = B_scaled @ B_scaled.T
residual = S_scaled - S_factor
residual = 0.5 * (residual + residual.T)
if isinstance(thresh, str):
off_diag = residual[~np.eye(N, dtype=bool)]
if off_diag.size == 0:
tau = 0.0
else:
tau = np.median(np.abs(off_diag)) * np.sqrt(np.log(max(N, 2)) / max(T, 1))
else:
if thresh < 0:
raise ValueError("`thresh` must be non-negative.")
tau = float(thresh)
sparse = residual.copy()
mask = ~np.eye(N, dtype=bool)
sparse[mask] = np.sign(sparse[mask]) * np.maximum(np.abs(sparse[mask]) - tau, 0.0)
Sigma_scaled = S_factor + sparse
Sigma_scaled = 0.5 * (Sigma_scaled + Sigma_scaled.T)
scale_diag = np.diag(scale)
Sigma = scale_diag @ Sigma_scaled @ scale_diag
Sigma = 0.5 * (Sigma + Sigma.T)
B_out = scale_diag @ B_scaled
F_out = X_scaled @ loadings
Sigma = ensure_psd_matrix(Sigma)
Sigma_wrapped = _wrap(Sigma, labels, False)
if not return_decomp:
return Sigma_wrapped
def _wrap_matrix(matrix: Optional[np.ndarray], *, index: Optional[pd.Index]) -> ArrayLike:
"""Wrap factor matrices in a DataFrame when index labels are provided.
Args:
matrix: Factor matrix or ``None``.
index: Optional index labels for rows.
Returns:
ArrayLike: Wrapped matrix.
"""
if matrix is None:
return None # type: ignore[return-value]
if index is not None:
cols = [f"factor_{i+1}" for i in range(matrix.shape[1])]
return pd.DataFrame(matrix, index=index, columns=cols)
return matrix
def _wrap_loadings(loadings_matrix: Optional[np.ndarray], *, columns: Sequence[str]) -> ArrayLike:
"""Wrap loadings in a DataFrame when column labels are provided.
Args:
loadings_matrix: Loadings matrix or ``None``.
columns: Column labels for assets.
Returns:
ArrayLike: Wrapped loadings matrix.
"""
if loadings_matrix is None:
return None # type: ignore[return-value]
if columns:
cols = [f"factor_{i+1}" for i in range(loadings_matrix.shape[1])]
return pd.DataFrame(loadings_matrix, index=columns, columns=cols)
return loadings_matrix
cols = list(labels) if labels is not None else []
B_wrapped = _wrap_loadings(B_out, columns=cols)
F_wrapped = _wrap_matrix(F_out, index=idx)
return Sigma_wrapped, B_wrapped, F_wrapped
[docs]
def robust_covariance_tyler(
R: ArrayLike,
*,
shrinkage: float = 0.0,
target: str | np.ndarray | pd.DataFrame = "identity",
tol: float = 1e-6,
max_iter: int = 200,
ensure_psd: bool = True,
) -> ArrayLike:
"""Return regularised Tyler's M-estimator for heavy-tailed covariance.
Args:
R: Scenario matrix with shape ``(T, N)``.
shrinkage: Shrinkage intensity toward ``target`` in ``[0, 1]``.
target: Target covariance matrix or ``"identity"``.
tol: Relative convergence tolerance for the fixed-point iteration.
max_iter: Maximum number of iterations.
ensure_psd: Whether to project the result to PSD.
Returns:
ArrayLike: Robust covariance matrix.
References:
:cite:p:`tyler1987statistical`
"""
X = np.asarray(R, dtype=float)
if X.ndim != 2:
raise ValueError("Input `R` must be two-dimensional with shape (T, N).")
T, N = X.shape
if T == 0 or N == 0:
raise ValueError("Input `R` must contain at least one observation and one asset.")
if not 0.0 <= shrinkage <= 1.0:
raise ValueError("`shrinkage` must lie in [0, 1].")
X = X - X.mean(axis=0, keepdims=True)
sample_cov = np.cov(X, rowvar=False, ddof=1)
sample_cov = ensure_psd_matrix(sample_cov)
trace_target = np.trace(sample_cov) / max(N, 1)
if trace_target <= 0.0:
trace_target = 1.0
if isinstance(target, str):
if target != "identity":
raise ValueError("`target` must be 'identity' or an array-like.")
target_matrix = np.eye(N)
else:
target_matrix = np.asarray(target, dtype=float)
if target_matrix.shape != (N, N):
raise ValueError("Custom target must have shape (N, N).")
target_matrix = 0.5 * (target_matrix + target_matrix.T)
target_matrix = ensure_psd_matrix(target_matrix)
target_matrix *= N / np.trace(target_matrix)
covariance = sample_cov / trace_target * N
for _ in range(max_iter):
try:
inv_cov = np.linalg.pinv(covariance)
except np.linalg.LinAlgError as exc:
raise ValueError("Covariance became singular during iterations.") from exc
quad = np.einsum("ti,ij,tj->t", X, inv_cov, X)
quad = np.clip(quad, 1e-6, None)
inv_quad = np.divide(
N,
quad,
out=np.full_like(quad, 1e6),
where=quad > 0,
)
inv_quad = np.clip(inv_quad, 0.0, 1e6)
weights = np.sqrt(inv_quad)[:, None]
with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
updated = (X * weights).T @ (X * weights) / T
updated = np.nan_to_num(updated, copy=False)
updated = 0.5 * (updated + updated.T)
updated *= N / np.trace(updated)
if shrinkage > 0.0:
updated = (1.0 - shrinkage) * updated + shrinkage * target_matrix
delta = np.linalg.norm(updated - covariance, ord="fro")
denom = max(np.linalg.norm(covariance, ord="fro"), 1e-12)
covariance = updated
if delta / denom <= tol:
break
else:
logger.warning(
"Tyler M-estimator did not converge in %d iterations "
"(relative delta=%.2e, tol=%.2e).",
max_iter, delta / denom, tol,
)
covariance *= trace_target / N
covariance = 0.5 * (covariance + covariance.T)
if ensure_psd:
covariance = ensure_psd_matrix(covariance)
return _wrap(covariance, _labels(R), False)
def covariance_ewma(
R: ArrayLike,
*,
span: int = 252,
min_periods: int = 1,
) -> ArrayLike:
"""Exponentially Weighted Moving Average (EWMA) covariance estimator.
Implements the RiskMetrics (1996) exponential smoother. The decay
factor is ``lambda = 1 - 2 / (span + 1)``.
The recursion is:
.. math::
\\Sigma_t = \\lambda\\,\\Sigma_{t-1} + (1-\\lambda)\\,r_t r_t^\\top.
Args:
R: Scenario matrix ``(T, N)`` of returns (most recent row last).
span: Decay span in observations. Defaults to 252 (≈ 1 year of daily data).
min_periods: Minimum number of observations before producing a result.
Returns:
ArrayLike: EWMA covariance matrix (pandas DataFrame when labels available).
References:
J.P. Morgan / Reuters (1996), *RiskMetrics Technical Document*.
"""
labels = _labels(R)
X = np.asarray(R, dtype=float)
if X.ndim != 2:
raise ValueError("Input `R` must be two-dimensional with shape (T, N).")
T, N = X.shape
if T < min_periods:
raise ValueError(f"Need at least {min_periods} observations, got {T}.")
lam = 1.0 - 2.0 / (span + 1)
# Initialize with first observation outer product
cov = np.outer(X[0], X[0])
for t in range(1, T):
cov = lam * cov + (1.0 - lam) * np.outer(X[t], X[t])
cov = 0.5 * (cov + cov.T) # enforce symmetry
return _wrap(cov, labels, False)
def _soft_threshold(matrix: np.ndarray, threshold: float) -> np.ndarray:
"""Apply soft-thresholding to off-diagonal elements.
Args:
matrix: Input matrix.
threshold: Non-negative threshold value.
Returns:
np.ndarray: Thresholded matrix.
"""
result = matrix.copy()
mask = ~np.eye(matrix.shape[0], dtype=bool)
off = result[mask]
result[mask] = np.sign(off) * np.maximum(np.abs(off) - threshold, 0.0)
return result
def _graphical_lasso_admm(
emp_cov: np.ndarray,
alpha: float,
*,
rho: float = 1.0,
max_iter: int = 200,
tol: float = 1e-4,
) -> Tuple[np.ndarray, np.ndarray, int]:
"""Solve the graphical lasso via ADMM.
Args:
emp_cov: Empirical covariance matrix.
alpha: L1 penalty parameter.
rho: ADMM penalty parameter.
max_iter: Maximum number of iterations.
tol: Convergence tolerance.
Returns:
Tuple[np.ndarray, np.ndarray, int]: Covariance, precision, and iteration count.
"""
p = emp_cov.shape[0]
Theta = np.linalg.inv(emp_cov + alpha * np.eye(p))
Z = Theta.copy()
U = np.zeros_like(emp_cov)
for iteration in range(1, max_iter + 1):
K = rho * (Z - U) - emp_cov
eigvals, eigvecs = np.linalg.eigh(K)
denom = 2.0 * rho
sqrt_term = np.sqrt(np.maximum(eigvals**2 + 4.0 * rho, 0.0))
diag_vals = (eigvals + sqrt_term) / denom
with np.errstate(over="ignore", under="ignore", divide="ignore", invalid="ignore"):
Theta = (eigvecs * diag_vals) @ eigvecs.T
Theta = np.nan_to_num(Theta, copy=False)
Theta = 0.5 * (Theta + Theta.T)
X = Theta + U
threshold = alpha / rho
Z_prev = Z
Z = _soft_threshold(X, threshold)
Z[np.diag_indices(p)] = X[np.diag_indices(p)]
U = U + Theta - Z
r_norm = np.linalg.norm(Theta - Z, ord="fro")
s_norm = np.linalg.norm(rho * (Z - Z_prev), ord="fro")
eps_pri = tol * np.sqrt(p) + tol * max(np.linalg.norm(Theta, ord="fro"), np.linalg.norm(Z, ord="fro"))
eps_dual = tol * np.sqrt(p) + tol * np.linalg.norm(rho * U, ord="fro")
if r_norm <= eps_pri and s_norm <= eps_dual:
break
else:
raise RuntimeError("Graphical lasso did not converge within max_iter.")
precision = Theta
try:
covariance = np.linalg.inv(precision)
except np.linalg.LinAlgError:
covariance = np.linalg.pinv(precision)
covariance = ensure_psd_matrix(covariance)
return covariance, precision, iteration
[docs]
def sparse_precision_glasso(
R: ArrayLike,
*,
alpha: float | str = "auto",
assume_centered: bool = True,
return_precision: bool = False,
) -> ArrayLike | Tuple[ArrayLike, ArrayLike]:
"""Estimate covariance via sparse inverse covariance (Graphical Lasso).
Args:
R: Scenario matrix with shape ``(T, N)``.
alpha: Penalty parameter or ``"auto"`` to cross-validate.
assume_centered: If ``False`` center the data before estimation.
return_precision: If ``True`` also return the precision matrix.
Returns:
ArrayLike or tuple: Covariance estimate (and precision if requested).
References:
:cite:p:`friedman2008sparse`
"""
X = np.asarray(R, dtype=float)
if X.ndim != 2:
raise ValueError("Input `R` must be two-dimensional with shape (T, N).")
if X.shape[0] < 2:
raise ValueError("Graphical lasso requires at least two observations.")
if not assume_centered:
X = X - X.mean(axis=0, keepdims=True)
T = X.shape[0]
with np.errstate(over="ignore", under="ignore", divide="ignore", invalid="ignore"):
emp_cov = (X.T @ X) / float(T)
emp_cov = np.nan_to_num(emp_cov, copy=False)
labels = _labels(R)
def _fit(alpha_value: float) -> Tuple[np.ndarray, np.ndarray]:
"""Fit graphical lasso for a single penalty value.
Args:
alpha_value: Penalty parameter.
Returns:
Tuple[np.ndarray, np.ndarray]: Covariance and precision matrices.
"""
cov, precision, _ = _graphical_lasso_admm(emp_cov, alpha_value)
return cov, precision
if alpha == "auto":
S = emp_cov.copy()
max_offdiag = np.max(np.abs(S - np.diag(np.diag(S))))
alpha_max = max(max_offdiag, 1e-6)
alphas = np.geomspace(alpha_max, alpha_max / 100.0, num=5)
best_alpha = alphas[-1]
best_score = -np.inf
K = min(5, T)
indices = np.arange(T)
for alpha_candidate in alphas:
scores = []
for fold in range(K):
mask = indices % K != fold
X_train = X[mask]
X_valid = X[~mask]
if X_train.size == 0 or X_valid.size == 0:
continue
with np.errstate(over="ignore", under="ignore", divide="ignore", invalid="ignore"):
S_train = (X_train.T @ X_train) / float(X_train.shape[0])
S_train = np.nan_to_num(S_train, copy=False)
_, precision_cv, _ = _graphical_lasso_admm(S_train, alpha_candidate)
with np.errstate(over="ignore", under="ignore", divide="ignore", invalid="ignore"):
S_valid = (X_valid.T @ X_valid) / float(X_valid.shape[0])
S_valid = np.nan_to_num(S_valid, copy=False)
logdet = np.linalg.slogdet(precision_cv)[1]
score = logdet - np.trace(S_valid @ precision_cv)
scores.append(score)
if scores:
avg_score = float(np.mean(scores))
if avg_score > best_score + 1e-8:
best_alpha = alpha_candidate
best_score = avg_score
chosen_alpha = float(best_alpha)
cov, precision, _ = _graphical_lasso_admm(emp_cov, chosen_alpha)
else:
chosen_alpha = float(alpha)
if chosen_alpha <= 0.0:
raise ValueError("`alpha` must be positive.")
cov, precision, _ = _graphical_lasso_admm(emp_cov, chosen_alpha)
cov_wrapped = _wrap(cov, labels, False)
if not return_precision:
return cov_wrapped
precision_wrapped = _wrap(precision, labels, False)
return cov_wrapped, precision_wrapped
[docs]
def shrink_mean_james_stein(
mu_hat: ArrayLike,
S: ArrayLike,
T: int,
target: str | np.ndarray | pd.Series = "grand_mean",
) -> ArrayLike:
"""Return James-Stein shrinkage estimate for the mean vector.
Args:
mu_hat: Sample mean vector.
S: Sample covariance matrix.
T: Number of observations.
target: Shrinkage target (``"grand_mean"`` or custom vector).
Returns:
ArrayLike: Shrunk mean estimate.
References:
:cite:p:`jorion1986bayes`
"""
mu_arr = np.asarray(mu_hat, dtype=float).reshape(-1)
Sigma = np.asarray(S, dtype=float)
if Sigma.ndim != 2 or Sigma.shape[0] != Sigma.shape[1]:
raise ValueError("`S` must be a square covariance matrix.")
N = mu_arr.size
if Sigma.shape[0] != N:
raise ValueError("Shape mismatch between `mu_hat` and `S`.")
if T <= 0:
raise ValueError("`T` must be positive.")
if isinstance(target, str):
if target != "grand_mean":
raise ValueError("`target` must be 'grand_mean' or an array-like.")
mu_target = np.full_like(mu_arr, mu_arr.mean())
else:
mu_target = np.asarray(target, dtype=float).reshape(-1)
if mu_target.size != N:
raise ValueError("Custom target must have length N.")
Sigma = 0.5 * (Sigma + Sigma.T)
Sigma = ensure_psd_matrix(Sigma)
Sigma_inv = np.linalg.pinv(Sigma, rcond=1e-12)
diff = mu_arr - mu_target
quad = float(diff.T @ (T * Sigma_inv) @ diff)
if quad <= 0.0:
shrink = 0.0
else:
shrink = max(0.0, 1.0 - (N - 2.0) / quad)
mu_js = mu_target + shrink * diff
return _wrap(mu_js, _labels(mu_hat, S), True)
[docs]
def robust_mean_huber(
R: ArrayLike,
*,
allow_vectorized: bool = True,
tol: float = 1e-6,
max_iter: int = 200,
) -> ArrayLike:
"""Return adaptive Huber mean estimator (per asset) for heavy-tailed data.
Args:
R: Scenario matrix with shape ``(T, N)``.
allow_vectorized: Must be ``True`` (scalar mode not implemented).
tol: Relative convergence tolerance.
max_iter: Maximum number of iterations per asset.
Returns:
ArrayLike: Robust mean vector.
References:
:cite:p:`huber1964robust`
"""
X = np.asarray(R, dtype=float)
if X.ndim != 2:
raise ValueError("Input `R` must be two-dimensional with shape (T, N).")
if not allow_vectorized:
raise ValueError("Set `allow_vectorized=True`; scalar mode is not implemented.")
T, N = X.shape
estimates = np.empty(N, dtype=float)
for j in range(N):
x = X[:, j]
mu = float(np.median(x))
scale = 1.4826 * np.median(np.abs(x - mu)) + 1e-12
tau = scale * np.sqrt(T / max(np.log(max(T, 3)), 2.0))
for _ in range(max_iter):
residuals = x - mu
denom = np.maximum(np.abs(residuals), 1e-12)
weights = np.minimum(1.0, tau / denom)
mu_new = np.sum(weights * x) / np.sum(weights)
if abs(mu_new - mu) <= tol * (abs(mu) + 1.0):
mu = float(mu_new)
break
mu = float(mu_new)
estimates[j] = mu
return _wrap(estimates, _labels(R), True)
[docs]
def posterior_moments_black_litterman(
*,
prior_cov: ArrayLike,
prior_mean: Optional[ArrayLike] = None,
market_weights: Optional[ArrayLike] = None,
risk_aversion: float = 1.0,
tau: float = 0.05,
mean_views: Any = None,
view_confidences: Any = None,
omega: Any = "idzorek",
**kwargs: Any,
) -> Tuple[ArrayLike, ArrayLike]:
"""Return posterior (mu, Sigma) from :class:`BlackLittermanProcessor`.
Args:
prior_cov: Prior covariance matrix.
prior_mean: Optional prior mean vector.
market_weights: Optional market-cap weights for implied equilibrium mean.
risk_aversion: Risk-aversion coefficient (defaults to ``1.0``).
tau: Prior covariance shrinkage parameter (defaults to ``0.05``).
mean_views: Mean views (absolute or relative).
view_confidences: Confidence levels for views (Idzorek).
omega: View covariance (``"idzorek"`` or array-like).
**kwargs: Additional arguments forwarded to ``BlackLittermanProcessor``.
Returns:
Tuple[ArrayLike, ArrayLike]: Posterior mean and covariance.
References:
:cite:p:`black1992global`
"""
from pyvallocation.views import BlackLittermanProcessor
processor = BlackLittermanProcessor(
prior_cov=prior_cov,
prior_mean=prior_mean,
market_weights=market_weights,
risk_aversion=risk_aversion,
tau=tau,
mean_views=mean_views,
view_confidences=view_confidences,
omega=omega,
**kwargs,
)
return processor.get_posterior()
[docs]
def posterior_moments_niw(
*,
prior_mu: ArrayLike,
prior_sigma: ArrayLike,
t0: int,
nu0: int,
sample_mu: ArrayLike,
sample_sigma: ArrayLike,
n_obs: int,
) -> Tuple[ArrayLike, ArrayLike]:
"""Return NIW posterior classical-equivalent (mu, Sigma).
Args:
prior_mu: Prior mean vector.
prior_sigma: Prior covariance matrix.
t0: Prior strength (pseudo-observations for mean).
nu0: Prior degrees of freedom for covariance.
sample_mu: Sample mean vector.
sample_sigma: Sample covariance matrix.
n_obs: Number of observations.
Returns:
Tuple[ArrayLike, ArrayLike]: Posterior mean and covariance.
"""
from pyvallocation.bayesian import NIWPosterior
niw = NIWPosterior(
prior_mu=prior_mu,
prior_sigma=prior_sigma,
t0=int(t0),
nu0=int(nu0),
)
niw.update(
sample_mu=sample_mu,
sample_sigma=sample_sigma,
n_obs=int(n_obs),
)
return niw.get_mu_ce(), niw.get_sigma_ce()
[docs]
def posterior_moments_niw_with_uncertainty(
*,
prior_mu: ArrayLike,
prior_sigma: ArrayLike,
t0: int,
nu0: int,
sample_mu: ArrayLike,
sample_sigma: ArrayLike,
n_obs: int,
) -> Tuple[ArrayLike, ArrayLike, ArrayLike]:
"""Return NIW posterior moments plus mean-uncertainty covariance.
The returned ``S_mu`` corresponds to the NIW mean uncertainty
:cite:p:`meucci2005robust`:
.. math::
S_\\mu = \\frac{\\nu_1}{T_1 (\\nu_1 - 2)} \\Sigma_1.
"""
from pyvallocation.bayesian import NIWPosterior
niw = NIWPosterior(
prior_mu=prior_mu,
prior_sigma=prior_sigma,
t0=int(t0),
nu0=int(nu0),
)
niw.update(
sample_mu=sample_mu,
sample_sigma=sample_sigma,
n_obs=int(n_obs),
)
return niw.get_mu_ce(), niw.get_sigma_ce(), niw.get_S_mu()
[docs]
def estimate_moments(
R: ArrayLike,
p: Optional[ArrayLike] = None,
*,
mean_estimator: str = "sample",
cov_estimator: str = "sample",
mean_kwargs: Optional[Dict[str, Any]] = None,
cov_kwargs: Optional[Dict[str, Any]] = None,
) -> Tuple[ArrayLike, ArrayLike]:
"""Return (mu, Sigma) using configurable mean and covariance estimators.
Args:
R: Scenario matrix with shape ``(T, N)``.
p: Optional scenario probabilities aligned with ``R``.
mean_estimator: Mean estimator key (default ``"sample"``).
cov_estimator: Covariance estimator key (default ``"sample"``).
mean_kwargs: Optional keyword arguments for the mean estimator.
cov_kwargs: Optional keyword arguments for the covariance estimator.
Returns:
Tuple[ArrayLike, ArrayLike]: Estimated mean and covariance.
"""
mean_kwargs = dict(mean_kwargs or {})
cov_kwargs = dict(cov_kwargs or {})
X = np.asarray(R, dtype=float)
if X.ndim != 2:
raise ValueError("Input `R` must be two-dimensional with shape (T, N).")
T, _ = X.shape
labels = _labels(R)
if p is None:
mu_sample = X.mean(axis=0)
# Use ddof=0 (MLE) for consistency with estimate_sample_moments
S_sample = np.cov(X, rowvar=False, ddof=0)
mu_sample_wrapped = _wrap(mu_sample, labels, True)
S_sample_wrapped = _wrap(S_sample, labels, False)
else:
mu_sample_wrapped, S_sample_wrapped = estimate_sample_moments(R, p)
mu_sample = np.asarray(mu_sample_wrapped, dtype=float)
S_sample = np.asarray(S_sample_wrapped, dtype=float)
mean_choice = mean_estimator.lower()
if mean_choice in {"sample", "simple"}:
mu = mu_sample_wrapped
elif mean_choice in {"jorion", "bayes_stein"}:
mu = shrink_mean_jorion(mu_sample_wrapped, S_sample_wrapped, T=T, **mean_kwargs)
elif mean_choice in {"james_stein", "james-stein", "js"}:
mu = shrink_mean_james_stein(
mu_sample_wrapped,
S_sample_wrapped,
T=T,
**mean_kwargs,
)
elif mean_choice in {"huber", "robust_huber"}:
mu = robust_mean_huber(R, **mean_kwargs)
elif mean_choice in {"median_of_means", "mom"}:
mu = robust_mean_median_of_means(R, **mean_kwargs)
else:
raise ValueError(
f"Unsupported mean estimator '{mean_estimator}'. "
f"Valid options: {sorted({'sample', 'jorion', 'james_stein', 'huber', 'median_of_means'})}."
)
cov_choice = cov_estimator.lower()
if cov_choice in {"sample", "empirical"}:
Sigma = S_sample_wrapped
elif cov_choice in {"ledoit_wolf", "lw"}:
Sigma = shrink_covariance_ledoit_wolf(R, S_sample_wrapped, **cov_kwargs)
elif cov_choice == "oas":
Sigma = shrink_covariance_oas(R, **cov_kwargs)
elif cov_choice in {"nls", "nonlinear", "nonlinear_shrinkage"}:
Sigma = shrink_covariance_nls(R, **cov_kwargs)
elif cov_choice == "poet":
cov_kwargs.setdefault("return_decomp", False)
Sigma = factor_covariance_poet(R, **cov_kwargs)
if isinstance(Sigma, tuple):
Sigma = Sigma[0]
elif cov_choice in {"tyler", "tyler_shrinkage"}:
Sigma = robust_covariance_tyler(R, **cov_kwargs)
elif cov_choice in {"glasso", "graphical_lasso"}:
cov_kwargs["return_precision"] = False
Sigma = sparse_precision_glasso(R, **cov_kwargs)
elif cov_choice == "ewma":
Sigma = covariance_ewma(R, **cov_kwargs)
else:
raise ValueError(
f"Unsupported covariance estimator '{cov_estimator}'. "
f"Valid options: {sorted({'sample', 'ledoit_wolf', 'oas', 'nls', 'poet', 'tyler', 'glasso', 'ewma'})}."
)
return mu, Sigma