|
|
|
|
| from dataclasses import dataclass |
| import numpy as np |
| import scipy.linalg as la |
| from scipy.signal import find_peaks |
| from math import ceil |
|
|
|
|
|
|
|
|
| def thin_peaks(peak_list, dmin=10, voxel_size=(1,1,1), return_larger_peaks=False): |
| """ |
| Remove peaks within a specified distance of each other, retaining the peak with the highest intensity. |
| |
| Args: |
| - peak_list (list of PeakData): Each element contains: |
| - pos (list of float): 3D coordinates of the peak. |
| - intensity (float): The intensity value of the peak. |
| - key (tuple): A unique identifier or index for the peak (#trace, #peak) |
| - dmin (float, optional): Minimum distance between peaks. peaks closer than this threshold will be thinned. Defaults to 10. |
| - return_larger_peaks (bool, optional): Indicate larger peak for each thinned peak |
| |
| Returns: |
| - list of tuples: A list containing keys of the removed peaks. |
| if return_larger_peaks |
| - list of tuples: A list containing the keys of the larger peak causing the peak to be removed |
| |
| Notes: |
| - The function uses the L2 norm (Euclidean distance) to compute the distance between peaks. |
| - When two peaks are within `dmin` distance, the peak with the lower intensity is removed. |
| """ |
| removed_peaks = [] |
| removed_larger_peaks = [] |
| for i in range(len(peak_list)): |
| if peak_list[i].key in removed_peaks: |
| continue |
| for j in range(len(peak_list)): |
| if i==j: |
| continue |
| if peak_list[j].key in removed_peaks: |
| continue |
| d = (np.array(peak_list[i].pos) - np.array(peak_list[j].pos))*np.array(voxel_size) |
| d = la.norm(d) |
| if d<dmin: |
| hi = peak_list[i].intensity |
| hj = peak_list[j].intensity |
| if hi<hj: |
| removed_peaks.append(peak_list[i].key) |
| removed_larger_peaks.append(peak_list[j].key) |
| break |
| else: |
| removed_peaks.append(peak_list[j].key) |
| removed_larger_peaks.append(peak_list[i].key) |
|
|
| if return_larger_peaks: |
| return removed_peaks, removed_larger_peaks |
| else: |
| return removed_peaks |
|
|
|
|
| @dataclass |
| class CellData(object): |
| """Represents data related to a single cell. |
| |
| Attributes: |
| pathdata_list (list): A list of PathData objects representing the various paths associated with the cell. |
| """ |
| pathdata_list: list |
|
|
| @dataclass |
| class RemovedPeakData(object): |
| """Represents data related to a removed peak |
| |
| Attributes: |
| idx (int): Index of peak along path |
| screening_peak (tuple): (path_idx, position along path) for screening peak |
| """ |
| idx: int |
| screening_peak: tuple |
|
|
| @dataclass |
| class PathData(object): |
| """Represents data related to a specific path in the cell. |
| |
| This dataclass encapsulates information about the peaks, |
| the defining points, the fluorescence values, and the path length of a specific path. |
| |
| Attributes: peaks (list): List of peaks in the path (indicies of positions in points, o_intensity). |
| removed_peaks (list): List of peaks in the path which have been removed because of a nearby larger peak |
| points (list): List of points defining the path. |
| o_intensity (list): List of (unnormalized) fluorescence intensity values along the path |
| SC_length (float): Length of the path. |
| |
| """ |
| peaks: list |
| removed_peaks: list |
| points: list |
| o_intensity: list |
| SC_length: float |
|
|
| @dataclass |
| class PeakData(object): |
| pos: tuple |
| intensity: float |
| key: tuple |
|
|
|
|
| def find_peaks2(v, distance=5, prominence=0.5): |
| """ |
| Find peaks in a 1D array with extended boundary handling. |
| |
| The function pads the input array at both ends to handle boundary peaks. It then identifies peaks in the extended array |
| and maps them back to the original input array. |
| |
| Args: |
| - v (numpy.ndarray): 1D input array in which to find peaks. |
| - distance (int, optional): Minimum number of array elements that separate two peaks. Defaults to 5. |
| - prominence (float, optional): Minimum prominence required for a peak to be identified. Defaults to 0.5. |
| |
| Returns: |
| - list of int: List containing the indices of the identified peaks in the original input array. |
| - dict: Information about the properties of the identified peaks (as returned by scipy.signal.find_peaks). |
| |
| """ |
| pad = int(ceil(distance))+1 |
| v_ext = np.concatenate([np.ones((pad,), dtype=v.dtype)*np.min(v), v, np.ones((pad,), dtype=v.dtype)*np.min(v)]) |
|
|
| assert(len(v_ext) == len(v)+2*pad) |
| peaks, _ = find_peaks(v_ext, distance=distance, prominence=prominence) |
| peaks = peaks - pad |
| n_peaks = [] |
| for i in peaks: |
| if 0<=i<len(v): |
| n_peaks.append(i) |
| else: |
| raise Exception |
| return n_peaks, _ |
| |
|
|
| def process_cell_traces(all_paths, path_lengths, measured_trace_fluorescence, dmin=10): |
| """ |
| Process traces of cells to extract peak information and organize the data. |
| |
| The function normalizes fluorescence data, finds peaks, refines peak information, |
| removes unwanted peaks that might be due to close proximity of bright peaks from |
| other paths, and organizes all the information into a structured data format. |
| |
| Args: |
| all_paths (list of list of tuples): A list containing paths, where each path is |
| represented as a list of 3D coordinate tuples. |
| path_lengths (list of float): List of path lengths corresponding to the provided paths. |
| measured_trace_fluorescence (list of list of float): A list containing fluorescence |
| data corresponding to each path point. |
| dmin (float): Distance below which brighter peaks screen less bright ones. |
| |
| Returns: |
| CellData: An object containing organized peak and path data for a given cell. |
| |
| Note: |
| - The function assumes that each path and its corresponding length and fluorescence data |
| are positioned at the same index in their respective lists. |
| """ |
| |
| cell_peaks = [] |
|
|
| for points, o_intensity in zip(all_paths, measured_trace_fluorescence): |
| |
| |
| intensity_normalized = (o_intensity - np.mean(o_intensity))/np.std(o_intensity) |
| |
| |
| p,_ = find_peaks2(intensity_normalized, distance=5, prominence=0.5*np.std(intensity_normalized)) |
| peaks = np.array(p, dtype=np.int32) |
|
|
| |
| peak_mean_heights = [ o_intensity[u] for u in peaks ] |
| peak_points = [ points[u] for u in peaks ] |
| |
| cell_peaks.append((peaks, peak_points, peak_mean_heights)) |
| |
| |
| |
| |
|
|
| to_thin = [] |
| for k in range(len(cell_peaks)): |
| for u in range(len(cell_peaks[k][0])): |
| to_thin.append(PeakData(pos=cell_peaks[k][1][u], intensity=cell_peaks[k][2][u], key=(k, u))) |
| |
| |
| removed_peaks, removed_larger_peaks = thin_peaks(to_thin, return_larger_peaks=True, dmin=dmin) |
|
|
| |
| new_cell_peaks = [] |
| removed_cell_peaks = [] |
| removed_cell_peaks_larger = [] |
| for path_idx in range(len(cell_peaks)): |
| path_retained_peaks = [] |
| path_removed_peaks = [] |
| path_peaks = cell_peaks[path_idx][0] |
|
|
| for peak_idx in range(len(path_peaks)): |
| if (path_idx, peak_idx) not in removed_peaks: |
| path_retained_peaks.append(path_peaks[peak_idx]) |
| else: |
| |
| idx = removed_peaks.index((path_idx, peak_idx)) |
| larger_path, larger_idx = removed_larger_peaks[idx] |
| path_removed_peaks.append(RemovedPeakData(idx=path_peaks[peak_idx], screening_peak=(larger_path, cell_peaks[larger_path][0][larger_idx]))) |
| |
| |
| new_cell_peaks.append(path_retained_peaks) |
| removed_cell_peaks.append(path_removed_peaks) |
| |
| cell_peaks = new_cell_peaks |
| pd_list = [] |
| |
| |
| for k in range(len(all_paths)): |
| |
| points, o_intensity = all_paths[k], measured_trace_fluorescence[k] |
|
|
| peaks = cell_peaks[k] |
| removed_peaks = removed_cell_peaks[k] |
| |
| pd = PathData(peaks=peaks, removed_peaks=removed_peaks, points=points, o_intensity=o_intensity, SC_length=path_lengths[k]) |
| pd_list.append(pd) |
|
|
| cd = CellData(pathdata_list=pd_list) |
|
|
| return cd |
|
|
|
|
| alpha_max = 0.4 |
|
|
|
|
| |
| |
| def focus_criterion(pos, v, alpha=alpha_max): |
| """ |
| Identify and return positions where values in the array `v` exceed a certain threshold. |
| |
| The threshold is computed as `alpha` times the maximum value in `v`. |
| |
| Args: |
| - pos (numpy.ndarray): Array of positions. |
| - v (numpy.ndarray): 1D array of values, e.g., intensities. |
| - alpha (float, optional): A scaling factor for the threshold. Defaults to `alpha_max`. |
| |
| Returns: |
| - numpy.ndarray: Array of positions where corresponding values in `v` exceed the threshold. |
| """ |
| if len(v): |
| idx = (v>=alpha*np.max(v)) |
| return np.array(pos[idx]) |
| else: |
| return np.array([], dtype=np.int32) |
|
|
| def analyse_celldata(cell_data, config): |
| """ |
| Analyse the provided cell data to extract focus-related information. |
| |
| Args: |
| cd (CellData): An instance of the CellData class containing path data information. |
| config (dictionary): Configuration dictionary containing 'peak_threshold' and 'threshold_type' |
| 'peak_threshold' (float) - threshold for calling peaks as foci |
| 'threshold_type' (str) = 'per-trace', 'per-foci' |
| |
| Returns: |
| tuple: A tuple containing: |
| - foci_rel_intensity (list): List of relative intensities for the detected foci. |
| - foci_pos (list): List of absolute positions of the detected foci. |
| - foci_pos_index (list): List of indices of the detected foci. |
| - screened_foci_data (list): List of RemovedPeakData indicating positions of removed peaks and the index of the larger peak |
| - trace_median_intensities (list): Per-trace median intensity |
| - trace_thresholds (list): Per-trace absolute threshold for calling peaks as foci |
| """ |
| foci_abs_intensity = [] |
| foci_pos = [] |
| foci_pos_index = [] |
| screened_foci_data = [] |
| trace_median_intensities = [] |
| trace_thresholds = [] |
| |
| peak_threshold = config['peak_threshold'] |
|
|
| threshold_type = config['threshold_type'] |
|
|
| if threshold_type == 'per-trace': |
| """ |
| Call extracted peaks as foci if intensity - trace_mean > peak_threshold * (trace_max_foci_intensity - trace_mean) |
| """ |
| |
| for path_data in cell_data.pathdata_list: |
| peaks = np.array(path_data.peaks, dtype=np.int32) |
|
|
| |
| |
| h = np.array(path_data.o_intensity) |
| h = h - np.mean(h) |
| h = h/np.std(h) |
| |
| foci_idx = focus_criterion(peaks, h[peaks], peak_threshold) |
| |
| |
| removed_peaks = path_data.removed_peaks |
| removed_peaks_idx = np.array([u.idx for u in removed_peaks], dtype=np.int32) |
|
|
| |
| if len(peaks): |
| trace_thresholds.append((1-peak_threshold)*np.mean(path_data.o_intensity) + peak_threshold*np.max(np.array(path_data.o_intensity)[peaks])) |
| else: |
| trace_thresholds.append(None) |
|
|
| if len(removed_peaks): |
| if len(peaks): |
| threshold = (1-peak_threshold)*np.mean(path_data.o_intensity) + peak_threshold*np.max(np.array(path_data.o_intensity)[peaks]) |
| else: |
| threshold = float('-inf') |
|
|
| |
| removed_peak_heights = np.array(path_data.o_intensity)[removed_peaks_idx] |
| screened_foci_idx = np.where(removed_peak_heights>threshold)[0] |
| |
| screened_foci_data.append([removed_peaks[i] for i in screened_foci_idx]) |
| else: |
| screened_foci_data.append([]) |
| |
| pos_abs = (foci_idx/len(path_data.points))*path_data.SC_length |
| foci_pos.append(pos_abs) |
| foci_abs_intensity.append(np.array(path_data.o_intensity)[foci_idx]) |
| |
| foci_pos_index.append(foci_idx) |
| trace_median_intensities.append(np.median(path_data.o_intensity)) |
| |
| elif threshold_type == 'per-cell': |
| """ |
| Call extracted peaks as foci if intensity - trace_mean > peak_threshold * max(intensity - trace_mean) |
| """ |
| max_cell_intensity = float("-inf") |
| for path_data in cell_data.pathdata_list: |
|
|
| |
| |
| h = np.array(path_data.o_intensity) |
| h = h - np.mean(h) |
| max_cell_intensity = max(max_cell_intensity, np.max(h)) |
|
|
| for path_data in cell_data.pathdata_list: |
| peaks = np.array(path_data.peaks, dtype=np.int32) |
|
|
| |
| |
| h = np.array(path_data.o_intensity) |
| h = h - np.mean(h) |
|
|
| foci_idx = peaks[h[peaks]>peak_threshold*max_cell_intensity] |
|
|
| removed_peaks = path_data.removed_peaks |
| removed_peaks_idx = np.array([u.idx for u in removed_peaks], dtype=np.int32) |
|
|
| trace_thresholds.append(np.mean(path_data.o_intensity) + peak_threshold*max_cell_intensity) |
|
|
| if len(removed_peaks): |
| threshold = np.mean(path_data.o_intensity) + peak_threshold*max_cell_intensity |
|
|
| removed_peak_heights = np.array(path_data.o_intensity)[removed_peaks_idx] |
| screened_foci_idx = np.where(removed_peak_heights>threshold)[0] |
| |
| screened_foci_data.append([removed_peaks[i] for i in screened_foci_idx]) |
| else: |
| screened_foci_data.append([]) |
|
|
| pos_abs = (foci_idx/len(path_data.points))*path_data.SC_length |
| foci_pos.append(pos_abs) |
| foci_abs_intensity.append(np.array(path_data.o_intensity)[foci_idx]) |
| |
| foci_pos_index.append(foci_idx) |
| trace_median_intensities.append(np.median(path_data.o_intensity)) |
| |
| else: |
| raise NotImplementedError |
| |
| return foci_abs_intensity, foci_pos, foci_pos_index, screened_foci_data, trace_median_intensities, trace_thresholds |
|
|
| def analyse_traces(all_paths, path_lengths, measured_trace_fluorescence, config): |
| |
| cd = process_cell_traces(all_paths, path_lengths, measured_trace_fluorescence, dmin=config['screening_distance']) |
|
|
| return analyse_celldata(cd, config) |
|
|
| |
|
|
|
|
|
|