Source code for proteopy.pp.normalization
import numpy as np
import pandas as pd
from scipy import sparse
from proteopy.utils.anndata import check_proteodata
from proteopy.utils.array import is_log_transformed
[docs]
def normalize_median(
adata,
method: str,
log_space: bool,
fill_na: float | None = None,
zeros_to_na: bool = False,
batch_id: str | None = None,
inplace: bool = True,
force: bool = False,
):
"""
Median normalization of intensities.
Parameters
----------
adata : AnnData
Input AnnData.
method : {'max_ref', 'median_ref'}
How to choose the reference across sample medians. ``'max_ref'`` uses
the maximum sample median, ``'median_ref'`` uses the median of sample
medians.
log_space : bool
Whether the input intensities are log-transformed. Mismatches with
automatic detection raise unless ``force=True``.
fill_na : float, optional
Temporarily replace non-finite entries with this value for the median
computation only; original values are restored afterward.
zeros_to_na : bool, default False
Treat zeros as missing for the median computation only; original zeros
are restored afterward.
batch_id : str, optional
Column in ``adata.obs`` to perform normalization within batches.
inplace : bool, default True
Modify ``adata`` in place. If False, return a copy.
force : bool, default False
Proceed even if ``log_space`` disagrees with automatic log detection.
Notes
-----
Median normalization:
``log_space=True``
``X + ref - sample_median``
``log_space=False``
``X * ref / sample_median``
``'max_ref'``
reference = max of sample medians (within batch if per_batch)
``'median_ref'``
reference = median of sample medians (within batch if per_batch)
Returns
-------
AnnData or None
Normalized AnnData when ``inplace`` is False; otherwise None.
pandas.DataFrame, optional
Per-sample factors when ``inplace`` is False.
"""
check_proteodata(adata)
per_batch = batch_id
method = method.lower()
allowed_methods = {"max_ref", "median_ref"}
if method not in allowed_methods:
raise ValueError(f"method must be one of {allowed_methods!r}")
if fill_na is not None and zeros_to_na:
raise ValueError('Cannot set both zeros_to_na and fill_na to True.')
Xsrc = adata.X
was_sparse = sparse.issparse(Xsrc)
X = Xsrc.toarray() if was_sparse else np.asarray(Xsrc)
X = X.astype(float, copy=True)
is_log, _ = is_log_transformed(adata)
mismatch = (log_space != is_log)
if mismatch and not force:
if log_space:
raise ValueError(
"You passed log_space=True but the data do not look log-transformed. "
"Set force=True to override the automatic detection."
)
else:
raise ValueError(
"You passed log_space=False but the data look log-transformed. "
"Set force=True to override the automatic detection."
)
n_samples, _ = X.shape
X_new = X.copy() # X with replaces values as per user parameters
# Track original missingness/zeros to restore later
na_mask = ~np.isfinite(X)
zero_mask = (X == 0)
if zeros_to_na:
X_new[zero_mask] = np.nan
else:
if fill_na is not None:
X_new = np.where(~np.isfinite(X_new), fill_na, X_new)
def _normalize_samples(
X_work,
method,
log_space,
):
"""Normalize a subset of samples and return normalized values and factors."""
with np.errstate(invalid='ignore'):
sample_medians = np.nanmedian(X_work, axis=1)
if method == 'median_ref':
ref = float(np.nanmedian(sample_medians))
elif method == 'max_ref':
ref = float(np.nanmax(sample_medians))
else:
raise ValueError("method must be one of {'median_ref','max_ref'}")
if log_space:
factors = (ref - sample_medians)[:, None]
sub_norm = X_work + factors
else:
with np.errstate(divide='ignore', invalid='ignore'):
factors = (ref / sample_medians)[:, None]
sub_norm = X_work * factors
return sub_norm, np.squeeze(factors)
all_norm = np.empty_like(X, dtype=float)
all_factors = np.empty((n_samples,), dtype=float)
if per_batch is None:
idx = np.arange(n_samples)
X_work = X_new[idx, :]
sub_norm, sub_fac = _normalize_samples(X_work, method, log_space)
all_norm[idx, :] = sub_norm
all_factors[idx] = sub_fac if log_space else np.squeeze(sub_fac)
else:
if per_batch not in adata.obs.columns:
raise KeyError(f"per_batch='{per_batch}' not found in adata.obs columns.")
batches = adata.obs[per_batch].astype('category')
for b in batches.cat.categories:
idx = np.where(batches.values == b)[0]
if idx.size == 0:
continue
X_work = X_new[idx, :]
sub_norm, sub_fac = _normalize_samples(X_work, method, log_space)
all_norm[idx, :] = sub_norm
all_factors[idx] = sub_fac if log_space else np.squeeze(sub_fac)
# Restore original NaNs and zeros in the output
all_norm[na_mask] = np.nan
if zeros_to_na:
all_norm[zero_mask] = 0.0
if log_space:
factor_name = "shift_log"
else:
factor_name = "scale_linear"
factors_df = pd.DataFrame({
"sample_index": np.arange(n_samples),
factor_name: all_factors,
})
if per_batch is not None:
factors_df[per_batch] = adata.obs[per_batch].values
# Surface problematic medians via warnings
if np.isnan(all_factors).any():
bad = np.where(np.isnan(all_factors))[0]
print(f"Warning: {bad.size} sample(s) had undefined median; factors are NaN for indices {bad.tolist()}.")
if np.isinf(all_factors).any():
bad = np.where(np.isinf(all_factors))[0]
print(f"Warning: {bad.size} sample(s) had zero median; factors are inf for indices {bad.tolist()}.")
out = sparse.csr_matrix(all_norm) if was_sparse else all_norm
if inplace:
adata.X = out
adata.uns["normalization_factors"] = factors_df
check_proteodata(adata)
else:
adata_out = adata.copy()
adata_out.X = out
adata_out.uns["normalization_factors"] = factors_df
check_proteodata(adata_out)
return adata_out, factors_df