Source code for alphabase.psm_reader.msfragger_reader

"""MSFragger reader."""

import warnings
from typing import Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
from pyteomics import pepxml

from alphabase.constants.aa import AA_ASCII_MASS
from alphabase.constants.atom import MASS_H, MASS_O
from alphabase.constants.modification import MOD_MASS, ModificationKeys
from alphabase.psm_reader.keys import MsFraggerTokens, PsmDfCols
from alphabase.psm_reader.psm_reader import (
    PSMReaderBase,
    psm_reader_provider,
    psm_reader_yaml,
)


def _is_all_fragger_decoy(proteins: List[str]) -> bool:
    """Check if all proteins are MSFragger decoy entries.

    Parameters
    ----------
    proteins : List[str]
        List of protein identifiers

    Returns
    -------
    bool
        True if all proteins start with 'rev_' (case-insensitive)

    """
    return all(
        prot.lower().startswith(MsFraggerTokens.DECOY_PREFIX) for prot in proteins
    )


def _extract_position(entry: str) -> Tuple[int, str]:
    """Extract leading position digits from modification entry.

    Parameters
    ----------
    entry : str
        Modification entry like '5S(79.9663)'

    Returns
    -------
    tuple
        (position, remainder) e.g. (5, 'S(79.9663)')

    Raises
    ------
    ValueError
        If entry has no leading position digits

    """
    position = ""
    for char in entry:
        if char.isdigit():
            position += char
        else:
            break

    if not position:
        raise ValueError(
            f"Invalid modification entry '{entry}': expected format "
            f"'<position><AA>(<mass>)' (e.g., '5S(79.9663)'), "
            f"'N-term(<mass>)', or 'C-term(<mass>)'."
        )

    return int(position), entry[len(position) :]


def _extract_mass_shift(entry: str) -> float:
    """Extract mass shift from entry like 'N-term(304.2071)' or 'S(79.9663)'."""
    return float(
        entry.split(MsFraggerTokens.MOD_START)[1].rstrip(MsFraggerTokens.MOD_STOP)
    )


def _parse_lookup_key(lookup_key: str, entry: str) -> Tuple[str, float]:
    """Parse lookup key into amino acid and mass shift.

    Parameters
    ----------
    lookup_key : str
        Lookup key like 'S(79.9663)'
    entry : str
        Original entry for error messages

    Returns
    -------
    tuple
        (amino_acid, mass_shift)

    """
    if MsFraggerTokens.MOD_START not in lookup_key:
        raise ValueError(
            f"Invalid modification entry '{entry}': "
            f"could not parse amino acid and mass."
        )
    amino_acid = lookup_key.split(MsFraggerTokens.MOD_START)[0]
    mass_shift = _extract_mass_shift(lookup_key)
    return amino_acid, mass_shift


