Source code for proteopy.download.contaminants

"""
Utilities for downloading contaminant FASTA files.
"""

from pathlib import Path
from urllib.request import urlretrieve
from datetime import date
from typing import Callable
import re
import tempfile
import warnings



def _is_uniprot_accession(accession: str) -> bool:
    pattern=r"[OPQ][0-9][A-Z0-9]{3}[0-9](-[0-9]{1,2})?|[A-NR-Z][0-9]([A-Z][A-Z0-9]{2}[0-9]){1,2}(-[0-9]{1,2})?"
    return bool(re.fullmatch(pattern, accession))

def check_uniprot_accession_nr(accession: str) -> None:
    if not _is_uniprot_accession(accession):
        raise ValueError(
            f"Accession '{accession}' is not a valid UniProt accession.",
        )

def _format_frankenfield_header(header: str) -> str:
    """
    Validate Frankenfield2022 headers; enforce three pipe-separated fields and
    UniProt-style accession.
    """
    parts = header.split(maxsplit=1)
    id_part = parts[0]
    desc = parts[1] if len(parts) > 1 else ""

    segments = id_part.split("|")
    if len(segments) != 3:
        raise ValueError(
            f"Header '{header}' must have exactly three pipe-separated fields.",
        )

    database, accession_number, protein_id = segments
    if accession_number.startswith("Cont_"):
        accession_number = accession_number[len("Cont_"):]
    _FRANKENFIELD_MANUAL_IDS = {"AAAA1", "AAAA2"}
    if accession_number not in _FRANKENFIELD_MANUAL_IDS:
        check_uniprot_accession_nr(accession_number)

    new_id = f"{database}|{accession_number}|{protein_id}"
    return f"{new_id} {desc}".strip()


def _format_fasta(
    source_path: Path,
    destination_path: Path,
    formatter: Callable[[str], str],
) -> None:
    """
    Rewrite FASTA headers using a formatter callable.
    """
    with open(source_path, "r", encoding="utf-8") as src, open(
        destination_path,
        "w",
        encoding="utf-8",
    ) as dest:
        for line in src:
            if line.startswith(">"):
                header = line[1:].strip()
                formatted = formatter(header)
                dest.write(f">{formatted}\n")
            else:
                dest.write(line)



_SOURCE_MAP = {
    "gpm_crap": {
        "url": "ftp://ftp.thegpm.org/fasta/cRAP/crap.fasta",
        "default_path": "data/contaminants_gpm-crap.fasta",
    },
    "frankenfield2022": {
        "url": (
            "https://raw.githubusercontent.com/HaoGroup-ProtContLib/"
            "Protein-Contaminant-Libraries-for-DDA-and-DIA-Proteomics/"
            "refs/heads/main/Universal%20protein%20contaminant%20FASTA/"
            "0602_Universal%20Contaminants.fasta"
        ),
        "default_path": "data/contaminants_frankenfield2022.fasta",
        "formatter": _format_frankenfield_header,
    },
}


[docs] def contaminants( source: str = "frankenfield2022", path: str | Path | None = None, force: bool = False, ) -> Path: """ Download contaminant FASTA files from putative sources. - ``frankenfield2022``: Frankenfield et al., 2022 (doi:10.1021/acs.jproteome.2c00145). - ``gpm_crap``: The Global Proteome Machine (GPM) common Repository of Adventitious Proteins (cRAP). Parameters ---------- source Contaminant FASTA source. Supported: ``"frankenfield2022"``, ``"gpm_crap"``. path Destination file path for the downloaded FASTA. If ``None``, a default path is chosen based on the ``source``; URL downloads append the current date (YYYY-MM-DD) to the filename. force If ``True``, overwrite an existing file at ``path``. Returns ------- Path Path to the downloaded FASTA file. """ if source is None: raise ValueError("Missing 'source' parameter.") if source not in _SOURCE_MAP: raise ValueError(f"Unsupported source '{source}'.") meta = _SOURCE_MAP[source] if path is None: destination = Path(meta["default_path"]) today_suffix = date.today().strftime("%Y-%m-%d") destination = destination.with_name( f"{destination.stem}_{today_suffix}{destination.suffix}", ) else: destination = Path(path) destination.parent.mkdir(parents=True, exist_ok=True) if destination.exists() and not force: warnings.warn( f"File already exists at {destination}. Use force=True to overwrite.", ) return destination formatter = meta.get("formatter") if formatter is None: urlretrieve(meta["url"], destination) else: with tempfile.NamedTemporaryFile(delete=False) as tmp: tmp_path = Path(tmp.name) try: urlretrieve(meta["url"], tmp_path) _format_fasta(tmp_path, destination, formatter) finally: if tmp_path.exists(): tmp_path.unlink() return destination