"""Plotting utilities for efficient frontiers and portfolio summaries."""
from __future__ import annotations
from collections import OrderedDict
from typing import Callable, Iterable, List, Mapping, Optional, Sequence, Tuple, Union, TYPE_CHECKING
import numpy as np
from .portfolioapi import PortfolioFrontier
if TYPE_CHECKING: # pragma: no cover - typing helper
import pandas as pd
def _require_matplotlib():
"""
Import :mod:`matplotlib.pyplot` and fall back to the Agg backend when GUI
toolkits are unavailable (e.g. headless CI environments).
"""
try:
import matplotlib.pyplot as plt
except ImportError as exc: # pragma: no cover - guarded by tests
raise ImportError(
"`plot_frontiers` requires `matplotlib`. Install it via `pip install matplotlib`."
) from exc
try:
fig = plt.figure()
except Exception: # pragma: no cover - backend fallback
try:
plt.switch_backend("Agg")
except Exception:
import matplotlib
matplotlib.use("Agg", force=True)
import matplotlib.pyplot as plt # type: ignore[no-redef]
fig = plt.figure()
plt.close(fig)
return plt
def _normalize_frontiers(
frontiers: Union[PortfolioFrontier, Sequence[PortfolioFrontier], Mapping[str, PortfolioFrontier]],
labels: Optional[Sequence[str]],
) -> List[Tuple[Optional[str], PortfolioFrontier]]:
"""Normalize frontier inputs into a list of ``(label, frontier)`` pairs.
Args:
frontiers: Single frontier, sequence, or mapping of label -> frontier.
labels: Optional sequence of labels aligned to the frontier sequence.
Returns:
list[tuple[str | None, PortfolioFrontier]]: Normalized list of frontiers.
"""
if isinstance(frontiers, Mapping):
result: List[Tuple[Optional[str], PortfolioFrontier]] = []
for name, frontier in frontiers.items():
if not isinstance(frontier, PortfolioFrontier):
raise TypeError("`frontiers` must contain `PortfolioFrontier` instances.")
result.append((str(name), frontier))
return result
if isinstance(frontiers, PortfolioFrontier):
frontier_list: Sequence[PortfolioFrontier] = [frontiers]
else:
frontier_list = list(frontiers)
if labels is not None and len(labels) != len(frontier_list):
raise ValueError("`labels` length must match the number of frontiers supplied.")
result: List[Tuple[Optional[str], PortfolioFrontier]] = []
for idx, frontier in enumerate(frontier_list):
if not isinstance(frontier, PortfolioFrontier):
raise TypeError("`frontiers` must contain `PortfolioFrontier` instances.")
label = labels[idx] if labels is not None else None
result.append((label, frontier))
return result
def _project_to_3d(
mean: np.ndarray,
cov: np.ndarray,
scenarios: Optional[np.ndarray] = None,
*,
components: Optional[np.ndarray] = None,
) -> Tuple[np.ndarray, np.ndarray, Optional[np.ndarray], np.ndarray]:
"""Project mean/cov (and optionally scenarios) into 3D using PCA directions.
Args:
mean: Mean vector of length ``n_assets``.
cov: Covariance matrix of shape ``(n_assets, n_assets)``.
scenarios: Optional scenario matrix shaped ``(n_scenarios, n_assets)``.
components: Optional projection matrix with 3 columns. When ``None``,
the top-3 eigenvectors of ``cov`` are used.
Returns:
Tuple containing the projected mean, covariance, projected scenarios (or ``None``),
and the projection matrix used.
"""
mean = np.asarray(mean, dtype=float).reshape(-1)
cov = np.asarray(cov, dtype=float)
if cov.ndim != 2 or cov.shape[0] != cov.shape[1]:
raise ValueError("`cov` must be a square matrix.")
if cov.shape[0] != mean.size:
raise ValueError("`mean` length must match `cov` dimensions.")
cov = 0.5 * (cov + cov.T)
n_assets = mean.size
if n_assets >= 3:
if components is None:
eigvals, eigvecs = np.linalg.eigh(cov)
order = np.argsort(eigvals)[::-1]
components = eigvecs[:, order[:3]]
mean_3d = components.T @ mean
cov_3d = components.T @ cov @ components
if scenarios is not None:
with np.errstate(divide="ignore", invalid="ignore", over="ignore"):
scen_3d = scenarios @ components
else:
scen_3d = None
return mean_3d, cov_3d, scen_3d, components
mean_3d = np.zeros(3)
mean_3d[:n_assets] = mean
cov_3d = np.zeros((3, 3))
cov_3d[:n_assets, :n_assets] = cov
scen_3d = None
if scenarios is not None:
scen_3d = np.zeros((scenarios.shape[0], 3))
scen_3d[:, :n_assets] = scenarios
components = np.eye(n_assets, 3)
return mean_3d, cov_3d, scen_3d, components
def _ellipsoid_mesh(
cov: np.ndarray,
*,
n_std: float = 1.0,
n_points: int = 32,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Generate an ellipsoid mesh representing covariance uncertainty.
Args:
cov: 3x3 covariance matrix.
n_std: Ellipsoid radius in standard deviations. Defaults to ``1.0``.
n_points: Mesh resolution along each angular dimension.
Returns:
Tuple[np.ndarray, np.ndarray, np.ndarray]: ``(x, y, z)`` mesh arrays.
"""
cov = np.asarray(cov, dtype=float)
cov = 0.5 * (cov + cov.T)
eigvals, eigvecs = np.linalg.eigh(cov)
eigvals = np.clip(eigvals, 0.0, None)
radii = n_std * np.sqrt(eigvals + 1e-16)
u = np.linspace(0.0, 2.0 * np.pi, n_points)
v = np.linspace(0.0, np.pi, n_points)
x = radii[0] * np.outer(np.cos(u), np.sin(v))
y = radii[1] * np.outer(np.sin(u), np.sin(v))
z = radii[2] * np.outer(np.ones_like(u), np.cos(v))
coords = np.stack([x, y, z], axis=0).reshape(3, -1)
with np.errstate(divide="ignore", invalid="ignore", over="ignore"):
coords = eigvecs @ coords
x, y, z = coords.reshape(3, n_points, n_points)
return x, y, z
def _resolve_highlight(
frontier: PortfolioFrontier,
marker: str,
*,
risk_vector: np.ndarray,
risk_free_rate: Optional[float],
) -> Optional[Tuple[str, float, float, "pd.Series", int]]:
"""Resolve a highlighted portfolio point for plotting.
Args:
frontier: Frontier instance.
marker: Highlight key (``min_risk``, ``max_return``, ``tangency``).
risk_vector: Risk values associated with the frontier.
risk_free_rate: Risk-free rate used for tangency selection.
Returns:
Optional[tuple]: Highlight label, risk, return, weights, and column index.
"""
if marker == "min_risk":
idx = int(np.argmin(risk_vector))
target_risk = float(risk_vector[idx])
target_return = float(frontier.returns[idx])
weights = frontier._to_pandas(frontier.weights[:, idx], "Min Risk Portfolio")
name = "Min Risk"
elif marker == "max_return":
idx = int(np.argmax(frontier.returns))
target_risk = float(risk_vector[idx])
target_return = float(frontier.returns[idx])
weights = frontier._to_pandas(frontier.weights[:, idx], "Max Return Portfolio")
name = "Max Return"
elif marker == "tangency":
if risk_free_rate is None:
raise ValueError("Highlighting the tangency portfolio requires `risk_free_rate`.")
if np.all(np.isclose(risk_vector, 0)):
return None
with np.errstate(divide="ignore", invalid="ignore"):
sharpe_ratios = (frontier.returns - risk_free_rate) / risk_vector
sharpe_ratios[~np.isfinite(sharpe_ratios)] = -np.inf
idx = int(np.argmax(sharpe_ratios))
if not np.isfinite(frontier.returns[idx]) or not np.isfinite(risk_vector[idx]):
return None
target_risk = float(risk_vector[idx])
target_return = float(frontier.returns[idx])
weights = frontier._to_pandas(
frontier.weights[:, idx],
f"Tangency Portfolio (rf={risk_free_rate:.2%})",
)
name = f"Tangency (rf={risk_free_rate:.2%})"
else:
raise ValueError(f"Unknown highlight '{marker}'.")
if not (np.isfinite(target_return) and np.isfinite(target_risk)):
return None
return name, target_risk, target_return, weights, idx
[docs]
def plot_frontiers(
frontiers: Union[PortfolioFrontier, Sequence[PortfolioFrontier], Mapping[str, PortfolioFrontier]],
*,
ax=None,
labels: Optional[Sequence[str]] = None,
highlight: Iterable[str] = ("min_risk", "max_return"),
risk_free_rate: Optional[float] = None,
legend: bool = True,
line_kwargs: Optional[Mapping[str, object]] = None,
marker_kwargs: Optional[Mapping[str, Mapping[str, object]]] = None,
scatter_kwargs: Optional[Mapping[str, object]] = None,
show_points: bool = False,
percent_axes: bool = False,
overlay_risk_labels: Optional[Sequence[str]] = None,
risk_label: Optional[str] = None,
return_label: str = "Expected Return",
highlight_metadata_keys: Optional[Sequence[str]] = None,
metadata_value_formatter: Optional[Callable[[str, object], str]] = None,
):
"""Plot one or more efficient frontiers.
Args:
frontiers: Frontier instance(s) to be visualised.
ax: Optional matplotlib axis. If omitted, a new figure and axis are created.
labels: Optional labels associated with each frontier.
highlight: Iterable of portfolio markers to emphasise. Valid entries are
``"min_risk"``, ``"max_return"`` and ``"tangency"``.
risk_free_rate: Required when highlighting the tangency portfolio.
legend: Whether to render a legend.
line_kwargs: Keyword arguments passed to ``Axes.plot`` for the frontier lines.
marker_kwargs: Mapping from highlight name to keyword arguments for the
corresponding scatter points.
scatter_kwargs: Global keyword arguments applied to all highlight markers.
show_points: When ``True``, scatter all frontier nodes (useful for reports).
percent_axes: Format both axes as percentages (assumes decimal inputs).
overlay_risk_labels: Optional risk labels to overlay as dashed diagnostic lines
(e.g., raw CVaR before convexification).
risk_label: Axis label for the risk dimension. If supplied, the plotting
uses the corresponding risk grid (primary or ``alternate_risks``) from
each frontier. When omitted, the primary risk is used.
return_label: Axis label for expected returns.
highlight_metadata_keys: Optional iterable of metadata field names to append
to highlight labels when the underlying :class:`PortfolioFrontier`
exposes ``metadata``.
metadata_value_formatter: Optional callable used to render metadata values
in highlight labels. Receives ``(key, value)`` and must return a string.
Returns:
The matplotlib ``Axes`` containing the plot. The axis is also populated
with a ``_frontier_highlights`` attribute containing the highlighted points.
"""
plt = _require_matplotlib()
highlight = tuple(highlight) if highlight else tuple()
normalized = _normalize_frontiers(frontiers, labels)
if ax is None:
_, ax = plt.subplots()
line_kwargs = dict(line_kwargs or {})
scatter_kwargs = dict(scatter_kwargs or {})
marker_kwargs = marker_kwargs or {}
highlight_records = []
metadata_keys = tuple(highlight_metadata_keys) if highlight_metadata_keys else tuple()
if metadata_value_formatter is None:
def _default_metadata_formatter(key: str, value: object) -> str:
"""Format metadata key/value pairs for highlight labels.
Args:
key: Metadata key.
value: Metadata value.
Returns:
str: Formatted key/value token.
"""
if isinstance(value, (float, np.floating)):
return f"{key}={value:.4f}"
return f"{key}={value}"
else:
_default_metadata_formatter = metadata_value_formatter
overlay_labels = tuple(overlay_risk_labels or ())
for supplied_label, frontier in normalized:
risk_vector = frontier._risk_vector(risk_label)
label = supplied_label or (risk_label or frontier.risk_measure)
line, = ax.plot(risk_vector, frontier.returns, label=label, **line_kwargs)
colour = line.get_color()
if show_points:
ax.scatter(risk_vector, frontier.returns, color=colour, s=12, alpha=0.6, zorder=line.get_zorder() + 1)
for overlay in overlay_labels:
try:
overlay_risk = frontier._risk_vector(overlay)
except KeyError:
continue
ax.plot(
overlay_risk,
frontier.returns,
linestyle="--",
color=colour,
alpha=0.6,
label=f"{label} ({overlay})",
)
for marker in highlight:
resolved = _resolve_highlight(
frontier,
marker,
risk_vector=risk_vector,
risk_free_rate=risk_free_rate,
)
if resolved is None:
continue
display_name, risk_value, return_value, weights, idx = resolved
metadata_entry = (
frontier.metadata[idx] if frontier.metadata and idx < len(frontier.metadata) else None
)
style = {
"color": colour,
"s": 60,
"zorder": line.get_zorder() + 1,
}
style.update(scatter_kwargs)
style.update(marker_kwargs.get(marker, {}))
highlight_label = f"{label} - {display_name}"
if metadata_entry and metadata_keys:
tokens: List[str] = []
for key in metadata_keys:
if key not in metadata_entry:
continue
value = metadata_entry[key]
if value is None:
continue
formatted = _default_metadata_formatter(key, value)
if formatted:
tokens.append(formatted)
if tokens:
highlight_label = f"{highlight_label} ({', '.join(tokens)})"
ax.scatter(risk_value, return_value, label=highlight_label, **style)
highlight_records.append(
{
"frontier": label,
"type": display_name,
"risk": risk_value,
"return": return_value,
"risk_measure": risk_label or frontier.risk_measure,
"weights": weights,
"index": idx,
"metadata": metadata_entry,
}
)
if not risk_label:
unique_measures = {frontier.risk_measure for _, frontier in normalized}
risk_label = unique_measures.pop() if len(unique_measures) == 1 else "Risk"
ax.set_xlabel(risk_label)
ax.set_ylabel(return_label)
if percent_axes:
try:
from matplotlib.ticker import FuncFormatter
fmt = FuncFormatter(lambda x, _pos: f"{x:.1%}")
ax.xaxis.set_major_formatter(fmt)
ax.yaxis.set_major_formatter(fmt)
except Exception: # pragma: no cover - optional formatting
pass
ax.grid(True, alpha=0.3)
if legend:
ax.legend()
ax._frontier_highlights = highlight_records # type: ignore[attr-defined]
return ax
[docs]
def plot_frontiers_grid(
frontiers: Union[PortfolioFrontier, Sequence[PortfolioFrontier], Mapping[str, PortfolioFrontier]],
*,
by: Optional[Callable[[Optional[str], PortfolioFrontier], str]] = None,
labels: Optional[Sequence[str]] = None,
cols: Optional[int] = None,
figsize: Tuple[float, float] = (12.0, 4.5),
sharex: bool = False,
sharey: bool = False,
highlight: Iterable[str] = ("min_risk", "max_return"),
risk_free_rate: Optional[float] = None,
legend: bool = True,
legend_policy: str = "auto",
**kwargs,
):
"""Plot groups of efficient frontiers on a grid.
Args:
frontiers: Single frontier, sequence, or mapping of frontiers.
by: Optional grouping key function ``(label, frontier) -> group``.
labels: Optional labels aligned to the frontier sequence.
cols: Number of columns in the grid. Defaults to ``len(groups)``.
figsize: Figure size in inches.
sharex: Whether to share x-axes across subplots.
sharey: Whether to share y-axes across subplots.
highlight: Iterable of highlight markers (e.g. ``min_risk``).
risk_free_rate: Risk-free rate for tangency highlights.
legend: Whether to show legends.
legend_policy: ``auto`` (default), ``all``, or ``none``.
**kwargs: Forwarded to :func:`plot_frontiers`.
Returns:
matplotlib.figure.Figure: Figure containing the grid of plots.
"""
plt = _require_matplotlib()
normalized = _normalize_frontiers(frontiers, labels)
grouper = by or (lambda supplied, fr: fr.risk_measure or "Risk")
grouped: "OrderedDict[str, OrderedDict[str, PortfolioFrontier]]" = OrderedDict()
for supplied_label, frontier in normalized:
key = str(grouper(supplied_label, frontier))
bucket = grouped.setdefault(key, OrderedDict())
bucket[supplied_label or frontier.risk_measure] = frontier
if not grouped:
raise ValueError("No frontiers supplied.")
cols = cols or len(grouped)
rows = (len(grouped) + cols - 1) // cols
fig, axes = plt.subplots(rows, cols, figsize=figsize, sharex=sharex, sharey=sharey)
axes_flat = np.atleast_1d(axes).ravel()
for ax, (group_name, mapping) in zip(axes_flat, grouped.items()):
show_legend = legend
if legend_policy == "auto":
show_legend = legend and len(grouped) == 1
elif legend_policy == "none":
show_legend = False
elif legend_policy == "each":
show_legend = legend
else:
raise ValueError("`legend_policy` must be 'auto', 'each', or 'none'.")
plot_frontiers(
mapping,
ax=ax,
highlight=highlight,
risk_free_rate=risk_free_rate,
legend=show_legend,
**kwargs,
)
ax.set_title(group_name)
for ax in axes_flat[len(grouped) :]:
ax.axis("off")
fig.tight_layout()
return fig, axes_flat
[docs]
def plot_weights(
weights,
*,
ax=None,
top_n: Optional[int] = None,
title: Optional[str] = None,
kind: str = "barh",
stacked: bool = False,
percent_axes: bool = True,
legend: bool = True,
):
"""Plot portfolio weights (Series or DataFrame) with minimal setup.
Args:
weights: Weight Series or DataFrame.
ax: Optional matplotlib axis to draw on.
top_n: If set, plot only the largest ``top_n`` weights.
title: Optional plot title.
kind: Plot kind (e.g., ``barh`` or ``bar``).
stacked: Whether to stack DataFrame columns.
percent_axes: If ``True`` format axes as percentages.
legend: Whether to show legend for DataFrame inputs.
Returns:
matplotlib.axes.Axes: Axis with the plot.
"""
plt = _require_matplotlib()
if ax is None:
_, ax = plt.subplots()
import pandas as pd # local import keeps plotting optional
if isinstance(weights, pd.Series):
series = weights.sort_values(ascending=True) if kind == "barh" else weights.sort_values(ascending=False)
if top_n is not None:
series = series.tail(top_n) if kind == "barh" else series.head(top_n)
series.plot(kind=kind, ax=ax, legend=False)
elif isinstance(weights, pd.DataFrame):
data = weights.copy()
data.plot(kind=kind, ax=ax, stacked=stacked, legend=legend)
else:
raise TypeError("`weights` must be a pandas Series or DataFrame.")
if percent_axes:
try:
from matplotlib.ticker import FuncFormatter
fmt = FuncFormatter(lambda x, _pos: f"{x:.1%}")
ax.xaxis.set_major_formatter(fmt)
if kind == "bar":
ax.yaxis.set_major_formatter(fmt)
except Exception: # pragma: no cover
pass
if title:
ax.set_title(title)
ax.grid(True, axis="x", alpha=0.3)
return ax
[docs]
def plot_robust_path(
frontier: PortfolioFrontier,
*,
param: str = "lambda",
risk_label: Optional[str] = None,
overlay_risk_label: Optional[str] = None,
ax=None,
cmap: str = "viridis",
percent_axes: bool = False,
show_points: bool = True,
):
"""Plot robust frontier with colour indicating parameter impact.
Args:
frontier: Robust frontier instance with metadata.
param: Metadata key to color by (default ``"lambda"``).
risk_label: Risk label for the x-axis.
overlay_risk_label: Optional risk label to overlay as a dashed line.
ax: Optional matplotlib axis to draw on.
cmap: Colormap name for parameter coloring.
percent_axes: Whether to format axes as percentages.
show_points: Whether to show individual frontier nodes.
Returns:
matplotlib.axes.Axes: Axis with the plot.
"""
plt = _require_matplotlib()
if ax is None:
_, ax = plt.subplots()
risk_vec = frontier._risk_vector(risk_label)
values = None
if frontier.metadata:
values = np.array([meta.get(param) for meta in frontier.metadata], dtype=float)
if values is None or not np.all(np.isfinite(values)):
values = np.linspace(0.0, 1.0, len(risk_vec))
sc = ax.scatter(risk_vec, frontier.returns, c=values, cmap=cmap, s=30, zorder=3)
if show_points:
ax.plot(risk_vec, frontier.returns, color="black", alpha=0.3, zorder=2)
if overlay_risk_label is not None:
try:
overlay = frontier._risk_vector(overlay_risk_label)
ax.plot(overlay, frontier.returns, linestyle="--", alpha=0.6, color="tab:gray")
except KeyError:
pass
ax.set_xlabel(risk_label or frontier.risk_measure)
ax.set_ylabel("Expected Return")
ax.grid(True, alpha=0.3)
cbar = plt.colorbar(sc, ax=ax)
cbar.set_label(param)
if percent_axes:
try:
from matplotlib.ticker import FuncFormatter
fmt = FuncFormatter(lambda x, _pos: f"{x:.1%}")
ax.xaxis.set_major_formatter(fmt)
ax.yaxis.set_major_formatter(fmt)
except Exception: # pragma: no cover
pass
return ax
[docs]
def plot_param_impact(
frontier: PortfolioFrontier,
*,
param: str = "lambda",
risk_label: Optional[str] = None,
percent_axes: bool = False,
ax=None,
):
"""Plot parameter vs return/risk in a two-panel figure.
Args:
frontier: Frontier with metadata containing parameter values.
param: Metadata key to plot along the x-axis.
risk_label: Risk label for the risk panel.
percent_axes: Whether to format y-axes as percentages.
ax: Optional tuple of axes or a figure axis to reuse.
Returns:
Tuple[matplotlib.figure.Figure, tuple]: Figure and axes tuple.
"""
plt = _require_matplotlib()
if frontier.metadata is None:
raise ValueError("Frontier metadata missing parameter values.")
values = np.array([meta.get(param) for meta in frontier.metadata], dtype=float)
if not np.all(np.isfinite(values)):
raise ValueError(f"Metadata does not contain finite '{param}' values.")
order = np.argsort(values)
values = values[order]
returns = np.asarray(frontier.returns, dtype=float)[order]
risks = frontier._risk_vector(risk_label)[order]
if ax is None:
fig, axes = plt.subplots(1, 2, figsize=(8, 3))
else:
fig = ax.figure
axes = ax
ax_return, ax_risk = axes
ax_return.plot(values, returns, marker="o")
ax_return.set_xlabel(param)
ax_return.set_ylabel("Expected Return")
ax_return.grid(True, alpha=0.3)
ax_risk.plot(values, risks, marker="o")
ax_risk.set_xlabel(param)
ax_risk.set_ylabel(risk_label or frontier.risk_measure)
ax_risk.grid(True, alpha=0.3)
if percent_axes:
try:
from matplotlib.ticker import FuncFormatter
fmt = FuncFormatter(lambda x, _pos: f"{x:.1%}")
ax_return.yaxis.set_major_formatter(fmt)
ax_risk.yaxis.set_major_formatter(fmt)
except Exception: # pragma: no cover
pass
fig.tight_layout()
return fig, axes
[docs]
def plot_robust_surface(
frontier: PortfolioFrontier,
*,
param: str = "lambda",
risk_label: Optional[str] = None,
ax=None,
):
"""3D scatter of parameter vs risk vs return for robust frontiers.
Args:
frontier: Frontier with metadata containing parameter values.
param: Metadata key to use for the x-axis and color.
risk_label: Risk label for the y-axis.
ax: Optional matplotlib 3D axis to reuse.
Returns:
matplotlib.axes.Axes: 3D axis with the scatter plot.
"""
plt = _require_matplotlib()
if frontier.metadata is None:
raise ValueError("Frontier metadata missing parameter values.")
values = np.array([meta.get(param) for meta in frontier.metadata], dtype=float)
if not np.all(np.isfinite(values)):
raise ValueError(f"Metadata does not contain finite '{param}' values.")
if ax is None:
fig = plt.figure()
ax = fig.add_subplot(111, projection="3d")
ax.scatter(values, frontier._risk_vector(risk_label), frontier.returns, c=values, cmap="viridis")
ax.set_xlabel(param)
ax.set_ylabel(risk_label or frontier.risk_measure)
ax.set_zlabel("Expected Return")
return ax
[docs]
def plot_assumptions_3d(
mean: np.ndarray,
cov: np.ndarray,
*,
scenarios: Optional[np.ndarray] = None,
uncertainty_cov: Optional[np.ndarray] = None,
n_std: float = 1.0,
uncertainty_std: float = 1.0,
max_points: int = 2500,
titles: Optional[Tuple[str, str]] = None,
):
"""Plot mean/covariance assumptions (and optional mean uncertainty) in 3D.
The function projects high-dimensional inputs onto the first three
principal components implied by ``cov`` and renders:
- a covariance ellipsoid around the mean,
- optional scenario clouds,
- and, when provided, a second panel for the mean-uncertainty ellipsoid.
"""
plt = _require_matplotlib()
scenarios_arr = None
if scenarios is not None:
scenarios_arr = np.asarray(scenarios, dtype=float)
if scenarios_arr.ndim != 2:
raise ValueError("`scenarios` must be a 2D array.")
if scenarios_arr.shape[1] != np.asarray(mean, dtype=float).size:
raise ValueError("`scenarios` columns must match `mean` length.")
if scenarios_arr.shape[0] > max_points:
idx = np.linspace(0, scenarios_arr.shape[0] - 1, max_points).astype(int)
scenarios_arr = scenarios_arr[idx]
cov_arr = np.asarray(cov, dtype=float)
mean_3d, cov_3d, scen_3d, components = _project_to_3d(mean, cov_arr, scenarios_arr)
if uncertainty_cov is None:
fig = plt.figure(figsize=(6.5, 5.0))
ax_main = fig.add_subplot(111, projection="3d")
axes = (ax_main,)
else:
fig = plt.figure(figsize=(12.0, 5.0))
ax_main = fig.add_subplot(121, projection="3d")
ax_unc = fig.add_subplot(122, projection="3d")
axes = (ax_main, ax_unc)
def _style_axis(ax, title: str, labels: Tuple[str, str, str]):
"""Apply consistent formatting to 3D axes.
Args:
ax: Matplotlib 3D axis.
title: Axis title.
labels: Tuple of axis labels.
"""
ax.set_title(title)
ax.set_xlabel(labels[0])
ax.set_ylabel(labels[1])
ax.set_zlabel(labels[2])
ax.grid(True, alpha=0.2)
ax.view_init(elev=22, azim=35)
try: # matplotlib >= 3.3
ax.set_box_aspect((1, 1, 1))
except Exception: # pragma: no cover
pass
def _plot_principal_axes(ax, mean_vec, cov_mat, *, color: str, alpha: float):
"""Draw principal axis lines for a covariance ellipsoid.
Args:
ax: Matplotlib 3D axis.
mean_vec: Mean vector in 3D.
cov_mat: 3x3 covariance matrix.
color: Line color.
alpha: Line opacity.
"""
cov_mat = 0.5 * (cov_mat + cov_mat.T)
eigvals, eigvecs = np.linalg.eigh(cov_mat)
order = np.argsort(eigvals)[::-1]
eigvals = np.clip(eigvals[order], 0.0, None)
eigvecs = eigvecs[:, order]
lengths = np.sqrt(eigvals + 1e-16)
for idx in range(3):
vec = eigvecs[:, idx] * lengths[idx]
xs = [mean_vec[0] - vec[0], mean_vec[0] + vec[0]]
ys = [mean_vec[1] - vec[1], mean_vec[1] + vec[1]]
zs = [mean_vec[2] - vec[2], mean_vec[2] + vec[2]]
ax.plot(xs, ys, zs, color=color, alpha=alpha, linewidth=1.4)
def _plot_distribution(ax, mean_vec, cov_mat, scen_vec, title, *, cov_color, cov_alpha, labels):
"""Plot mean/covariance ellipsoid with optional scenarios.
Args:
ax: Matplotlib 3D axis.
mean_vec: Mean vector in 3D.
cov_mat: 3x3 covariance matrix.
scen_vec: Optional scenario projections.
title: Subplot title.
cov_color: Ellipsoid color.
cov_alpha: Ellipsoid opacity.
labels: Axis labels.
"""
x, y, z = _ellipsoid_mesh(cov_mat, n_std=n_std, n_points=32)
ax.plot_surface(
x + mean_vec[0],
y + mean_vec[1],
z + mean_vec[2],
color=cov_color,
alpha=cov_alpha,
linewidth=0.0,
shade=False,
)
if scen_vec is not None:
ax.scatter(
scen_vec[:, 0],
scen_vec[:, 1],
scen_vec[:, 2],
s=8,
alpha=0.15,
color="0.35",
)
ax.scatter(mean_vec[0], mean_vec[1], mean_vec[2], color="crimson", s=50)
_plot_principal_axes(ax, mean_vec, cov_mat, color=cov_color, alpha=0.6)
_style_axis(ax, title, labels)
try:
from matplotlib.lines import Line2D
from matplotlib.patches import Patch
handles = [
Line2D([0], [0], marker="o", color="w", markerfacecolor="crimson", markersize=6, label="Mean"),
Patch(facecolor=cov_color, alpha=cov_alpha, label="Covariance (1σ)"),
]
if scen_vec is not None:
handles.insert(
0,
Line2D([0], [0], marker=".", color="0.35", linestyle="", label="Scenarios"),
)
ax.legend(handles=handles, loc="upper left", frameon=True)
except Exception: # pragma: no cover
pass
titles = titles or ("Return distribution", "Mean uncertainty")
eigvals = np.linalg.eigvalsh(cov_arr)
eigvals = np.clip(eigvals, 0.0, None)
order = np.argsort(eigvals)[::-1]
explained = eigvals[order]
total = float(explained.sum()) if explained.sum() > 0 else 1.0
explained = explained[:3] / total
if explained.size < 3:
explained = np.pad(explained, (0, 3 - explained.size), constant_values=0.0)
labels = (
f"PC1 ({explained[0]:.0%})",
f"PC2 ({explained[1]:.0%})",
f"PC3 ({explained[2]:.0%})",
)
_plot_distribution(
ax_main,
mean_3d,
cov_3d,
scen_3d,
titles[0],
cov_color="#377eb8",
cov_alpha=0.22,
labels=labels,
)
if uncertainty_cov is not None:
mean_u, cov_u, _, _ = _project_to_3d(mean, uncertainty_cov, None, components=components)
x_u, y_u, z_u = _ellipsoid_mesh(cov_u, n_std=uncertainty_std, n_points=32)
ax_unc.plot_surface(
x_u + mean_u[0],
y_u + mean_u[1],
z_u + mean_u[2],
color="#ff7f00",
alpha=0.35,
linewidth=0.0,
shade=False,
)
ax_unc.scatter(mean_u[0], mean_u[1], mean_u[2], color="crimson", s=50)
_plot_principal_axes(ax_unc, mean_u, cov_u, color="#ff7f00", alpha=0.7)
_style_axis(ax_unc, titles[1], labels)
try:
from matplotlib.lines import Line2D
from matplotlib.patches import Patch
handles = [
Line2D([0], [0], marker="o", color="w", markerfacecolor="crimson", markersize=6, label="Mean"),
Patch(facecolor="#ff7f00", alpha=0.35, label="Mean uncertainty (1σ)"),
]
ax_unc.legend(handles=handles, loc="upper left", frameon=True)
except Exception: # pragma: no cover
pass
fig.tight_layout()
return fig, axes
[docs]
def plot_frontier_report(
frontier: PortfolioFrontier,
*,
selection: str = "min_risk",
selection_kwargs: Optional[Mapping[str, object]] = None,
risk_label: Optional[str] = None,
overlay_risk_labels: Optional[Sequence[str]] = None,
show_points: bool = True,
percent_axes: bool = True,
weights: Optional["pd.Series"] = None,
weights_title: str = "Weights",
figsize: Tuple[float, float] = (10.0, 4.0),
):
"""Compact report plot: frontier plus selected weights bar.
Args:
frontier: Frontier to plot.
selection: Selector key (e.g. ``min_risk``, ``tangency``).
selection_kwargs: Keyword arguments for the selector.
risk_label: Optional risk label for the frontier x-axis.
overlay_risk_labels: Optional list of risk labels to overlay.
show_points: Whether to draw frontier nodes.
percent_axes: Whether to format axes as percentages.
weights: Optional pre-selected weights (skip selector).
weights_title: Title for the weights subplot.
figsize: Figure size in inches.
Returns:
matplotlib.figure.Figure: Figure containing the report plot.
"""
plt = _require_matplotlib()
fig, axes = plt.subplots(1, 2, figsize=figsize, gridspec_kw={"width_ratios": [1.4, 1.0]})
ax_frontier, ax_weights = axes
selection_key = selection.lower()
selection_kwargs = dict(selection_kwargs or {})
selected_weights = weights
selected_risk = None
selected_return = None
if selected_weights is None:
if selection_key in {"min_risk", "minimum_risk"}:
selected_weights, selected_return, selected_risk = frontier.min_risk(
risk_label=risk_label
)
elif selection_key in {"max_return", "maximum_return"}:
selected_weights, selected_return, selected_risk = frontier.max_return()
elif selection_key in {"tangency", "max_sharpe", "sharpe"}:
rf = float(selection_kwargs.pop("risk_free_rate", 0.0))
selected_weights, selected_return, selected_risk = frontier.tangency(rf)
elif selection_key in {"risk_target", "max_return_subject_to_risk"}:
max_risk = float(selection_kwargs.pop("max_risk"))
selected_weights, selected_return, selected_risk = frontier.at_risk(
max_risk=max_risk, risk_label=risk_label
)
elif selection_key in {"return_target", "min_risk_subject_to_return"}:
min_return = float(selection_kwargs.pop("min_return"))
selected_weights, selected_return, selected_risk = frontier.at_return(
min_return=min_return, risk_label=risk_label
)
elif selection_key in {"risk_percentile", "risk_pct", "percentile"}:
pct = float(selection_kwargs.pop("percentile"))
selected_weights, selected_return, selected_risk = frontier.at_percentile(
pct, risk_label=risk_label
)
elif selection_key in {"risk_match", "risk_nearest"}:
target_risk = float(selection_kwargs.pop("target_risk"))
selected_weights, selected_return, selected_risk = frontier.closest_risk(
target_risk, risk_label=risk_label
)
elif selection_key in {"index", "column"}:
idx = int(selection_kwargs.pop("index"))
selected_weights = frontier._to_pandas(frontier.weights[:, idx], "Selected")
selected_return = float(frontier.returns[idx])
selected_risk = float(frontier._risk_vector(risk_label)[idx])
else:
raise ValueError(f"Unknown selection '{selection}'.")
highlight = (selection_key,) if selection_key in {"min_risk", "max_return", "tangency"} else ()
plot_frontiers(
frontier,
ax=ax_frontier,
highlight=highlight,
risk_free_rate=selection_kwargs.get("risk_free_rate"),
show_points=show_points,
percent_axes=percent_axes,
risk_label=risk_label,
overlay_risk_labels=overlay_risk_labels,
)
if selected_risk is not None and selected_return is not None and not highlight:
ax_frontier.scatter(selected_risk, selected_return, color="black", s=60, zorder=5)
plot_weights(
selected_weights,
ax=ax_weights,
title=weights_title,
kind="barh",
percent_axes=percent_axes,
)
fig.tight_layout()
return fig, axes
__all__ = [
"plot_frontiers",
"plot_frontiers_grid",
"plot_weights",
"plot_frontier_report",
"plot_robust_path",
"plot_param_impact",
"plot_robust_surface",
"plot_assumptions_3d",
]