import re
from collections import defaultdict
from functools import partial
from typing import Callable, List, Dict, Union, Optional
import numpy as np
import pandas as pd
import anndata as ad
from scipy import sparse
from proteopy.utils.anndata import check_proteodata, is_proteodata
def _aggregate_var_value(series):
"""Collapse a var-annotation series into a single value.
Unique non-NaN values are joined with ``';'``.
"""
uniq = sorted(set(series.dropna().astype(str)))
if len(uniq) == 0:
return np.nan
if len(uniq) == 1:
return uniq[0]
return ";".join(uniq)
def _rebuild_adata(adata, X_new, var_new, inplace):
"""Create or reinitialise an AnnData from aggregated data."""
if inplace:
adata._init_as_actual(
X=X_new,
obs=adata.obs.copy(),
var=var_new,
uns=adata.uns,
obsm=adata.obsm,
varm={},
varp={},
layers={},
obsp=(
adata.obsp
if hasattr(adata, "obsp") else None
),
)
adata.var_names = var_new.index
return None
out = ad.AnnData(
X=X_new,
obs=adata.obs.copy(),
var=var_new.copy(),
uns=adata.uns.copy(),
obsm=adata.obsm.copy(),
obsp=(
adata.obsp.copy()
if hasattr(adata, "obsp") else None
),
)
out.var_names = var_new.index
return out
def _apply_grouped_func(grouped, func):
"""Dispatch aggregation for *func* (str or callable)."""
if isinstance(func, str):
if func == "sum":
return grouped.sum(min_count=1)
if func == "mean":
return grouped.mean()
if func == "median":
return grouped.median()
if func == "max":
return grouped.max()
raise ValueError(
"Unsupported func. Choose from "
"'sum', 'mean', 'median', or 'max'."
)
if callable(func):
result = grouped.aggregate(func)
if isinstance(result, pd.Series):
result = result.to_frame().T
if not isinstance(result, pd.DataFrame):
raise TypeError(
"Callable `func` must return a "
"DataFrame or Series per group."
)
return result
raise TypeError(
"`func` must be either a string "
"identifier or a callable."
)
def _find_root(parent, x):
"""Find root of *x* with path compression (iterative)."""
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def _union_find_groups(peptides):
"""Return ``{representative: [members]}`` via union-find
on substring containment."""
parent = {p: p for p in peptides}
rank = {p: 0 for p in peptides}
peps_by_len = sorted(peptides, key=len, reverse=True)
for i, longp in enumerate(peps_by_len):
for shortp in peps_by_len[i + 1:]:
if shortp in longp:
ra = _find_root(parent, longp)
rb = _find_root(parent, shortp)
if ra != rb:
if rank[ra] < rank[rb]:
ra, rb = rb, ra
parent[rb] = ra
if rank[ra] == rank[rb]:
rank[ra] += 1
buckets = defaultdict(list)
for p in peptides:
buckets[_find_root(parent, p)].append(p)
groups = {}
for _, members in buckets.items():
rep = max(members, key=len)
groups[rep] = sorted(
members, key=lambda s: (-len(s), s),
)
return groups
def _group_peptides(peptides: List[str]) -> Dict[str, List[str]]:
"""
Group peptides so that any peptide fully contained
in another belongs to the same group.
Returns {representative_longest: [members...]} with
members sorted by (-len, lexicographically).
"""
peptides = [
p for p in
pd.Series(peptides).dropna().astype(str).tolist()
]
peptides = list(dict.fromkeys(peptides))
return _union_find_groups(peptides)
[docs]
def summarize_overlapping_peptides(
adata: ad.AnnData,
peptide_col: str = "peptide_id",
group_by: str = "peptide_group",
layer: str | None = None,
func: Union[str, Callable] = "sum",
inplace: bool = True,
):
"""
Aggregate intensities across peptides sharing the
same ``group_col``.
- Sums intensities in ``adata.X`` within each group
(NaN-aware).
- Uses the longest peptide_id in each group as the
new var_name.
- Keeps both the representative peptide_id as a column
and index.
- Retains the grouping key as ``'peptide_group'``.
- Concatenates differing ``.var`` annotations using
``';'``.
Parameters
----------
adata : AnnData
Input AnnData with quantitative data and
annotations.
peptide_col : str
Column in ``adata.var`` containing peptide
identifiers (or will be created from var_names).
group_by : str
Column in ``adata.var`` specifying grouping
(e.g. ``'peptide_group'`` or ``'proteoform_id'``).
layer : str, optional
Key in ``adata.layers`` specifying which matrix
to aggregate; defaults to ``.X``.
func : {'sum', 'mean', 'median', 'max'} or Callable
Aggregation applied across peptides in each group.
inplace : bool
If True, modifies adata in place. Otherwise returns a new AnnData.
Returns
-------
AnnData | None
Aggregated AnnData if inplace=False, else modifies in place.
"""
# --- safety checks
if group_by not in adata.var.columns:
raise KeyError(f"'{group_by}' not found in adata.var")
if peptide_col not in adata.var.columns:
# fallback: use var_names as peptide identifiers
adata.var[peptide_col] = adata.var_names.astype(str)
# --- matrix as DataFrame (obs × vars)
vals = pd.DataFrame(
adata.layers[layer] if layer is not None else adata.X,
index=adata.obs_names,
columns=adata.var_names,
)
# --- group columns and aggregate
group_keys = adata.var[group_by].astype(str)
grouped = vals.T.groupby(
group_keys, sort=True, observed=True,
)
agg_vals = _apply_grouped_func(grouped, func).T
# --- build new var table (aggregate annotations)
groups = adata.var.groupby(
group_by, sort=True, observed=True,
)
records, group_to_peptide = [], {}
for gkey, df_g in groups:
# pick longest peptide_id (tie-break lexicographic)
longest_pep = sorted(
df_g[peptide_col].astype(str),
key=lambda s: (-len(s), s),
)[0]
group_to_peptide[str(gkey)] = longest_pep
rec = {
group_by: str(gkey),
peptide_col: longest_pep,
"n_grouped": len(df_g),
}
# aggregate all other var columns
for col in adata.var.columns:
if col in (group_by, peptide_col):
continue
rec[col] = _aggregate_var_value(df_g[col])
records.append(rec)
var_new = pd.DataFrame.from_records(records).set_index(peptide_col)
# --- rename aggregated matrix columns from group key → longest peptide
agg_vals.columns = [group_to_peptide[k] for k in agg_vals.columns]
var_new = var_new.loc[agg_vals.columns] # ensure same order
# --- ensure 'peptide_id' column always matches index
var_new[peptide_col] = var_new.index
# --- rebuild AnnData
return _rebuild_adata(
adata, agg_vals.values, var_new, inplace,
)
def quantify_by_category(
adata: ad.AnnData,
group_by: str = None,
layer=None,
func: Union[str, Callable] = "sum",
inplace: bool = True,
) -> Optional[ad.AnnData]:
"""
Aggregate intensities in ``adata.X`` (or selected
layer) by ``.var[group_col]``, aggregate annotations
in ``adata.var`` by concatenating unique values with
``';'``, and set ``group_col`` as the new index
(``var_names``).
Parameters
----------
adata : AnnData
Input AnnData with .X (obs x vars) and .var annotations.
group_by : str
Column in adata.var to group by (e.g. 'protein_id').
layer : str, optional
Key in ``adata.layers``; when set, quantification
uses that layer instead of ``adata.X``.
func : {'sum', 'median', 'max'} | Callable
Aggregation to apply across grouped variables.
inplace : bool
If True, modify `adata` in place; else return a new AnnData.
Returns
-------
AnnData | None
Aggregated AnnData if inplace=False; otherwise None.
"""
if group_by is None or group_by not in adata.var.columns:
raise KeyError(f"'{group_by}' not found in adata.var")
# --- Matrix as DataFrame (obs × vars)
vals = pd.DataFrame(
adata.layers[layer] if layer is not None else adata.X,
index=adata.obs_names,
columns=adata.var_names,
)
# --- Group columns and aggregate
group_keys = adata.var[group_by].astype(str)
grouped = vals.T.groupby(
group_keys, sort=True, observed=True,
)
agg_vals = _apply_grouped_func(grouped, func).T
# --- Build new var table (aggregate annotations per group)
records = []
for gkey, df_g in adata.var.groupby(
group_by, sort=True, observed=True,
):
rec = {group_by: str(gkey)}
for col in adata.var.columns:
if col == group_by:
continue
rec[col] = _aggregate_var_value(df_g[col])
records.append(rec)
var_new = pd.DataFrame.from_records(records).set_index(group_by)
# align var rows to aggregated matrix columns
var_new = var_new.loc[agg_vals.columns]
var_new[group_by] = var_new.index
var_new.index.name = None
# --- Rebuild AnnData
return _rebuild_adata(
adata, agg_vals.values, var_new, inplace,
)
quantify_proteins = partial(
quantify_by_category,
group_by='protein_id',
)
quantify_proteoforms = partial(
quantify_by_category,
group_by='proteoform_id',
)
def _validate_summarize_mods_input(
adata, method, zero_to_na, fill_na, keep_var_cols,
):
"""Validate arguments for ``summarize_modifications``."""
_, level = is_proteodata(adata)
if level != "peptide":
raise ValueError(
"summarize_modifications requires "
"peptide-level proteodata (adata.var "
"must contain 'peptide_id')."
)
allowed = {"sum", "mean", "median", "max"}
if method not in allowed:
raise ValueError(
f"method must be one of {allowed!r}, "
f"got '{method}'."
)
if zero_to_na and fill_na is not None:
raise ValueError(
"Cannot set both zero_to_na and fill_na."
)
if keep_var_cols is not None:
missing = [
c for c in keep_var_cols
if c not in adata.var.columns
]
if missing:
raise KeyError(
f"keep_var_cols entries not found in "
f"adata.var: {missing}"
)
_reserved = {
"peptide_id", "protein_id",
"n_peptidoforms", "n_modifications",
}
overlap = [
c for c in keep_var_cols
if c in _reserved
]
if overlap:
raise ValueError(
f"keep_var_cols must not include "
f"reserved columns: {overlap}"
)
def _count_modifications(peptide_ids, pattern):
"""Count unique (position, text) modification sites."""
all_mods = set()
for pid in peptide_ids:
removed = 0
for m in pattern.finditer(pid):
pos = m.start() - removed
all_mods.add((pos, m.group()))
removed += len(m.group())
return len(all_mods)
def _build_var_table_mods(
var_src, stripped, sort, pattern, keep_var_cols,
):
"""Build the new ``.var`` table for
``summarize_modifications``."""
records = []
for gkey, df_g in var_src.groupby(
stripped, sort=sort, observed=True,
):
rec = {"peptide_id": gkey}
pids = df_g["protein_id"].unique()
if len(pids) > 1:
raise ValueError(
f"Stripped sequence '{gkey}' maps to "
f"multiple protein_ids: "
f"{pids.tolist()}. Cannot summarize "
f"modifications across different "
f"proteins."
)
rec["protein_id"] = pids[0]
rec["n_peptidoforms"] = len(df_g)
rec["n_modifications"] = _count_modifications(
df_g.index, pattern,
)
for col in (keep_var_cols or []):
rec[col] = _aggregate_var_value(df_g[col])
records.append(rec)
var_new = pd.DataFrame.from_records(records)
var_new = var_new.set_index("peptide_id")
var_new.index.name = None
var_new["peptide_id"] = var_new.index
return var_new
def _apply_str_method(grouped, method):
"""Dispatch named aggregation *method* (string only)."""
if method == "sum":
return grouped.sum(min_count=1)
if method == "mean":
return grouped.mean()
if method == "median":
return grouped.median()
return grouped.max()
[docs]
def summarize_modifications(
adata: ad.AnnData,
mod_regex: str = r"\s*\(.*?\)",
method: str = "sum",
layer: str | None = None,
zero_to_na: bool = False,
fill_na: float | int | None = None,
skip_na: bool = True,
sort: bool = True,
keep_var_cols: list[str] | None = None,
inplace: bool = True,
verbose: bool = False,
) -> ad.AnnData | None:
"""
Aggregate modified peptides by their stripped sequence.
Removes modification annotations from peptide identifiers
using a regular expression, groups peptides sharing the
same stripped sequence, and summarizes intensities with the
chosen aggregation method.
Parameters
----------
adata : AnnData
:class:`~anndata.AnnData` with peptide-level data.
mod_regex : str, optional
Regular expression matching modification annotations
to strip from peptide identifiers.
method : {'sum', 'mean', 'median', 'max'}, optional
Aggregation applied to each group of modified
peptides.
layer : str, optional
Key in ``adata.layers`` to use instead of
``adata.X``.
zero_to_na : bool, optional
If True, replace zeros with ``np.nan`` before
aggregation.
fill_na : float or int, optional
Replace ``np.nan`` values with this constant before
aggregation.
skip_na : bool, optional
If True, ignore ``np.nan`` during aggregation. If
False, any ``np.nan`` in a group produces ``np.nan``
in the result.
sort : bool, optional
If True, sort the output variables alphabetically
by stripped sequence. If False, preserve the order
of first appearance.
keep_var_cols : list of str, optional
Additional ``.var`` columns to carry over to the
output. Values are aggregated per group (unique
values joined by ``';'``). ``'peptide_id'`` and
``'protein_id'`` are always included.
inplace : bool, optional
If True, modify ``adata`` in place. Otherwise return
a new AnnData.
verbose : bool, optional
Print status messages.
Returns
-------
AnnData or None
Aggregated AnnData when ``inplace=False``; otherwise
None. Two columns are added to ``.var``:
``n_peptidoforms`` (total variants per stripped
sequence) and ``n_modifications`` (unique
modification sites, identified by their position
in the stripped sequence and the matched
annotation text, collected across all
peptidoforms in the group).
Examples
--------
**UniMod notation** (default ``mod_regex``).
>>> import proteopy as pr
>>> import numpy as np
>>> import pandas as pd
>>> from anndata import AnnData
>>> pids = [
... "AAADLM(UniMod:35)AYC(UniMod:4)EAHAKEDPLLTPVPASENPFR",
... "AAADLMAYC(UniMod:4)EAHAKEDPLLTPVPASENPFR",
... "AAADLMAYCEAHAKEDPLLTPVPASENPFR",
... "VVDLMR",
... ]
>>> var = pd.DataFrame(
... {
... "peptide_id": pids,
... "protein_id": ["P1", "P1", "P1", "P2"],
... },
... index=pids,
... )
>>> obs = pd.DataFrame(
... {"sample_id": ["s1", "s2"]},
... index=["s1", "s2"],
... )
>>> X = np.array([
... [100.0, 200.0, np.nan, 50.0],
... [150.0, 100.0, 300.0, 75.0],
... ])
>>> adata = AnnData(X=X, obs=obs, var=var)
>>> result = pr.pp.summarize_modifications(
... adata,
... method="sum",
... inplace=False,
... )
>>> result.var_names.tolist()
['AAADLMAYCEAHAKEDPLLTPVPASENPFR', 'VVDLMR']
>>> result.X
array([[300., 50.],
[550., 75.]])
>>> result.var["n_peptidoforms"].tolist()
[3, 1]
>>> result.var["n_modifications"].tolist()
[2, 0]
With ``skip_na=False``, any NaN in a group propagates:
>>> result = pr.pp.summarize_modifications(
... adata,
... method="sum",
... skip_na=False,
... inplace=False,
... )
>>> result.X
array([[ nan, 50.],
[550., 75.]])
With ``method="max"``, the maximum intensity per
group is retained:
>>> result = pr.pp.summarize_modifications(
... adata,
... method="max",
... inplace=False,
... )
>>> result.X
array([[200., 50.],
[300., 75.]])
**Mass-shift notation** (e.g. ``M[+16]``) requires a
custom ``mod_regex``:
>>> pids_ms = [
... "AAADLM[+16]AYC[+57]R",
... "AAADLMAYC[+57]R",
... "AAADLMAYCR",
... ]
>>> var_ms = pd.DataFrame(
... {
... "peptide_id": pids_ms,
... "protein_id": ["P1", "P1", "P1"],
... },
... index=pids_ms,
... )
>>> obs_ms = pd.DataFrame(
... {"sample_id": ["s1"]},
... index=["s1"],
... )
>>> X_ms = np.array([[10.0, 20.0, 30.0]])
>>> adata_ms = AnnData(
... X=X_ms,
... obs=obs_ms,
... var=var_ms,
... )
>>> result = pr.pp.summarize_modifications(
... adata_ms,
... mod_regex="\\[.*?\\]",
... method="sum",
... inplace=False,
... )
>>> result.var_names.tolist()
['AAADLMAYCR']
>>> result.X
array([[60.]])
>>> result.var["n_peptidoforms"].tolist()
[3]
>>> result.var["n_modifications"].tolist()
[2]
"""
# --- validate input
check_proteodata(adata, layers=layer)
_validate_summarize_mods_input(
adata, method, zero_to_na, fill_na, keep_var_cols,
)
# --- extract matrix
Xsrc = (
adata.layers[layer] if layer is not None
else adata.X
)
was_sparse = sparse.issparse(Xsrc)
X = Xsrc.toarray() if was_sparse else np.asarray(Xsrc)
X = X.astype(float, copy=True)
if zero_to_na:
X[X == 0] = np.nan
if fill_na is not None:
X[np.isnan(X)] = fill_na
# --- strip modifications from peptide identifiers
peptide_ids = adata.var["peptide_id"].astype(str).values
try:
pattern = re.compile(mod_regex)
except re.error as exc:
raise ValueError(
f"Invalid mod_regex '{mod_regex}': {exc}"
) from exc
stripped = np.array([
pattern.sub("", pid) for pid in peptide_ids
])
if verbose:
n_unique = len(np.unique(stripped))
print(
f"Stripping modifications: "
f"{len(peptide_ids)} peptides -> "
f"{n_unique} unique stripped sequences "
f"(method='{method}')."
)
# --- aggregate matrix by stripped sequence
df = pd.DataFrame(
X,
index=adata.obs_names,
columns=peptide_ids,
)
grouped = df.T.groupby(stripped, sort=sort)
agg_vals = _apply_str_method(grouped, method).T
if not skip_na:
has_nan = df.isna().T.groupby(
stripped, sort=sort,
).any().T
agg_vals[has_nan] = np.nan
# --- build new .var table
var_new = _build_var_table_mods(
adata.var.copy(), stripped, sort,
pattern, keep_var_cols,
)
# --- result matrix
X_new = agg_vals.values
if was_sparse:
X_new = sparse.csr_matrix(X_new)
# --- rebuild AnnData
result = _rebuild_adata(
adata, X_new, var_new, inplace,
)
if inplace:
check_proteodata(adata)
else:
check_proteodata(result)
return result