Source code for proteopy.pp.imputation

import numpy as np
from scipy import sparse

from proteopy.utils.anndata import check_proteodata
from proteopy.utils.array import is_log_transformed


[docs] def impute_downshift( adata, downshift: float = 1.8, width: float = 0.3, zero_to_nan: bool = True, inplace: bool = True, force: bool = False, random_state: int | None = 42, ): """ Left-censored imputation in log space with downshifted normal sampling. Parameters ---------- adata : AnnData Input proteomics AnnData. width : float, optional Standard deviation scaling factor for the normal sampler. downshift : float, optional Number of standard deviations to shift the mean downward. zero_to_nan : bool, optional Treat zeros as missing values before imputation. inplace : bool, optional Modify ``adata`` in place. If False, return a copied AnnData. force : bool, optional If False (default), raise a ValueError when data are detected as non-log. Set True to impute even when data appear non-log. random_state : int or None, optional Seed for the random number generator. Returns ------- AnnData Modified AnnData (or a copy if ``inplace=False``). """ check_proteodata(adata) if width <= 0: raise ValueError("`width` must be positive.") Xsrc = adata.X was_sparse = sparse.issparse(Xsrc) X = Xsrc.toarray() if was_sparse else np.asarray(Xsrc) X = X.astype(float, copy=True) is_log, stats = is_log_transformed(adata) if not is_log and not force: raise ValueError( "Imputation expects log-transformed intensities. " "Set force=True to proceed nevertheless." ) # Build working matrix Y (NaN = missing) and capture missing mask Y = X.copy() if zero_to_nan: Y[Y == 0] = np.nan Y[~np.isfinite(Y)] = np.nan miss_mask = ~np.isfinite(Y) n_missing = int(miss_mask.sum()) n_samples, _ = Y.shape rng = np.random.default_rng(random_state) # Global fallback stats y_finite = Y[np.isfinite(Y)] if y_finite.size < 3: raise ValueError("Not enough finite values to estimate imputation parameters.") g_mean = float(np.mean(y_finite)) g_sd = float(np.std(y_finite)) if not np.isfinite(g_sd) or g_sd <= 0: g_sd = 1.0 # Per-sample imputation Y_imp = Y.copy() for i in range(n_samples): row = Y[i, :] miss = miss_mask[i, :] if not miss.any(): continue obs = row[np.isfinite(row)] if obs.size >= 3: r_mean = float(np.mean(obs)) r_sd = float(np.std(obs)) if not np.isfinite(r_sd) or r_sd <= 0: r_mean, r_sd = g_mean, g_sd else: r_mean, r_sd = g_mean, g_sd mu = r_mean - downshift * r_sd sd = max(width * r_sd, 1e-6) Y_imp[i, miss] = rng.normal(loc=mu, scale=sd, size=int(miss.sum())) Z_out = sparse.csr_matrix(Y_imp) if was_sparse else Y_imp def _book_keeping(target): target.layers["imputation_mask_X"] = miss_mask.astype(bool) target.uns.setdefault("imputation", {}) target.uns["imputation"].update(dict( method="downshift_normal", width=float(width), downshift=float(downshift), random_state=(None if random_state is None else int(random_state)), n_imputed=int(n_missing), pct_imputed=float(n_missing / (miss_mask.size) * 100.0), )) if not inplace: adata_out = adata.copy() adata_out.X = Z_out _book_keeping(adata_out) check_proteodata(adata_out) return adata_out else: adata.X = Z_out _book_keeping(adata) check_proteodata(adata) return None