"""Hierarchical clustering tools for proteomics data."""
import hashlib
import warnings
import numpy as np
import pandas as pd
import anndata as ad
from scipy import sparse
from scipy.cluster.hierarchy import fcluster, linkage
from scipy.spatial.distance import pdist
from proteopy.utils.anndata import check_proteodata
from proteopy.utils.parsers import (
_is_standard_hclustv_key,
_parse_hclustv_key_components,
_resolve_hclustv_keys,
_resolve_hclustv_cluster_key,
)
def _validate_linkage_and_values(
Z: np.ndarray,
values_df: pd.DataFrame,
) -> None:
"""
Validate linkage matrix and values DataFrame for clustering operations.
Parameters
----------
Z : np.ndarray
Linkage matrix from hierarchical clustering.
values_df : pd.DataFrame
DataFrame with variables as columns used to obtain the linkage matrix.
Raises
------
TypeError
If linkage matrix is not a numpy array or values is not a DataFrame.
ValueError
If linkage matrix has invalid shape or dimension mismatch with
values DataFrame.
"""
if not isinstance(Z, np.ndarray):
raise TypeError(
f"Expected linkage matrix to be numpy array, "
f"got {type(Z).__name__}."
)
if Z.ndim != 2 or Z.shape[1] != 4:
raise ValueError(
f"Invalid linkage matrix shape {Z.shape}. "
f"Expected (n-1, 4) for n observations."
)
if not isinstance(values_df, pd.DataFrame):
raise TypeError(
f"Expected values to be DataFrame, "
f"got {type(values_df).__name__}."
)
n_vars = values_df.shape[1]
expected_vars = Z.shape[0] + 1
if n_vars != expected_vars:
raise ValueError(
f"Dimension mismatch: linkage matrix has {expected_vars} leaves "
f"but values DataFrame has {n_vars} columns."
)
[docs]
def hclustv_tree(
adata: ad.AnnData,
selected_vars: list[str] | None = None,
group_by: str | None = None,
summary_method: str = "median",
linkage_method: str = "average",
distance_metric: str = "euclidean",
layer: str | None = None,
zero_to_na: bool = False,
fill_na: float | int | None = None,
z_transform: bool = True,
inplace: bool = True,
key_added: str | None = None,
verbose: bool = True,
) -> ad.AnnData | None:
"""
Perform hierarchical clustering on variables (peptides or proteins).
Computes a linkage matrix from variable profiles across samples or groups,
storing the result in ``adata.uns`` for downstream visualization or analysis.
Parameters
----------
adata : AnnData
:class:`~anndata.AnnData` with proteomics annotations.
selected_vars : list[str] | None
Explicit list of variables to include. When ``None``, all variables
are used.
group_by : str | None
Column in ``adata.obs`` used to group observations. When provided,
computes a summary statistic for each group rather than using
individual samples. Grouping can resolve NaN values through
aggregation (e.g., median of [1, NaN, 3] = 2).
summary_method : str
Method for computing group summaries when ``group_by`` is specified.
One of ``"median"`` or ``"mean"`` (alias ``"average"``).
linkage_method : str
Linkage criterion passed to :func:`scipy.cluster.hierarchy.linkage`.
Common options include ``"average"``, ``"complete"``, ``"single"``,
and ``"ward"``.
distance_metric : str
Distance metric for clustering. One of ``"euclidean"``, ``"manhattan"``,
or ``"cosine"``.
layer : str | None
Optional ``adata.layers`` key to draw quantification values from.
When ``None`` the primary matrix ``adata.X`` is used.
zero_to_na : bool
Replace zeros with ``NaN`` before computing profiles.
fill_na : float | int | None
Replace ``NaN`` values with the specified constant before summary
computation.
z_transform : bool
Standardize values to mean 0 and variance 1 per variable before
clustering. Variables with zero variance will be set to 0 (the mean)
after transformation.
inplace : bool
If ``True``, store results in ``adata.uns`` and return ``None``.
If ``False``, return a modified copy of ``adata``.
key_added : str | None
Custom key prefix for storing results in ``adata.uns``. When ``None``,
uses the default format ``'hclustv_linkage;<group_by>;<var_hash>;<layer>'``.
verbose : bool
Print storage location keys after computation.
Returns
-------
AnnData | None
If ``inplace=True``, returns ``None``.
If ``inplace=False``, returns a copy of ``adata`` with clustering
results stored in ``.uns``.
Notes
-----
The linkage matrix is stored at
``adata.uns['hclustv_linkage;<group_by>;<var_hash>;<layer>']``.
The profile values DataFrame (after all transformations) is stored at
``adata.uns['hclustv_values;<group_by>;<var_hash>;<layer>']``.
The ``var_hash`` is the first 8 characters of the MD5 hash of the sorted,
semicolon-joined variable names used for clustering.
When ``group_by`` is ``None``, the field is left empty in the key.
When ``layer`` is ``None``, ``'X'`` is used in the key.
Examples
--------
>>> import proteopy as pp
>>> adata = pp.datasets.example_peptide_data()
>>> pp.tl.hclustv_tree(adata, group_by="condition")
>>> # Linkage matrix stored in adata.uns['hclustv_linkage;condition;a1b2c3d4;X']
"""
check_proteodata(adata)
# Validate summary_method
summary_method = summary_method.lower()
if summary_method == "average":
summary_method = "mean"
if summary_method not in ("median", "mean"):
raise ValueError(
f"summary_method must be 'median' or 'mean', got '{summary_method}'."
)
# Validate distance_metric
distance_metric = distance_metric.lower()
if distance_metric not in ("euclidean", "manhattan", "cosine"):
raise ValueError(
f"distance_metric must be 'euclidean', 'manhattan', or 'cosine', "
f"got '{distance_metric}'."
)
# Map metric names to scipy pdist names
metric_map = {
"euclidean": "euclidean",
"manhattan": "cityblock",
"cosine": "cosine",
}
scipy_metric = metric_map[distance_metric]
# Validate ward linkage compatibility
if linkage_method == "ward" and distance_metric != "euclidean":
raise ValueError(
"linkage_method='ward' requires distance_metric='euclidean'. "
f"Got distance_metric='{distance_metric}'."
)
# Extract matrix
matrix = adata.layers[layer] if layer else adata.X
if matrix is None:
raise ValueError("Selected matrix is empty.")
if sparse.issparse(matrix):
matrix = matrix.toarray()
else:
matrix = np.asarray(matrix)
df = pd.DataFrame(
matrix,
index=adata.obs_names,
columns=adata.var_names,
)
# Filter variables if specified
if selected_vars is not None:
seen = set()
duplicates = [v for v in selected_vars if v in seen or seen.add(v)]
if duplicates:
raise ValueError(
f"Duplicate variables in selected_vars: {list(set(duplicates))}"
)
missing_vars = [v for v in selected_vars if v not in df.columns]
if missing_vars:
raise KeyError(
f"Variables not found in adata.var_names: {missing_vars}"
)
df = df[selected_vars]
if zero_to_na:
df = df.replace(0, np.nan)
if fill_na is not None:
df = df.fillna(fill_na)
# Group by if specified
if group_by is not None:
if group_by not in adata.obs.columns:
raise KeyError(f"Column '{group_by}' not found in adata.obs.")
groups = adata.obs[group_by]
df["__group__"] = groups.values
# Compute group summaries
if summary_method == "median":
profile_df = df.groupby("__group__", observed=True).apply(
lambda x: x.median(skipna=True),
include_groups=False,
)
else:
profile_df = df.groupby("__group__", observed=True).apply(
lambda x: x.mean(skipna=True),
include_groups=False,
)
else:
profile_df = df
# Validate sufficient rows (observations/groups) for clustering
if profile_df.shape[0] < 2:
raise ValueError(
"At least 2 observations or groups are required for hierarchical "
f"clustering. Got {profile_df.shape[0]} after grouping."
)
# Drop variables with all NaN (operates on columns since vars are columns)
profile_df = profile_df.dropna(axis=1, how="all")
if profile_df.empty or profile_df.shape[1] < 2:
raise ValueError(
"At least 2 variables are required for hierarchical clustering."
)
# Check for remaining NaN values - clustering does not support NaN
nan_count = profile_df.isna().sum().sum()
if nan_count > 0:
nan_vars = profile_df.columns[profile_df.isna().any()].tolist()
suffix = "..." if len(nan_vars) > 5 else ""
raise ValueError(
f"Clustering does not support NaN values. Found {nan_count} NaN "
f"values in {len(nan_vars)} variables: {nan_vars[:5]}{suffix}. "
"Use fill_na parameter or preprocess data to handle missing values."
)
# Optionally compute z-scores per variable (column)
if z_transform:
col_means = profile_df.mean(axis=0, skipna=True)
col_stds = profile_df.std(axis=0, skipna=True, ddof=1)
zero_var_cols = col_stds[col_stds == 0].index.tolist()
if zero_var_cols:
suffix = "..." if len(zero_var_cols) > 5 else ""
warnings.warn(
f"{len(zero_var_cols)} variables have zero variance and will "
f"be set to 0 after z-transform: {zero_var_cols[:5]}{suffix}"
)
col_stds = col_stds.replace(0, np.nan) # avoid division by zero
profile_df = (profile_df - col_means) / col_stds
# Fill NaN from zero-variance columns with 0 (the mean)
profile_df = profile_df.fillna(0)
profile_values_df = profile_df.copy()
# Transpose for clustering: pdist computes distances between rows
# We want distances between variables, so vars must be rows
clustering_matrix = profile_df.T
# Compute pairwise distances and linkage
dist_matrix = pdist(clustering_matrix.values, metric=scipy_metric)
Z = linkage(dist_matrix, method=linkage_method)
# Compute var_hash from the actual clustered variables
clustered_vars = list(profile_values_df.columns)
var_hash = hashlib.md5(
";".join(sorted(clustered_vars)).encode()
).hexdigest()[:8]
# Build storage keys
group_by_str = group_by if group_by is not None else ""
layer_str = layer if layer is not None else "X"
if key_added is not None:
linkage_key = key_added
values_key = f"{key_added}_values"
else:
linkage_key = f"hclustv_linkage;{group_by_str};{var_hash};{layer_str}"
values_key = f"hclustv_values;{group_by_str};{var_hash};{layer_str}"
# Store results
if inplace:
adata.uns[linkage_key] = Z
adata.uns[values_key] = profile_values_df
check_proteodata(adata)
else:
adata_out = adata.copy()
adata_out.uns[linkage_key] = Z
adata_out.uns[values_key] = profile_values_df
check_proteodata(adata_out)
if verbose:
print(
f"Linkage matrix stored in adata.uns['{linkage_key}']\n"
f"Profile values stored in adata.uns['{values_key}']"
)
if inplace:
return None
else:
return adata_out
[docs]
def hclustv_cluster_ann(
adata: ad.AnnData,
k: int,
linkage_key: str = 'auto',
values_key: str = 'auto',
inplace: bool = True,
key_added: str | None = None,
verbose: bool = True,
) -> ad.AnnData | None:
"""
Annotate variables with cluster assignments from hierarchical clustering.
Uses :func:`scipy.cluster.hierarchy.fcluster` to cut the dendrogram
at ``k`` clusters and stores cluster assignments in ``.var``.
Parameters
----------
adata : AnnData
:class:`~anndata.AnnData` with hierarchical clustering results stored
in ``.uns`` (from :func:`~proteopy.tl.hclustv_tree`).
k : int
Number of clusters to generate (required).
linkage_key : str
Key in ``adata.uns`` containing the linkage matrix. When ``'auto'``,
auto-detects the linkage key if exactly one
``'hclustv_linkage;...'`` key exists. When multiple keys are present,
must be specified explicitly.
values_key : str
Key in ``adata.uns`` containing the values DataFrame. When ``'auto'``,
auto-detects the values key if exactly one
``'hclustv_values;...'`` key exists. When multiple keys are present,
must be specified explicitly.
inplace : bool
If ``True``, store results in ``adata.var`` and return ``None``.
If ``False``, return a modified copy of ``adata``.
key_added : str | None
Custom key for storing results in ``adata.var``. When ``None``,
uses the default format
``'hclustv_cluster;<group_by>;<var_hash>;<layer>'`` derived from
the linkage key components.
verbose : bool
Print storage location key after computation.
Returns
-------
AnnData | None
If ``inplace=True``, returns ``None``.
If ``inplace=False``, returns a copy of ``adata`` with cluster
annotations stored in ``.var``.
Raises
------
ValueError
If no hierarchical clustering results are found in ``adata.uns``.
If multiple clustering results exist and ``linkage_key`` is not
specified.
If linkage matrix has invalid shape.
If ``k < 2`` (single cluster is semantically meaningless).
If auto-generated storage key cannot be derived from a custom
linkage key.
TypeError
If linkage matrix is not a numpy array.
KeyError
If specified ``linkage_key`` is not found in ``adata.uns``.
Notes
-----
Cluster assignments are stored at
``adata.var['hclustv_cluster;<group_by>;<var_hash>;<layer>']``
Variables not included in the clustering (e.g., filtered out due to
NaN values) will have ``NaN`` in this column.
Examples
--------
>>> import proteopy as pr
>>> adata = pr.datasets.karayel_2020()
>>> pr.tl.hclustv_tree(
... adata, group_by="condition", selected_vars=adata.vars[0:1000]
... )
>>> pr.tl.hclustv_cluster_ann(adata, 5)
Access cluster assignments:
>>> adata.var['hclustv_cluster;condition;a1b2c3d4;X']
"""
check_proteodata(adata)
# Resolve linkage and values keys
linkage_key, values_key = _resolve_hclustv_keys(
adata, linkage_key, values_key, verbose
)
Z = adata.uns[linkage_key]
values_df = adata.uns[values_key]
_validate_linkage_and_values(Z, values_df)
n_vars = values_df.shape[1]
if k < 2:
raise ValueError(
"k must be at least 2. A single cluster is semantically "
"meaningless for cluster assignments."
)
if k > n_vars:
if verbose:
print(
f"k={k} exceeds number of variables ({n_vars}). "
f"Limiting to k={n_vars}."
)
k = n_vars
labels = fcluster(Z, t=k, criterion="maxclust")
var_names = values_df.columns.tolist()
cluster_map = dict(zip(var_names, labels))
# Zero-pad cluster numbers for correct alphanumeric sorting
n_digits = len(str(k))
# Build cluster annotation series for adata.var
cluster_annotations = pd.Series(
index=adata.var_names,
dtype="object",
)
for var_name, cluster_id in cluster_map.items():
cluster_annotations[var_name] = f"{cluster_id:0{n_digits}d}"
# Build storage key
if key_added is not None:
cluster_key = key_added
elif _is_standard_hclustv_key(linkage_key, "linkage"):
components = _parse_hclustv_key_components(linkage_key)
group_by_str, var_hash, layer_str = components
cluster_key = f"hclustv_cluster;{group_by_str};{var_hash};{layer_str}"
else:
raise ValueError(
f"Cannot auto-generate storage key from custom linkage_key "
f"'{linkage_key}'. Please provide key_added explicitly."
)
if inplace:
adata.var[cluster_key] = cluster_annotations
check_proteodata(adata)
if verbose:
print(f"Cluster annotations stored in adata.var['{cluster_key}']")
return None
else:
adata_out = adata.copy()
adata_out.var[cluster_key] = cluster_annotations
check_proteodata(adata_out)
if verbose:
print(f"Cluster annotations stored in adata.var['{cluster_key}']")
return adata_out
[docs]
def hclustv_profiles(
adata: ad.AnnData,
cluster_key: str = 'auto',
layer: str | None = None,
group_by: str | None = None,
method: str = "median",
zero_to_na: bool = False,
fill_na: float | int | None = None,
skip_na: bool = True,
inplace: bool = True,
key_added: str | None = None,
verbose: bool = True,
) -> ad.AnnData | None:
"""
Compute cluster profiles from cluster annotations.
Summarizes variables within each cluster using mean or median to create
cluster profile intensities across all observations.
Parameters
----------
adata : AnnData
:class:`~anndata.AnnData` with cluster annotations in ``.var``
(from :func:`~proteopy.tl.hclustv_cluster_ann`).
cluster_key : str
Column in ``adata.var`` containing cluster assignments. When
``'auto'``, auto-detects from available ``'hclustv_cluster;...'``
columns. When multiple columns exist, must be specified explicitly.
layer : str | None
Layer to use for computing profiles. When ``None``, uses ``adata.X``.
group_by : str | None
Column in ``adata.obs`` to group observations by before computing
cluster profiles. When specified, observations are first summarized
by this column using ``method``, then cluster profiles are computed
on the grouped data.
method : str
Summarization method for computing cluster profiles. One of ``"mean"``
or ``"median"``. Also used for grouping observations when ``group_by``
is specified.
zero_to_na : bool
If ``True``, convert zeros in the data matrix to ``np.nan`` before
any computation.
fill_na : float | int | None
If specified, replace ``np.nan`` values with this constant before
computing profiles. Applied after ``zero_to_na``.
skip_na : bool
If ``True``, exclude ``np.nan`` values when computing summaries.
If ``False``, return ``np.nan`` if any value in the group is ``np.nan``.
inplace : bool
If ``True``, store results in ``adata.uns`` and return ``None``.
If ``False``, return a modified copy of ``adata``.
key_added : str | None
Custom key for storing results in ``adata.uns``. When ``None``,
uses the default format
``'hclustv_profiles;<group_by>;<var_hash>;<layer>'`` derived from
the cluster key components.
verbose : bool
Print storage location key after computation.
Returns
-------
AnnData | None
If ``inplace=True``, returns ``None``.
If ``inplace=False``, returns a copy of ``adata`` with cluster
profiles stored in ``.uns``.
Raises
------
ValueError
If no cluster annotations are found in ``adata.var``.
If multiple cluster columns exist and ``cluster_key`` is not
specified.
If ``method`` is not ``"mean"`` or ``"median"``.
If auto-generated storage key cannot be derived.
KeyError
If specified ``cluster_key`` is not found in ``adata.var``.
If specified ``layer`` is not found in ``adata.layers``.
If specified ``group_by`` column is not found in ``adata.obs``.
Notes
-----
The cluster profiles DataFrame is stored at
``adata.uns['hclustv_profiles;<group_by>;<var_hash>;<layer>']``.
Examples
--------
>>> import proteopy as pr
>>> adata = pr.datasets.karayel_2020()
>>> pr.tl.hclustv_tree(adata, group_by="condition")
>>> pr.tl.hclustv_cluster_ann(adata, 5)
>>> pr.tl.hclustv_profiles(adata)
"""
check_proteodata(adata)
method = method.lower()
if method not in ("mean", "median"):
raise ValueError(
f"method must be 'mean' or 'median', got '{method}'."
)
resolved_key = _resolve_hclustv_cluster_key(
adata,
cluster_key=cluster_key,
verbose=verbose,
)
cluster_col = adata.var[resolved_key]
# Get data matrix
if layer is not None:
if layer not in adata.layers:
raise KeyError(f"Layer '{layer}' not found in adata.layers.")
matrix = adata.layers[layer]
else:
matrix = adata.X
if sparse.issparse(matrix):
matrix = matrix.toarray()
else:
matrix = np.asarray(matrix)
# Create DataFrame with obs as rows, vars as columns
df = pd.DataFrame(
matrix,
index=adata.obs_names,
columns=adata.var_names,
)
if zero_to_na:
df = df.replace(0, np.nan)
if fill_na is not None:
df = df.fillna(fill_na)
# Group by obs column if specified
if group_by is not None:
if group_by not in adata.obs.columns:
raise KeyError(
f"group_by column '{group_by}' not found in adata.obs."
)
groups = adata.obs[group_by]
if method == "mean":
df = df.groupby(groups).apply(
lambda x: x.mean(skipna=skip_na), include_groups=False
)
elif method == "median":
df = df.groupby(groups).apply(
lambda x: x.median(skipna=skip_na), include_groups=False
)
# Get unique clusters (excluding NaN)
clusters = cluster_col.dropna().unique()
clusters = sorted(clusters)
if len(clusters) == 0:
raise ValueError("No cluster assignments found in the specified column.")
# Compute profiles for each cluster
cluster_profiles = {}
for cluster_id in clusters:
cluster_vars = cluster_col[cluster_col == cluster_id].index.tolist()
if not cluster_vars:
continue
cluster_data = df[cluster_vars]
if method == "mean":
cluster_profiles[cluster_id] = cluster_data.mean(
axis=1, skipna=skip_na
)
elif method == "median":
cluster_profiles[cluster_id] = cluster_data.median(
axis=1, skipna=skip_na
)
profiles_df = pd.DataFrame(cluster_profiles)
# Build storage key
if key_added is not None:
profiles_key = key_added
elif resolved_key.startswith("hclustv_cluster;"):
# Extract components from cluster key
parts = resolved_key.split(";")
if len(parts) == 4:
group_by_str, var_hash = parts[1], parts[2]
# Use actual layer if specified, otherwise "X" for adata.X
if layer is not None:
layer_str = layer
else:
layer_str = "X"
profiles_key = (
f"hclustv_profiles;{group_by_str};{var_hash};{layer_str}"
)
else:
raise ValueError(
f"Cannot auto-generate storage key from cluster key "
f"'{resolved_key}'. Please provide key_added explicitly."
)
else:
raise ValueError(
f"Cannot auto-generate storage key from custom cluster_key "
f"'{resolved_key}'. Please provide key_added explicitly."
)
if inplace:
adata.uns[profiles_key] = profiles_df
check_proteodata(adata)
if verbose:
print(f"Cluster profiles stored in adata.uns['{profiles_key}']")
return None
else:
adata_out = adata.copy()
adata_out.uns[profiles_key] = profiles_df
check_proteodata(adata_out)
if verbose:
print(f"Cluster profiles stored in adata.uns['{profiles_key}']")
return adata_out