Source code for maxfuse.utils

"""
General utility functions
"""

import warnings
from collections import defaultdict, Counter
import numpy as np
import anndata as ad
import scanpy as sc
from scipy.sparse.linalg import svds
from sklearn.cross_decomposition import CCA
from sklearn.utils.extmath import randomized_svd


[docs]def center_scale(arr): """ Column-wise center and scale by standard deviation. Parameters ---------- arr: np.ndarray of shape (n_samples, n_features) Returns ------- Center and scaled version of arr. """ return (arr - arr.mean(axis=0)) / arr.std(axis=0)
[docs]def sort_dict(dict_): """ Return a copy of dict_ with both keys and values sorted. Parameters ---------- dict_: dict Returns ------- A sorted copy of dict_ """ res = defaultdict(list) for key in sorted(dict_.keys()): # sort according to decreasing order in scores res[key] = sorted(dict_[key], key=lambda x: -x[1]) return res
[docs]def dict_to_list(dict_): """ Convert dict_ into a list. Parameters ---------- dict_: dict A dict representing a matching. The structure is {row: [(col1, val1), (col2, val2), ...]} Returns ------- res A list of length three, say (rows, cols, vals). Each element is further a list and each matched pair is (rows[i], cols[i]), and their similarity score is vals[i]. """ res = [[], [], []] for curr_idx1, indices2_and_scores in dict_.items(): for curr_idx2, curr_score in indices2_and_scores: res[0].append(curr_idx1) res[1].append(curr_idx2) res[2].append(curr_score) return res
[docs]def list_to_dict(list_): """ Convert list_ to a dict. Parameters ---------- list_: list of length three rows, cols, vals = list_, each element is further a list and each matched pair is (rows[i], cols[i]), and their similarity score is vals[i]. Returns ------- A dict representing a matching. The structure is {row: [(col1, val1), (col2, val2), ...]}. """ res = defaultdict(list) for idx1, idx2, score in zip(list_[0], list_[1], list_[2]): res[idx1].append((idx2, score)) return res
[docs]def summarize_clustering(clustering, true_labels): """ Compute the majority cell type for each cluster. Parameters ---------- clustering: np.array of shape (n_samples,) Clustering labels, coded from 0, 1, ..., n_clusters. true_labels: np.array of shape (n_samples,) Groundtruth labels. Returns ------- np.array of shape (n_clusters,) The majority voting results. """ res = [] cluster_label_to_indices = defaultdict(list) for i, l in enumerate(clustering): cluster_label_to_indices[l].append(i) for i in range(len(cluster_label_to_indices)): curr_indices = cluster_label_to_indices[i] curr_true_labels = true_labels[curr_indices] # majority voting # randomly break the ties curr_true_labels = np.random.permutation(curr_true_labels) counter = Counter(curr_true_labels) most_common_element, _ = counter.most_common(1)[0] res.append(most_common_element) return res
[docs]def drop_zero_variability_columns(arr_lst: list, tol=1e-8): """ Drop columns for which its standard deviation is zero in any one of the arrays in arr_list. Parameters ---------- arr_lst: list of np.array List of arrays tol: float, default=1e-8 Any number less than tol is considered as zero Returns ------- List of np.array where no column has zero standard deviation """ assert all([arr_lst[i].shape[1] == arr_lst[0].shape[1] for i in range(len(arr_lst))]) bad_columns = set() for arr in arr_lst: curr_std = np.std(arr, axis=0) for col in np.nonzero(np.abs(curr_std) < tol)[0]: bad_columns.add(col) good_columns = [i for i in range(arr_lst[0].shape[1]) if i not in bad_columns] return [arr[:, good_columns] for arr in arr_lst]
[docs]def recode(labels): """ Recode the cluster labels to 0, 1, ..., num_clusters-1 Parameters ---------- labels: np.array of shape (n_samples,) Cluster labels, each element must be hashable Returns ------- new_labels: np.array of shape (n_samples,) Recoded cluster labels new_to_old: dict Dictionary of {new_label: old_label} """ unique_labels = np.unique(labels) old_to_new = {old: new for new, old in enumerate(unique_labels)} new_to_old = {new: old for new, old in enumerate(unique_labels)} new_labels = [] for l in labels: new_labels.append(old_to_new[l]) return np.array(new_labels), new_to_old
[docs]def shrink_towards_centroids(arr, labels, wt): """ For each row of arr, shrink it towards its cluster centroid by taking wt*raw_data + (1-wt)*centroid Parameters ---------- arr: np.array of shape (n_samples, n_features) Data matrix labels: np.array of shape (n_samples,) Cluster labels wt: float Weight for shrinkage Returns ------- denoised_arr: np.array of shape (n_samples, n_features) Original array after centroid shrinkage """ labels, _ = recode(labels) centroids = get_centroids(arr, labels) centroids = centroids[labels, :] return wt * arr + (1 - wt) * centroids
[docs]def graph_smoothing(arr, edges, wt): """ For each row of arr, shrink it towards the average of its neighborhood by taking wt*raw_data + (1-wt)*nhbd_avg Parameters ---------- arr: np.array of shape (n_samples, n_features) Data matrix edges: list of length two or three Each edge of the graph is (edges[0][i], edges[1][i]) and the weight is edges[2][i] (if exists) wt: float Weight for shrinkage Returns ------- denoised_arr: np.array of shape (n_samples, n_features) Original array after graph_smoothing """ n = arr.shape[0] adj_list = [[] for _ in range(n)] wt_list = [[] for _ in range(n)] for i in range(len(edges[0])): adj_list[edges[0][i]].append(edges[1][i]) if len(edges) > 2: wt_list[edges[0][i]].append(edges[2][i]) # for i in range(len(adj.indptr)-1): # col_indices = adj.indices[adj.indptr[i]:adj.indptr[i+1]] # if len(col_indices) == 0: # adj_list.append(np.array([i])) # else: # adj_list.append(adj.indices[adj.indptr[i]:adj.indptr[i+1]]) centroids = [] for i in range(n): if len(edges) == 2: centroids.append(np.mean(arr[adj_list[i], :], axis=0)) else: centroids.append(np.average(arr[adj_list[i], :], axis=0, weights=wt_list[i])) centroids = np.array(centroids) return wt * arr + (1-wt) * centroids
[docs]def get_centroids(arr, labels): """ Compute the centroids (cluster mean) of arr. Parameters ---------- arr: np.array of shape (n_samples, n_features) Data matrix labels: np.array of shape (n_samples,) Cluster labels of each sample, coded from 0, ..., num_clusters-1 Returns ------- centroids: np.array of shape (n_centroids, n_features) Matrix of cluster centroids, the i-th column is the centroid of the i-th cluster """ cluster_label_to_indices = defaultdict(list) for i, l in enumerate(labels): cluster_label_to_indices[l].append(i) unique_labels = sorted(cluster_label_to_indices.keys()) if not all(i == l for i, l in enumerate(unique_labels)): raise ValueError('labels must be coded in integers from 0, ..., n_clusters-1.') centroids = np.empty((len(unique_labels), arr.shape[1])) for curr_label, indices in cluster_label_to_indices.items(): centroids[curr_label, :] = arr[indices, :].mean(axis=0) return centroids
[docs]def robust_svd(arr, n_components, randomized=False, n_runs=1): """ Do deterministic or randomized SVD on arr. Parameters ---------- arr: np.array The array to do SVD on n_components: int Number of SVD components randomized: bool, default=False Whether to run randomized SVD n_runs: int, default=1 Run multiple times and take the realization with the lowest Frobenious reconstruction error Returns ------- u, s, vh: np.array u @ np.diag(s) @ vh is the reconstruction of the original arr """ if randomized: best_err = float('inf') u, s, vh = None, None, None for _ in range(n_runs): curr_u, curr_s, curr_vh = randomized_svd(arr, n_components=n_components, random_state=None) curr_err = np.sum((arr - curr_u @ np.diag(curr_s) @ curr_vh) ** 2) if curr_err < best_err: best_err = curr_err u, s, vh = curr_u, curr_s, curr_vh assert u is not None and s is not None and vh is not None else: if n_runs > 1: warnings.warn("Doing deterministic SVD, n_runs reset to one.") u, s, vh = svds(arr*1.0, k=n_components) # svds can not handle integer values return u, s, vh
[docs]def svd_denoise(arr, n_components=20, randomized=False, n_runs=1): """ Compute best rank-n_components approximation of arr by SVD. Parameters ---------- arr: np.array of shape (n_samples, n_features) Data matrix n_components: int, default=20 Number of components to keep randomized: bool, default=False Whether to use randomized SVD n_runs: int, default=1 Run multiple times and take the realization with the lowest Frobenious reconstruction error Returns ------- arr: array_like of shape (n_samples, n_features) Rank-n_comopnents approximation of the input arr. """ if n_components is None: return arr u, s, vh = robust_svd(arr, n_components=n_components, randomized=randomized, n_runs=n_runs) return u @ np.diag(s) @ vh
[docs]def svd_embedding(arr, n_components=20, randomized=False, n_runs=1): """ Compute rank-n_components SVD embeddings of arr. Parameters ---------- arr: np.array of shape (n_samples, n_features) Data matrix n_components: int, default=20 Number of components to keep randomized: bool, default=False Whether to use randomized SVD n_runs: int, default=1 Run multiple times and take the realization with the lowest Frobenious reconstruction error Returns ------- embeddings: array_like of shape (n_samples, n_components) Rank-n_comopnents SVD embedding of arr. """ if n_components is None: return arr u, s, vh = robust_svd(arr, n_components=n_components, randomized=randomized, n_runs=n_runs) return u @ np.diag(s)
[docs]def cdist_correlation(arr1, arr2): """Calculate pair-wise 1 - Pearson correlation between X and Y. Parameters ---------- arr1: np.array of shape (n_samples1, n_features) First dataset. arr2: np.array of shape (n_samples2, n_features) Second dataset. Returns ------- array-like of shape (n_samples1, n_samples2) The (i, j)-th entry is 1 - Pearson correlation between i-th row of arr1 and j-th row of arr2. """ n, p = arr1.shape m, p2 = arr2.shape assert p2 == p arr1 = (arr1.T - np.mean(arr1, axis=1)).T arr2 = (arr2.T - np.mean(arr2, axis=1)).T arr1 = (arr1.T / np.sqrt(1e-6 + np.sum(arr1 ** 2, axis=1))).T arr2 = (arr2.T / np.sqrt(1e-6 + np.sum(arr2 ** 2, axis=1))).T return 1 - arr1 @ arr2.T
[docs]def pearson_correlation(arr1, arr2): """Calculate the vector of pearson correlations between each row of arr1 and arr2. Parameters ---------- arr1: np.array of shape (n_samples, n_features) First dataset. arr2: np.array of shape (n_samples, n_features) Second dataset. Returns ------- np.array of shape (n_samples,), the i-th entry is the pearson correlation between arr1[i, :] and arr2[i, :]. """ n, p = arr1.shape m, p2 = arr2.shape assert n == m and p2 == p arr1 = (arr1.T - np.mean(arr1, axis=1)).T arr2 = (arr2.T - np.mean(arr2, axis=1)).T arr1 = (arr1.T / np.sqrt(1e-6 + np.sum(arr1 ** 2, axis=1))).T arr2 = (arr2.T / np.sqrt(1e-6 + np.sum(arr2 ** 2, axis=1))).T return np.sum(arr1 * arr2, axis=1)
[docs]def filter_bad_matches(matching, filter_prop=0.1): """ Filter bad matches according to the distances of matched pairs. Parameters ---------- matching: list rows, cols, vals = init_matching, where each matched pair is (rows[i], cols[i]), and their distance is vals[i] filter_prop: float Matched pairs with distance in top filter_prop are discarded Returns ------- rows, cols, vals: list Each matched pair of rows[i], cols[i], their distance is vals[i] """ init_rows, init_cols, init_vals = matching thresh = np.quantile(init_vals, 1 - filter_prop) rows = [] cols = [] vals = [] for i, j, val in zip(init_rows, init_cols, init_vals): if val < thresh: rows.append(i) cols.append(j) vals.append(val) return np.array(rows, dtype=np.int32), np.array(cols, dtype=np.int32), np.array(vals, dtype=np.float32)
[docs]def cca_embedding(arr1, arr2, init_matching, filter_prop, n_components, max_iter=2000): """ Filter bad matched pairs, align arr1 and arr2 using init_matching, fit CCA, and get CCA embeddings of arr1 and arr2. Parameters ---------- arr1: np.ndarray of shape (n_samples1, n_features1) The first data matrix arr2: np.ndarray of shape (n_samples2, n_features2) The second data matrix init_matching: list rows, cols, vals = init_matching, where each matched pair is (rows[i], cols[i]), and their distance is vals[i] filter_prop: float Matched pairs with distance in top filter_prop are discarded when fitting CCA n_components: int Number of components to keep when fitting CCA max_iter: int, default=2000 Maximum number of iterations for CCA Returns ------- arr1_cca: np.array of shape (n_samples1, n_components) arr2_cca: np.array of shape (n_samples2, n_components) canonical_correlations: np.array of shape (n_components,) """ # filter bad matched pairs arr1_indices, arr2_indices, _ = filter_bad_matches(init_matching, filter_prop) # align arr1_aligned = arr1[arr1_indices, :] arr2_aligned = arr2[arr2_indices, :] # cca cca = CCA(n_components=n_components, max_iter=max_iter) cca.fit(arr1_aligned, arr2_aligned) arr1_aligned_cca, arr2_aligned_cca = cca.transform(arr1_aligned, arr2_aligned) arr1_aligned_cca = center_scale(arr1_aligned_cca) arr2_aligned_cca = center_scale(arr2_aligned_cca) canonical_correlations = np.corrcoef( arr1_aligned_cca, arr2_aligned_cca, rowvar=False).diagonal(offset=n_components) arr1_cca, arr2_cca = cca.transform(arr1, arr2) arr1_cca = center_scale(arr1_cca) arr2_cca = center_scale(arr2_cca) return arr1_cca, arr2_cca, canonical_correlations
[docs]def process_count_data(arr, target_sum=1e4, min_mean=0.0125, max_mean=3, min_disp=0.5, max_value=10): """ Process count data according to scanpy pipeline. Parameters ---------- arr: np.array of shape (n_samples, n_features) Data matrix target_sum: float, default=1e4 Parameter in sc.pp.normalize_total min_mean: float, default=0.0125 Parameter in sc.pp.higly_variable_genes max_mean: float, default=3 Parameter in sc.pp.higly_variable_genes min_disp: float, default=0.5 Parameter in sc.pp.higly_variable_genes max_value: float, default=10 Parameter in sc.pp.scale Returns ------- An np.array representing the processed version of arr """ adata = ad.AnnData(arr, dtype=np.float32) sc.pp.normalize_total(adata, target_sum=target_sum) sc.pp.log1p(adata) sc.pp.highly_variable_genes(adata, min_mean=min_mean, max_mean=max_mean, min_disp=min_disp) adata = adata[:, adata.var.highly_variable].copy() sc.pp.scale(adata, max_value=max_value) return adata.X