| import os |
| import numpy as np |
| import pandas as pd |
| import torch |
| import torch.nn.functional as F |
| from PIL import Image |
| import json |
| import cv2 |
| from sklearn.decomposition import PCA |
| from open_clip import create_model_from_pretrained, get_tokenizer |
|
|
|
|
|
|
| import os |
| from typing import List, Tuple, Union |
|
|
| import torch |
| import torch.nn.functional as F |
| import numpy as np |
| from PIL import Image |
| import pandas as pd |
|
|
| |
|
|
| def load_model( |
| model_path: str, |
| device: Union[str, torch.device] |
| ) -> Tuple[torch.nn.Module, callable, callable]: |
| """ |
| Load pretrained OmiCLIP (COCA ViT‑L‑14) model, its image preprocess, and tokenizer. |
| """ |
| model, preprocess = create_model_from_pretrained( |
| "coca_ViT-L-14", device=device, pretrained=model_path |
| ) |
| tokenizer = get_tokenizer("coca_ViT-L-14") |
| model.to(device).eval() |
| return model, preprocess, tokenizer |
|
|
| |
|
|
| def encode_images( |
| model: torch.nn.Module, |
| preprocess: callable, |
| image_paths: List[str], |
| device: Union[str, torch.device] |
| ) -> torch.Tensor: |
| """ |
| Batch–encode a list of image file paths into L2‑normalized embeddings. |
| Returns a tensor of shape (N, D). |
| """ |
| |
| imgs = [preprocess(Image.open(p)) for p in image_paths] |
| batch = torch.stack(imgs, dim=0).to(device) |
| |
| with torch.no_grad(): |
| feats = model.encode_image(batch) |
| return F.normalize(feats, p=2, dim=-1) |
|
|
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
|
|
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
|
|
| |
|
|
|
|
| |
|
|
| def encode_texts( |
| model: torch.nn.Module, |
| tokenizer: callable, |
| texts: List[str], |
| device: Union[str, torch.device] |
| ) -> torch.Tensor: |
| """ |
| Batch–encode a list of strings into L2‑normalized embeddings. |
| Returns a tensor of shape (N, D). |
| """ |
| |
| text_inputs = tokenizer(texts) |
| |
| with torch.no_grad(): |
| feats = model.encode_text(text_inputs) |
| return F.normalize(feats, p=2, dim=-1) |
|
|
|
|
| def encode_text_df( |
| model: torch.nn.Module, |
| tokenizer: callable, |
| df: pd.DataFrame, |
| col_name: str, |
| device: Union[str, torch.device] |
| ) -> torch.Tensor: |
| """ |
| Encodes an entire DataFrame column into (N, D) embeddings. |
| """ |
| texts = df[col_name].astype(str).tolist() |
| return encode_texts(model, tokenizer, texts, device) |
|
|
|
|
|
|
|
|
| def get_pca_by_fit(tar_features, src_features): |
| """ |
| Applies PCA to target features and transforms both target and source features using the fitted PCA model. |
| Combines the PCA-transformed features from both target and source datasets and returns the combined data |
| along with batch labels indicating the origin of each sample. |
| |
| :param tar_features: Numpy array of target features (samples by features). |
| :param src_features: Numpy array of source features (samples by features). |
| :return: |
| - pca_comb_features: A numpy array containing PCA-transformed target and source features combined. |
| - pca_comb_features_batch: A numpy array of batch labels indicating which samples are from target (0) and source (1). |
| """ |
|
|
| pca = PCA(n_components=3) |
| |
| |
| pca_fit_tar = pca.fit(tar_features.T) |
| |
| |
| pca_tar = pca_fit_tar.transform(tar_features.T) |
| pca_src = pca_fit_tar.transform(src_features.T) |
| |
| |
| pca_comb_features = np.concatenate((pca_tar, pca_src)) |
| |
| |
| pca_comb_features_batch = np.array([0] * len(pca_tar) + [1] * len(pca_src)) |
|
|
| return pca_comb_features, pca_comb_features_batch |
|
|
|
|
|
|
| def cap_quantile(weight, cap_max=None, cap_min=None): |
| """ |
| Caps the values in the 'weight' array based on the specified quantile thresholds for maximum and minimum values. |
| If the quantile thresholds are provided, the function will replace values above or below these thresholds |
| with the corresponding quantile values. |
| |
| :param weight: Numpy array of weights to be capped. |
| :param cap_max: Quantile threshold for the maximum cap. Values above this quantile will be capped. |
| If None, no maximum capping will be applied. |
| :param cap_min: Quantile threshold for the minimum cap. Values below this quantile will be capped. |
| If None, no minimum capping will be applied. |
| :return: Numpy array with the values capped at the specified quantiles. |
| """ |
| |
| |
| if cap_max is not None: |
| cap_max = np.quantile(weight, cap_max) |
| |
| |
| if cap_min is not None: |
| cap_min = np.quantile(weight, cap_min) |
| |
| |
| weight = np.minimum(weight, cap_max) |
| |
| |
| weight = np.maximum(weight, cap_min) |
| |
| return weight |
|
|
|
|
|
|
| def read_polygons(file_path, slide_id): |
| """ |
| Reads polygon data from a JSON file for a specific slide ID, extracting coordinates, colors, and thickness. |
| |
| :param file_path: Path to the JSON file containing polygon configurations. |
| :param slide_id: Identifier for the specific slide whose polygon data is to be extracted. |
| :return: |
| - polygons: A list of numpy arrays, where each array contains the coordinates of a polygon. |
| - polygon_colors: A list of color values corresponding to each polygon. |
| - polygon_thickness: A list of thickness values for each polygon's border. |
| """ |
|
|
| |
| with open(file_path, 'r') as f: |
| polygons_configs = json.load(f) |
|
|
| |
| if slide_id not in polygons_configs: |
| return None, None, None |
|
|
| |
| polygons = [np.array(poly['coords']) for poly in polygons_configs[slide_id]] |
| polygon_colors = [poly['color'] for poly in polygons_configs[slide_id]] |
| polygon_thickness = [poly['thickness'] for poly in polygons_configs[slide_id]] |
|
|
| |
| return polygons, polygon_colors, polygon_thickness |
|
|
|
|
|
|