import warnings
from dataclasses import dataclass, field
from typing import Dict, List, Tuple, Union
import numba as nb
import numpy as np
import pandas as pd
from alphabase.constants._const import PEAK_INTENSITY_DTYPE, PEAK_MZ_DTYPE
from alphabase.constants.atom import (
MASS_PROTON,
calc_mass_from_formula,
)
from alphabase.constants.modification import ModificationKeys, calc_modloss_mass
from alphabase.peptide.mass_calc import calc_b_y_and_peptide_masses_for_same_len_seqs
from alphabase.peptide.precursor import (
is_precursor_refined,
refine_precursor_df,
)
from alphabase.utils import DeprecatedDict
[docs]
class Direction:
"""String constants defining fragment directions."""
FORWARD = "forward"
REVERSE = "reverse"
DIRECTION_MAPPING = {Direction.FORWARD: 1, Direction.REVERSE: -1}
DIRECTION_MAPPING_INV = {v: k for k, v in DIRECTION_MAPPING.items()}
[docs]
class Loss:
"""String constants defining fragment losses."""
MODLOSS = "modloss"
H2O = "H2O"
NH3 = "NH3"
LOSSH = "lossH"
ADDH = "addH"
NONE = ""
LOSS_MAPPING = {
# we use 98 because it is similar to the molecular weight of a phosphate group
Loss.MODLOSS: 98,
# we use 18 because it is similar to the molecular weight of a water molecule
Loss.H2O: 18,
# we use 17 because it is similar to the molecular weight of an ammonia molecule
Loss.NH3: 17,
# we use 1 because it is similar to the molecular weight of a hydrogen atom
Loss.LOSSH: 1,
# there is no -1 so we use 2
Loss.ADDH: 2,
Loss.NONE: 0,
}
LOSS_MAPPING_INV = {v: k for k, v in LOSS_MAPPING.items()}
[docs]
class Series:
"""String constants defining fragment series types."""
A = "a"
B = "b"
C = "c"
X = "x"
Y = "y"
Z = "z"
SERIES_MAPPING = {
# ascii value for a, b, c, x, y, z
Series.A: 97,
Series.B: 98,
Series.C: 99,
Series.X: 120,
Series.Y: 121,
Series.Z: 122,
}
SERIES_MAPPING_INV = {v: k for k, v in SERIES_MAPPING.items()}
UNANNOTATED_TYPE = 255
[docs]
@dataclass(frozen=True)
class FragmentType:
"""
Class which represents a constant fragment type.
Parameters
----------
name : str
Name of the fragment type
ref_ion : str
Reference ion of the fragment type
delta_formula : str
Chemical formula representing the mass difference from the reference ion
delta_mass : float
Mass difference calculated from delta_formula, set during initialization
modloss : bool
Whether the fragment type has a modification neutral loss
series_id : int
Series ID of the fragment type (e.g., 97=a, 98=b, 99=c, 120=x, 121=y, 122=z), from SERIES_MAPPING
loss_id : int
Loss type ID of the fragment (e.g., 0=none, 1=lossH, 2=addH, 17=NH3, 18=H2O, 98=modloss), from LOSS_MAPPING
direction_id : int
Direction ID of the fragment type (forward=1, reverse=-1), from DIRECTION_MAPPING
Attributes
----------
delta_mass : float
Mass difference calculated from delta_formula, set during initialization
"""
name: str
ref_ion: str
delta_formula: str
delta_mass: float = field(init=False)
modloss: bool
series_id: int
loss_id: int
direction_id: int
def __post_init__(self):
"""Set delta_mass after initialization using delta_formula"""
object.__setattr__(
self, "delta_mass", calc_mass_from_formula(self.delta_formula)
)
# constant which contains all valid fragment types
FRAGMENT_TYPES = {
"a": FragmentType(
name="a",
ref_ion="b",
delta_formula="C(-1)O(-1)",
modloss=False,
series_id=SERIES_MAPPING[Series.A],
loss_id=LOSS_MAPPING[Loss.NONE],
direction_id=DIRECTION_MAPPING[Direction.FORWARD],
),
"b": FragmentType(
name="b",
ref_ion="b",
delta_formula="",
modloss=False,
series_id=SERIES_MAPPING[Series.B],
loss_id=LOSS_MAPPING[Loss.NONE],
direction_id=DIRECTION_MAPPING[Direction.FORWARD],
),
"c": FragmentType(
name="c",
ref_ion="b",
delta_formula="N(1)H(3)",
modloss=False,
series_id=SERIES_MAPPING[Series.C],
loss_id=LOSS_MAPPING[Loss.NONE],
direction_id=DIRECTION_MAPPING[Direction.FORWARD],
),
"x": FragmentType(
name="x",
ref_ion="y",
delta_formula="C(1)O(1)H(-2)",
modloss=False,
series_id=SERIES_MAPPING[Series.X],
loss_id=LOSS_MAPPING[Loss.NONE],
direction_id=DIRECTION_MAPPING[Direction.REVERSE],
),
"y": FragmentType(
name="y",
ref_ion="y",
delta_formula="",
modloss=False,
series_id=SERIES_MAPPING[Series.Y],
loss_id=LOSS_MAPPING[Loss.NONE],
direction_id=DIRECTION_MAPPING[Direction.REVERSE],
),
"z": FragmentType(
name="z",
ref_ion="y",
delta_formula="N(-1)H(-2)",
modloss=False,
series_id=SERIES_MAPPING[Series.Z],
loss_id=LOSS_MAPPING[Loss.NONE],
direction_id=DIRECTION_MAPPING[Direction.REVERSE],
),
"b_modloss": FragmentType(
name="b_modloss",
ref_ion="b",
delta_formula="N(1)H(3)",
modloss=True,
series_id=SERIES_MAPPING[Series.B],
loss_id=LOSS_MAPPING[Loss.MODLOSS],
direction_id=DIRECTION_MAPPING[Direction.FORWARD],
),
"b_H2O": FragmentType(
name="b_H2O",
ref_ion="b",
delta_formula="H(-2)O(-1)",
modloss=False,
series_id=SERIES_MAPPING[Series.B],
loss_id=LOSS_MAPPING[Loss.H2O],
direction_id=DIRECTION_MAPPING[Direction.FORWARD],
),
"b_NH3": FragmentType(
name="b_NH3",
ref_ion="b",
delta_formula="N(-1)H(-3)",
modloss=False,
series_id=SERIES_MAPPING[Series.B],
loss_id=LOSS_MAPPING[Loss.NH3],
direction_id=DIRECTION_MAPPING[Direction.FORWARD],
),
"c_lossH": FragmentType(
name="c_lossH",
ref_ion="b",
delta_formula="N(1)H(2)",
modloss=False,
series_id=SERIES_MAPPING[Series.C],
loss_id=LOSS_MAPPING[Loss.LOSSH],
direction_id=DIRECTION_MAPPING[Direction.FORWARD],
),
"y_modloss": FragmentType(
name="y_modloss",
ref_ion="y",
delta_formula="N(-1)H(-2)",
modloss=True,
series_id=SERIES_MAPPING[Series.Y],
loss_id=LOSS_MAPPING[Loss.MODLOSS],
direction_id=DIRECTION_MAPPING[Direction.REVERSE],
),
"y_H2O": FragmentType(
name="y_H2O",
ref_ion="y",
delta_formula="H(-2)O(-1)",
modloss=False,
series_id=SERIES_MAPPING[Series.Y],
loss_id=LOSS_MAPPING[Loss.H2O],
direction_id=DIRECTION_MAPPING[Direction.REVERSE],
),
"y_NH3": FragmentType(
name="y_NH3",
ref_ion="y",
delta_formula="N(-1)H(-3)",
modloss=False,
series_id=SERIES_MAPPING[Series.Y],
loss_id=LOSS_MAPPING[Loss.NH3],
direction_id=DIRECTION_MAPPING[Direction.REVERSE],
),
"z_addH": FragmentType(
name="z_addH",
ref_ion="y",
delta_formula="N(-1)H(-1)",
modloss=False,
series_id=SERIES_MAPPING[Series.Z],
loss_id=LOSS_MAPPING[Loss.ADDH],
direction_id=DIRECTION_MAPPING[Direction.REVERSE],
),
}
FRAGMENT_CHARGE_SEPARATOR = "_z"
# TODO: remove this dictionary
frag_type_representation_dict = DeprecatedDict(
{
"c": "b+N(1)H(3)",
"z": "y+N(-1)H(-2)",
"a": "b+C(-1)O(-1)",
"x": "y+C(1)O(1)H(-2)",
"b_H2O": "b+H(-2)O(-1)",
"y_H2O": "y+H(-2)O(-1)",
"b_NH3": "b+N(-1)H(-3)",
"y_NH3": "y+N(-1)H(-3)",
"c_lossH": "b+N(1)H(2)",
"z_addH": "y+N(-1)H(-1)",
},
warning_message="frag_type_representation_dict is deprecated and will be removed in the future version",
)
# TODO: remove this dictionary
frag_mass_from_ref_ion_dict = DeprecatedDict(
{},
warning_message="frag_mass_from_ref_ion_dict is deprecated and will be removed in a future version",
)
# TODO: remove this function
[docs]
def add_new_frag_type(frag_type: str, representation: str):
"""Add new modifications into :data:`frag_type_representation_dict`
and update :data:`frag_mass_from_ref_ion_dict`.
Parameters
----------
frag_type : str
New fragment type
representation : str
The representation similar to :data:`frag_type_representation_dict`
"""
frag_type_representation_dict[frag_type] = representation
ref_ion, formula = representation.split("+")
frag_mass_from_ref_ion_dict[frag_type] = dict(
ref_ion=ref_ion, add_mass=calc_mass_from_formula(formula)
)
# TODO: remove this function
[docs]
def parse_all_frag_type_representation():
for frag, representation in frag_type_representation_dict.items():
add_new_frag_type(frag, representation)
parse_all_frag_type_representation()
[docs]
def sort_charged_frag_types(charged_frag_types: List[str]) -> List[str]:
"""charged frag types are sorted by (no-loss, loss) and then alphabetically"""
has_loss = [
f.replace(FRAGMENT_CHARGE_SEPARATOR, "").count("_") > 0
for f in charged_frag_types
]
no_loss = [f for f, hl in zip(charged_frag_types, has_loss) if not hl]
loss = [f for f, hl in zip(charged_frag_types, has_loss) if hl]
return sorted(no_loss) + sorted(loss)
[docs]
def get_charged_frag_types(
frag_types: List[str], max_frag_charge: int = 2
) -> List[str]:
"""
Calculate the combination of fragment types and charge states.
Returns a sorted list of charged fragment types.
Parameters
----------
frag_types : List[str]
e.g. ['b','y','b_modloss','y_modloss']
max_frag_charge : int
max fragment charge. (default: 2)
Returns
-------
List[str]
charged fragment types
Examples
--------
>>> frag_types=['b','y','b_modloss','y_modloss']
>>> get_charged_frag_types(frag_types, 2)
['b_z1','b_z2','y_z1','y_z2','b_modloss_z1','b_modloss_z2','y_modloss_z1','y_modloss_z2']
"""
charged_frag_types = []
for frag_type in frag_types:
if frag_type in FRAGMENT_TYPES:
for charge in range(1, max_frag_charge + 1):
charged_frag_types.append(f"{frag_type}_z{charge}")
else:
raise ValueError(f"Fragment type {frag_type} is currently not supported")
return sort_charged_frag_types(charged_frag_types)
[docs]
def filter_valid_charged_frag_types(
charged_frag_types: List[str],
) -> List[str]:
"""
Filters a list of charged fragment types and returns only the valid ones.
A valid charged fragment type must:
1. Follow the format: {fragment_type}_z{charge} (e.g. 'b_z1', 'y_modloss_z2')
2. Use a fragment type that exists in FRAGMENT_TYPES
3. Have a strictly positive integer charge
Parameters
----------
charged_frag_types : List[str]
List of charged fragment types to filter (e.g. ['b_z1', 'y_z2', 'invalid_z1', 'b_modloss_z2'])
Returns
-------
List[str]
List containing only the valid charged fragment types, (e.g. ['b', 'y', 'b_modloss'])
"""
valid_types = []
for charged_frag_type in charged_frag_types:
try:
_ = parse_charged_frag_type(charged_frag_type)
valid_types.append(charged_frag_type)
except ValueError as e:
warnings.warn(str(e))
continue
return valid_types
[docs]
def parse_charged_frag_type(charged_frag_type: str) -> Tuple[str, int]:
"""
Oppsite to `get_charged_frag_types`.
Parameters
----------
charged_frag_type : str
e.g. 'y_z1', 'b_modloss_z1'
Returns
-------
tuple
str. Fragment type, e.g. 'b','y'
int. Charge state
Raises
------
ValueError
If charge state is not given or not a strictly positive integer or if fragment type is not supported
"""
if charged_frag_type.count(FRAGMENT_CHARGE_SEPARATOR) != 1:
raise ValueError(
"Only charged fragment types are supported. Please add charge state to the fragment type, "
f"using {FRAGMENT_CHARGE_SEPARATOR} as separator. e.g. 'b{FRAGMENT_CHARGE_SEPARATOR}1'"
)
fragment_type, charge = charged_frag_type.split(FRAGMENT_CHARGE_SEPARATOR)
# Check if charge is a valid integer string (no decimals)
if not charge.isdigit() or not (charge_int := int(charge)) > 0:
raise ValueError(
f"Charge state must be a positive integer, got '{charge}' from fragment type '{charged_frag_type}'"
)
if fragment_type not in FRAGMENT_TYPES:
raise ValueError(f"Fragment type {fragment_type} is currently not supported")
return fragment_type, charge_int
[docs]
def init_zero_fragment_dataframe(
peplen_array: np.ndarray, charged_frag_types: List[str], dtype=PEAK_MZ_DTYPE
) -> Tuple[pd.DataFrame, np.ndarray, np.ndarray]:
"""Initialize a zero dataframe based on peptide length
(nAA) array (peplen_array) and charge_frag_types (column number).
The row number of returned dataframe is np.sum(peplen_array-1).
Parameters
----------
peplen_array : np.ndarray
peptide lengths for the fragment dataframe
charged_frag_types : List[str]
`['b_z1','b_z2','y_z1','y_z2','b_modloss_z1','y_H2O_z1'...]`
Returns
-------
tuple
pd.DataFrame, `fragment_df` with zero values
np.ndarray (int64), the start indices point to the `fragment_df` for each peptide
np.ndarray (int64), the end indices point to the `fragment_df` for each peptide
"""
indices = np.zeros(len(peplen_array) + 1, dtype=np.int64)
indices[1:] = peplen_array - 1
indices = np.cumsum(indices)
fragment_df = pd.DataFrame(
np.zeros((indices[-1], len(charged_frag_types)), dtype=dtype),
columns=charged_frag_types,
)
return fragment_df, indices[:-1], indices[1:]
[docs]
def init_fragment_dataframe_from_other(
reference_fragment_df: pd.DataFrame, dtype=PEAK_MZ_DTYPE
):
"""
Init zero fragment dataframe from the `reference_fragment_df` (same rows and same columns)
"""
return pd.DataFrame(
np.zeros_like(reference_fragment_df.values, dtype=dtype),
columns=reference_fragment_df.columns,
)
[docs]
def init_fragment_by_precursor_dataframe(
precursor_df,
charged_frag_types: List[str],
*,
reference_fragment_df: pd.DataFrame = None,
dtype: np.dtype = PEAK_MZ_DTYPE,
inplace_in_reference: bool = False,
):
"""
Init zero fragment dataframe for the `precursor_df`. If
the `reference_fragment_df` is provided, the result dataframe's
length will be the same as reference_fragment_df. Otherwise it
generates the dataframe from scratch.
Parameters
----------
precursor_df : pd.DataFrame
precursors to generate fragment masses,
if `precursor_df` contains the 'frag_start_idx' column,
it is better to provide `reference_fragment_df` as
`precursor_df.frag_start_idx` and `precursor.frag_stop_idx`
point to the indices in `reference_fragment_df`
charged_frag_types : List
`['b_z1','b_z2','y_z1','y_z2','b_modloss_z1','y_H2O_z1'...]`
reference_fragment_df : pd.DataFrame
init zero fragment_mz_df based
on this reference. If None, fragment_mz_df will be
initialized by :func:`alphabase.peptide.fragment.init_zero_fragment_dataframe`.
Defaults to None.
dtype: np.dtype
dtype of fragment mz values, Defaults to :data:`PEAK_MZ_DTYPE`.
inplace_in_reference : bool, optional
if calculate the fragment mz
inplace in the reference_fragment_df (default: False)
Returns
-------
pd.DataFrame
zero `fragment_df` with given `charged_frag_types` columns
"""
if "frag_start_idx" not in precursor_df.columns:
(fragment_df, start_indices, end_indices) = init_zero_fragment_dataframe(
precursor_df.nAA.values, charged_frag_types, dtype=dtype
)
precursor_df["frag_start_idx"] = start_indices
precursor_df["frag_stop_idx"] = end_indices
else:
if reference_fragment_df is None:
# raise ValueError(
# "`precursor_df` contains 'frag_start_idx' column, "\
# "please provide `reference_fragment_df` argument"
# )
fragment_df = pd.DataFrame(
np.zeros(
(precursor_df.frag_stop_idx.max(), len(charged_frag_types)),
dtype=dtype,
),
columns=charged_frag_types,
)
else:
if inplace_in_reference:
fragment_df = reference_fragment_df[
[
_fr
for _fr in charged_frag_types
if _fr in reference_fragment_df.columns
]
]
else:
fragment_df = pd.DataFrame(
np.zeros(
(len(reference_fragment_df), len(charged_frag_types)),
dtype=dtype,
),
columns=charged_frag_types,
)
return fragment_df
[docs]
def update_sliced_fragment_dataframe(
fragment_df: pd.DataFrame,
fragment_df_vals: np.ndarray,
values: np.ndarray,
frag_start_end_list: List[Tuple[int, int]],
charged_frag_types: List[str] = None,
):
"""
Set the values of the slices `frag_start_end_list=[(start,end),(start,end),...]`
of fragment_df.
Parameters
----------
fragment_df : pd.DataFrame
fragment dataframe to set the values
fragment_df_vals : np.ndarray
The `fragment_df.to_numpy(copy=True)`, to prevent readonly assignment.
values : np.ndarray
values to set
frag_start_end_list : List[Tuple[int,int]]
e.g. `[(start,end),(start,end),...]`
charged_frag_types : List[str], optional
e.g. `['b_z1','b_z2','y_z1','y_z2']`.
If None, the columns of values should be the same as fragment_df's columns.
It is much faster if charged_frag_types is None as we use numpy slicing,
otherwise we use pd.loc (much slower).
Defaults to None.
"""
frag_slice_list = [slice(start, end) for start, end in frag_start_end_list]
frag_slices = np.r_[tuple(frag_slice_list)]
if charged_frag_types is None or len(charged_frag_types) == 0:
fragment_df_vals[frag_slices, :] = values.astype(fragment_df_vals.dtype)
else:
charged_frag_idxes = [
fragment_df.columns.get_loc(c) for c in charged_frag_types
]
fragment_df.iloc[frag_slices, charged_frag_idxes] = values.astype(
fragment_df_vals.dtype
)
fragment_df_vals[frag_slices] = fragment_df.values[frag_slices]
[docs]
def get_sliced_fragment_dataframe(
fragment_df: pd.DataFrame,
frag_start_end_list: Union[List, np.ndarray],
charged_frag_types: List = None,
) -> pd.DataFrame:
"""
Get the sliced fragment_df from `frag_start_end_list=[(start,end),(start,end),...]`.
Parameters
----------
fragment_df : pd.DataFrame
fragment dataframe to get values
frag_start_end_list : Union
List[Tuple[int,int]], e.g. `[(start,end),(start,end),...]` or np.ndarray
charged_frag_types : List[str]
e.g. `['b_z1','b_z2','y_z1','y_z2']`.
if None, all columns will be considered
Returns
-------
pd.DataFrame
sliced fragment_df. If `charged_frag_types` is None,
return fragment_df with all columns
"""
frag_slice_list = [slice(start, end) for start, end in frag_start_end_list]
frag_slices = np.r_[tuple(frag_slice_list)]
if charged_frag_types is None or len(charged_frag_types) == 0:
charged_frag_idxes = slice(None)
else:
charged_frag_idxes = [
fragment_df.columns.get_loc(c) for c in charged_frag_types
]
return fragment_df.iloc[frag_slices, charged_frag_idxes]
[docs]
def concat_precursor_fragment_dataframes(
precursor_df_list: List[pd.DataFrame],
fragment_df_list: List[pd.DataFrame],
*other_fragment_df_lists,
) -> Tuple[pd.DataFrame, ...]:
"""
Since fragment_df is indexed by precursor_df, when we concatenate multiple
fragment_df, the indexed positions will change for in precursor_dfs,
this function keeps the correct indexed positions of precursor_df when
concatenating multiple fragment_df dataframes.
Parameters
----------
precursor_df_list : List[pd.DataFrame]
precursor dataframe list to concatenate
fragment_df_list : List[pd.DataFrame]
fragment dataframe list to concatenate
other_fragment_df_lists
arbitray other fragment dataframe list to concatenate,
e.g. fragment_mass_df, fragment_inten_df, ...
Returns
-------
Tuple[pd.DataFrame,...]
concatenated precursor_df, fragment_df, other_fragment_dfs ...
"""
fragment_df_lens = [len(fragment_df) for fragment_df in fragment_df_list]
precursor_df_list = [precursor_df.copy() for precursor_df in precursor_df_list]
cum_frag_df_lens = np.cumsum(fragment_df_lens)
for i, precursor_df in enumerate(precursor_df_list[1:]):
precursor_df[["frag_start_idx", "frag_stop_idx"]] += cum_frag_df_lens[i]
return (
pd.concat(precursor_df_list, ignore_index=True),
pd.concat(fragment_df_list, ignore_index=True),
*[
pd.concat(other_list, ignore_index=True)
for other_list in other_fragment_df_lists
],
)
[docs]
def calc_fragment_mz_values_for_same_nAA(
df_group: pd.DataFrame, nAA: int, charged_frag_types: list
):
mod_list = (
df_group.mods.str.split(ModificationKeys.SEPARATOR)
.apply(lambda x: [m for m in x if len(m) > 0])
.values
)
site_list = (
df_group.mod_sites.str.split(ModificationKeys.SEPARATOR)
.apply(lambda x: [int(s) for s in x if len(s) > 0])
.values
)
if "aa_mass_diffs" in df_group.columns:
mod_diff_list = (
df_group.aa_mass_diffs.str.split(ModificationKeys.SEPARATOR)
.apply(lambda x: [float(m) for m in x if len(m) > 0])
.values
)
mod_diff_site_list = (
df_group.aa_mass_diff_sites.str.split(ModificationKeys.SEPARATOR)
.apply(lambda x: [int(s) for s in x if len(s) > 0])
.values
)
else:
mod_diff_list = None
mod_diff_site_list = None
(b_mass, y_mass, pepmass) = calc_b_y_and_peptide_masses_for_same_len_seqs(
df_group.sequence.values.astype("U"),
mod_list,
site_list,
mod_diff_list,
mod_diff_site_list,
)
b_mass = b_mass.reshape(-1)
y_mass = y_mass.reshape(-1)
for charged_frag_type in charged_frag_types:
if charged_frag_type.startswith("b_modloss"):
b_modloss = np.concatenate(
[
calc_modloss_mass(nAA, mods, sites, True)
for mods, sites in zip(mod_list, site_list)
]
)
break
for charged_frag_type in charged_frag_types:
if charged_frag_type.startswith("y_modloss"):
y_modloss = np.concatenate(
[
calc_modloss_mass(nAA, mods, sites, False)
for mods, sites in zip(mod_list, site_list)
]
)
break
mz_values = []
for charged_frag_type in charged_frag_types:
frag_type, charge = parse_charged_frag_type(charged_frag_type)
if frag_type == "b":
_mass = b_mass / charge + MASS_PROTON
elif frag_type == "y":
_mass = y_mass / charge + MASS_PROTON
elif frag_type == "b_modloss":
_mass = (b_mass - b_modloss) / charge + MASS_PROTON
_mass[b_modloss == 0] = 0
elif frag_type == "y_modloss":
_mass = (y_mass - y_modloss) / charge + MASS_PROTON
_mass[y_modloss == 0] = 0
elif frag_type in FRAGMENT_TYPES:
ref_ion = FRAGMENT_TYPES[frag_type].ref_ion
delta_mass = FRAGMENT_TYPES[frag_type].delta_mass
if ref_ion == "b":
_mass = (b_mass + delta_mass) / charge + MASS_PROTON
elif ref_ion == "y":
_mass = (y_mass + delta_mass) / charge + MASS_PROTON
else:
raise KeyError(
f"ref_ion only allows `b` and `y`, but {ref_ion} is given"
)
else:
raise KeyError(f'Fragment type "{frag_type}" is not supported')
mz_values.append(_mass)
return np.array(mz_values).T
[docs]
def mask_fragments_for_charge_greater_than_precursor_charge(
fragment_df: pd.DataFrame,
precursor_charge_array: np.ndarray,
nAA_array: np.ndarray,
*,
candidate_fragment_charges: list = [2, 3, 4],
):
"""Mask the fragment dataframe when
the fragment charge is larger than the precursor charge"""
precursor_charge_array = np.repeat(precursor_charge_array, nAA_array - 1)
for col in fragment_df.columns:
for charge in candidate_fragment_charges:
if col.endswith(f"z{charge}"):
fragment_df.loc[precursor_charge_array < charge, col] = 0
return fragment_df
[docs]
@nb.njit(parallel=True)
def fill_in_indices(
frag_start_idxes: np.ndarray,
frag_stop_idxes: np.ndarray,
indices: np.ndarray,
max_indices: np.ndarray,
excluded_indices: np.ndarray,
top_k: int,
flattened_intensity: np.ndarray,
number_of_fragment_types: int,
max_frag_per_peptide: int = 300,
) -> None:
"""
Fill in indices, max indices and excluded indices for each peptide.
indices: index of fragment per peptide (from 0 to max_index-1)
max_indices: max index of fragments per peptide (number of fragments per peptide)
excluded_indices: not top k excluded indices per peptide
Parameters
----------
frag_start_idxes : np.ndarray
start indices of fragments for each peptide
frag_stop_idxes : np.ndarray
stop indices of fragments for each peptide
indices : np.ndarray
index of fragment per peptide (from 0 to max_index-1) it will be filled in this function
max_indices : np.ndarray
max index of fragments per peptide (number of fragments per peptide) it will be filled in this function
excluded_indices : np.ndarray
not top k excluded indices per peptide it will be filled in this function
top_k : int
top k highest peaks to keep
flattened_intensity : np.ndarray
Flattened fragment intensities
number_of_fragment_types : int
number of types of fragments (e.g. b,y,b_modloss,y_modloss, ...) equals to the number of columns in fragment mz dataframe
max_frag_per_peptide : int, optional
maximum number of fragments per peptide, Defaults to 300
"""
array = np.arange(0, max_frag_per_peptide).reshape(-1, 1)
ones = np.ones(max_frag_per_peptide).reshape(-1, 1)
length = len(frag_start_idxes)
for i in nb.prange(length):
frag_start = frag_start_idxes[i]
frag_end = frag_stop_idxes[i]
max_index = frag_end - frag_start
indices[frag_start:frag_end] = array[:max_index]
max_indices[frag_start:frag_end] = ones[:max_index] * max_index
if flattened_intensity is None or top_k >= max_index * number_of_fragment_types:
continue
idxes = np.argsort(
flattened_intensity[
frag_start * number_of_fragment_types : frag_end
* number_of_fragment_types
]
)
_excl = np.ones_like(idxes, dtype=np.bool_)
_excl[idxes[-top_k:]] = False
excluded_indices[
frag_start * number_of_fragment_types : frag_end * number_of_fragment_types
] = _excl
@nb.vectorize([nb.uint32(nb.int8, nb.uint32, nb.uint32, nb.uint32)], target="parallel")
def calculate_fragment_numbers(
frag_direction: np.int8,
frag_number: np.uint32,
index: np.uint32,
max_index: np.uint32,
):
"""
Calculate fragment numbers for each fragment based on the fragment direction.
Parameters
----------
frag_direction : np.int8
directions of fragments for each peptide
frag_number : np.uint32
fragment numbers for each peptide
index : np.uint32
index of fragment per peptide (from 0 to max_index-1)
max_index : np.uint32
max index of fragments per peptide (number of fragments per peptide)
"""
if frag_direction == 1:
frag_number = index + 1
elif frag_direction == -1:
frag_number = max_index - index
return frag_number
[docs]
def parse_fragment(
frag_directions: np.ndarray,
frag_start_idxes: np.ndarray,
frag_stop_idxes: np.ndarray,
top_k: int,
intensities: np.ndarray,
number_of_fragment_types: int,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Parse fragments to get fragment numbers, fragment positions and not top k excluded indices in one hit
faster than doing each operation individually, and makes the most of the operations that are done in parallel.
Parameters
----------
frag_directions : np.ndarray
directions of fragments for each peptide
frag_start_idxes : np.ndarray
start indices of fragments for each peptide
frag_stop_idxes : np.ndarray
stop indices of fragments for each peptide
top_k : int
top k highest peaks to keep
intensities : np.ndarray
Flattened fragment intensities
number_of_fragment_types : int
number of types of fragments (e.g. b,y,b_modloss,y_modloss, ...) equals to the number of columns in fragment mz dataframe
Returns
-------
Tuple[np.ndarray, np.ndarray, np.ndarray]
Tuple of fragment numbers, fragment positions and not top k excluded indices
"""
# Allocate memory for fragment numbers, indices, max indices and excluded indices
frag_numbers = np.empty_like(frag_directions, dtype=np.uint32)
indices = np.empty_like(frag_directions, dtype=np.uint32)
max_indices = np.empty_like(frag_directions, dtype=np.uint32)
excluded_indices = np.zeros(
frag_directions.shape[0] * frag_directions.shape[1], dtype=np.bool_
)
# Fill in indices, max indices and excluded indices
fill_in_indices(
frag_start_idxes,
frag_stop_idxes,
indices,
max_indices,
excluded_indices,
top_k,
intensities,
number_of_fragment_types,
)
# Calculate fragment numbers
frag_numbers = calculate_fragment_numbers(
frag_directions, frag_numbers, indices, max_indices
)
return frag_numbers, indices, excluded_indices
[docs]
def flatten_fragments(
precursor_df: pd.DataFrame,
fragment_mz_df: pd.DataFrame,
fragment_intensity_df: pd.DataFrame,
min_fragment_intensity: float = -1,
keep_top_k_fragments: int = 1000,
custom_columns: list = ["type", "number", "position", "charge", "loss_type"],
custom_df: Dict[str, pd.DataFrame] = {},
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Converts the tabular fragment format consisting of
the `fragment_mz_df` and the `fragment_intensity_df`
into a linear fragment format.
The linear fragment format will only retain fragments
above a given intensity treshold with `mz > 0`.
It consists of columns: `mz`, `intensity`,
`type`, `number`, `charge` and `loss_type`,
where each column refers to:
- mz: :data:`PEAK_MZ_DTYPE`, fragment mz value
- intensity: :data:`PEAK_INTENSITY_DTYPE`, fragment intensity value
- type: uint8, ASCII code of the ion series. Must be a part of the `SERIES_MAPPING`.
- number: uint32, fragment series number
- position: uint32, fragment position in sequence (from left to right, starts with 0)
- charge: uint8, fragment charge
- loss_type: int16, fragment loss type. Must be a part of the `LOSS_MAPPING`.
The fragment pointers `frag_start_idx` and `frag_stop_idx`
will be reannotated to the new fragment format.
For ASCII code `type`, we can convert it into byte-str by using `frag_df.type.values.view('S1')`.
Parameters
----------
precursor_df : pd.DataFrame
input precursor dataframe which contains the frag_start_idx and frag_stop_idx columns
fragment_mz_df : pd.DataFrame
input fragment mz dataframe of shape (N, T) which contains N * T fragment mzs.
Fragments with mz==0 will be excluded.
fragment_intensity_df : pd.DataFrame
input fragment intensity dataframe of shape (N, T) which contains N * T fragment mzs.
Could be empty (len==0) to exclude intensity values.
min_fragment_intensity : float, optional
minimum intensity which should be retained. Defaults to -1
custom_columns : list, optional
'mz' and 'intensity' columns are required. Others could be customized.
Defaults to ['type','number','position','charge','loss_type']
custom_df : Dict[str, pd.DataFrame], optional
Append custom columns by providing additional dataframes of the same shape as fragment_mz_df and fragment_intensity_df. Defaults to {}.
Returns
-------
pd.DataFrame
precursor dataframe with added `flat_frag_start_idx` and `flat_frag_stop_idx` columns
pd.DataFrame
fragment dataframe with columns: `mz`, `intensity`, `type`, `number`,
`charge` and `loss_type`.
"""
if len(precursor_df) == 0:
return precursor_df, pd.DataFrame()
# new dataframes for fragments and precursors are created
frag_df = {}
frag_df["mz"] = fragment_mz_df.values.reshape(-1)
if len(fragment_intensity_df) > 0:
frag_df["intensity"] = fragment_intensity_df.values.astype(
PEAK_INTENSITY_DTYPE
).reshape(-1)
use_intensity = True
else:
use_intensity = False
# add additional columns to the fragment dataframe
# each column in the flat fragment dataframe is a whole pandas dataframe in the dense representation
for col_name, df in custom_df.items():
frag_df[col_name] = df.values.reshape(-1)
frag_types = []
frag_loss_types = []
frag_charges = []
frag_directions = [] # 'abc': direction=1, 'xyz': direction=-1, otherwise 0
for charged_frag_type in fragment_mz_df.columns.values:
frag_type, charge = parse_charged_frag_type(charged_frag_type)
frag_charges.append(charge)
frag_types.append(FRAGMENT_TYPES[frag_type].series_id)
frag_loss_types.append(FRAGMENT_TYPES[frag_type].loss_id)
frag_directions.append(FRAGMENT_TYPES[frag_type].direction_id)
if "type" in custom_columns:
frag_df["type"] = np.array(
np.tile(frag_types, len(fragment_mz_df)), dtype=np.int8
)
if "loss_type" in custom_columns:
frag_df["loss_type"] = np.array(
np.tile(frag_loss_types, len(fragment_mz_df)), dtype=np.int16
)
if "charge" in custom_columns:
frag_df["charge"] = np.array(
np.tile(frag_charges, len(fragment_mz_df)), dtype=np.int8
)
frag_directions = np.array(
np.tile(frag_directions, (len(fragment_mz_df), 1)), dtype=np.int8
)
numbers, positions, excluded_indices = parse_fragment(
frag_directions,
precursor_df.frag_start_idx.values,
precursor_df.frag_stop_idx.values,
keep_top_k_fragments,
frag_df["intensity"] if use_intensity else None,
len(fragment_mz_df.columns),
)
if "number" in custom_columns:
frag_df["number"] = numbers.reshape(-1)
if "position" in custom_columns:
frag_df["position"] = positions.reshape(-1)
precursor_df["flat_frag_start_idx"] = precursor_df.frag_start_idx
precursor_df["flat_frag_stop_idx"] = precursor_df.frag_stop_idx
precursor_df[["flat_frag_start_idx", "flat_frag_stop_idx"]] *= len(
fragment_mz_df.columns
)
if use_intensity:
frag_df["intensity"][frag_df["mz"] == 0.0] = 0.0
excluded = (
frag_df["mz"] == 0
if not use_intensity
else (frag_df["intensity"] < min_fragment_intensity)
| (frag_df["mz"] == 0)
| (excluded_indices)
)
frag_df = pd.DataFrame(frag_df)
frag_df = frag_df[~excluded]
frag_df = frag_df.reset_index(drop=True)
# cumulative sum counts the number of fragments before the given fragment which were removed.
# This sum does not include the fragment at the index position and has therefore len N +1
cum_sum_tresh = np.zeros(shape=len(excluded) + 1, dtype=np.int64)
cum_sum_tresh[1:] = np.cumsum(excluded)
precursor_df["flat_frag_start_idx"] -= cum_sum_tresh[
precursor_df.flat_frag_start_idx.values
]
precursor_df["flat_frag_stop_idx"] -= cum_sum_tresh[
precursor_df.flat_frag_stop_idx.values
]
return precursor_df, frag_df
[docs]
@nb.njit()
def compress_fragment_indices(frag_idx):
"""
recalculates fragment indices to remove unused fragments. Can be used to compress a fragment library.
Expects fragment indices to be ordered by increasing values (!!!).
It should be O(N) runtime with N being the number of fragment rows.
>>> frag_idx = [[6, 10],
[12, 14],
[20, 22]]
>>> frag_idx = [[0, 4],
[4, 6],
[6, 8]]
>>> fragment_pointer = [6,7,8,9,12,13,20,21]
"""
frag_idx_len = frag_idx[:, 1] - frag_idx[:, 0]
# This sum does not include the fragment at the index position and has therefore len N +1
frag_idx_cumsum = np.zeros(shape=len(frag_idx_len) + 1, dtype="int64")
frag_idx_cumsum[1:] = np.cumsum(frag_idx_len)
fragment_pointer = np.zeros(np.sum(frag_idx_len), dtype="int64")
for i in range(len(frag_idx)):
start_index = frag_idx_cumsum[i]
for j, k in enumerate(range(frag_idx[i, 0], frag_idx[i, 1])):
fragment_pointer[start_index + j] = k
new_frag_idx = np.column_stack((frag_idx_cumsum[:-1], frag_idx_cumsum[1:]))
return new_frag_idx, fragment_pointer
[docs]
def remove_unused_fragments(
precursor_df: pd.DataFrame,
fragment_df_list: Tuple[pd.DataFrame, ...],
frag_start_col: str = "frag_start_idx",
frag_stop_col: str = "frag_stop_idx",
) -> Tuple[pd.DataFrame, Tuple[pd.DataFrame, ...]]:
"""Removes unused fragments of removed precursors,
reannotates the `frag_start_col` and `frag_stop_col`
Parameters
----------
precursor_df : pd.DataFrame
Precursor dataframe which contains frag_start_idx and frag_stop_idx columns
fragment_df_list : List[pd.DataFrame]
A list of fragment dataframes which should be compressed by removing unused fragments.
Multiple fragment dataframes can be provided which will all be sliced in the same way.
This allows to slice both the fragment_mz_df and fragment_intensity_df.
At least one fragment dataframe needs to be provided.
frag_start_col : str, optional
Fragment start idx column in `precursor_df`, such as "frag_start_idx" and "peak_start_idx".
Defaults to "frag_start_idx".
frag_stop_col : str, optional
Fragment stop idx column in `precursor_df`, such as "frag_stop_idx" and "peak_stop_idx".
Defaults to "frag_stop_idx".
Returns
-------
pd.DataFrame, List[pd.DataFrame]
returns the reindexed precursor DataFrame and the sliced fragment DataFrames
"""
precursor_df = precursor_df.sort_values([frag_start_col], ascending=True)
frag_idx = precursor_df[[frag_start_col, frag_stop_col]].values
new_frag_idx, fragment_pointer = compress_fragment_indices(frag_idx)
precursor_df[[frag_start_col, frag_stop_col]] = new_frag_idx
precursor_df = precursor_df.sort_index()
output_tuple = []
for i in range(len(fragment_df_list)):
output_tuple.append(
fragment_df_list[i].iloc[fragment_pointer].copy().reset_index(drop=True)
)
return precursor_df, tuple(output_tuple)
[docs]
def create_fragment_mz_dataframe_by_sort_precursor(
precursor_df: pd.DataFrame,
charged_frag_types: List,
batch_size: int = 500000,
dtype: np.dtype = PEAK_MZ_DTYPE,
) -> pd.DataFrame:
"""Sort nAA in precursor_df for faster fragment mz dataframe creation.
Because the fragment mz values are continous in memory, so it is faster
when setting values in pandas.
Note that this function will change the order and index of precursor_df
Parameters
----------
precursor_df : pd.DataFrame
precursor dataframe
charged_frag_types : List
fragment types list
batch_size : int, optional
Calculate fragment mz values in batch.
Defaults to 500000.
"""
if "frag_start_idx" in precursor_df.columns:
precursor_df.drop(columns=["frag_start_idx", "frag_stop_idx"], inplace=True)
refine_precursor_df(precursor_df)
fragment_mz_df = init_fragment_by_precursor_dataframe(
precursor_df,
charged_frag_types,
dtype=dtype,
)
_grouped = precursor_df.groupby("nAA")
for nAA, big_df_group in _grouped:
for i in range(0, len(big_df_group), batch_size):
batch_end = i + batch_size
df_group = big_df_group.iloc[i:batch_end, :]
mz_values = calc_fragment_mz_values_for_same_nAA(
df_group, nAA, charged_frag_types
)
fragment_mz_df.iloc[
df_group.frag_start_idx.values[0] : df_group.frag_stop_idx.values[-1], :
] = mz_values.astype(PEAK_MZ_DTYPE)
return mask_fragments_for_charge_greater_than_precursor_charge(
fragment_mz_df,
precursor_df.charge.values,
precursor_df.nAA.values,
)
[docs]
def create_fragment_mz_dataframe(
precursor_df: pd.DataFrame,
charged_frag_types: List,
*,
reference_fragment_df: pd.DataFrame = None,
inplace_in_reference: bool = False,
batch_size: int = 500000,
dtype: np.dtype = PEAK_MZ_DTYPE,
) -> pd.DataFrame:
"""
Generate fragment mass dataframe for the precursor_df. If
the `reference_fragment_df` is provided and precursor_df contains `frag_start_idx`,
it will generate the mz dataframe based on the reference. Otherwise it
generates the mz dataframe from scratch.
Parameters
----------
precursor_df : pd.DataFrame
precursors to generate fragment masses,
if `precursor_df` contains the 'frag_start_idx' column,
`reference_fragment_df` must be provided
charged_frag_types : List
`['b_z1','b_z2','y_z1','y_z2','b_modloss_1','y_H2O_z1'...]`
reference_fragment_df : pd.DataFrame
kwargs only. Generate fragment_mz_df based on this reference,
as `precursor_df.frag_start_idx` and
`precursor.frag_stop_idx` point to the indices in
`reference_fragment_df`.
Defaults to None
inplace_in_reference : bool
kwargs only. Change values in place in the `reference_fragment_df`.
Defaults to False
batch_size: int
Number of peptides for each batch, to save RAM.
Returns
-------
pd.DataFrame
`fragment_mz_df` with given `charged_frag_types`
"""
if reference_fragment_df is None and "frag_start_idx" in precursor_df.columns:
# raise ValueError(
# "`precursor_df` contains 'frag_start_idx' column, "\
# "please provide `reference_fragment_df` argument"
# )
fragment_mz_df = init_fragment_by_precursor_dataframe(
precursor_df,
charged_frag_types,
dtype=dtype,
)
return create_fragment_mz_dataframe(
precursor_df=precursor_df,
charged_frag_types=charged_frag_types,
reference_fragment_df=fragment_mz_df,
inplace_in_reference=True,
batch_size=batch_size,
dtype=dtype,
)
if "nAA" not in precursor_df.columns:
# fast
return create_fragment_mz_dataframe_by_sort_precursor(
precursor_df,
charged_frag_types,
batch_size,
dtype=dtype,
)
if is_precursor_refined(precursor_df) and reference_fragment_df is None:
# fast
return create_fragment_mz_dataframe_by_sort_precursor(
precursor_df, charged_frag_types, batch_size, dtype=dtype
)
else:
# slow
if reference_fragment_df is not None:
if inplace_in_reference:
fragment_mz_df = reference_fragment_df.loc[
:,
[
_fr
for _fr in charged_frag_types
if _fr in reference_fragment_df.columns
],
]
else:
fragment_mz_df = pd.DataFrame(
np.zeros(
(len(reference_fragment_df), len(charged_frag_types)),
dtype=dtype,
),
columns=charged_frag_types,
)
else:
fragment_mz_df = init_fragment_by_precursor_dataframe(
precursor_df,
charged_frag_types,
dtype=dtype,
)
frag_mz_values = fragment_mz_df.to_numpy(copy=True)
_grouped = precursor_df.groupby("nAA")
for nAA, big_df_group in _grouped:
for i in range(0, len(big_df_group), batch_size):
batch_end = i + batch_size
df_group = big_df_group.iloc[i:batch_end, :]
mz_values = calc_fragment_mz_values_for_same_nAA(
df_group, nAA, fragment_mz_df.columns
)
update_sliced_fragment_dataframe(
fragment_mz_df,
frag_mz_values,
mz_values,
df_group[["frag_start_idx", "frag_stop_idx"]].values,
)
fragment_mz_df.iloc[:] = frag_mz_values
return mask_fragments_for_charge_greater_than_precursor_charge(
fragment_mz_df,
precursor_df.charge.values,
precursor_df.nAA.values,
)
[docs]
@nb.njit(nogil=True)
def join_left(left: np.ndarray, right: np.ndarray):
"""joins all values in the left array to the values in the right array.
The index to the element in the right array is returned.
If the value wasn't found, -1 is returned. If the element appears more than once, the last appearance is used.
Parameters
----------
left: numpy.ndarray
left array which should be matched
right: numpy.ndarray
right array which should be matched to
Returns
-------
numpy.ndarray, dtype = int64
array with length of the left array which indices pointing to the right array
-1 is returned if values could not be found in the right array
"""
left_indices = np.argsort(left)
left_sorted = left[left_indices]
right_indices = np.argsort(right)
right_sorted = right[right_indices]
joined_index = -np.ones(len(left), dtype="int64")
# from hereon sorted arrays are expected
lower_right = 0
for i in range(len(joined_index)):
for k in range(lower_right, len(right)):
if left_sorted[i] >= right_sorted[k]:
if left_sorted[i] == right_sorted[k]:
joined_index[i] = k
lower_right = k
else:
break
# the joined_index_sorted connects indices from the sorted left array with the sorted right array
# to get the original indices, the order of both sides needs to be restored
# First, the indices pointing to the right side are restored by masking the array for hits and looking up the right side
joined_index[joined_index >= 0] = right_indices[joined_index[joined_index >= 0]]
# Next, the left side is restored by arranging the items
joined_index[left_indices] = joined_index
return joined_index
[docs]
def calc_fragment_count(
precursor_df: pd.DataFrame, fragment_intensity_df: pd.DataFrame
):
"""
Calculates the number of fragments for each precursor.
Parameters
----------
precursor_df : pd.DataFrame
precursor dataframe which contains the frag_start_idx and frag_stop_idx columns
fragment_intensity_df : pd.DataFrame
fragment intensity dataframe which contains the fragment intensities
Returns
-------
numpy.ndarray
array with the number of fragments for each precursor
"""
if not set(["frag_start_idx", "frag_stop_idx"]).issubset(precursor_df.columns):
raise KeyError("frag_start_idx and frag_stop_idx not in dataframe")
n_fragments = []
for start, stop in zip(
precursor_df["frag_start_idx"].values, precursor_df["frag_stop_idx"].values
):
n_fragments += [np.sum(fragment_intensity_df.iloc[start:stop].values > 0)]
return np.array(n_fragments)
[docs]
def filter_fragment_number(
precursor_df: pd.DataFrame,
fragment_intensity_df: pd.DataFrame,
n_fragments_allowed_column_name: str = "n_fragments_allowed",
n_allowed: int = 999,
):
"""
Filters the number of fragments for each precursor.
Parameters
----------
precursor_df : pd.DataFrame
precursor dataframe which contains the frag_start_idx and frag_stop_idx columns
fragment_intensity_df : pd.DataFrame
fragment intensity dataframe which contains the fragment intensities
n_fragments_allowed_column_name : str, default = 'n_fragments_allowed'
column name in precursor_df which contains the number of allowed fragments
n_allowed : int, default = 999
number of fragments which should be allowed
Returns
-------
None
"""
if not set(["frag_start_idx", "frag_stop_idx"]).issubset(precursor_df.columns):
raise KeyError("frag_start_idx and frag_stop_idx not in dataframe")
for start_idx, stop_idx, n_allowed_lib in zip(
precursor_df["frag_start_idx"].values,
precursor_df["frag_stop_idx"].values,
precursor_df[n_fragments_allowed_column_name].values,
):
_allowed = min(n_allowed_lib, n_allowed)
intensies = fragment_intensity_df.iloc[start_idx:stop_idx].values
flat_intensities = np.sort(intensies.flatten())[::-1]
intensies[intensies <= flat_intensities[_allowed]] = 0
fragment_intensity_df.iloc[start_idx:stop_idx] = intensies
[docs]
def calc_fragment_cardinality(
precursor_df,
fragment_mz_df,
group_column="elution_group_idx",
split_target_decoy=True,
):
"""
Calculate the cardinality for a given fragment across a group of precursors.
The cardinality is the number of precursors that have a given fragment at a given position.
All precursors within a group are expected to have the same number of fragments.
The precursor dataframe.
fragment_mz_df : pd.DataFrame
The fragment mz dataframe.
group_column : str
The column to group the precursors by. Integer column is expected.
split_target_decoy : bool
If True, the cardinality is calculated for the target and decoy precursors separately.
"""
if len(precursor_df) == 0:
raise ValueError("Precursor dataframe is empty.")
if len(fragment_mz_df) == 0:
raise ValueError("Fragment dataframe is empty.")
if group_column not in precursor_df.columns:
raise KeyError("Group column not in precursor dataframe.")
if ("frag_start_idx" not in precursor_df.columns) or (
"frag_stop_idx" not in precursor_df.columns
):
raise KeyError("Precursor dataframe does not contain fragment indices.")
precursor_df = precursor_df.sort_values(group_column)
fragment_mz = fragment_mz_df.values
fragment_cardinality = np.ones(fragment_mz.shape, dtype=np.uint8)
@nb.njit
def _calc_fragment_cardinality(
elution_group_idx,
start_idx,
stop_idx,
fragment_mz,
fragment_cardinality,
):
if len(elution_group_idx) == 0:
return
elution_group_idx[0] # noqa TODO check for potential bug
elution_group_start = 0
for i in range(len(elution_group_idx)):
if (
i == len(elution_group_idx) - 1
or elution_group_idx[i] != elution_group_idx[i + 1]
):
elution_group_stop = i + 1
# check if whole elution group is covered
n_precursor = elution_group_stop - elution_group_start
# Check that all precursors within a group have the same number of fragments.
nAA = (
stop_idx[elution_group_start:elution_group_stop]
- start_idx[elution_group_start:elution_group_stop]
)
if not np.all(nAA[0] == nAA):
raise ValueError(
"All precursors within a group must have the same number of fragments."
)
# within a group, check for each precursor if it has the same fragment as another precursor
for i in range(n_precursor):
precursor_start_idx = start_idx[elution_group_start + i]
precursor_stop_idx = stop_idx[elution_group_start + i]
precursor_fragment_mz = fragment_mz[
precursor_start_idx:precursor_stop_idx
]
for j in range(n_precursor):
if i == j:
continue
other_precursor_start_idx = start_idx[elution_group_start + j]
other_precursor_stop_idx = stop_idx[elution_group_start + j]
other_precursor_fragment_mz = fragment_mz[
other_precursor_start_idx:other_precursor_stop_idx
]
binary_mask = (
np.abs(precursor_fragment_mz - other_precursor_fragment_mz)
< 0.00001
)
fragment_cardinality[precursor_start_idx:precursor_stop_idx] += (
binary_mask.astype(np.uint8)
)
elution_group_start = elution_group_stop
if ("decoy" in precursor_df.columns) and (split_target_decoy):
decoy_classes = precursor_df["decoy"].unique()
for decoy_class in decoy_classes:
df = precursor_df[precursor_df["decoy"] == decoy_class]
_calc_fragment_cardinality(
df[group_column].values,
df["frag_start_idx"].values,
df["frag_stop_idx"].values,
fragment_mz,
fragment_cardinality,
)
else:
_calc_fragment_cardinality(
precursor_df[group_column].values,
precursor_df["frag_start_idx"].values,
precursor_df["frag_stop_idx"].values,
fragment_mz,
fragment_cardinality,
)
return pd.DataFrame(fragment_cardinality, columns=fragment_mz_df.columns)
def _calc_column_indices(
fragment_df: pd.DataFrame,
charged_frag_types: list,
) -> np.ndarray:
"""
Calculate the column indices for a dense fragment matrix.
Columns are sorted according to `fragment.sort_charged_frag_types`
Parameters
----------
fragment_df : pd.DataFrame
Flat fragment dataframe with columns 'type', 'loss_type', 'charge'
charged_frag_types : list
List of charged fragment types as generated by `fragment.get_charged_frag_types`
Returns
-------
np.ndarray
Column indices with shape (n_fragments,)
"""
# features.LOSS_INVERSE but with separator '_' for non-empty values
_loss_inverse_separator = {
key: ("_" + value if value != "" else value)
for key, value in LOSS_MAPPING_INV.items()
}
sorted_charged_frag_types = sort_charged_frag_types(charged_frag_types)
# mapping of charged fragment types to indices
inverse_frag_type_mapping = dict(
zip(sorted_charged_frag_types, range(len(sorted_charged_frag_types)))
)
# mapping of fragment type, loss type, charge to a dense column name
frag_type_list = (
fragment_df["type"].map(SERIES_MAPPING_INV)
+ fragment_df["loss_type"].map(_loss_inverse_separator)
+ FRAGMENT_CHARGE_SEPARATOR
+ fragment_df["charge"].astype(str)
)
# Convert to integer array, using -1 for any unmapped values
return (
frag_type_list.map(inverse_frag_type_mapping)
.fillna(-1)
.astype(np.int32)
.to_numpy()
)
def _calc_row_indices(
precursor_naa: np.ndarray,
fragment_position: np.ndarray,
precursor_df_idx: np.ndarray,
fragment_df_idx: np.ndarray,
frag_start_idx: Union[None, np.ndarray] = None,
frag_stop_idx: Union[None, np.ndarray] = None,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Calculate new start and stop index mapping for flat fragments.
Returns the vector of row indices for the dense fragment matrix, shape (n_fragments,)
and the new start and stop indices for the flat fragments, shape (n_precursors,)
Parameters
----------
precursor_naa : np.ndarray
Array of precursor nAA values
fragment_position : np.ndarray
Array of fragment positions
precursor_df_idx : np.ndarray
Array of precursor indices
fragment_df_idx : np.ndarray
Array of fragment indices
Returns
-------
tuple[np.ndarray, np.ndarray, np.ndarray]
(row_indices, frag_start_idx, frag_stop_idx)
"""
if len(fragment_position) != len(fragment_df_idx):
raise ValueError(
"fragment_position and fragment_df_idx must have the same length"
)
if len(precursor_naa) != len(precursor_df_idx):
raise ValueError("precursor_naa and precursor_df_idx must have the same length")
build_index = (frag_start_idx is None) or (frag_stop_idx is None)
if build_index:
frag_stop_idx = (precursor_naa - 1).cumsum()
# Start indices for each precursor is the accumlated nAA of the previous precursor and for the first precursor is 0
frag_start_idx = np.zeros_like(frag_stop_idx)
frag_start_idx[1:] = frag_stop_idx[
:-1
] # shift values right by 1, first element remains 0
else:
if (frag_start_idx is None) or (frag_stop_idx is None):
raise ValueError(
"frag_start_idx and frag_stop_idx must both be provided if one is provided"
)
elif len(frag_start_idx) != len(frag_stop_idx):
raise ValueError(
"frag_start_idx and frag_stop_idx must have the same length"
)
# Row indices of a fragment being the accumlated nAA of the precursor + fragment position -1
precursor_idx_to_accumulated_naa = dict(zip(precursor_df_idx, frag_start_idx))
# Convert numpy array to pandas Series for mapping
# This massively speeds up the mapping
row_indices = (
pd.Series(fragment_df_idx).map(
precursor_idx_to_accumulated_naa, na_action="ignore"
)
).to_numpy() + fragment_position
# fill nan with -1 and cast to int32
row_indices[np.isnan(row_indices)] = -1
row_indices = row_indices.astype(np.int32)
return row_indices, frag_start_idx, frag_stop_idx
def _start_stop_to_idx(precursor_df, fragment_df, index_column="precursor_idx"):
"""
Convert start/stop indices to precursor and fragment indices.
Parameters
----------
precursor_df : pd.DataFrame
DataFrame containing flat_frag_start_idx and flat_frag_stop_idx columns
fragment_df : pd.DataFrame
DataFrame containing fragment information
index_column : str, optional
Name of the index column to use, by default "precursor_idx"
Returns
-------
tuple
(precursor_df_idx, fragment_df_idx) - numpy arrays containing indices
"""
# Handle empty DataFrames
if precursor_df.empty or fragment_df.empty:
return np.array([], dtype=np.int64), np.array([], dtype=np.int64)
# Sort precursor_df by 'flat_frag_start_idx'
precursor_df_sorted = (
precursor_df[["flat_frag_start_idx", "flat_frag_stop_idx"]]
.copy()
.reset_index(drop=True)
.sort_values("flat_frag_start_idx")
)
# Add precursor_idx to precursor_df as 0,1,2,3...
precursor_df_sorted[index_column] = np.arange(precursor_df_sorted.shape[0])
# Add precursor_idx to fragment_df
fragment_df_idx = np.repeat(
precursor_df_sorted[index_column].to_numpy(),
precursor_df_sorted["flat_frag_stop_idx"].to_numpy()
- precursor_df_sorted["flat_frag_start_idx"].to_numpy(),
)
if len(fragment_df_idx) != fragment_df.shape[0]:
raise ValueError(
f"Number of fragments {len(fragment_df_idx)} is not equal to the number of rows in fragment_df {fragment_df.shape[0]}"
)
# Restore original order of precursor_df
precursor_df_resorted = precursor_df_sorted.sort_index()
precursor_df_idx = precursor_df_resorted[index_column].to_numpy()
return precursor_df_idx, fragment_df_idx
[docs]
def create_dense_matrices(
precursor_df: pd.DataFrame,
fragment_df: pd.DataFrame,
charged_frag_types: list,
flat_columns: Union[list, None] = None,
) -> tuple[dict, np.ndarray, np.ndarray]:
"""
Convert the flat library to a new SpecLibBase object with dense fragment matrices.
Creates a new SpecLibBase containing fragment_mz_df (using calculated m/z values).
Flat columns like 'intensity' are transformed into dense matrices as fragment_intensity_df.
For all columns specified in flat_columns, a corresponding _fragment_<column>_df matrix is created and assigned to the new SpecLibBase object.
Warning
-------
If the column 'mz' is added to flat_columns, it will override the calculated m/z values in fragment_mz_df.
To mitigate this behavior and get observed as calculated m/z values, rename the flat mz column to 'mz_observed' before calling to_speclib_base.
Fragment types can be specified explicitly or inherited from self.charged_frag_types.
Only fragments matching these types will be included in the dense matrices. Each fragment
type (e.g., 'b_z1', 'y_z2') becomes a column in the resulting dense matrices.
The precursor_df is copied and updated with new dense fragment indices, removing any
flat-specific columns (flat_frag_start_idx, flat_frag_stop_idx).
Parameters
----------
precursor_df : pd.DataFrame
Precursor dataframe
fragment_df : pd.DataFrame
Fragment dataframe in flat format
charged_frag_types : list
List of charged fragment types (e.g., ['b_z1', 'y_z1'])
flat_columns : Union[list, None], optional
Fragment columns from the flat representation to convert to dense format, defaults to ['intensity']
Returns
-------
dict
Dictionary mapping column names to dense matrices as DataFrames
Always includes 'mz', plus any specified flat_columns
np.ndarray
Start indices for fragments in the dense representation
np.ndarray
Stop indices for fragments in the dense representation
"""
if flat_columns is None:
flat_columns = ["intensity"]
optional_columns = [
col
for col in ["precursor_idx", "flat_frag_start_idx", "flat_frag_stop_idx"]
if col in precursor_df.columns
]
precursor_df_ = precursor_df[
["sequence", "mods", "mod_sites", "charge", "nAA"] + optional_columns
].copy()
fragment_mz_df = create_fragment_mz_dataframe(
precursor_df_,
charged_frag_types,
)
if ("precursor_idx" in precursor_df_.columns) and (
"precursor_idx" in fragment_df.columns
):
precursor_df_idx = precursor_df_["precursor_idx"]
fragment_df_idx = fragment_df["precursor_idx"]
elif ("flat_frag_start_idx" in precursor_df_.columns) and (
"flat_frag_stop_idx" in precursor_df_.columns
):
precursor_df_idx, fragment_df_idx = _start_stop_to_idx(
precursor_df_, fragment_df
)
else:
raise ValueError(
"Mapping of fragment indices to precursor indices failed, no 'precursor_idx' or 'flat_frag_start_idx' and 'flat_frag_stop_idx' columns found."
)
column_indices = _calc_column_indices(fragment_df, charged_frag_types)
row_indices, frag_start_idx, frag_stop_idx = _calc_row_indices(
precursor_df_["nAA"].to_numpy(),
fragment_df["position"].to_numpy(),
precursor_df_idx,
fragment_df_idx,
precursor_df_["frag_start_idx"].to_numpy(),
precursor_df_["frag_stop_idx"].to_numpy(),
)
# remove all fragments that could not be mapped to a column
match_mask = (column_indices != -1) & (row_indices != -1)
column_indices = column_indices[match_mask]
row_indices = row_indices[match_mask]
# create a dictionary with the mz matrix and the flat columns
df_collection = {"mz": fragment_mz_df}
# df_collection["mz"] might be overridden by flat_columns["mz"]
if "mz" in flat_columns:
warnings.warn(
"flat_columns contains 'mz', this will override the calculated m/z values in fragment_mz_df. If this is not intended, rename the flat mz column to 'mz_observed' before calling to_speclib_base.",
UserWarning,
)
for column_name in flat_columns:
matrix = np.zeros_like(fragment_mz_df.values, dtype=PEAK_INTENSITY_DTYPE)
matrix[row_indices, column_indices] = fragment_df[column_name].values[
match_mask
]
df_collection[column_name] = pd.DataFrame(matrix, columns=charged_frag_types)
return df_collection, frag_start_idx, frag_stop_idx