Source code for alphabase.protein.fasta

import copy
import itertools
import os
import warnings
from typing import Union

import ahocorasick
import numba
import numpy as np
import pandas as pd
import regex as re
from Bio import SeqIO
from tqdm import tqdm

from alphabase.constants._const import CONST_FILE_FOLDER
from alphabase.constants.modification import ModificationKeys
from alphabase.io.hdf import HDF_File
from alphabase.spectral_library.base import SpecLibBase
from alphabase.utils import explode_multiple_columns
from alphabase.yaml_utils import load_yaml


[docs] def get_uniprot_gene_name(description: str): idx = description.find(" GN=") if idx == -1: return "" else: idx += 4 return description[idx : description.find(" ", idx)]
[docs] def read_fasta_file(fasta_filename: str = ""): """ Read a FASTA file line by line Parameters ---------- fasta_filename : str fasta. Yields ------ dict protein information, {protein_id:str, full_name:str, gene_name:str, description:str, sequence:str} """ with open(fasta_filename, encoding="utf-8") as handle: iterator = SeqIO.parse(handle, "fasta") while iterator: try: record = next(iterator) parts = record.id.split("|") # pipe char if len(parts) > 2: id = parts[1] gene_org = parts[2] else: id = record.name gene_org = record.name sequence = str(record.seq) entry = { "protein_id": id, "full_name": record.name, "gene_name": get_uniprot_gene_name(record.description), "gene_org": gene_org, "description": record.description, "sequence": sequence, } yield entry except StopIteration: break
[docs] def load_all_proteins(fasta_file_list: list): protein_dict = {} for fasta in fasta_file_list: for protein in read_fasta_file(fasta): protein_dict[protein["full_name"]] = protein return protein_dict
[docs] def load_fasta_list_as_protein_df(fasta_list: list): protein_dict = load_all_proteins(fasta_list) return pd.DataFrame().from_dict(protein_dict, orient="index").reset_index(drop=True)
[docs] def concat_proteins(protein_dict: dict, sep="$") -> str: """Concatenate all protein sequences into a single sequence, seperated by `sep ($ by default)`. Parameters ---------- protein_dict : dict protein_dict by read_fasta_file() Returns ------- str concatenated sequence seperated by `sep`. """ seq_list = [""] seq_count = 1 for key in protein_dict: protein_dict[key]["offset"] = seq_count seq_list.append(protein_dict[key]["sequence"]) seq_count += protein_dict[key]["sequence"] + 1 seq_list.append("") return "$".join(seq_list)
protease_dict = load_yaml(os.path.join(CONST_FILE_FOLDER, "protease.yaml")) """ Pre-built protease dict with regular expression. """
[docs] @numba.njit def cleave_sequence_with_cut_pos( sequence: str, cut_pos: np.ndarray, n_missed_cleavages: int = 2, pep_length_min: int = 6, pep_length_max: int = 45, ) -> tuple: """ Cleave a sequence with cut postions (cut_pos). Filters to have a minimum and maximum length. Parameters ---------- sequence : str protein sequence cut_pos : np.ndarray cut postions determined by a given protease. n_missed_cleavages : int the number of max missed cleavages. pep_length_min : int min peptide length. pep_length_max :int max peptide length. Returns ------- tuple: List[str]. Cleaved peptide sequences with missed cleavages. List[int]. Number of miss cleavage of each peptide. List[bool]. If N-term peptide List[bool]. If C-term pepetide """ seq_list = [] miss_list = [] nterm_list = [] cterm_list = [] for i, start_pos in enumerate(cut_pos): for n_miss, end_pos in enumerate(cut_pos[i + 1 : i + 2 + n_missed_cleavages]): if end_pos > start_pos + pep_length_max: break elif end_pos < start_pos + pep_length_min: continue else: seq_list.append(sequence[start_pos:end_pos]) miss_list.append(n_miss) if start_pos == 0: nterm_list.append(True) else: nterm_list.append(False) if end_pos == len(sequence): cterm_list.append(True) else: cterm_list.append(False) return seq_list, miss_list, nterm_list, cterm_list
[docs] class Digest:
[docs] def __init__( self, protease: str = "trypsin", max_missed_cleavages: int = 2, peptide_length_min: int = 6, peptide_length_max: int = 45, ): """Digest a protein sequence Parameters ---------- protease : str, optional protease name, could be pre-defined name defined in `protease_dict` or a regular expression. By default 'trypsin/P' max_missed_cleavages : int, optional Max number of misses cleavage sites. By default 2 peptide_length_min : int, optional Minimal cleaved peptide length, by default 6 peptide_length_max : int, optional Maximal cleaved peptide length, by default 45 """ self.n_miss_cleave = max_missed_cleavages self.peptide_length_min = peptide_length_min self.peptide_length_max = peptide_length_max if protease.lower() in protease_dict: self.regex_pattern = re.compile(protease_dict[protease.lower()]) else: self.regex_pattern = re.compile(protease)
[docs] def get_cut_positions(self, sequence: str) -> np.ndarray: """Get cut positions for a given sequence.""" cut_pos = [0] positions = [m.start() + 1 for m in self.regex_pattern.finditer(sequence)] cut_pos.extend([p for p in positions if p <= len(sequence)]) if cut_pos[-1] != len(sequence): cut_pos.append(len(sequence)) return np.array(cut_pos, dtype=np.int64)
[docs] def cleave_sequence( self, sequence: str, ) -> tuple: """ Cleave a sequence. Parameters ---------- sequence : str the given (protein) sequence. Returns ------- tuple[list] list[str]: cleaved peptide sequences with missed cleavages list[int]: miss cleavage list list[bool]: is protein N-term list[bool]: is protein C-term """ cut_pos = self.get_cut_positions(sequence) (seq_list, miss_list, nterm_list, cterm_list) = cleave_sequence_with_cut_pos( sequence, cut_pos, self.n_miss_cleave, self.peptide_length_min, self.peptide_length_max, ) # Consider M loss at protein N-term if sequence.startswith("M"): for seq, miss, cterm in zip(seq_list, miss_list, cterm_list): if sequence.startswith(seq) and len(seq) > self.peptide_length_min: seq_list.append(seq[1:]) miss_list.append(miss) nterm_list.append(True) cterm_list.append(cterm) return seq_list, miss_list, nterm_list, cterm_list
[docs] def get_fix_mods(sequence: str, fix_mod_aas: str, fix_mod_dict: dict) -> tuple: """ Generate fix modifications for the sequence """ mods = [] mod_sites = [] for i, aa in enumerate(sequence): if aa in fix_mod_aas: mod_sites.append(i + 1) mods.append(fix_mod_dict[aa]) return ModificationKeys.SEPARATOR.join(mods), ModificationKeys.SEPARATOR.join( str(i) for i in mod_sites )
[docs] def get_candidate_sites(sequence: str, target_mod_aas: str) -> list: """get candidate modification sites Parameters ---------- sequence : str peptide sequence target_mod_aas : str AAs that may have modifications Returns ------- list candiadte mod sites in alphabase format (0: N-term, -1: C-term, 1-n:others) """ candidate_sites = [] for i, aa in enumerate(sequence): if aa in target_mod_aas: candidate_sites.append(i + 1) # alphabase mod sites return candidate_sites
[docs] def get_var_mod_sites( sequence: str, target_mod_aas: str, min_var_mod: int, max_var_mod: int, max_combs: int, ) -> list: """get all combinations of variable modification sites Parameters ---------- sequence : str peptide sequence target_mod_aas : str AAs that may have modifications min_var_mod : int max number of mods in a sequence max_var_mod : int max number of mods in a sequence max_combs : int max number of combinations for a sequence Returns ------- list list of combinations (tuple) of modification sites """ candidate_sites = get_candidate_sites(sequence, target_mod_aas) if min_var_mod <= 1 and max_var_mod >= 1: mod_sites = [(s,) for s in candidate_sites] else: mod_sites = [] for n_var_mod in range(max(2, min_var_mod), max_var_mod + 1): if len(mod_sites) >= max_combs: break mod_sites.extend( itertools.islice( itertools.combinations(candidate_sites, n_var_mod), max_combs - len(mod_sites), ) ) return mod_sites
[docs] def get_var_mods_per_sites_multi_mods_on_aa( sequence: str, mod_sites: tuple, var_mod_dict: dict ) -> list: """ Used only when the var mod list contains more than one mods on the same AA, for example: Mod1@A, Mod2@A ... """ mods_str_list = [""] for i, site in enumerate(mod_sites): if len(var_mod_dict[sequence[site - 1]]) == 1: for i in range(len(mods_str_list)): mods_str_list[i] += ( var_mod_dict[sequence[site - 1]][0] + ModificationKeys.SEPARATOR ) else: _new_list = [] for mod in var_mod_dict[sequence[site - 1]]: _lst = copy.deepcopy(mods_str_list) for i in range(len(_lst)): _lst[i] += mod + ModificationKeys.SEPARATOR _new_list.extend(_lst) mods_str_list = _new_list return [mod[:-1] for mod in mods_str_list]
[docs] def get_var_mods_per_sites_single_mod_on_aa( sequence: str, mod_sites: tuple, var_mod_dict: dict ) -> list: """ Used when the var mod list contains only one mods on the each AA, for example: Mod1@A, Mod2@D ... """ mod_str = "" for site in mod_sites: mod_str += var_mod_dict[sequence[site - 1]] + ModificationKeys.SEPARATOR return [mod_str[:-1]]
get_var_mods_per_sites = get_var_mods_per_sites_single_mod_on_aa
[docs] def get_var_mods( sequence: str, var_mod_aas: str, mod_dict: dict, min_var_mod: int, max_var_mod: int, max_combs: int, ) -> tuple: """ Generate all modification combinations and associated sites for the sequence. """ mod_sites_list = get_var_mod_sites( sequence, var_mod_aas, min_var_mod, max_var_mod, max_combs ) ret_mods = [] ret_sites_list = [] for mod_sites in mod_sites_list: _mods = get_var_mods_per_sites(sequence, mod_sites, mod_dict) mod_sites_str = ModificationKeys.SEPARATOR.join([str(i) for i in mod_sites]) ret_mods.extend(_mods) ret_sites_list.extend([mod_sites_str] * len(_mods)) if min_var_mod == 0: ret_mods.append("") ret_sites_list.append("") return ret_mods, ret_sites_list
[docs] def parse_term_mod(term_mod_name: str): _mod, term = term_mod_name.split(ModificationKeys.SITE_SEPARATOR) if ModificationKeys.TERM_SEPARATOR in term: return tuple(term.split(ModificationKeys.TERM_SEPARATOR)) else: return "", term
[docs] def add_single_peptide_labeling( seq: str, mods: str, mod_sites: str, label_aas: str, label_mod_dict: dict, nterm_label_mod: str, cterm_label_mod: str, ): add_nterm_label = bool(nterm_label_mod) add_cterm_label = bool(cterm_label_mod) if mod_sites: _sites = mod_sites.split(ModificationKeys.SEPARATOR) if "0" in _sites: add_nterm_label = False if "-1" in _sites: add_cterm_label = False mod_list = [mods] mod_site_list = [mod_sites] else: mod_list = [] mod_site_list = [] if add_nterm_label: mod_list.append(nterm_label_mod) mod_site_list.append("0") if add_cterm_label: mod_list.append(cterm_label_mod) mod_site_list.append("-1") aa_labels, aa_label_sites = get_fix_mods(seq, label_aas, label_mod_dict) if aa_labels: mod_list.append(aa_labels) mod_site_list.append(aa_label_sites) return ModificationKeys.SEPARATOR.join(mod_list), ModificationKeys.SEPARATOR.join( mod_site_list )
[docs] def parse_labels(labels: list): label_aas = "" label_mod_dict = {} nterm_label_mod = "" cterm_label_mod = "" for label in labels: _, aa = label.split(ModificationKeys.SITE_SEPARATOR) if len(aa) == 1: label_aas += aa label_mod_dict[aa] = label elif aa == ModificationKeys.ANY_N_TERM: nterm_label_mod = label elif aa == ModificationKeys.ANY_C_TERM: cterm_label_mod = label return label_aas, label_mod_dict, nterm_label_mod, cterm_label_mod
[docs] def create_labeling_peptide_df( peptide_df: pd.DataFrame, labels: list, inplace: bool = False ): if len(peptide_df) == 0: return peptide_df df = peptide_df if inplace else peptide_df.copy() (label_aas, label_mod_dict, nterm_label_mod, cterm_label_mod) = parse_labels(labels) (df["mods"], df["mod_sites"]) = zip( *df[["sequence", "mods", "mod_sites"]].apply( lambda x: add_single_peptide_labeling( *x, label_aas, label_mod_dict, nterm_label_mod, cterm_label_mod ), axis=1, ) ) return df
[docs] def protein_idxes_to_names(protein_idxes: str, protein_names: list): if len(protein_idxes) == 0: return "" proteins = [ protein_names[int(i)] for i in protein_idxes.split(ModificationKeys.SEPARATOR) ] proteins = [protein for protein in proteins if protein] return ModificationKeys.SEPARATOR.join(proteins)
[docs] def append_special_modifications( df: pd.DataFrame, var_mods: list = ["Phospho@S", "Phospho@T", "Phospho@Y"], min_mod_num: int = 0, max_mod_num: int = 1, max_peptidoform_num: int = 100, cannot_modify_pep_nterm_aa: bool = False, cannot_modify_pep_cterm_aa: bool = False, ) -> pd.DataFrame: """ Append special (not N/C-term) variable modifications to the exsiting modifications of each sequence in `df`. Parameters ---------- df : pd.DataFrame Precursor dataframe var_mods : list, optional Considered varialbe modification list. Defaults to ['Phospho@S','Phospho@T','Phospho@Y']. min_mod_num : int, optional Minimal modification number for each sequence of the `var_mods`. Defaults to 0. max_mod_num : int, optional Maximal modification number for each sequence of the `var_mods`. Defaults to 1. max_peptidoform_num : int, optional One sequence is only allowed to explode to `max_peptidoform_num` number of modified peptides. Defaults to 100. cannot_modify_pep_nterm_aa : bool, optional Similar to `cannot_modify_pep_cterm_aa`, by default False cannot_modify_pep_cterm_aa : bool, optional If the modified AA is at C-term, then the modification cannot modified it. For example GlyGly@K, for a peptide `ACDKEFGK`, if GlyGly is at the C-term, trypsin cannot cleave the C-term K, hence there will be no such a modified peptide ACDKEFGK(GlyGly). by default False Returns ------- pd.DataFrame The precursor_df with new modification added. """ if len(var_mods) == 0 or len(df) == 0: return df if cannot_modify_pep_nterm_aa: df["sequence"] = df["sequence"].apply(lambda seq: seq[0].lower() + seq[1:]) if cannot_modify_pep_cterm_aa: df["sequence"] = df["sequence"].apply(lambda seq: seq[:-1] + seq[-1].lower()) global get_var_mods_per_sites var_mod_aas = "" mod_dict = {} if _check_if_multi_mods_on_aa(var_mods): for mod in var_mods: if mod.find(ModificationKeys.SITE_SEPARATOR) + 2 == len(mod): # if mod[-1] in self.fix_mod_dict: continue if mod[-1] in mod_dict: mod_dict[mod[-1]].append(mod) else: var_mod_aas += mod[-1] mod_dict[mod[-1]] = [mod] get_var_mods_per_sites = get_var_mods_per_sites_multi_mods_on_aa else: for mod in var_mods: if mod.find(ModificationKeys.SITE_SEPARATOR) + 2 == len(mod): # if mod[-1] in self.fix_mod_dict: continue var_mod_aas += mod[-1] mod_dict[mod[-1]] = mod get_var_mods_per_sites = get_var_mods_per_sites_single_mod_on_aa (df["mods_app"], df["mod_sites_app"]) = zip( *df.sequence.apply( get_var_mods, var_mod_aas=var_mod_aas, mod_dict=mod_dict, min_var_mod=min_mod_num, max_var_mod=max_mod_num, max_combs=max_peptidoform_num, ) ) if cannot_modify_pep_nterm_aa: df["sequence"] = df["sequence"].apply(lambda seq: seq[0].upper() + seq[1:]) if cannot_modify_pep_cterm_aa: df["sequence"] = df["sequence"].apply(lambda seq: seq[:-1] + seq[-1].upper()) if min_mod_num == 0: df = explode_multiple_columns(df, ["mods_app", "mod_sites_app"]) df.fillna("", inplace=True) else: df.drop(df[df.mods_app.apply(lambda x: len(x) == 0)].index, inplace=True) df = explode_multiple_columns(df, ["mods_app", "mod_sites_app"]) df["mods"] = df[["mods", "mods_app"]].apply( lambda x: ModificationKeys.SEPARATOR.join(i for i in x if i), axis=1 ) df["mod_sites"] = df[["mod_sites", "mod_sites_app"]].apply( lambda x: ModificationKeys.SEPARATOR.join(i for i in x if i), axis=1 ) df.drop(columns=["mods_app", "mod_sites_app"], inplace=True) df.reset_index(drop=True, inplace=True) return df
[docs] class SpecLibFasta(SpecLibBase): """ This is the main entry of AlphaBase when generating spectral libraries from fasta files It includes functionalities to: - Load protein sequences - Digest protein sequences - Append decoy peptides - Add fixed, variable or labeling modifications to the peptide sequences - Add charge states - Save libraries into hdf file Attributes ---------- max_peptidoform_num : int, 100 by default For some modifications such as Phospho, there may be thousands of peptidoforms generated for some peptides, so we use this attribute to control the overall number of peptidoforms of a peptide. protein_df : pd.DataFrame Protein dataframe with columns 'protein_id', 'sequence', 'description', 'gene_name', etc. """
[docs] def __init__( self, charged_frag_types: list = ["b_z1", "b_z2", "y_z1", "y_z2"], *, protease: str = "trypsin", max_missed_cleavages: int = 2, peptide_length_min: int = 7, peptide_length_max: int = 35, precursor_charge_min: int = 2, precursor_charge_max: int = 4, precursor_mz_min: float = 400.0, precursor_mz_max: float = 2000.0, var_mods: list = ["Acetyl@Protein_N-term", "Oxidation@M"], min_var_mod_num: int = 0, max_var_mod_num: int = 2, fix_mods: list = ["Carbamidomethyl@C"], labeling_channels: dict = None, special_mods: list = [], min_special_mod_num: int = 0, max_special_mod_num: int = 1, special_mods_cannot_modify_pep_n_term: bool = False, special_mods_cannot_modify_pep_c_term: bool = False, decoy: str = None, include_contaminants: bool = False, I_to_L: bool = False, ): """ Parameters ---------- charged_frag_types : list, optional Fragment types with charge, by default [ 'b_z1','b_z2','y_z1', 'y_z2' ] protease : str, optional Could be pre-defined protease name defined in :data:`protease_dict`, or a regular expression. By default 'trypsin' max_missed_cleavages : int, optional Maximal missed cleavages, by default 2 peptide_length_min : int, optional Minimal cleaved peptide length, by default 7 peptide_length_max : int, optional Maximal cleaved peptide length, by default 35 precursor_charge_min : int, optional Minimal precursor charge, by default 2 precursor_charge_max : int, optional Maximal precursor charge, by default 4 precursor_mz_min : float, optional Minimal precursor mz, by default 200.0 precursor_mz_max : float, optional Maximal precursor mz, by default 2000.0 var_mods : list, optional list of variable modifications, by default ['Acetyl@Protein_N-term','Oxidation@M'] max_var_mod_num : int, optional Minimal number of variable modifications on a peptide sequence, by default 0 max_var_mod_num : int, optional Maximal number of variable modifications on a peptide sequence, by default 2 fix_mods : list, optional list of fixed modifications, by default ['Carbamidomethyl@C'] labeling_channels : dict, optional Add isotope labeling with different channels, see :meth:`add_peptide_labeling()`. Defaults to None special_mods : list, optional Modifications with special occurance per peptide. It is useful for modificaitons like Phospho which may largely explode the number of candidate modified peptides. The number of special_mods per peptide is controlled by `max_append_mod_num`. Defaults to []. min_special_mod_num : int, optional Control the min number of special_mods per peptide, by default 0. max_special_mod_num : int, optional Control the max number of special_mods per peptide, by default 1. special_mods_cannot_modify_pep_c_term : bool, optional Some modifications cannot modify the peptide C-term, this will be useful for GlyGly@K as if C-term is di-Glyed, it cannot be cleaved/digested. Defaults to False. special_mods_cannot_modify_pep_n_term : bool, optional Similar to `special_mods_cannot_modify_pep_c_term`, but at N-term. Defaults to False. decoy : str, optional Decoy type (see :meth:`alphabase.spectral_library.base.append_decoy_sequence()`) # - `protein_reverse`: Reverse on target protein sequences - `pseudo_reverse`: Pseudo-reverse on target peptide sequences - `diann`: DiaNN-like decoy - None: no decoy by default None include_contaminants : bool, optional If include contaminants.fasta, by default False """ super().__init__( charged_frag_types=charged_frag_types, precursor_mz_min=precursor_mz_min, precursor_mz_max=precursor_mz_max, decoy=decoy, ) self.protein_df: pd.DataFrame = pd.DataFrame() self.I_to_L = I_to_L self.include_contaminants = include_contaminants self.max_peptidoform_num = 100 self._digest = Digest( protease, max_missed_cleavages, peptide_length_min, peptide_length_max ) self.min_precursor_charge = precursor_charge_min self.max_precursor_charge = precursor_charge_max self.var_mods = var_mods self.fix_mods = fix_mods self.min_var_mod_num = min_var_mod_num self.max_var_mod_num = max_var_mod_num self.labeling_channels = labeling_channels self.special_mods = special_mods self.min_special_mod_num = min_special_mod_num self.max_special_mod_num = max_special_mod_num self.special_mods_cannot_modify_pep_n_term = ( special_mods_cannot_modify_pep_n_term ) self.special_mods_cannot_modify_pep_c_term = ( special_mods_cannot_modify_pep_c_term ) self._parse_fix_and_var_mods()
def _parse_fix_and_var_mods(self): # self.fix_mod_aas = '' # self.fix_mod_prot_nterm_dict = {} # self.fix_mod_prot_cterm_dict = {} # self.fix_mod_pep_nterm_dict = {} # self.fix_mod_pep_cterm_dict = {} # self.fix_mod_dict = {} def _set_term_mod( term_mod, prot_nterm, prot_cterm, pep_nterm, pep_cterm, allow_conflicts ): def _set_dict(term_dict, site, mod, allow_conflicts): if allow_conflicts: if site in term_dict: term_dict[site].append(term_mod) else: term_dict[site] = [term_mod] else: term_dict[site] = term_mod site, term = parse_term_mod(term_mod) if term == ModificationKeys.ANY_N_TERM: _set_dict(pep_nterm, site, term_mod, allow_conflicts) elif term == ModificationKeys.PROTEIN_N_TERM: _set_dict(prot_nterm, site, term_mod, allow_conflicts) elif term == ModificationKeys.ANY_C_TERM: _set_dict(pep_cterm, site, term_mod, allow_conflicts) elif term == ModificationKeys.PROTEIN_C_TERM: _set_dict(prot_cterm, site, term_mod, allow_conflicts) # for mod in self.fix_mods: # if mod.find('@')+2 == len(mod): # self.fix_mod_aas += mod[-1] # self.fix_mod_dict[mod[-1]] = mod # else: # _set_term_mod( # mod, # self.fix_mod_prot_nterm_dict, # self.fix_mod_prot_cterm_dict, # self.fix_mod_pep_nterm_dict, # self.fix_mod_pep_cterm_dict, # allow_conflicts=False # ) self.var_mod_aas = "" self.var_mod_prot_nterm_dict = {} self.var_mod_prot_cterm_dict = {} self.var_mod_pep_nterm_dict = {} self.var_mod_pep_cterm_dict = {} self.var_mod_dict = {} global get_var_mods_per_sites if _check_if_multi_mods_on_aa(self.var_mods): for mod in self.var_mods: if mod.find(ModificationKeys.SITE_SEPARATOR) + 2 == len(mod): # if mod[-1] in self.fix_mod_dict: continue if mod[-1] in self.var_mod_dict: self.var_mod_dict[mod[-1]].append(mod) else: self.var_mod_aas += mod[-1] self.var_mod_dict[mod[-1]] = [mod] get_var_mods_per_sites = get_var_mods_per_sites_multi_mods_on_aa else: for mod in self.var_mods: if mod.find(ModificationKeys.SITE_SEPARATOR) + 2 == len(mod): # if mod[-1] in self.fix_mod_dict: continue self.var_mod_aas += mod[-1] self.var_mod_dict[mod[-1]] = mod get_var_mods_per_sites = get_var_mods_per_sites_single_mod_on_aa for mod in self.var_mods: if mod.find(ModificationKeys.SITE_SEPARATOR) + 2 < len(mod): _set_term_mod( mod, self.var_mod_prot_nterm_dict, self.var_mod_prot_cterm_dict, self.var_mod_pep_nterm_dict, self.var_mod_pep_cterm_dict, allow_conflicts=True, )
[docs] def import_and_process_fasta( self, fasta_files: list, ): """Import and process a fasta file or a list of fasta files. It includes 3 steps: 1. Digest and get peptide sequences, it uses `self.get_peptides_from_...()` 2. Process the peptides including add modifications, it uses :meth:`process_from_naked_peptide_seqs()`. Parameters ---------- fasta_files : list A fasta file or a list of fasta files """ if self.include_contaminants: fasta_files.append(os.path.join(CONST_FILE_FOLDER, "contaminants.fasta")) protein_dict = load_all_proteins(fasta_files) self.import_and_process_protein_dict(protein_dict)
[docs] def import_and_process_protein_dict(self, protein_dict: dict): """ Import and process the protein_dict. The processing step is in :meth:`process_from_naked_peptide_seqs()`. ``` protein_dict = load_all_proteins(fasta_files) ``` Parameters ---------- protein_dict : dict Format: { 'prot_id1': {'protein_id': 'prot_id1', 'sequence': string, 'gene_name': string, 'description': string 'prot_id2': {...} ... } """ self.get_peptides_from_protein_dict(protein_dict) self.process_from_naked_peptide_seqs()
[docs] def import_and_process_protein_df(self, protein_df: pd.DataFrame): """ Import and process the protein_dict. The processing step is in :meth:`process_from_naked_peptide_seqs()`. ``` protein_dict = load_all_proteins(fasta_files) ``` Parameters ---------- protein_df : pd.DataFrame DataFrame with columns 'protein_id', 'sequence', 'gene_name', 'description' """ self.get_peptides_from_protein_df(protein_df) self.process_from_naked_peptide_seqs()
[docs] def import_and_process_peptide_sequences( self, pep_seq_list: list, protein_list: list = None, ): """ Importing and process peptide sequences instead of proteins. The processing step is in :meth:`process_from_naked_peptide_seqs()`. Parameters ---------- pep_seq_list : list Peptide sequence list protein_list : list, optional Protein id list which maps to pep_seq_list one-by-one, by default None """ self.get_peptides_from_peptide_sequence_list(pep_seq_list, protein_list) self.process_from_naked_peptide_seqs()
[docs] def process_from_naked_peptide_seqs(self): """ The peptide processing step which is called by `import_and_process_...` methods. """ self.append_decoy_sequence() self.add_modifications() self.add_special_modifications() self.add_peptide_labeling() self.add_charge() self.calc_and_clip_precursor_mz()
[docs] def get_peptides_from_fasta(self, fasta_file: Union[str, list]): """Load peptide sequences from fasta files. Parameters ---------- fasta_file : Union[str,list] Could be a fasta file (str) or a list of fasta files (list[str]) """ if isinstance(fasta_file, str): self.get_peptides_from_fasta_list([fasta_file]) else: self.get_peptides_from_fasta_list(fasta_file)
[docs] def get_peptides_from_fasta_list(self, fasta_files: list): """Load peptide sequences from fasta file list Parameters ---------- fasta_files : list fasta file list """ if self.include_contaminants: fasta_files.append(os.path.join(CONST_FILE_FOLDER, "contaminants.fasta")) protein_dict = load_all_proteins(fasta_files) self.get_peptides_from_protein_dict(protein_dict)
[docs] def get_peptides_from_protein_df(self, protein_df: pd.DataFrame): self.protein_df = protein_df if self.I_to_L: self.protein_df["sequence_I2L"] = self.protein_df.sequence.str.replace( "I", "L" ) digest_seq = "sequence_I2L" else: digest_seq = "sequence" self._cleave_to_peptides(self.protein_df, protein_seq_column=digest_seq)
[docs] def get_peptides_from_protein_dict(self, protein_dict: dict): """Cleave the protein sequences in protein_dict. Parameters ---------- protein_dict : dict Format: ``` { 'prot_id1': {'protein_id': 'prot_id1', 'sequence': string, 'gene_name': string, 'description': string 'prot_id2': {...} ... } ``` """ protein_df = pd.DataFrame.from_dict(protein_dict, orient="index").reset_index( drop=True ) self.get_peptides_from_protein_df(protein_df)
def _cleave_to_peptides( self, protein_df: pd.DataFrame, protein_seq_column: str = "sequence" ): """Cleave protein sequences in protein_df Parameters ---------- protein_df : pd.DataFrame Protein DataFrame containing `protein_seq_column` protein_seq_column : str, optional Target column containing protein sequences, by default 'sequence' """ pep_dict = {} for i, prot_seq in enumerate(protein_df[protein_seq_column].values): (seq_list, miss_list, nterm_list, cterm_list) = ( self._digest.cleave_sequence(prot_seq) ) for seq, miss, nterm, cterm in zip( seq_list, miss_list, nterm_list, cterm_list ): prot_id = str(i) if seq in pep_dict: if not pep_dict[seq][0].endswith(prot_id): pep_dict[seq][0] += ModificationKeys.SEPARATOR + prot_id if nterm: pep_dict[seq][2] = nterm if cterm: pep_dict[seq][3] = cterm else: pep_dict[seq] = [prot_id, miss, nterm, cterm] self._precursor_df = pd.DataFrame().from_dict( pep_dict, orient="index", columns=[ "protein_idxes", "miss_cleavage", "is_prot_nterm", "is_prot_cterm", ], ) self._precursor_df.reset_index(drop=False, inplace=True) self._precursor_df.rename(columns={"index": "sequence"}, inplace=True) self._precursor_df["mods"] = "" self._precursor_df["mod_sites"] = "" self.refine_df()
[docs] def append_protein_name(self): if ( "protein_id" not in self.protein_df or "protein_idxes" not in self._precursor_df ): return self._precursor_df["proteins"] = self._precursor_df["protein_idxes"].apply( protein_idxes_to_names, protein_names=self.protein_df["protein_id"].values ) if "gene_name" in self.protein_df.columns: self._precursor_df["genes"] = self._precursor_df["protein_idxes"].apply( protein_idxes_to_names, protein_names=self.protein_df["gene_name"].values, )
[docs] def get_peptides_from_peptide_sequence_list( self, pep_seq_list: list, protein_list: list = None ): self._precursor_df = pd.DataFrame() self._precursor_df["sequence"] = pep_seq_list if protein_list is not None: self._precursor_df["protein_name"] = protein_list self._precursor_df["is_prot_nterm"] = False self._precursor_df["is_prot_cterm"] = False self.refine_df()
[docs] def add_mods_for_one_seq( self, sequence: str, is_prot_nterm, is_prot_cterm ) -> tuple: """Add fixed and variable modifications to a sequence Parameters ---------- sequence : str Peptide sequence is_prot_nterm : bool if protein N-term is_prot_cterm : bool if protein C-term Returns ------- tuple list[str]: list of modification names list[str]: list of modification sites """ # fix_mods, fix_mod_sites = get_fix_mods( # sequence, self.fix_mod_aas, self.fix_mod_dict # ) # #TODO add prot and pep C-term fix mods # #TODO add prot and pep N-term fix mods # if len(fix_mods) == 0: # fix_mods = [''] # fix_mod_sites = [''] # else: # fix_mods = [fix_mods] # fix_mod_sites = [fix_mod_sites] var_mods_list, var_mod_sites_list = get_var_mods( sequence, self.var_mod_aas, self.var_mod_dict, self.min_var_mod_num, self.max_var_mod_num, self.max_peptidoform_num - 1, # 1 for unmodified ) nterm_var_mods = [""] nterm_var_mod_sites = [""] if is_prot_nterm and len(self.var_mod_prot_nterm_dict) > 0: if "" in self.var_mod_prot_nterm_dict: nterm_var_mods.extend(self.var_mod_prot_nterm_dict[""]) if sequence[0] in self.var_mod_prot_nterm_dict: nterm_var_mods.extend(self.var_mod_prot_nterm_dict[sequence[0]]) if len(self.var_mod_pep_nterm_dict) > 0: if "" in self.var_mod_pep_nterm_dict: nterm_var_mods.extend(self.var_mod_pep_nterm_dict[""]) if sequence[0] in self.var_mod_pep_nterm_dict: nterm_var_mods.extend(self.var_mod_pep_nterm_dict[sequence[0]]) nterm_var_mod_sites.extend(["0"] * (len(nterm_var_mods) - 1)) # TODO add prot and pep C-term var mods return ( list( ModificationKeys.SEPARATOR.join([i for i in items if i]) for items in itertools.product(nterm_var_mods, var_mods_list) ), list( ModificationKeys.SEPARATOR.join([i for i in items if i]) for items in itertools.product(nterm_var_mod_sites, var_mod_sites_list) ), )
[docs] def add_modifications(self): """Add fixed and variable modifications to all peptide sequences in `self.precursor_df`""" if "is_prot_nterm" not in self._precursor_df.columns: self._precursor_df["is_prot_nterm"] = False if "is_prot_cterm" not in self._precursor_df.columns: self._precursor_df["is_prot_cterm"] = False if len(self._precursor_df) == 0: self._precursor_df["mods"] = "" self._precursor_df["mod_sites"] = "" return (self._precursor_df["mods"], self._precursor_df["mod_sites"]) = zip( *self._precursor_df[["sequence", "is_prot_nterm", "is_prot_cterm"]].apply( lambda x: self.add_mods_for_one_seq(*x), axis=1 ) ) self._precursor_df = explode_multiple_columns( self._precursor_df, ["mods", "mod_sites"] ) self._precursor_df.dropna(subset=["mods"], inplace=True) self._precursor_df = create_labeling_peptide_df( self._precursor_df, self.fix_mods, inplace=True ) self._precursor_df.reset_index(drop=True, inplace=True)
[docs] def add_special_modifications(self): """ Add external defined variable modifications to all peptide sequences in `self._precursor_df`. See :meth:`append_special_modifications()` for details. """ if len(self.special_mods) == 0: return self._precursor_df = append_special_modifications( self._precursor_df, self.special_mods, self.min_special_mod_num, self.max_special_mod_num, self.max_peptidoform_num, cannot_modify_pep_nterm_aa=self.special_mods_cannot_modify_pep_n_term, cannot_modify_pep_cterm_aa=self.special_mods_cannot_modify_pep_c_term, )
[docs] def add_peptide_labeling(self, labeling_channel_dict: dict = None): """ Add labeling onto peptides inplace of self._precursor_df Parameters ---------- labeling_channel_dict : dict, optional For example: ``` { -1: [], # not labeled 0: ['Dimethyl@Any_N-term','Dimethyl@K'], 4: ['Dimethyl:2H(4)@Any_N-term','Dimethyl:2H(4)@K'], 8: ['Dimethyl:2H(6)13C(2)@Any_N-term','Dimethyl:2H(6)13C(2)@K'], } ```. The key name could be int (highly recommended or must be in the future) or str, and the value must be a list of modification names (str) in alphabase format. It is set to `self.labeling_channels` if None. Defaults to None """ if labeling_channel_dict is None: labeling_channel_dict = self.labeling_channels if labeling_channel_dict is None or len(labeling_channel_dict) == 0: return df_list = [] for channel, labels in labeling_channel_dict.items(): df = create_labeling_peptide_df(self._precursor_df, labels) df["labeling_channel"] = channel df_list.append(df) self._precursor_df = pd.concat(df_list, ignore_index=True) try: self._precursor_df["labeling_channel"] = ( self._precursor_df.labeling_channel.astype(np.int32) ) if "labeling_channel" not in self.key_numeric_columns: self.key_numeric_columns.append("labeling_channel") except Exception: if "labeling_channel" in self.key_numeric_columns: self.key_numeric_columns.remove("labeling_channel")
[docs] def add_charge(self): """Add charge states""" if "charge" in self._precursor_df.columns: return self._precursor_df["charge"] = [ np.arange(self.min_precursor_charge, self.max_precursor_charge + 1) ] * len(self._precursor_df) self._precursor_df = self._precursor_df.explode("charge") self._precursor_df["charge"] = self._precursor_df.charge.astype(np.int8) self._precursor_df.reset_index(drop=True, inplace=True)
[docs] def save_hdf(self, hdf_file: str): """Save the contents into hdf file (attribute -> hdf_file): - self.precursor_df -> library/precursor_df - self.protein_df -> library/protein_df - self.fragment_mz_df -> library/fragment_mz_df - self.fragment_intensity_df -> library/fragment_intensity_df Parameters ---------- hdf_file : str The hdf file path """ super().save_hdf(hdf_file) _hdf = HDF_File(hdf_file, read_only=False, truncate=True, delete_existing=False) _hdf.library.protein_df = self.protein_df
[docs] def load_hdf(self, hdf_file: str, load_mod_seq: bool = False): """Load contents from hdf file: - self.precursor_df <- library/precursor_df - self.precursor_df <- library/mod_seq_df if load_mod_seq is True - self.protein_df <- library/protein_df - self.fragment_mz_df <- library/fragment_mz_df - self.fragment_intensity_df <- library/fragment_intensity_df Parameters ---------- hdf_file : str hdf file path load_mod_seq : bool, optional After library is generated with hash values (int64) for sequences (str) and modifications (str), we don't need sequence information for searching. So we can skip loading sequences to make the loading much faster. By default False """ super().load_hdf(hdf_file, load_mod_seq=load_mod_seq) try: _hdf = HDF_File( hdf_file, ) self.protein_df = _hdf.library.protein_df.values except (AttributeError, KeyError, ValueError, TypeError): print(f"No protein_df in {hdf_file}")
[docs] def annotate_precursor_df( precursor_df: pd.DataFrame, protein_df: pd.DataFrame, ): """Annotate a list of peptides with genes and proteins by using an ahocorasick automaton. Parameters ---------- precursor_df : pd.DataFrame A dataframe containing a sequence column. protein_df : pd.DataFrame protein dataframe containing `sequence` column. Returns ------- pd.DataFrame updated precursor_df with `genes`, `proteins` and `cardinality` columns. """ if len(precursor_df) == 0: return precursor_df if len(protein_df) == 0: return precursor_df if "sequence" not in precursor_df.columns: raise SystemError("precursor_df must contain a sequence column") peptide_df = pd.DataFrame({"sequence": precursor_df["sequence"].unique()}) # ahocorasick automaton will be used to index the protein_df automaton = ahocorasick.Automaton() for i, peptide_sequence in enumerate(peptide_df["sequence"]): automaton.add_word(peptide_sequence, i) automaton.make_automaton() genes = [[] for _ in range(len(peptide_df))] proteins = [[] for _ in range(len(peptide_df))] # iter as dictionary for protein_entry in tqdm(protein_df.to_dict("records")): idx = [idx for _, idx in automaton.iter(protein_entry["sequence"])] idx = np.unique(idx) if len(idx) > 0: for i in idx: genes[i].append(protein_entry["gene_org"]) proteins[i].append(protein_entry["protein_id"]) peptide_df["genes"] = [ModificationKeys.SEPARATOR.join(g) for g in genes] peptide_df["proteins"] = [ModificationKeys.SEPARATOR.join(g) for g in proteins] peptide_df["cardinality"] = [len(g) for g in genes] if "genes" in precursor_df.columns: precursor_df.drop(columns=["genes"], inplace=True) if "proteins" in precursor_df.columns: precursor_df.drop(columns=["proteins"], inplace=True) if "proteotypic" in precursor_df.columns: precursor_df.drop(columns=["proteotypic"], inplace=True) if "cardinality" in precursor_df.columns: precursor_df.drop(columns=["cardinality"], inplace=True) failed_annotation = np.sum(peptide_df["genes"] == "") if failed_annotation > 0: warnings.warn(f"{failed_annotation} peptides could not be annotated") return precursor_df.merge(peptide_df, on="sequence", how="left")
def _check_if_multi_mods_on_aa(var_mods): mod_set = set() for mod in var_mods: if mod.find(ModificationKeys.SITE_SEPARATOR) + 2 == len(mod): if mod[-1] in mod_set: return True mod_set.add(mod[-1]) return False