| """ |
| Help functions to evaluate our visualization system |
| """ |
|
|
| import numpy as np |
| from pynndescent import NNDescent |
| from sklearn.neighbors import NearestNeighbors |
| from sklearn.manifold import trustworthiness |
| from scipy.stats import kendalltau, spearmanr, pearsonr, rankdata |
|
|
|
|
| def evaluate_isAlign(embeddingLeft, embeddingRight, align_metric=1): |
| lens = len(embeddingLeft) |
| align_indices = [] |
| for i in range(lens): |
| dist = np.linalg.norm( embeddingLeft[i]-embeddingRight[i]) |
| if dist < align_metric: |
| align_indices.append(i) |
| return align_indices |
|
|
|
|
| def evaluate_isAlign_single(embeddingLeft, embeddingRight, selected_left, selected_right,align_metric=1): |
| lens = len(embeddingLeft) |
| align_indices_left = [] |
| align_indices_right = [] |
|
|
| if selected_left != -1: |
| for i in range(lens): |
| dist = np.linalg.norm( embeddingLeft[selected_left]-embeddingRight[i]) |
| if dist < align_metric: |
| align_indices_right.append(i) |
| if selected_right != -1: |
| for i in range(lens): |
| dist = np.linalg.norm( embeddingLeft[i]-embeddingRight[selected_right]) |
| if dist < align_metric: |
| align_indices_left.append(i) |
|
|
| return align_indices_left, align_indices_right |
| |
| def evaluate_isNearestNeighbour(embeddingLeft, embeddingRight, n_neighbors=15, metric="euclidean"): |
| """ |
| Find indices where none of the nearest neighbors in embeddingLeft are preserved in embeddingRight. |
| :param embeddingLeft: ndarray, first set of low dimensional representations |
| :param embeddingRight: ndarray, second set of low dimensional representations |
| :param n_neighbors: int, number of nearest neighbors to consider |
| :param metric: str, metric for nearest neighbor calculation, default "euclidean" |
| :return: list of indices where none of the neighbors are preserved |
| """ |
| n_trees = 5 + int(round((embeddingLeft.shape[0]) ** 0.5 / 20.0)) |
| n_iters = max(5, int(round(np.log2(embeddingLeft.shape[0])))) |
|
|
| nnd_left = NNDescent( |
| embeddingLeft, |
| n_neighbors=n_neighbors, |
| metric=metric, |
| n_trees=n_trees, |
| n_iters=n_iters, |
| max_candidates=60, |
| verbose=True |
| ) |
| left_neighbors, _ = nnd_left.neighbor_graph |
|
|
| nnd_right = NNDescent( |
| embeddingRight, |
| n_neighbors=n_neighbors, |
| metric=metric, |
| n_trees=n_trees, |
| n_iters=n_iters, |
| max_candidates=60, |
| verbose=True |
| ) |
| right_neighbors, _ = nnd_right.neighbor_graph |
|
|
| non_preserved_indices = [] |
| for i in range(len(embeddingLeft)): |
| if len(np.intersect1d(left_neighbors[i], right_neighbors[i])) == 0: |
| non_preserved_indices.append(i) |
|
|
| return non_preserved_indices |
|
|
| def evaluate_isNearestNeighbour_single(embeddingLeft, embeddingRight, selected_left, selected_right, n_neighbors=15, metric="euclidean"): |
|
|
| n_trees = 5 + int(round((embeddingLeft.shape[0]) ** 0.5 / 20.0)) |
| n_iters = max(5, int(round(np.log2(embeddingLeft.shape[0])))) |
|
|
| nnd_left = NNDescent( |
| embeddingLeft, |
| n_neighbors=n_neighbors, |
| metric=metric, |
| n_trees=n_trees, |
| n_iters=n_iters, |
| max_candidates=60, |
| verbose=True |
| ) |
| left_neighbors, _ = nnd_left.neighbor_graph |
|
|
| nnd_right = NNDescent( |
| embeddingRight, |
| n_neighbors=n_neighbors, |
| metric=metric, |
| n_trees=n_trees, |
| n_iters=n_iters, |
| max_candidates=60, |
| verbose=True |
| ) |
| right_neighbors, _ = nnd_right.neighbor_graph |
|
|
| nearest_neighbors_leftE_rightS = [] |
| nearest_neighbors_rightE_rightS = [] |
| nearest_neighbors_leftE_leftS = [] |
| nearest_neighbors_rightE_leftS = [] |
| if selected_left != -1: |
| nearest_neighbors_leftE_leftS = left_neighbors[selected_left].tolist() |
| nearest_neighbors_rightE_leftS = right_neighbors[selected_left].tolist() |
|
|
| if selected_right != -1: |
|
|
| nearest_neighbors_leftE_rightS = left_neighbors[selected_right].tolist() |
| nearest_neighbors_rightE_rightS = right_neighbors[selected_right].tolist() |
|
|
| return nearest_neighbors_leftE_leftS, nearest_neighbors_leftE_rightS, nearest_neighbors_rightE_leftS, nearest_neighbors_rightE_rightS |
|
|
| def evaluate_proj_nn_perseverance_knn(data, embedding, n_neighbors, metric="euclidean", selected_indices=None): |
| """ |
| evaluate projection function, nn preserving property using knn algorithm |
| :param data: ndarray, high dimensional representations |
| :param embedding: ndarray, low dimensional representations |
| :param n_neighbors: int, the number of neighbors |
| :param metric: str, by default "euclidean" |
| :return nn property: float, nn preserving property |
| """ |
| n_trees = 5 + int(round((data.shape[0]) ** 0.5 / 20.0)) |
| n_iters = max(5, int(round(np.log2(data.shape[0])))) |
| |
| nnd = NNDescent( |
| data, |
| n_neighbors=n_neighbors, |
| metric=metric, |
| n_trees=n_trees, |
| n_iters=n_iters, |
| max_candidates=60, |
| verbose=True |
| ) |
| high_ind, _ = nnd.neighbor_graph |
| nnd = NNDescent( |
| embedding, |
| n_neighbors=n_neighbors, |
| metric=metric, |
| n_trees=n_trees, |
| n_iters=n_iters, |
| max_candidates=60, |
| verbose=True |
| ) |
| low_ind, _ = nnd.neighbor_graph |
| lens_data = len(data) |
|
|
| if selected_indices: |
| lens_data = len(selected_indices) |
| high_ind = high_ind[selected_indices] |
| low_ind = low_ind[selected_indices] |
|
|
| border_pres = np.zeros(lens_data) |
| for i in range(lens_data): |
| border_pres[i] = len(np.intersect1d(high_ind[i], low_ind[i])) |
|
|
| |
| return border_pres.mean() |
|
|
| from sklearn.metrics import pairwise_distances |
| import numpy as np |
|
|
| def subset_trustworthiness(X, X_subset, X_embedded, X_subset_embedded, n_neighbors=5): |
| |
| dist_X = pairwise_distances(X) |
| dist_X_embedded = pairwise_distances(X_embedded) |
|
|
| |
| subset_indices = np.where(np.isin(X, X_subset))[0] |
|
|
| |
| neighbors_X = np.argsort(dist_X)[:, 1:n_neighbors+1] |
| subset_neighbors_X = neighbors_X[subset_indices] |
|
|
| |
| neighbors_X_embedded = np.argsort(dist_X_embedded)[:, 1:n_neighbors+1] |
| subset_neighbors_X_embedded = neighbors_X_embedded[subset_indices] |
|
|
| |
| trustworthiness_val = 0 |
| for i, subset_index in enumerate(subset_indices): |
| for j, neighbor in enumerate(subset_neighbors_X_embedded[i]): |
| rank_in_X = np.where(neighbors_X[subset_index] == neighbor)[0][0] |
| if rank_in_X >= n_neighbors: |
| trustworthiness_val += rank_in_X - n_neighbors + 1 |
|
|
| trustworthiness_val = trustworthiness_val / (X_subset.shape[0] * n_neighbors) |
| trustworthiness_val = 1 - (2. / n_neighbors) * trustworthiness_val |
|
|
| return trustworthiness_val |
|
|
|
|
|
|
|
|
|
|
|
|
| def evaluate_proj_nn_perseverance_trustworthiness(data, embedding, n_neighbors, metric="euclidean"): |
| """ |
| evaluate projection function, nn preserving property using trustworthiness formula |
| :param data: ndarray, high dimensional representations |
| :param embedding: ndarray, low dimensional representations |
| :param n_neighbors: int, the number of neighbors |
| :param metric: str, by default "euclidean" |
| :return nn property: float, nn preserving property |
| """ |
| t = trustworthiness(data, embedding, n_neighbors=n_neighbors, metric=metric) |
| return t |
|
|
|
|
| def evaluate_proj_boundary_perseverance_knn(data, embedding, high_centers, low_centers, n_neighbors): |
| """ |
| evaluate projection function, boundary preserving property |
| :param data: ndarray, high dimensional representations |
| :param embedding: ndarray, low dimensional representations |
| :param high_centers: ndarray, border points high dimensional representations |
| :param low_centers: ndarray, border points low dimensional representations |
| :param n_neighbors: int, the number of neighbors |
| :return boundary preserving property: float,boundary preserving property |
| """ |
| high_neigh = NearestNeighbors(n_neighbors=n_neighbors, radius=0.4) |
| high_neigh.fit(high_centers) |
| high_ind = high_neigh.kneighbors(data, n_neighbors=n_neighbors, return_distance=False) |
|
|
| low_neigh = NearestNeighbors(n_neighbors=n_neighbors, radius=0.4) |
| low_neigh.fit(low_centers) |
| low_ind = low_neigh.kneighbors(embedding, n_neighbors=n_neighbors, return_distance=False) |
|
|
| border_pres = np.zeros(len(data)) |
| for i in range(len(data)): |
| border_pres[i] = len(np.intersect1d(high_ind[i], low_ind[i])) |
|
|
| |
| return border_pres.mean() |
|
|
|
|
| def evaluate_proj_temporal_perseverance_corr(alpha, delta_x): |
| """ |
| Evaluate temporal preserving property, |
| calculate the correlation between neighbor preserving rate and moving distance in low dim in a time sequence |
| :param alpha: ndarray, shape(N,) neighbor preserving rate |
| :param delta_x: ndarray, shape(N,), moved distance in low dim for each point |
| :return corr: ndarray, shape(N,), correlation for each point from temporal point of view |
| """ |
| alpha = alpha.T |
| delta_x = delta_x.T |
| shape = alpha.shape |
| data_num = shape[0] |
| corr = np.zeros(data_num) |
| for i in range(data_num): |
| |
| correlation, pvalue = pearsonr(alpha[i], delta_x[i]) |
| if np.isnan(correlation): |
| correlation = 0.0 |
| corr[i] = correlation |
| return corr.mean(), corr.std() |
|
|
|
|
| def evaluate_inv_distance(data, inv_data): |
| """ |
| The distance between original data and reconstruction data |
| :param data: ndarray, high dimensional data |
| :param inv_data: ndarray, reconstruction data |
| :return err: mse, reconstruction error |
| """ |
| return np.linalg.norm(data-inv_data, axis=1).mean() |
|
|
|
|
| def evaluate_inv_accu(labels, pred): |
| """ |
| prediction accuracy of reconstruction data |
| :param labels: ndarray, shape(N,), label for each point |
| :param pred: ndarray, shape(N,), prediction for each point |
| :return accu: float, the reconstruction accuracy |
| """ |
| return np.sum(labels == pred) / len(labels) |
|
|
|
|
| def evaluate_inv_conf(labels, ori_pred, new_pred): |
| """ |
| the confidence difference between original data and reconstruction data |
| :param labels: ndarray, shape(N,), the original prediction for each point |
| :param ori_pred: ndarray, shape(N,10), the prediction of original data |
| :param new_pred: ndarray, shape(N,10), the prediction of reconstruction data |
| :return diff: float, the mean of confidence difference for each point |
| """ |
| old_conf = [ori_pred[i, labels[i]] for i in range(len(labels))] |
| new_conf = [new_pred[i, labels[i]] for i in range(len(labels))] |
| old_conf = np.array(old_conf) |
| new_conf = np.array(new_conf) |
|
|
| diff = np.abs(old_conf - new_conf) |
| |
| return diff.mean() |
|
|
|
|
| def evaluate_proj_temporal_perseverance_entropy(alpha, delta_x): |
| """ |
| (discard) |
| calculate the temporal preserving property |
| based on the correlation between the entropy of moved distance and neighbor preserving rate(alpha) |
| :param alpha: ndarray, shape(N,), neighbor preserving rate for each point |
| :param delta_x: ndarray, shape(N,), the moved distance in low dim for each point |
| :return corr: float, the mean of all correlation |
| """ |
| alpha = alpha.T |
| delta_x = delta_x.T |
| shape = alpha.shape |
| data_num = shape[0] |
| |
| |
| |
| delta_x_norm = delta_x.max() |
| delta_x_norm = delta_x / delta_x_norm |
|
|
| alpha = np.floor(alpha*10) |
| delta_x_norm = np.floor(delta_x_norm*10) |
|
|
| corr = np.zeros(len(alpha)) |
| |
| for i in range(len(alpha)): |
| |
| index = list() |
| entropy = list() |
| for j in range(11): |
| dx = delta_x_norm[i][np.where(alpha[i] == j)] |
| entropy_x = np.zeros(11) |
| for k in range(11): |
| entropy_x[k] = np.sum(dx == k) |
| if np.sum(entropy_x) == 0: |
| continue |
| else: |
| entropy_x = entropy_x / np.sum(entropy_x+10e-8) |
| entropy_x = np.sum(entropy_x*np.log(entropy_x+10e-8)) |
| entropy.append(-entropy_x) |
| index.append(j) |
| if len(index) < 2: |
| print("no enough data to form a correlation, setting correlation to be 0") |
| corr[i] = 0 |
| else: |
| correlation, _ = pearsonr(index, entropy) |
| corr[i] = correlation |
|
|
| return corr.mean() |
|
|
|
|
| def evaluate_proj_temporal_global_corr(high_rank, low_rank): |
| l = len(high_rank) |
| tau_l = np.zeros(l) |
| p_l = np.zeros(l) |
| for i in range(l): |
| r1 = high_rank[i] |
| r2 = low_rank[i] |
| tau, p = spearmanr(r1, r2) |
| tau_l[i] = tau |
| p_l[i] = p |
| return tau_l, p_l |
|
|
|
|
| def _wcov(x, y, w, ms): |
| return np.sum(w * (x - ms[0]) * (y - ms[1])) |
| def _wpearson(x, y, w): |
| mx, my = (np.sum(i * w) / np.sum(w) for i in [x, y]) |
| return _wcov(x, y, w, [mx, my]) / np.sqrt(_wcov(x, x, w, [mx, mx]) * _wcov(y, y, w, [my, my])) |
| def evaluate_proj_temporal_weighted_global_corr(high_rank, low_rank): |
| k = len(high_rank) |
| r = rankdata(high_rank).astype("int")-1 |
| tau = _wpearson(high_rank[r], low_rank[r], 1/np.arange(1, k+1)) |
| return tau |
|
|
|
|
| def evaluate_keep_B(low_B, grid_view, decision_view, threshold=0.8): |
| """ |
| evaluate whether high dimensional boundary points still lying on Boundary in low-dimensional space or not |
| find the nearest grid point of boundary points, and check whether the color of corresponding grid point is white or not |
| |
| :param low_B: ndarray, (n, 2), low dimension position of boundary points |
| :param grid_view: ndarray, (resolution^2, 2), the position array of grid points |
| :param decision_view: ndarray, (resolution^2, 3), the RGB color of grid points |
| :param threshold: |
| :return: |
| """ |
| if len(low_B) == 0 or low_B is None: |
| return .0 |
| |
| grid_view = grid_view.reshape(-1, 2) |
| decision_view = decision_view.reshape(-1, 3) |
|
|
| |
| nbs = NearestNeighbors(n_neighbors=1, algorithm="ball_tree").fit(grid_view) |
| _, indices = nbs.kneighbors(low_B) |
| indices = indices.squeeze() |
| sample_colors = decision_view[indices] |
|
|
| |
| c1 = np.zeros(indices.shape[0], dtype=np.bool) |
| c1[sample_colors[:, 0] > threshold] = 1 |
|
|
| c2 = np.zeros(indices.shape[0], dtype=np.bool) |
| c2[sample_colors[:, 1] > threshold] = 1 |
|
|
| c3 = np.zeros(indices.shape[0], dtype=np.bool) |
| c3[sample_colors[:, 2] > threshold] = 1 |
| c = np.logical_and(c1, c2) |
| c = np.logical_and(c, c3) |
|
|
| |
| return np.sum(c)/len(c) |