| | import scanpy as sc |
| | import numpy as np |
| | import pandas as pd |
| | import json |
| | import os |
| | from PIL import Image |
| |
|
| |
|
| |
|
| | def generate_gene_df(ad, house_keeping_genes, todense=True): |
| | """ |
| | Generates a DataFrame with the top 50 genes for each observation in an AnnData object. |
| | It removes genes containing '.' or '-' in their names, as well as genes listed in |
| | the provided `house_keeping_genes` DataFrame/Series under the 'genesymbol' column. |
| | |
| | :param ad: An AnnData object containing gene expression data. |
| | :type ad: anndata.AnnData |
| | :param house_keeping_genes: DataFrame or Series with a 'genesymbol' column listing housekeeping genes to exclude. |
| | :type house_keeping_genes: pandas.DataFrame or pandas.Series |
| | :param todense: Whether to convert the sparse matrix (ad.X) to a dense matrix before creating a DataFrame. |
| | :type todense: bool |
| | :return: A DataFrame (`top_k_genes_str`) that contains a 'label' column. Each row in 'label' is a string |
| | with the top 50 gene names (space-separated) for that observation. |
| | :rtype: pandas.DataFrame |
| | """ |
| |
|
| | |
| | ad = ad[:, ~ad.var.index.str.contains('.', regex=False)] |
| | |
| | ad = ad[:, ~ad.var.index.str.contains('-', regex=False)] |
| | |
| | ad = ad[:, ~ad.var.index.isin(house_keeping_genes['genesymbol'])] |
| |
|
| | |
| | if todense: |
| | expr = pd.DataFrame(ad.X.todense(), index=ad.obs.index, columns=ad.var.index) |
| | else: |
| | expr = pd.DataFrame(ad.X, index=ad.obs.index, columns=ad.var.index) |
| |
|
| | |
| | top_k_genes = expr.apply(lambda s, n: pd.Series(s.nlargest(n).index), axis=1, n=50) |
| |
|
| | |
| | top_k_genes_str = pd.DataFrame() |
| | top_k_genes_str['label'] = top_k_genes[top_k_genes.columns].astype(str) \ |
| | .apply(lambda x: ' '.join(x), axis=1) |
| |
|
| | return top_k_genes_str |
| |
|
| |
|
| |
|
| | def segment_patches(img_array, coord, patch_dir, height=20, width=20): |
| | """ |
| | Extracts small image patches centered at specified coordinates and saves them as individual PNG files. |
| | |
| | :param img_array: A NumPy array representing the full-resolution image. Shape is expected to be (H, W[, C]). |
| | :type img_array: numpy.ndarray |
| | :param coord: A pandas DataFrame containing patch center coordinates in columns "pixel_x" and "pixel_y". |
| | The index corresponds to spot IDs. Example columns: ["pixel_x", "pixel_y"]. |
| | :type coord: pandas.DataFrame |
| | :param patch_dir: Directory path where the patch images will be saved. |
| | :type patch_dir: str |
| | :param height: The patch's height in pixels (distance in the y-direction). |
| | :type height: int |
| | :param width: The patch's width in pixels (distance in the x-direction). |
| | :type width: int |
| | :return: None. The function saves image patches to `patch_dir` but does not return anything. |
| | """ |
| |
|
| | |
| | if not os.path.exists(patch_dir): |
| | os.makedirs(patch_dir) |
| |
|
| | |
| | yrange, xrange = img_array.shape[:2] |
| |
|
| | |
| | for spot_idx in coord.index: |
| | |
| | ycenter, xcenter = coord.loc[spot_idx, ["pixel_x", "pixel_y"]] |
| |
|
| | |
| | x1 = round(xcenter - width / 2) |
| | y1 = round(ycenter - height / 2) |
| | x2 = x1 + width |
| | y2 = y1 + height |
| |
|
| | |
| | if x1 < 0 or y1 < 0 or x2 > xrange or y2 > yrange: |
| | print(f"Patch {spot_idx} is out of range and will be skipped.") |
| | continue |
| |
|
| | |
| | patch_img = Image.fromarray(img_array[y1:y2, x1:x2].astype(np.uint8)) |
| |
|
| | |
| | patch_name = f"{spot_idx}_hires.png" |
| | patch_path = os.path.join(patch_dir, patch_name) |
| |
|
| | |
| | patch_img.save(patch_path) |
| |
|
| |
|
| |
|
| | def read_gct(file_path): |
| | """ |
| | Reads a GCT file, parses its dimensions, and returns the data as a pandas DataFrame. |
| | |
| | :param file_path: The path to the GCT file to be read. |
| | :return: A pandas DataFrame containing the GCT data, where the first two columns represent gene names and descriptions, |
| | and the subsequent columns contain the expression data. |
| | """ |
| | |
| | |
| | with open(file_path, 'r') as file: |
| | |
| | file.readline() |
| | |
| | |
| | dims = file.readline().strip().split() |
| | num_rows = int(dims[0]) |
| | num_cols = int(dims[1]) |
| | |
| | |
| | |
| | data = pd.read_csv(file, sep='\t', header=0, nrows=num_rows) |
| | |
| | |
| | return data |
| |
|
| |
|
| |
|
| | def get_library_id(adata): |
| | """ |
| | Retrieves the library ID from the AnnData object, assuming it contains spatial data. |
| | The function will return the first library ID found in `adata.uns['spatial']`. |
| | |
| | :param adata: AnnData object containing spatial information in `adata.uns['spatial']`. |
| | :return: The first library ID found in `adata.uns['spatial']`. |
| | :raises: |
| | AssertionError: If 'spatial' is not present in `adata.uns`. |
| | Logs an error if no library ID is found. |
| | """ |
| | |
| | |
| | assert 'spatial' in adata.uns, "spatial not present in adata.uns" |
| | |
| | |
| | library_ids = adata.uns['spatial'].keys() |
| | |
| | try: |
| | |
| | library_id = list(library_ids)[0] |
| | return library_id |
| | except IndexError: |
| | |
| | logger.error('No library_id found in adata') |
| |
|
| |
|
| |
|
| | def get_scalefactors(adata, library_id=None): |
| | """ |
| | Retrieves the scalefactors from the AnnData object for a given library ID. If no library ID is provided, |
| | the function will automatically retrieve the first available library ID. |
| | |
| | :param adata: AnnData object containing spatial data and scalefactors in `adata.uns['spatial']`. |
| | :param library_id: The library ID for which the scalefactors are to be retrieved. If not provided, it defaults to the first available ID. |
| | :return: A dictionary containing scalefactors for the specified library ID. |
| | """ |
| | |
| | |
| | if library_id is None: |
| | library_id = get_library_id(adata) |
| | |
| | try: |
| | |
| | scalef = adata.uns['spatial'][library_id]['scalefactors'] |
| | return scalef |
| | except KeyError: |
| | |
| | logger.error('scalefactors not found in adata') |
| |
|
| |
|
| |
|
| | def get_spot_diameter_in_pixels(adata, library_id=None): |
| | """ |
| | Retrieves the spot diameter in pixels from the AnnData object's scalefactors for a given library ID. |
| | If no library ID is provided, the function will automatically retrieve the first available library ID. |
| | |
| | :param adata: AnnData object containing spatial data and scalefactors in `adata.uns['spatial']`. |
| | :param library_id: The library ID for which the spot diameter is to be retrieved. If not provided, defaults to the first available ID. |
| | |
| | :return: The spot diameter in full resolution pixels, or None if not found. |
| | """ |
| | |
| | |
| | scalef = get_scalefactors(adata, library_id=library_id) |
| | |
| | try: |
| | |
| | spot_diameter = scalef['spot_diameter_fullres'] |
| | return spot_diameter |
| | except TypeError: |
| | |
| | pass |
| | except KeyError: |
| | |
| | logger.error('spot_diameter_fullres not found in adata') |
| |
|
| |
|
| |
|
| | def prepare_data_for_alignment(data_path, scale_type='tissue_hires_scalef'): |
| | """ |
| | Prepares data for alignment by reading an AnnData object and preparing the high-resolution tissue image. |
| | |
| | :param data_path: The path to the AnnData (.h5ad) file containing the Visium data. |
| | :param scale_type: The type of scale factor to use (`tissue_hires_scalef` by default). |
| | |
| | :return: |
| | - ad: AnnData object containing the spatial transcriptomics data. |
| | - ad_coor: Numpy array of scaled spatial coordinates (adjusted for the specified resolution). |
| | - img: High-resolution tissue image, normalized to 8-bit unsigned integers. |
| | |
| | :raises: |
| | ValueError: If required data (e.g., scale factors, spatial coordinates, or images) is missing. |
| | """ |
| | |
| | |
| | ad = sc.read_h5ad(data_path) |
| | |
| | |
| | ad.var_names_make_unique() |
| | |
| | try: |
| | |
| | scalef = get_scalefactors(ad)[scale_type] |
| | except KeyError: |
| | raise ValueError(f"Scale factor '{scale_type}' not found in ad.uns['spatial']") |
| | |
| | |
| | try: |
| | ad_coor = np.array(ad.obsm['spatial']) * scalef |
| | except KeyError: |
| | raise ValueError("Spatial coordinates not found in ad.obsm['spatial']") |
| | |
| | |
| | try: |
| | img = ad.uns['spatial'][get_library_id(ad)]['images']['hires'] |
| | except KeyError: |
| | raise ValueError("High-resolution image not found in ad.uns['spatial']") |
| | |
| | |
| | if img.max() < 1.1: |
| | img = (img * 255).astype('uint8') |
| | |
| | return ad, ad_coor, img |
| |
|
| |
|
| |
|
| | def load_data_for_annotation(st_data_path, json_path, in_tissue=True): |
| | """ |
| | Loads spatial transcriptomics (ST) data from an .h5ad file and prepares it for annotation. |
| | |
| | :param sample_type: The type or category of the sample (used to locate the data in the directory structure). |
| | :param sample_name: The name of the sample (used to locate specific files). |
| | :param in_tissue: Boolean flag to filter the data to include only spots that are in tissue. Default is True. |
| | |
| | :return: |
| | - st_ad: AnnData object containing the spatial transcriptomics data, with spatial coordinates in `obs`. |
| | - library_id: The library ID associated with the spatial data. |
| | - roi_polygon: Region of interest polygon loaded from a JSON file for further annotation or analysis. |
| | """ |
| |
|
| | |
| | st_ad = sc.read_h5ad(st_data_path) |
| | |
| | |
| | if in_tissue: |
| | st_ad = st_ad[st_ad.obs['in_tissue'] == 1] |
| | |
| | |
| | st_ad.obs[["pixel_y", "pixel_x"]] = None |
| | st_ad.obs[["pixel_y", "pixel_x"]] = st_ad.obsm['spatial'] |
| | |
| | |
| | library_id = get_library_id(st_ad) |
| | |
| | |
| | with open(json_path) as f: |
| | roi_polygon = json.load(f) |
| |
|
| | return st_ad, library_id, roi_polygon |
| |
|
| |
|
| |
|
| | def read_polygons(file_path, slide_id): |
| | """ |
| | Reads polygon data from a JSON file for a specific slide ID, extracting coordinates, colors, and thickness. |
| | |
| | :param file_path: Path to the JSON file containing polygon configurations. |
| | :param slide_id: Identifier for the specific slide whose polygon data is to be extracted. |
| | :return: |
| | - polygons: A list of numpy arrays, where each array contains the coordinates of a polygon. |
| | - polygon_colors: A list of color values corresponding to each polygon. |
| | - polygon_thickness: A list of thickness values for each polygon's border. |
| | """ |
| |
|
| | |
| | with open(file_path, 'r') as f: |
| | polygons_configs = json.load(f) |
| |
|
| | |
| | if slide_id not in polygons_configs: |
| | return None, None, None |
| |
|
| | |
| | polygons = [np.array(poly['coords']) for poly in polygons_configs[slide_id]] |
| | polygon_colors = [poly['color'] for poly in polygons_configs[slide_id]] |
| | polygon_thickness = [poly['thickness'] for poly in polygons_configs[slide_id]] |
| |
|
| | |
| | return polygons, polygon_colors, polygon_thickness |
| |
|
| |
|
| |
|