[docs] class MSFraggerModificationTranslator: """Translate MSFragger PSM.TSV modifications to alphabase format."""
[docs] def __init__( self, mass_mapped_mods: List[str], mod_mass_tol: float, rev_mod_mapping: Dict[str, str], ): """Initialize MSFragger modification translator. Parameters ---------- mass_mapped_mods : List[str] List of modification names to match against (e.g., ['Phospho@S', 'Oxidation@M']) mod_mass_tol : float Mass tolerance for matching modifications in Daltons. rev_mod_mapping : Dict[str, str] Reverse mapping from MSFragger format to alphabase format. Keys use MSFragger's native format: 'AA(mass)' or 'N-term(mass)'. Values use alphabase format: 'Mod@AA'. """ self._mass_mapped_mods = mass_mapped_mods self._mod_mass_tol = mod_mass_tol self._rev_mod_mapping = rev_mod_mapping
[docs] def translate(self, psm_df: pd.DataFrame) -> pd.DataFrame: """Translate modifications from MSFragger assigned modifications. Parameters ---------- psm_df : pd.DataFrame DataFrame with PsmDfCols.TMP_MODS column containing raw assigned modifications strings Returns ------- pd.DataFrame The input DataFrame with 'mods' and 'mod_sites' columns added """ mods_list = [] sites_list = [] for _, row in psm_df.iterrows(): assigned_mods = row.get(PsmDfCols.TMP_MODS, "") mods, sites = self._parse_assigned_modifications(assigned_mods) mods_list.append(mods) sites_list.append(sites) psm_df[PsmDfCols.MODS] = mods_list psm_df[PsmDfCols.MOD_SITES] = sites_list return psm_df
def _parse_assigned_modifications(self, assigned_mods: str) -> Tuple[str, str]: """Parse MSFragger Assigned Modifications string. Parameters ---------- assigned_mods : str MSFragger format: "5S(79.9663), 7S(79.9663), N-term(304.2071)" Returns ------- tuple (mods_str, sites_str) where mods and sites are semicolon-separated. Example: ("Phospho@S;Phospho@S;TMT@Any_N-term", "5;7;0") """ if not assigned_mods: return "", "" mod_entries = [ m.strip() for m in assigned_mods.split(MsFraggerTokens.MOD_SEPARATOR) ] mods = [] sites = [] for entry in mod_entries: if not entry: continue mod_name, site = self._parse_single_modification(entry) mods.append(mod_name) sites.append(site) return ModificationKeys.SEPARATOR.join(mods), ModificationKeys.SEPARATOR.join( sites ) def _parse_single_modification(self, entry: str) -> Tuple[str, str]: """Parse a single modification entry.""" if entry.startswith(MsFraggerTokens.N_TERM): return self._resolve_terminal_mod(entry, "Any_N-term"), "0" if entry.startswith(MsFraggerTokens.C_TERM): return self._resolve_terminal_mod(entry, "Any_C-term"), "-1" position, lookup_key = _extract_position(entry) mod_name = self._resolve_positional_mod(lookup_key, entry, position) return mod_name, str(position) def _resolve_terminal_mod(self, entry: str, aa_or_term: str) -> str: """Resolve terminal modification name, checking rev_mod_mapping first.""" if entry in self._rev_mod_mapping: return self._rev_mod_mapping[entry] return self._match_mod_by_mass(_extract_mass_shift(entry), aa_or_term) def _resolve_positional_mod( self, lookup_key: str, entry: str, position: int ) -> str: """Resolve positional modification name from lookup key like 'S(79.9663)'.""" if lookup_key in self._rev_mod_mapping: return self._rev_mod_mapping[lookup_key] amino_acid, mass_shift = _parse_lookup_key(lookup_key, entry) return self._match_mod_by_mass(mass_shift, amino_acid, position) def _match_mod_by_mass( self, mass_shift: float, aa_or_term: str, position: Optional[int] = None ) -> str: """Match mass shift to modification name by finding the closest mass match. Parameters ---------- mass_shift : float Mass shift in Daltons (e.g., 79.9663 for phosphorylation) aa_or_term : str Amino acid single letter code or terminal (Any_N-term, Any_C-term) position : Optional[int] Position in peptide (1-indexed). Used to match AA^Any_N-term mods at position 1. Returns ------- str Modification name in alphabase format (e.g., 'Phospho@S') Raises ------ ValueError If no matching modification is found """ is_terminal = aa_or_term in [ ModificationKeys.ANY_N_TERM, ModificationKeys.ANY_C_TERM, ] best_match = None best_mass_diff = float("inf") for mod_name in self._mass_mapped_mods: if mod_name not in MOD_MASS: continue mass_diff = abs(mass_shift - MOD_MASS[mod_name]) if mass_diff >= self._mod_mass_tol or mass_diff >= best_mass_diff: continue mod_site = mod_name.split(ModificationKeys.SITE_SEPARATOR)[1] is_exact_match = mod_site == aa_or_term is_term_match = is_terminal and mod_site.endswith(aa_or_term) # Match AA^Any_N-term mods (e.g., Ammonia-loss@C^Any_N-term) at position 1 is_nterm_aa_match = ( position == 1 and mod_site.startswith(aa_or_term) and mod_site.endswith(ModificationKeys.ANY_N_TERM) ) if is_exact_match or is_term_match or is_nterm_aa_match: best_match = mod_name best_mass_diff = mass_diff if best_match is not None: return best_match raise ValueError( f"Unknown modification: mass_shift={mass_shift:.4f} at {aa_or_term}. " f"Add the modification to 'mass_mapped_mods' in psm_reader.yaml or extend " f"the reader's _mass_mapped_mods list before importing." )
def _get_mods_from_masses( # noqa: PLR0912, C901 too many branches, too complex TODO: refactor sequence: str, msf_aa_mods: List[str], mass_mapped_mods: List[str], mod_mass_tol: float, ) -> Tuple[str, str, str, str]: mods = [] mod_sites = [] aa_mass_diffs = [] aa_mass_diff_sites = [] for mod in msf_aa_mods: _mass_str, site_str = mod.split(ModificationKeys.SITE_SEPARATOR) mod_mass = float(_mass_str) site = int(site_str) cterm_position = len(sequence) + 1 if site > 0: if site < cterm_position: mod_mass = mod_mass - AA_ASCII_MASS[ord(sequence[site - 1])] else: mod_mass -= 2 * MASS_H + MASS_O else: mod_mass -= MASS_H mod_translated = False for mod_name in mass_mapped_mods: if abs(mod_mass - MOD_MASS[mod_name]) < mod_mass_tol: if site == 0: _mod = ( mod_name.split(ModificationKeys.SITE_SEPARATOR)[0] + ModificationKeys.SITE_SEPARATOR + ModificationKeys.ANY_N_TERM ) elif site == 1: if mod_name.endswith("^Any_N-term"): _mod = mod_name site_str = "0" else: _mod = ( mod_name.split(ModificationKeys.SITE_SEPARATOR)[0] + ModificationKeys.SITE_SEPARATOR + sequence[0] ) elif site == cterm_position: if mod_name.endswith("C-term"): _mod = mod_name else: _mod = ( mod_name.split(ModificationKeys.SITE_SEPARATOR)[0] + ModificationKeys.SITE_SEPARATOR + ModificationKeys.ANY_C_TERM ) # what if only Protein C-term is listed? site_str = "-1" else: _mod = ( mod_name.split(ModificationKeys.SITE_SEPARATOR)[0] + ModificationKeys.SITE_SEPARATOR + sequence[site - 1] ) if _mod in MOD_MASS: mods.append(_mod) mod_sites.append(site_str) mod_translated = True break if not mod_translated: aa_mass_diffs.append(f"{mod_mass:.5f}") aa_mass_diff_sites.append(site_str) return ( ModificationKeys.SEPARATOR.join(mods), ModificationKeys.SEPARATOR.join(mod_sites), ModificationKeys.SEPARATOR.join(aa_mass_diffs), ModificationKeys.SEPARATOR.join(aa_mass_diff_sites), )
[docs] class MSFraggerPsmTsvReader(PSMReaderBase): """Reader for MSFragger's psm.tsv file.""" _reader_type = "msfragger_psm_tsv"
[docs] def __init__( self, *, column_mapping: Optional[dict] = None, modification_mapping: Optional[dict] = None, fdr: float = 0.01, keep_decoy: bool = False, rt_unit: Optional[str] = None, **kwargs, ): """Initialize MSFragger PSM TSV reader. Parameters ---------- column_mapping : Optional[dict] Custom column name mapping. modification_mapping : Optional[dict] Custom modification mapping from alphabase format to MSFragger format. Keys use alphabase format: 'Mod@AA'. Values use MSFragger's native format: 'AA(mass)' or 'N-term(mass)' or 'C-term(mass)'. Example: {'Phospho@S': 'S(79.9663)', 'TMTpro@Any_N-term': 'N-term(304.2071)'} fdr : float False discovery rate threshold. Default: 0.01 keep_decoy : bool Whether to keep decoy hits. Default: False rt_unit : Optional[str] Retention time unit. **kwargs Additional arguments passed to PSMReaderBase. """ super().__init__( column_mapping=column_mapping, modification_mapping=modification_mapping, fdr=fdr, keep_decoy=keep_decoy, rt_unit=rt_unit, **kwargs, ) self._mass_mapped_mods = psm_reader_yaml.get(self._reader_type, {}).get( "mass_mapped_mods", [] ) self._mod_mass_tol = psm_reader_yaml.get(self._reader_type, {}).get( "mod_mass_tol", 0.1 )
def _translate_modifications(self) -> None: """No-op: modification translation is handled in _load_modifications.""" def _load_file(self, filename: str) -> pd.DataFrame: """Load MSFragger PSM TSV file.""" return pd.read_csv(filename, sep="\t", keep_default_na=False) def _pre_process(self, df: pd.DataFrame) -> pd.DataFrame: """MSFragger PSM TSV preprocessing.""" df.fillna("", inplace=True) df[[PsmDfCols.RAW_NAME, PsmDfCols.SCAN_NUM]] = ( df["Spectrum"].str.split(".").apply(lambda x: pd.Series([x[0], int(x[1])])) ) return df def _translate_decoy(self) -> None: # if the decoy column deosnt exist we assume all entries are target self._psm_df[PsmDfCols.DECOY] = ( 0 if PsmDfCols.DECOY not in self._psm_df.columns else (self._psm_df[PsmDfCols.DECOY] == "true").astype(np.int8) ) def _translate_score(self) -> None: """MSFragger Hyperscore is already in correct format (larger = better).""" def _load_modifications(self, origin_df: pd.DataFrame) -> None: # noqa: ARG002 """Parse modifications from PsmDfCols.TMP_MODS column (mapped from 'Assigned Modifications').""" modification_translator = MSFraggerModificationTranslator( mass_mapped_mods=self._mass_mapped_mods, mod_mass_tol=self._mod_mass_tol, rev_mod_mapping=self._modification_mapper.rev_mod_mapping or {}, ) self._psm_df = modification_translator.translate(self._psm_df)
[docs] class MSFraggerPepXMLReader(PSMReaderBase): """Reader for MSFragger's pep.xml file.""" _reader_type = "msfragger_pepxml"
[docs] def __init__( # noqa: PLR0913, D417 # too many arguments in function definition, missing argument descriptions self, *, column_mapping: Optional[dict] = None, modification_mapping: Optional[dict] = None, # mod_seq_columns: Optional[List[str]] = None,# TODO: not needed here? fdr: float = 0.001, # refers to E-value in the PepXML keep_decoy: bool = False, rt_unit: Optional[str] = None, # MSFragger reader-specific: keep_unknown_aa_mass_diffs: bool = False, **kwargs, ): """Initialize the MSFraggerreader. See documentation of `PSMReaderBase` for more information. MSFragger is not fully supported as we can only access the pepxml file. Parameters ---------- keep_unknown_aa_mass_diffs: whether to keep PSMs with unknown amino acid mass differences, default: False See documentation of `PSMReaderBase` for the rest of parameters. """ super().__init__( column_mapping=column_mapping, modification_mapping=modification_mapping, fdr=fdr, keep_decoy=keep_decoy, rt_unit=rt_unit, **kwargs, ) self._keep_unknown_aa_mass_diffs = keep_unknown_aa_mass_diffs # TODO: should those be set via API, too? self._mass_mapped_mods = psm_reader_yaml["msfragger_pepxml"]["mass_mapped_mods"] self._mod_mass_tol = psm_reader_yaml["msfragger_pepxml"]["mod_mass_tol"]
def _translate_modifications(self) -> None: """No-op: modification translation is handled in _load_modifications.""" def _load_file(self, filename: str) -> pd.DataFrame: """Load a MsFragger output file to a DataFrame.""" return pepxml.DataFrame(filename) def _pre_process(self, df: pd.DataFrame) -> pd.DataFrame: """MsFragger-specific preprocessing of output data.""" df.fillna("", inplace=True) if "ion_mobility" in df.columns: df["ion_mobility"] = df["ion_mobility"].astype(float) df[PsmDfCols.RAW_NAME] = df["spectrum"].str.split(".").apply(lambda x: x[0]) df["to_remove"] = 0 # TODO: revisit self.column_mapping[PsmDfCols.TO_REMOVE] = "to_remove" return df def _translate_decoy(self) -> None: self._psm_df[PsmDfCols.DECOY] = ( self._psm_df[PsmDfCols.PROTEINS] .apply(_is_all_fragger_decoy) .astype(np.int8) ) self._psm_df[PsmDfCols.PROTEINS] = self._psm_df[PsmDfCols.PROTEINS].apply( lambda x: ModificationKeys.SEPARATOR.join(x) ) if not self._keep_decoy: self._psm_df[PsmDfCols.TO_REMOVE] += self._psm_df[PsmDfCols.DECOY] > 0 def _translate_score(self) -> None: """Translate MSFragger evalue to AlphaBase score: the larger the better.""" self._psm_df[PsmDfCols.SCORE] = -np.log(self._psm_df[PsmDfCols.SCORE] + 1e-100) def _load_modifications(self, origin_df: pd.DataFrame) -> None: ( self._psm_df[PsmDfCols.MODS], self._psm_df[PsmDfCols.MOD_SITES], self._psm_df[PsmDfCols.AA_MASS_DIFFS], self._psm_df[PsmDfCols.AA_MASS_DIFF_SITES], ) = zip( *origin_df[["peptide", "modifications"]].apply( lambda x: _get_mods_from_masses( *x, mass_mapped_mods=self._mass_mapped_mods, mod_mass_tol=self._mod_mass_tol, ), axis=1, ) ) if not self._keep_unknown_aa_mass_diffs: self._psm_df[PsmDfCols.TO_REMOVE] += ( self._psm_df[PsmDfCols.AA_MASS_DIFFS] != "" ) self._psm_df.drop( columns=[PsmDfCols.AA_MASS_DIFFS, PsmDfCols.AA_MASS_DIFF_SITES], inplace=True, ) def _post_process(self, origin_df: pd.DataFrame) -> None: self._psm_df = ( self._psm_df.query(f"{PsmDfCols.TO_REMOVE}==0") .drop(columns=PsmDfCols.TO_REMOVE) .reset_index(drop=True) ) super()._post_process(origin_df)
[docs] class MSFraggerPepXML(MSFraggerPepXMLReader): """Deprecated."""
[docs] def __init__(self, *args, **kwargs): """Deprecated.""" warnings.warn( "MSFraggerPepXML is deprecated and will ne removed in alphabase>1.5.0.", "Please use the equivalent MSFraggerPepXMLReader instead.", FutureWarning, ) super().__init__(*args, **kwargs)
[docs] def register_readers() -> None: """Register MSFragger readers.""" psm_reader_provider.register_reader("msfragger_psm_tsv", MSFraggerPsmTsvReader) psm_reader_provider.register_reader("msfragger", MSFraggerPsmTsvReader) psm_reader_provider.register_reader("msfragger_pepxml", MSFraggerPepXMLReader)