Source code for alphabase.peptide.precursor

import multiprocessing as mp
import typing
from functools import partial

import numba
import numpy as np
import pandas as pd
from tqdm import tqdm
from xxhash import xxh64_intdigest

from alphabase.constants.aa import AA_Composition
from alphabase.constants.atom import MASS_ISOTOPE, MASS_PROTON
from alphabase.constants.isotope import IsotopeDistribution
from alphabase.constants.modification import MOD_Composition, ModificationKeys
from alphabase.peptide.mass_calc import calc_peptide_masses_for_same_len_seqs


[docs] def refine_precursor_df( df: pd.DataFrame, drop_frag_idx=False, ensure_data_validity=False, ) -> pd.DataFrame: """ Refine df inplace for faster precursor/fragment calculation. """ if ensure_data_validity: df.fillna("", inplace=True) if "charge" in df.columns and df.charge.dtype not in [ "int", "int8", "int64", "int32", # np.int64, np.int32, np.int8, ]: df["charge"] = df["charge"].astype(np.int8) if "mod_sites" in df.columns and df.mod_sites.dtype not in ["O", "U"]: df["mod_sites"] = df.mod_sites.astype("U") if "nAA" not in df.columns: df["nAA"] = df.sequence.str.len().astype(np.int32) if drop_frag_idx and "frag_start_idx" in df.columns: df.drop(columns=["frag_start_idx", "frag_stop_idx"], inplace=True) if not is_precursor_refined(df): df.sort_values("nAA", inplace=True) df.reset_index(drop=True, inplace=True) return df
reset_precursor_df = refine_precursor_df
[docs] def is_precursor_refined(precursor_df: pd.DataFrame): return (len(precursor_df) == 0) or ( (precursor_df.index.values[0] == 0) and precursor_df.nAA.is_monotonic_increasing and np.all(np.diff(precursor_df.index.values) == 1) )
[docs] def is_precursor_sorted(precursor_df: pd.DataFrame): import warnings warnings.warn( "`alphabase.peptide.precursor.is_precursor_sorted()` is deprecated, " "it will be removed in alphabse>=1.3.0. " "Please use `alphabase.peptide.precursor.is_precursor_refined()` instead.", FutureWarning, ) return is_precursor_refined(precursor_df)
[docs] def update_precursor_mz( precursor_df: pd.DataFrame, batch_size=500000, ) -> pd.DataFrame: """ Calculate precursor_mz inplace in the precursor_df Parameters ---------- precursor_df : pd.DataFrame precursor_df with the 'charge' column Returns ------- pd.DataFrame precursor_df with 'precursor_mz' """ if "nAA" not in precursor_df: reset_precursor_df(precursor_df) _calc_in_order = True elif is_precursor_refined(precursor_df): _calc_in_order = True else: _calc_in_order = False precursor_df["precursor_mz"] = 0.0 precursor_mzs = np.zeros(len(precursor_df)) _grouped = precursor_df.groupby("nAA") # precursor_mz_idx = precursor_df.columns.get_loc( # 'precursor_mz' # ) for _, big_df_group in _grouped: for i in range(0, len(big_df_group), batch_size): batch_end = i + batch_size df_group = big_df_group.iloc[i:batch_end, :] pep_mzs = ( calc_peptide_masses_for_same_len_seqs( df_group.sequence.values.astype("U"), df_group.mods.values, df_group.aa_mass_diffs.values if "aa_mass_diffs" in df_group.columns else None, ) / df_group.charge + MASS_PROTON ) if _calc_in_order: precursor_mzs[ df_group.index.values[0] : df_group.index.values[-1] + 1 ] = pep_mzs else: precursor_df.loc[df_group.index, "precursor_mz"] = pep_mzs if _calc_in_order: precursor_df["precursor_mz"] = precursor_mzs return precursor_df
[docs] def calc_precursor_mz(precursor_df: pd.DataFrame, batch_size: int = 500000): import warnings warnings.warn( "`alphabase.peptide.precursor.calc_precursor_mz()` is deprecated, " "it will be removed in alphabase>=2.0.0. " "Please use `alphabase.peptide.precursor.update_precursor_mz()` instead.", FutureWarning, ) return update_precursor_mz(precursor_df, batch_size)
[docs] def get_mod_seq_hash( sequence: str, mods: str, mod_sites: str, *, seed: int = 0 ) -> np.uint64: """Get hash code value for a peptide: (sequence, mods, mod_sites) Parameters ---------- sequence : str Amino acid sequence mods : str Modification names in AlphaBase format mod_sites : str Modification sites in AlphaBase format seed : int Seed for hashing. Optional, by default 0 Returns ------- np.uint64 64-bit hash code value """ return np.array( [ xxh64_intdigest(sequence, seed=seed), xxh64_intdigest(mods, seed=seed), xxh64_intdigest(mod_sites, seed=seed), ], dtype=np.uint64, ).sum() # use np.sum to prevent overflow
[docs] def get_mod_seq_charge_hash( sequence: str, mods: str, mod_sites: str, charge: int, *, seed=0 ): """Get hash code value for a precursor: (sequence, mods, mod_sites, charge) Parameters ---------- sequence : str Amino acid sequence mods : str Modification names in AlphaBase format mod_sites : str Modification sites in AlphaBase format charge : int Precursor charge state seed : int Seed for hashing. Optional, by default 0 Returns ------- np.uint64 64-bit hash code value """ return np.array( [ get_mod_seq_hash(sequence, mods, mod_sites, seed=seed), charge, ], dtype=np.uint64, ).sum() # use np.sum to prevent overflow
[docs] def hash_mod_seq_df(precursor_df: pd.DataFrame, *, seed=0): """Internal function""" hash_vals = precursor_df.sequence.apply( lambda x: xxh64_intdigest(x, seed=seed) ).to_numpy(copy=True, dtype=np.uint64) hash_vals += ( precursor_df.mods.apply(lambda x: xxh64_intdigest(x, seed=seed)) .astype(np.uint64) .values ) hash_vals += ( precursor_df.mod_sites.apply(lambda x: xxh64_intdigest(x, seed=seed)) .astype(np.uint64) .values ) precursor_df["mod_seq_hash"] = hash_vals return precursor_df
[docs] def hash_mod_seq_charge_df(precursor_df: pd.DataFrame, *, seed=0): """Internal function""" if "mod_seq_hash" not in precursor_df.columns: hash_mod_seq_df(precursor_df, seed=seed) if "charge" not in precursor_df.columns: return precursor_df precursor_df["mod_seq_charge_hash"] = precursor_df[ "mod_seq_hash" ].values + precursor_df["charge"].values.astype(np.uint64) return precursor_df
[docs] def hash_precursor_df(precursor_df: pd.DataFrame, *, seed: int = 0) -> pd.DataFrame: """Add columns 'mod_seq_hash' and 'mod_seq_charge_hash' into precursor_df (inplace). The 64-bit hash function is from xxhash (xxhash.xxh64). Parameters ---------- precursor_df : pd.DataFrame precursor_df Seed : int Seed for xxhash.xxh64. Optional, by default 0 Returns ------- pd.DataFrame DataFrame with columns 'mod_seq_hash' and 'mod_seq_charge_hash' """ hash_mod_seq_df(precursor_df, seed=seed) if "charge" in precursor_df.columns: hash_mod_seq_charge_df(precursor_df, seed=seed) return precursor_df
[docs] def get_mod_seq_formula(seq: str, mods: str) -> list: """ 'PEPTIDE','Acetyl@Any_N-term' --> [('C',n), ('H',m), ...] """ formula = {} for aa in seq: for chem, n in AA_Composition[aa].items(): if chem in formula: formula[chem] += n else: formula[chem] = n if len(mods) > 0: for mod in mods.split(ModificationKeys.SEPARATOR): for chem, n in MOD_Composition[mod].items(): if chem in formula: formula[chem] += n else: formula[chem] = n return list(formula.items())
[docs] @numba.njit def get_right_most_isotope_offset( intensities: np.ndarray, apex_idx: int, min_right_most_intensity: float, ) -> int: """Get right-most isotope index Parameters ---------- intensities : np.ndarray Isotope intensities apex_idx : int The index or position of apex peak min_right_most_intensity : float Minimal intensity to consider for right-most peak relative to apex Returns ------- int Index or position of the right-most peak """ apex_inten = intensities[apex_idx] for i in range(len(intensities) - 1, -1, -1): if intensities[i] >= apex_inten * min_right_most_intensity: return i return apex_idx
[docs] def get_mod_seq_isotope_distribution( seq_mods: tuple, isotope_dist: IsotopeDistribution, min_right_most_intensity: float = 0.2, ) -> tuple: """Get isotope abundance distribution by IsotopeDistribution. This function is designed for multiprocessing. Parameters ---------- seq_mods : tuple (sequence, mods) isotope_dist : IsotopeDistribution See `IsotopeDistribution` in `alphabase.constants.isotope` min_right_most_intensity : float The minimal intensity value of the right-most peak relative to apex peak. Optional, by default 0.2 Returns ------- tuple float - Abundance of mono+1 / mono float - Abundance of apex / mono int - Apex isotope position relative to mono, i.e. apex index - mono index and 0 refers to the position of mono itself float - Abundance of right-most peak which has at least `min_right_most_intensity` intensity relative to the apex peak int - Right-most position relative to mono, i.e. right-most index - mono index """ dist, mono = isotope_dist.calc_formula_distribution(get_mod_seq_formula(*seq_mods)) apex_idx = np.argmax(dist) # find right-most peak right_most_idx = get_right_most_isotope_offset( dist, apex_idx, min_right_most_intensity ) return ( dist[mono + 1] / dist[mono], dist[apex_idx] / dist[mono], apex_idx - mono, dist[right_most_idx] / dist[mono], right_most_idx - mono, )
[docs] def calc_precursor_isotope_info( precursor_df: pd.DataFrame, min_right_most_intensity: float = 0.2, ): """Calculate isotope mz values and relative (to M0) intensity values for precursor_df inplace. Parameters ---------- precursor_df : pd.DataFrame precursor_df to calculate min_right_most_intensity : float The minimal intensity value of the right-most peak relative to apex peak. Optional, by default 0.2 Returns ------- pd.DataFrame precursor_df with additional columns: - isotope_m1_intensity: relative intensity of M1 to mono peak - isotope_m1_mz: mz of M1 - isotope_apex_intensity: relative intensity of the apex peak - isotope_apex_mz: mz of the apex peak - isotope_apex_offset: position offset of the apex peak to mono peak - isotope_right_most_intensity: relative intensity of the right-most peak - isotope_right_most_mz: mz of the right-most peak - isotope_right_most_offset: position offset of the right-most peak """ if "precursor_mz" not in precursor_df.columns: update_precursor_mz(precursor_df) isotope_dist = IsotopeDistribution() ( precursor_df["isotope_m1_intensity"], precursor_df["isotope_apex_intensity"], precursor_df["isotope_apex_offset"], precursor_df["isotope_right_most_intensity"], precursor_df["isotope_right_most_offset"], ) = zip( *precursor_df[["sequence", "mods"]].apply( get_mod_seq_isotope_distribution, axis=1, isotope_dist=isotope_dist, min_right_most_intensity=min_right_most_intensity, ) ) precursor_df["isotope_m1_intensity"] = precursor_df["isotope_m1_intensity"].astype( np.float32 ) precursor_df["isotope_apex_intensity"] = precursor_df[ "isotope_apex_intensity" ].astype(np.float32) precursor_df["isotope_apex_offset"] = precursor_df["isotope_apex_offset"].astype( np.int8 ) precursor_df["isotope_right_most_intensity"] = precursor_df[ "isotope_right_most_intensity" ].astype(np.float32) precursor_df["isotope_right_most_offset"] = precursor_df[ "isotope_right_most_offset" ].astype(np.int8) precursor_df["isotope_m1_mz"] = ( precursor_df.precursor_mz + MASS_ISOTOPE / precursor_df.charge ) precursor_df["isotope_apex_mz"] = precursor_df.precursor_mz + ( MASS_ISOTOPE * precursor_df.isotope_apex_offset / precursor_df.charge ) precursor_df["isotope_right_most_mz"] = precursor_df.precursor_mz + ( MASS_ISOTOPE * precursor_df.isotope_right_most_offset / precursor_df.charge ) return precursor_df
def _batchify_df(df_group, mp_batch_size): """Internal funciton for multiprocessing""" for _, df in df_group: for i in range(0, len(df), mp_batch_size): yield df.iloc[i : i + mp_batch_size, :] def _count_batchify_df(df_group, mp_batch_size): """Internal funciton for multiprocessing""" count = 0 for _, df in df_group: for _ in range(0, len(df), mp_batch_size): count += 1 return count # `progress_bar` should be replaced by more advanced tqdm wrappers created by Sander # I will leave it to alphabase.utils
[docs] def calc_precursor_isotope_info_mp( precursor_df: pd.DataFrame, processes: int = 8, mp_batch_size: int = 10000, progress_bar=None, min_right_most_intensity: float = 0.2, min_precursor_num_to_run_mp: int = 10000, ) -> pd.DataFrame: """`calc_precursor_isotope` is not that fast for large dataframes, so here we use multiprocessing for faster isotope pattern calculation. The speed is acceptable with multiprocessing (3.8 min for 21M precursors, 8 processes). Parameters ---------- precursor_df : pd.DataFrame Precursor_df to calculate processes : int Process number. Optional, by default 8 mp_batch_size : int Multiprocessing batch size. Optional, by default 100000. progress_bar : Callable The tqdm-based callback function to check multiprocessing. Defaults to None. min_right_most_intensity : float The minimal intensity value of the right-most peak relative to apex peak. Optional, by default 0.2 Returns ------- pd.DataFrame DataFrame with `isotope_*` columns, see :meth:'calc_precursor_isotope()'. """ if len(precursor_df) < min_precursor_num_to_run_mp or processes <= 1: return calc_precursor_isotope_info( precursor_df=precursor_df, min_right_most_intensity=min_right_most_intensity, ) df_list = [] df_group = precursor_df.groupby("nAA") with mp.get_context("spawn").Pool(processes) as p: processing = p.imap( partial( calc_precursor_isotope_info, min_right_most_intensity=min_right_most_intensity, ), _batchify_df(df_group, mp_batch_size), ) if progress_bar: processing = progress_bar( processing, _count_batchify_df(df_group, mp_batch_size) ) for df in processing: df_list.append(df) return pd.concat(df_list)
[docs] def calc_precursor_isotope_intensity( precursor_df, max_isotope=6, min_right_most_intensity=0.001, normalize: typing.Literal["mono", "sum"] = "sum", ) -> pd.DataFrame: """Calculate isotope intensity values for precursor_df inplace. Parameters ---------- precursor_df : pd.DataFrame Precursor_df to calculate isotope intensity max_isotope : int Max isotope number to calculate. Optional, by default 6 min_right_most_intensity : float The minimal intensity value of the right-most peak relative to apex peak. Returns ------- pd.DataFrame precursor_df with additional columns: """ isotope_dist = IsotopeDistribution() col_names = [f"i_{i}" for i in range(max_isotope)] precursor_dist = np.zeros((len(precursor_df), max_isotope), dtype=np.float32) mono_idxes = np.zeros(len(precursor_df), dtype=np.int32) for i in range(len(precursor_df)): row = precursor_df.iloc[i] dist, mono = isotope_dist.calc_formula_distribution( get_mod_seq_formula(row["sequence"], row["mods"]) ) dist[dist <= min_right_most_intensity] = 0.0 # mono should be always included in the i_x list # after clipping max_isotope isotopes mono_left_half_isotope = max_isotope // 2 mono_right_half_isotope = ( mono_left_half_isotope if max_isotope % 2 == 0 else (mono_left_half_isotope + 1) ) if mono < mono_left_half_isotope: precursor_dist[i] = dist[:max_isotope] mono_idxes[i] = mono elif mono + mono_right_half_isotope >= len(dist): precursor_dist[i] = dist[-max_isotope:] mono_idxes[i] = max_isotope + mono - len(dist) + 1 else: precursor_dist[i] = dist[ mono - mono_left_half_isotope : mono + mono_right_half_isotope ] mono_idxes[i] = mono - mono_left_half_isotope if normalize == "sum": precursor_dist /= np.sum(precursor_dist, axis=1, keepdims=True) else: precursor_dist /= precursor_dist[ np.arange(len(precursor_dist)), mono_idxes ].reshape(-1, 1) precursor_df[col_names] = precursor_dist precursor_df["mono_isotope_idx"] = mono_idxes return precursor_df
[docs] def calc_precursor_isotope_intensity_mp( precursor_df, max_isotope=6, min_right_most_intensity=0.001, normalize: typing.Literal["mono", "sum"] = "sum", mp_batch_size=1000, mp_process_num=8, progress_bar=True, ) -> pd.DataFrame: """Calculate isotope intensity values for precursor_df using multiprocessing. Parameters ---------- precursor_df : pd.DataFrame Precursor_df to calculate isotope intensity max_isotope : int Max isotope number to calculate. Optional, by default 6 min_right_most_intensity : float The minimal intensity value of the right-most peak relative to apex peak. mp_batch_size : int Multiprocessing batch size. Optional, by default 1000. mp_process_num : int Process number. Optional, by default 8 progress_bar : bool Whether to show progress bar. Optional, by default True Returns ------- pd.DataFrame precursor_df with additional columns i_0, i_1, i_2, ... i_{max_isotope-1} """ if mp_process_num <= 1: return calc_precursor_isotope_intensity( precursor_df=precursor_df, max_isotope=max_isotope, min_right_most_intensity=min_right_most_intensity, normalize=normalize, ) df_list = [] df_group = precursor_df.groupby("nAA") with mp.get_context("spawn").Pool(mp_process_num) as p: processing = p.imap( partial( calc_precursor_isotope_intensity, max_isotope=max_isotope, min_right_most_intensity=min_right_most_intensity, normalize=normalize, ), _batchify_df(df_group, mp_batch_size), ) if progress_bar: df_list = list( tqdm(processing, total=_count_batchify_df(df_group, mp_batch_size)) ) else: df_list = list(processing) return pd.concat(df_list, ignore_index=True)
calc_precursor_isotope = calc_precursor_isotope_intensity calc_precursor_isotope_mp = calc_precursor_isotope_intensity_mp