import multiprocessing as mp
import typing
import numba
import numpy as np
import pandas as pd
import tqdm
from alphabase.constants.modification import MOD_DF, ModificationKeys
from alphabase.spectral_library.base import SpecLibBase
from alphabase.utils import explode_multiple_columns
# @numba.njit #(cannot use numba for pd.Series)
[docs]
def create_modified_sequence(
seq_mods_sites: typing.Tuple, # must be ('sequence','mods','mod_sites')
translate_mod_dict: dict = None,
mod_sep="[]",
nterm="_",
cterm="_",
):
"""
Translate `(sequence, mods, mod_sites)` into a modified sequence. Used by `df.apply()`.
For example, `('ABCDEFG','Mod1@A;Mod2@E','1;5')`->`_A[Mod1@A]BCDE[Mod2@E]FG_`.
Parameters
----------
seq_mods_sites : List
must be `(sequence, mods, mod_sites)`
translate_mod_dict : dict
A dict to map AlphaX modification names to other software,
use unimod name if None.
Defaults to None.
mod_sep : str
'[]' or '()', default '[]'
"""
mod_seq, mods, mod_sites = seq_mods_sites
if mods:
mods = mods.split(ModificationKeys.SEPARATOR)
mod_sites = [int(i) for i in mod_sites.split(ModificationKeys.SEPARATOR)]
rev_order = np.argsort(mod_sites)[::-1]
mod_sites = [mod_sites[rev_order[i]] for i in range(len(mod_sites))]
mods = [mods[rev_order[i]] for i in range(len(mods))]
if translate_mod_dict is None:
mods = [mod[: mod.find(ModificationKeys.SITE_SEPARATOR)] for mod in mods]
else:
mods = [translate_mod_dict[mod] for mod in mods]
for _site, mod in zip(mod_sites, mods):
if _site > 0:
mod_seq = (
mod_seq[:_site] + mod_sep[0] + mod + mod_sep[1] + mod_seq[_site:]
)
elif _site == -1:
cterm += mod_sep[0] + mod + mod_sep[1]
elif _site == 0:
nterm += mod_sep[0] + mod + mod_sep[1]
else:
mod_seq = (
mod_seq[:_site] + mod_sep[0] + mod + mod_sep[1] + mod_seq[_site:]
)
return nterm + mod_seq + cterm
@numba.njit
def _get_frag_info_from_column_name(column: str):
"""
Only used when converting alphabase libraries into other libraries
"""
idx = column.rfind("_")
frag_type = column[:idx]
charge = column[idx + 2 :]
if len(frag_type) == 1:
loss_type = "noloss"
else:
idx = frag_type.find("_")
loss_type = frag_type[idx + 1 :]
frag_type = frag_type[0]
return frag_type, loss_type, charge
def _get_frag_num(columns, rows, frag_len):
frag_nums = []
for r, c in zip(rows, columns):
if is_nterm_frag(c):
frag_nums.append(r + 1)
else:
frag_nums.append(frag_len - r)
return frag_nums
[docs]
def merge_precursor_fragment_df(
precursor_df: pd.DataFrame,
fragment_mz_df: pd.DataFrame,
fragment_inten_df: pd.DataFrame,
top_n_inten: int,
frag_type_head: str = "FragmentType",
frag_mass_head: str = "FragmentMz",
frag_inten_head: str = "RelativeIntensity",
frag_charge_head: str = "FragmentCharge",
frag_series_head: str = "FragmentNumber",
frag_loss_head: str = "FragmentLossType",
verbose=True,
):
"""
Convert alphabase library into a single dataframe.
This method is not important, as it will be only
used by DiaNN, or spectronaut, or others
"""
df = precursor_df.copy()
frag_columns = fragment_mz_df.columns.values.astype("U")
frag_type_list = []
frag_loss_list = []
frag_charge_list = []
frag_mass_list = []
frag_inten_list = []
frag_num_list = []
iters = enumerate(df[["frag_start_idx", "frag_stop_idx"]].values)
if verbose:
iters = tqdm.tqdm(iters)
for _i, (start, end) in iters:
intens = fragment_inten_df.iloc[start:end, :].to_numpy(
copy=True
) # is loc[start:end-1,:] faster?
max_inten = np.amax(intens)
if max_inten > 0:
intens /= max_inten
masses = fragment_mz_df.iloc[start:end, :].values
sorted_idx = np.argsort(intens.reshape(-1))[-top_n_inten:][::-1]
idx_in_df = np.unravel_index(sorted_idx, masses.shape)
frag_len = end - start
rows = np.arange(frag_len, dtype=np.int32)[idx_in_df[0]]
columns = frag_columns[idx_in_df[1]]
frag_types, loss_types, charges = zip(
*[_get_frag_info_from_column_name(_) for _ in columns]
)
frag_nums = _get_frag_num(columns, rows, frag_len)
frag_type_list.append(frag_types)
frag_loss_list.append(loss_types)
frag_charge_list.append(charges)
frag_mass_list.append(masses[idx_in_df])
frag_inten_list.append(intens[idx_in_df])
frag_num_list.append(frag_nums)
df[frag_type_head] = frag_type_list
df[frag_mass_head] = frag_mass_list
df[frag_inten_head] = frag_inten_list
df[frag_charge_head] = frag_charge_list
df[frag_series_head] = frag_num_list
df[frag_loss_head] = frag_loss_list
return explode_multiple_columns(
df,
[
frag_type_head,
frag_mass_head,
frag_inten_head,
frag_charge_head,
frag_series_head,
frag_loss_head,
],
)
# try:
# return df.explode([
# frag_type_head,
# frag_mass_head,
# frag_inten_head,
# frag_charge_head,
# frag_loss_head,
# frag_num_head
# ])
# except ValueError:
# # df.explode does not allow mulitple columns before pandas version 1.x.x.
# df = df.explode(frag_type_head)
# df[frag_mass_head] = _flatten(frag_mass_list)
# df[frag_inten_head] = _flatten(frag_inten_list)
# df[frag_charge_head] = _flatten(frag_charge_list)
# df[frag_loss_head] = _flatten(frag_loss_list)
# df[frag_num_head] = _flatten(frag_num_list)
# return df
mod_to_unimod_dict = {}
for mod_name, unimod_id in MOD_DF[["mod_name", "unimod_id"]].values:
if unimod_id == -1 or unimod_id == "-1":
continue
mod_to_unimod_dict[mod_name] = f"UniMod:{unimod_id}"
[docs]
def is_nterm_frag(frag_type: str):
return frag_type[0] in "abc"
[docs]
def mask_fragment_intensity_by_mz_(
fragment_mz_df: pd.DataFrame,
fragment_intensity_df: pd.DataFrame,
min_frag_mz,
max_frag_mz,
):
fragment_intensity_df.mask(
(fragment_mz_df > max_frag_mz) | (fragment_mz_df < min_frag_mz), 0, inplace=True
)
[docs]
def mask_fragment_intensity_by_frag_nAA(
fragment_intensity_df: pd.DataFrame, precursor_df: pd.DataFrame, max_mask_frag_nAA
):
if max_mask_frag_nAA <= 0:
return
b_mask = np.zeros(len(fragment_intensity_df), dtype=np.bool_)
y_mask = b_mask.copy()
for i_frag in range(max_mask_frag_nAA):
b_mask[precursor_df.frag_start_idx.values + i_frag] = True
y_mask[precursor_df.frag_stop_idx.values - i_frag - 1] = True
masks = np.zeros(
(len(fragment_intensity_df), len(fragment_intensity_df.columns)), dtype=np.bool_
)
for i, col in enumerate(fragment_intensity_df.columns.values):
if is_nterm_frag(col):
masks[:, i] = b_mask
else:
masks[:, i] = y_mask
fragment_intensity_df.mask(masks, 0, inplace=True)
[docs]
def speclib_to_single_df(
speclib: SpecLibBase,
*,
translate_mod_dict: dict = None,
keep_k_highest_fragments: int = 12,
min_frag_mz=200,
max_frag_mz=2000,
min_frag_intensity=0.01,
min_frag_nAA=0,
modloss: str = "H3PO4",
frag_type_head: str = "FragmentType",
frag_mass_head: str = "FragmentMz",
frag_inten_head: str = "RelativeIntensity",
frag_charge_head: str = "FragmentCharge",
frag_loss_head: str = "FragmentLossType",
frag_series_head: str = "FragmentNumber",
verbose=True,
) -> pd.DataFrame:
"""
Convert alphabase library to diann (or Spectronaut) library dataframe
This method is not important, as it will be only
used by DiaNN, or spectronaut, or others
Parameters
----------
translate_mod_dict : dict
A dict to map AlphaX modification names to other software,
use unimod name if None.
Defaults to None.
keep_k_highest_peaks : int
only keep highest fragments for each precursor. Default: 12
Returns
-------
pd.DataFrame
a single dataframe in the SWATH-like format
"""
df = pd.DataFrame()
df["ModifiedPeptide"] = speclib._precursor_df[
["sequence", "mods", "mod_sites"]
].apply(
create_modified_sequence,
axis=1,
translate_mod_dict=translate_mod_dict,
mod_sep="[]",
)
df["frag_start_idx"] = speclib._precursor_df["frag_start_idx"]
df["frag_stop_idx"] = speclib._precursor_df["frag_stop_idx"]
df["PrecursorCharge"] = speclib._precursor_df["charge"]
for rt_col in ["irt_pred", "rt_pred", "rt", "irt", "rt_norm"]:
if rt_col in speclib.precursor_df.columns:
df["RT"] = speclib.precursor_df[rt_col]
break
if "RT" not in df.columns:
raise ValueError("precursor_df must contain the RT columns")
for im_col in ["mobility_pred", "mobility"]:
if im_col in speclib.precursor_df.columns:
df["IonMobility"] = speclib.precursor_df[im_col]
break
for ccs_col in ["ccs_pred", "ccs"]:
if ccs_col in speclib.precursor_df.columns:
df["CCS"] = speclib.precursor_df[ccs_col]
break
# df['LabelModifiedSequence'] = df['ModifiedPeptide']
df["StrippedPeptide"] = speclib.precursor_df["sequence"]
if "precursor_mz" not in speclib._precursor_df.columns:
speclib.calc_precursor_mz()
df["PrecursorMz"] = speclib._precursor_df["precursor_mz"]
for prot_col in ["uniprot_ids", "proteins"]:
if prot_col in speclib.precursor_df.columns:
df["ProteinID"] = speclib.precursor_df[prot_col]
break
if "genes" in speclib._precursor_df.columns:
df["Genes"] = speclib._precursor_df["genes"]
if "decoy" in speclib._precursor_df.columns:
df["Decoy"] = speclib._precursor_df["decoy"]
# if 'protein_group' in speclib._precursor_df.columns:
# df['ProteinGroups'] = speclib._precursor_df['protein_group']
if min_frag_mz > 0 or max_frag_mz > 0:
mask_fragment_intensity_by_mz_(
speclib._fragment_mz_df,
speclib._fragment_intensity_df,
min_frag_mz,
max_frag_mz,
)
if min_frag_nAA > 0:
mask_fragment_intensity_by_frag_nAA(
speclib._fragment_intensity_df,
speclib._precursor_df,
max_mask_frag_nAA=min_frag_nAA - 1,
)
df = merge_precursor_fragment_df(
df,
speclib._fragment_mz_df,
speclib._fragment_intensity_df,
top_n_inten=keep_k_highest_fragments,
frag_type_head=frag_type_head,
frag_mass_head=frag_mass_head,
frag_inten_head=frag_inten_head,
frag_charge_head=frag_charge_head,
frag_loss_head=frag_loss_head,
frag_series_head=frag_series_head,
verbose=verbose,
)
df = df[df["RelativeIntensity"] > min_frag_intensity]
df.loc[df[frag_loss_head] == "modloss", frag_loss_head] = modloss
return df.drop(["frag_start_idx", "frag_stop_idx"], axis=1)
[docs]
def speclib_to_swath_df(
speclib: SpecLibBase,
*,
keep_k_highest_fragments: int = 12,
min_frag_mz=200,
max_frag_mz=2000,
min_frag_intensity=0.01,
) -> pd.DataFrame:
speclib_to_single_df(
speclib,
translate_mod_dict=None,
keep_k_highest_fragments=keep_k_highest_fragments,
min_frag_mz=min_frag_mz,
max_frag_mz=max_frag_mz,
min_frag_intensity=min_frag_intensity,
)
[docs]
class WritingProcess(mp.Process):
[docs]
def __init__(self, task_queue, tsv, *args, **kwargs):
self.task_queue: mp.Queue = task_queue
self.tsv = tsv
super().__init__(*args, **kwargs)
[docs]
def run(self):
while True:
df, batch = self.task_queue.get()
if df is None:
break
if tuple([int(i) for i in pd.__version__.split(".")[:2]]) >= (1, 5):
newline = dict(lineterminator="\n")
else:
newline = dict(line_terminator="\n")
df.to_csv(
self.tsv,
header=(batch == 0),
sep="\t",
mode="a",
index=False,
**newline,
)
[docs]
def translate_to_tsv(
speclib: SpecLibBase,
tsv: str,
*,
keep_k_highest_fragments: int = 12,
min_frag_mz: float = 200,
max_frag_mz: float = 2000,
min_frag_intensity: float = 0.01,
min_frag_nAA: int = 0,
batch_size: int = 100000,
translate_mod_dict: dict = None,
multiprocessing: bool = True,
):
if multiprocessing:
queue_size = 1000000 // batch_size
if queue_size < 2:
queue_size = 2
elif queue_size > 10:
queue_size = 10
df_head_queue = mp.Queue(maxsize=queue_size)
writing_process = WritingProcess(df_head_queue, tsv)
writing_process.start()
mask_fragment_intensity_by_mz_(
speclib._fragment_mz_df,
speclib._fragment_intensity_df,
min_frag_mz,
max_frag_mz,
)
if min_frag_nAA > 0:
mask_fragment_intensity_by_frag_nAA(
speclib._fragment_intensity_df,
speclib._precursor_df,
max_mask_frag_nAA=min_frag_nAA - 1,
)
if isinstance(tsv, str):
with open(tsv, "w"):
pass
_speclib = SpecLibBase()
_speclib._fragment_intensity_df = speclib._fragment_intensity_df
_speclib._fragment_mz_df = speclib._fragment_mz_df
precursor_df = speclib._precursor_df
for i in tqdm.tqdm(range(0, len(precursor_df), batch_size)):
_speclib._precursor_df = precursor_df.iloc[i : i + batch_size]
df = speclib_to_single_df(
_speclib,
translate_mod_dict=translate_mod_dict,
keep_k_highest_fragments=keep_k_highest_fragments,
min_frag_mz=0,
max_frag_mz=0,
min_frag_intensity=min_frag_intensity,
min_frag_nAA=0,
verbose=False,
)
if multiprocessing:
df_head_queue.put((df, i))
else:
if tuple([int(i) for i in pd.__version__.split(".")[:2]]) >= (1, 5):
newline = dict(lineterminator="\n")
else:
newline = dict(line_terminator="\n")
df.to_csv(tsv, header=(i == 0), sep="\t", mode="a", index=False, **newline)
if multiprocessing:
df_head_queue.put((None, None))
print(
"Translation finished, it will take several minutes to export the rest precursors to the tsv file..."
)
writing_process.join()