Source code for zoom.sc_tool

# -*- coding: utf-8 -*-
"""
Functionality for preprocessing scRNA-seq data.
"""

import scanpy as sc
import anndata as ad
import numpy as np
import pandas as pd
import gseapy as gp
import os
from scipy.stats import ttest_1samp
from statsmodels.stats.multitest import multipletests
from scipy.stats import gmean, rankdata
from multiprocessing import Pool
from tqdm import tqdm
import scipy.sparse as sp
from joblib import Parallel, delayed
import subprocess
from typing import (Tuple, Union, Optional, Dict, Any)
from data_loader import load_sc, load_df

[docs] def preprocess( adata: ad.AnnData, QC: bool, min_genes: int, min_cells: int, d: int, expression: pd.DataFrame, DS: Union[os.PathLike, pd.DataFrame], ) -> ad.AnnData: """ Calculates the gene expression rank for each cell. Parameters ---------- adata : ad.AnnData The AnnData object containing expression data. - `adata.X` should contain the raw or normalized expression data. QC : bool If True, filter low-quality genes and cells through scanpy - sc.pp.filter_cells(adata, min_genes=min_genes) - sc.pp.filter_genes(adata, min_cells=min_cells) d : int Number of nearest neighbors for each cell. expression : pd.DataFrame The AHBA gene expression matrix. DS : pd.DataFrame Gene-level differential stability(DS). Returns ------- adata : ad.AnnData The AnnData object containing common genes only and gene-level differential stability information. expression : pd.DataFrame The AHBA gene expression containing common genes only. """ # Load scRNA-seq data # adata = load_sc(adata,flag_sparse=True) # Filter low-quality genes and cells if QC: sc.pp.filter_cells(adata, min_genes=min_genes) sc.pp.filter_genes(adata, min_cells=min_cells) # Find nearest neighbors for each cell sc.pp.neighbors(adata, n_pcs=30, n_neighbors=d) # Keep commnon genes between scRNA-seq data and AHBA sc_genes = list(adata.var_names) ahba_genes = list(expression.column) common_genes = [g for g in ahba_genes if g in set(sc_genes)] expression = expression.loc[:, common_genes] DS = load_df(DS) DS = DS.loc[common_genes] adata = adata[:, common_genes].copy() # Add DS on the AnnData object adata.uns['GENE_STATS'] = DS return expression, adata
def _rank_cell(data): """Help function for ranking single-cell""" return rankdata(data, method="average").astype(np.float32)
[docs] def rank_expression(adata: ad.AnnData) -> ad.AnnData: """ Calculates the gene expression rank for each cell. Parameters ---------- adata : ad.AnnData The AnnData object containing expression data. Returns ------- adata : ad.AnnData The AnnData object with a new layer `adata.layers['rank']` containing the gene expression rank matrix. """ expr_mtx = adata.X.toarray() n_cells = adata.n_obs # Parallel computation with Pool() as pool: results = list(tqdm(pool.imap(_rank_cell, expr_mtx), total=n_cells, desc="Computing ranks per cell")) adata.layers['rank'] = np.vstack(results) return adata
[docs] def compute_gss( adata: ad.AnnData, n_jobs: int ) -> ad.AnnData: """ Calculates the gene expression rank for each cell. Parameters ---------- adata : ad.AnnData The AnnData object containing expression data. gss_limit : float Allowed maximum GSS value to avoid over-representation. n_jobs : int Number of cores for parallel computation. Returns ------- adata : ad.AnnData The AnnData object with a new layer `adata.layers['gss']` containing the gene specificity scores. References ---------- Song, L., Chen, W., Hou, J., Guo, M. & Yang, J. Spatially resolved mapping of cells associated with human complex traits. Nature 641 932-941 (2025). """ # Compute geometric mean of gene ranks rank_mtx = adata.layers["rank"].astype(np.float32) gm_all = gmean(rank_mtx, axis=0).reshape(-1,1) # Compute the proportion of gene expression in cells adata_X_bool = adata.X.astype(bool) frac_all = (np.asarray(adata_X_bool.sum(axis=0)).flatten()).reshape(-1,1)/adata_X_bool.shape[0] # Find nearest neighbors for each cell D = adata.obsp["distances"] indptr = D.indptr.astype(np.int64) indices = D.indices.astype(np.int64) # Help function for computing gene specificity scores def compute_gss_i(i, rank_mtx, adata_X_bool, indptr, indices, gm_all, frac_all): # Find nearest neighbors start = indptr[i] end = indptr[i+1] d = end-start-1 nn_idx = indices[start:end] # Compute mean ranks and expression fraction on microdomain gm_sub = gmean(rank_mtx[nn_idx,:], axis=0).reshape(-1,1) frac_sub = (np.asarray(adata_X_bool[nn_idx].sum(axis=0)).flatten()).reshape(-1,1)/(d+1) # Compute gene specificity score spcy = gm_sub/gm_all # Enforce sparsity and filter outlier spcy[frac_sub<frac_all] = 0 spcy[spcy<1] = 0 # Exponential power scale spcy = np.exp(np.square(spcy))-1 return spcy.ravel() # Parallel computation results = Parallel(n_jobs=n_jobs)( delayed(compute_gss_i)(i,rank_mtx,adata_X_bool,indptr,indices,gm_all,frac_all) for i in tqdm(range(adata.shape[0]), desc="Computing gene specificity scores (GSS)") ) gss_mtx = np.vstack(results) gss_mtx = sp.csr_matrix(gss_mtx) adata.layers["gss"] = gss_mtx # Add GSS statistics to AnnData object gss_max = adata.layers["gss"].max(axis=0) gss_max = gss_max.toarray().ravel() adata.uns['GENE_STATS']["gss_max"] = gss_max del adata.layers['rank'] return adata
[docs] def select_ctrl( weight_perm: pd.DataFrame, sign_perm: pd.DataFrame, gene_rep: pd.DataFrame, weight_key: str, gene_stats: pd.DataFrame, qbin_counts: dict, direction: bool, n_jobs: int ) -> Tuple[dict, dict]: """ Calculates the gene expression rank for each cell. Parameters ---------- weight_perm : pd.DataFrame Gene weights in permutation tests. sign_perm : pd.DataFrame Signed gene weights in permutation tests. gene_rep : pd.DataFrame Results for PLS-R, must contain column `weight_key`. weight_key: str Column name indicating the original gene weights for scoring. gene_stat : pd.DataFrame Gene statistics of scRNA-seq dataset. qbin_counts : dict Dictory of gene counts in each bin. direction : bool If True, find gene set relevant to the positive direction of given SBP else negative direction. n_jobs : int Number of cores used for parallel computation. Returns ------- dic_ctrl_list : dict Control gene sets selected based on spatial permutation tests. dic_ctrl_weight : dict Corresponding gene weights of control gene sets. References ---------- Fulcher, B. D., Arnatkeviciute, A. & Fornito, A. Overcoming false-positive gene-category enrichment in the analysis of spatially resolved transcriptomic brain atlas data. Nat. Commun. 12, 2669 (2021). """ # Extract neccessary information n_perm = weight_perm.shape[1] weight_perm_vals = weight_perm.values sign_perm_vals = sign_perm.values cond_pos = sign_perm_vals > 0 cond_neg = sign_perm_vals < 0 gene_idx = weight_perm.index # Help function for parallel computation def select_ctrl_p(p): weight_p = weight_perm_vals[:, p] sign_p = sign_perm_vals[:, p] # Substitute the pth column with non-permutated weight weight_perm_vals_p = weight_perm_vals.copy() weight_perm_vals_p[:, p] = gene_rep[weight_key].values ge_mtx = weight_perm_vals_p > weight_p[:, None] # Compute p-values pos_mask = sign_p > 0 neg_mask = sign_p < 0 counts = np.zeros(weight_p.shape[0], dtype=int) if pos_mask.any(): counts[pos_mask] = np.sum(ge_mtx[pos_mask,:]&cond_pos[pos_mask,:],axis=1) if neg_mask.any(): counts[neg_mask] = np.sum(ge_mtx[neg_mask,:]&cond_neg[neg_mask,:],axis=1) p_perm_p = counts/float(n_perm) # Construct data frame for convenience gene_rep_p = pd.DataFrame({ 'Sign': sign_p, weight_key: weight_p, 'p_perm_p': p_perm_p }, index=gene_idx) # Filter and rank genes gene_rep_p = gene_rep_p[gene_rep_p['Sign']>0] if direction else gene_rep_p[gene_rep_p['Sign']<0] gene_rep_p = gene_rep_p.sort_values(by=['p_perm_p', weight_key], ascending=[True, False]) ctrl_list = [] for qbin in qbin_counts.keys(): qbin_genes = list(gene_stats[gene_stats['qbin']==qbin].index) gene_rep_p_ = gene_rep_p[gene_rep_p.index.isin(qbin_genes)] qbin_ctrl = list(gene_rep_p_.head(qbin_counts[qbin]).index) ctrl_list = ctrl_list + qbin_ctrl ctrl_weight = list(gene_rep_p.loc[ctrl_list][weight_key]) return p, ctrl_list, ctrl_weight # Parallel computation results = Parallel(n_jobs=n_jobs)( delayed(select_ctrl_p)(p) for p in range(n_perm) ) dic_ctrl_list = {} dic_ctrl_weight = {} for p, genes, weights in results: dic_ctrl_list[p] = genes dic_ctrl_weight[p] = weights return dic_ctrl_list, dic_ctrl_weight
def _compute_raw_score( adata: ad.AnnData, gene_list: list, gene_weight: list, weight_opt: str ) -> Tuple[np.ndarray, np.ndarray]: """ Compute raw enrichment scores. And this function is totally adopted from scDRS. Parameters ---------- adata : ad.AnnData The AnnData object containing expression data. - `adata.X` should have underwent gene specificity score normalization. - `adata.uns['GENE_STATS']` should be present. gene_list : list SBP-relevant gene list. gene_weight : list Gene weights for genes in the gene_list. weight_opt : str Gene statistics for re-weighting genes, must be present in `adata.uns ['GENE_STATS']`. In our study, we use differential stability to down-weight genes with instable spatial expression pattern. Returns ------- v_raw_score : np.ndarray Raw scores of shape (n_cell,). v_score_weight : np.ndarray Gene weights of shape (gene_size,). References ---------- Zhang, M. J. et al. Polygenic enrichment distinguishes disease associations of individual cells in single-cell RNA-seq data. Nat. Genet. 54, 1572-1580 (2022). """ # Re-weight and normalize gene weights gene_stats = adata.uns["GENE_STATS"] v_score_weight = gene_stats.loc[gene_list, weight_opt].values v_score_weight = v_score_weight * np.array(gene_weight) v_score_weight = v_score_weight/v_score_weight.sum() # Compute raw enrichment scores gene_idx = adata.var_names.get_indexer(gene_list) gene_idx = gene_idx[gene_idx >= 0] v_raw_score = adata.X[:,gene_idx].dot(v_score_weight).reshape([-1]) return v_raw_score, v_score_weight def _correct_background( v_raw_score: np.ndarray, mat_ctrl_raw_score: np.ndarray, v_var_ratio_c2t: np.ndarray )-> Tuple[np.ndarray, np.ndarray]: """ Cell-wise and gene-wise background correction. And this function is totally adopted from scDRS. Parameters ---- v_raw_score : np.ndarray Raw enrichment scores of shape (n_cell,). mat_ctrl_raw_score : np.ndarray Raw control enrichment scores of shape (n_cell,n_perm). v_var_ratio_c2t : np.ndarray Ratio of independent variance between control scores and SBP-relevant score, of shape (n_perm). Returns ------- v_norm_score : np.ndarray Normalized disease score of shape (n_cell,) mat_ctrl_norm_score : np.ndarray Normalized control scores of shape (n_cell,n_ctrl). References ---------- Zhang, M. J. et al. Polygenic enrichment distinguishes disease associations of individual cells in single-cell RNA-seq data. Nat. Genet. 54, 1572-1580 (2022). """ # Zero-values are assigned the smallest values at the end ind_zero_score = v_raw_score == 0 ind_zero_ctrl_score = mat_ctrl_raw_score == 0 # First gene set alignment: mean 0 and same independent variance v_raw_score = v_raw_score - v_raw_score.mean() mat_ctrl_raw_score = mat_ctrl_raw_score - mat_ctrl_raw_score.mean(axis=0) mat_ctrl_raw_score = mat_ctrl_raw_score / np.sqrt(v_var_ratio_c2t) # Cell-wise standardization v_mean = mat_ctrl_raw_score.mean(axis=1) v_std = mat_ctrl_raw_score.std(axis=1) v_norm_score = v_raw_score.copy() v_norm_score = (v_norm_score - v_mean) / v_std mat_ctrl_norm_score = ((mat_ctrl_raw_score.T - v_mean) / v_std).T # Second gene set alignment: mean 0 v_norm_score = v_norm_score - v_norm_score.mean() mat_ctrl_norm_score = mat_ctrl_norm_score - mat_ctrl_norm_score.mean(axis=0) # Set cells with raw_score=0 to the minimum norm_score value norm_score_min = min(v_norm_score.min(), mat_ctrl_norm_score.min()) v_norm_score[ind_zero_score] = norm_score_min - 1e-3 mat_ctrl_norm_score[ind_zero_ctrl_score] = norm_score_min return v_norm_score, mat_ctrl_norm_score def _get_p_from_empi_null( v_t: np.ndarray, v_t_null: np.ndarray ) -> np.ndarray: """ Compute p-value from empirical null. Parameters ---- v_t : np.ndarray Observed score of shape (M,). v_t_null : np.ndarray Null scores of shape (N,). Returns ------- v_p: : np.ndarray P-value for each element in v_t of shape (M,). References ---------- Zhang, M. J. et al. Polygenic enrichment distinguishes disease associations of individual cells in single-cell RNA-seq data. Nat. Genet. 54, 1572-1580 (2022). """ v_t = np.array(v_t) v_t_null = np.array(v_t_null) v_t_null = np.sort(v_t_null) v_pos = np.searchsorted(v_t_null, v_t, side="left") v_p = (v_t_null.shape[0] - v_pos + 1) / (v_t_null.shape[0] + 1) return v_p
[docs] def score_cell_zoom( adata: ad.AnnData, gene_rep: pd.DataFrame, weight_key: str, sign_key: str, p_perm_key: str, weight_perm: pd.DataFrame, sign_perm: pd.DataFrame, direction: bool, gene_size: int, ctrl_match_key: str, weight_opt: str, n_genebin: int, return_ctrl_raw_score: bool, return_ctrl_norm_score: bool, n_jobs: int ) -> pd.DataFrame: """ Calculates single-cell SBP-relevant enrichment scores. Parameters ---------- adata : ad.AnnData The AnnData object containing expression data. - `adata.X` should have underwent gene specificity score normalization. - `adata.uns['GENE_STATS']` should be present. gene_rep : pd.DataFrame Results for PLS-R, must contain following columns: - weight_key: Column name indicating the original gene weights for scoring. - sign_key: Column name indicating the direction of genes. - p_perm_key: Column name indicating the permutation p-values. weight_perm : pd.DataFrame Gene weights in permutation tests. sign_perm : pd.DataFrame Signed gene weights in permutation tests. direction : bool If True, find gene set relevant to the positive direction of given SBP else negative direction. gene_size : int Number of genes used for scoring. ctrl_match_key : str Gene-level statistic used for matching control and SBP-relevant genes, must be present in `adata.uns['GENE_STATS']`. weight_opt : str Gene-level statistic used for re-weighting SBP-relevant genes, must be present in `adata.uns['GENE_STATS']`. n_genebin : int Number of bins for dividing genes by `ctrl_match_key`. return_ctrl_raw_score : bool If True, return raw scores for control gene sets. return_ctrl_norm_score : bool If True, return normalized scores for control gene sets. n_jobs : int Number of cores used for parallel computation. Returns ------- df_res : pd.DataFrame Results of single-cell SBP-relevant enrichment scores and other statistics. - raw_socre: Raw enrichment scores. - norm_score: Normalized enrichment scores. - p_perm: Single-cell level p-values estimated on spatial permutation tests of the same cell. - pval: Pooled p-value estimated on all permutation tests, all cells References ---------- .. [1] Zhang, M. J. et al. Polygenic enrichment distinguishes disease associations of individual cells in single-cell RNA-seq data. Nat. Genet. 54, 1572-1580 (2022). .. [2] Fulcher, B. D., Arnatkeviciute, A. & Fornito, A. Overcoming false-positive gene-category enrichment in the analysis of spatially resolved transcriptomic brain atlas data. Nat. Commun. 12, 2669 (2021). """ n_cell, n_gene = adata.shape # Select genes most relevant to SBP of interest gene_rep_ = gene_rep.copy() gene_rep_ = gene_rep_.sort_values(by=[p_perm_key,weight_key], ascending=[True, False]) gene_rep_ = gene_rep_[gene_rep_[sign_key]>0] if direction else gene_rep_[gene_rep_[sign_key]<0] gene_rep_ = gene_rep_.head(gene_size) gene_list = list(gene_rep_.index) gene_weight = gene_rep_[weight_key].to_numpy() # Check preprocessing information assert ( "GENE_STATS" in adata.uns ), "adata.uns['GENE_STATS'] not found, run `preprocess` first" gene_stats_set_expect = {"DS", "gss_max"} gene_stats_set = set(adata.uns["GENE_STATS"]) assert ( len(gene_stats_set_expect - gene_stats_set) == 0 ), "One of 'DS', 'gss_max' not found in adata.uns['GENE_STATS'], run `preprocess` and `compute_gss` first" # Check if ctrl_match_key and weight_opt is in GENE_STATS assert ctrl_match_key in adata.uns["GENE_STATS"], ( "ctrl_match_key=%s not found in adata.uns['GENE_STATS']" % ctrl_match_key ) assert weight_opt in adata.uns["GENE_STATS"], ( "ctrl_match_key=%s not found in adata.uns['GENE_STATS']" % weight_opt ) # Divide genes into bins according to `ctrl_match_key` gene_stats = adata.uns["GENE_STATS"].copy() gene_stats["qbin"] = pd.qcut( gene_stats[ctrl_match_key], q=n_genebin, labels=False, duplicates="drop" ) gene_stats_SBP = gene_stats.loc[gene_list] qbin_counts = dict(gene_stats_SBP["qbin"].value_counts()) if gene_weight is not None: gene_weight = list(gene_weight) else: gene_weight = [1]*len(gene_list) # Select control gene sets dic_ctrl_list, dic_ctrl_weight = select_ctrl( weight_perm, sign_perm, gene_rep, weight_key, gene_stats, qbin_counts, direction, n_jobs ) # Compute raw scores v_raw_score, v_score_weight = _compute_raw_score( adata, gene_list, gene_weight, weight_opt ) # Compute control scores with parallel computation n_perm = weight_perm.shape[1] mat_ctrl_raw_score = np.zeros([n_cell, n_perm]) mat_ctrl_weight = np.zeros([len(gene_list), n_perm]) results = Parallel(n_jobs=n_jobs)( delayed(_compute_raw_score)(adata, dic_ctrl_list[p], dic_ctrl_weight[p], weight_opt) for p in tqdm(range(n_perm),desc="Computing control scores") ) # Extract results for p, (v_ctrl_raw_score, v_ctrl_weight) in enumerate(results): mat_ctrl_raw_score[:, p] = v_ctrl_raw_score mat_ctrl_weight[:, p] = v_ctrl_weight # Compute ratio of independent variance v_var_ratio_c2t = np.ones(n_perm) for p in range(n_perm): v_var_ratio_c2t[p] = (mat_ctrl_weight[:,p]**2).sum() v_var_ratio_c2t /= (v_score_weight**2).sum() # Compute normalized scores v_norm_score, mat_ctrl_norm_score = _correct_background( v_raw_score, mat_ctrl_raw_score, v_var_ratio_c2t, ) # Get p-values p_perm = (1+(mat_ctrl_norm_score.T>=v_norm_score).sum(axis=0))/(1+n_perm) pooled_p = _get_p_from_empi_null(v_norm_score,mat_ctrl_norm_score.flatten()) # Return result dic_res = { "raw_score": v_raw_score, "norm_score": v_norm_score, "p_perm": p_perm, "pval": pooled_p } if return_ctrl_raw_score: for i in range(n_perm): dic_res["ctrl_raw_score_%d" % i] = mat_ctrl_raw_score[:,i] if return_ctrl_norm_score: for i in range(n_perm): dic_res["ctrl_norm_score_%d" % i] = mat_ctrl_norm_score[:,i] df_res = pd.DataFrame(index=adata.obs.index, data=dic_res, dtype=np.float32) return df_res
[docs] def group_bh( adata: ad.AnnData, df_res: pd.DataFrame, pval: str, group: str, alpha: float ) -> pd.DataFrame: """ Perform group Benjamini–Hochberg FDR correction. Parameters ---------- adata : ad.AnnData The AnnData object containing expression data. df_res : pd.DataFrame Results of single-cell SBP-relevant enrichment scores and other statistics. pval : str Column name indicating the cell-level p-values, must be present in `df_res`. group : str Column name indicating the cell groups, based on which p-values are adjusted, must be present in `adata.obs`. alpha : float Significance level for group Benjamini–Hochberg FDR correction. Returns ------- df_res: pd.DataFrame Results with adjusted p-values, stored as `p_adj`. References ---------- Hu, J. X., Zhao, H. & Zhou, H. H. False discovery rate control with groups. J. Am. Stat. Assoc. 105, 1215-1227 (2010). """ # Initialization N = df_res.shape[0] df_res[group] = adata.obs[group] pi_0 = sum(df_res[pval]>alpha)/N # Get cell counts for each group group_counts = dict(df_res[group].value_counts()) group_counts_sig = dict(df_res[df_res[pval]<alpha][group].value_counts()) # Compute weighted p-values for each group result_pw = pd.Series(index=df_res.index, dtype=float, name='p_w') for g, df_grp in df_res.groupby(group, observed=False): g_count = group_counts[g] g_count_sig = group_counts_sig[g] if g_count_sig==0: w_g = 10000 else: w_g = (g_count - g_count_sig)/g_count_sig p_wg = (df_grp[pval].values)*w_g result_pw.loc[df_grp.index] = p_wg # Group Benjamini–Hochberg FDR correction df_res['p_w'] = result_pw df_res = df_res.sort_values(by='p_w') df_res['p_rank'] = range(1,N+1) df_res['p_adj'] = (df_res['p_w']*N*(1-pi_0))/df_res['p_rank'] df_res["p_adj"] = df_res["p_adj"][::-1].cummin()[::-1] df_res[f'p_fdr{alpha}'] = np.where(df_res['p_adj']<alpha, 'True', 'False') df_res = df_res.reindex(adata.obs.index) return df_res
[docs] def downstream_DEG( adata: ad.AnnData, df_res: pd.DataFrame, alpha: float, min_score: float, group: str, rank_method: str, max_iter: int ) -> ad.AnnData: """ Calculates differentially expressed genes (DEG) for cell groups containing cells significantly relevant to SBP. Parameters ---------- adata : ad.AnnData The AnnData object containing expression data. - `adata.X` should have underwent gene specificity score normalization. df_res : pd.DataFrame Results of single-cell SBP-relevant enrichment scores and other statistics. alpha : float Significance level indicating significant cell subpopulations. min_score : float Minimum enrichment score for significant cell subpopulations. group : str Column name indicating the cell groups, based on which significant cells and non-significant cells are compared, must be present in `adata.obs`. rank_method : {'t-test','t-test_overestim_var','wilcoxon','logreg'} Methods how scanpy calculate differentially expressed genes. max_iter : int Maximum iteration for DEG calculation, only used for `logreg`. Returns ------- adata : ad.AnnData The AnnData object with `adata.uns['DEG']`, where the DEG results for each cell group are stored. """ # If no significant cell is found, skip this function if sum(df_res[f'p_fdr{alpha}']=='True')==0: print("No significant cell is found.") else: adata.uns['DEG'] = {} # Find pre-defined cell groups containing significant cells adata.obs['norm_score'] = df_res['norm_score'] adata.obs[f'p_fdr{alpha}'] = df_res[f'p_fdr{alpha}'] adata.obs["Sig"] = np.where( (adata.obs["norm_score"]>min_score)&(adata.obs[f"p_fdr{alpha}"]=="True"), "Significant", "Non-significant" ) sig_groups = list(adata.obs[adata.obs['Sig']=='Significant'][group].unique()) # Compute differentially expressed genes for each selected group for g in sig_groups: adata_sub = adata[adata.obs[group]==g].copy() if sum(adata_sub.obs["Sig"]=="Significant") < 2: continue if rank_method == 'logreg': sc.tl.rank_genes_groups( adata_sub, groupby='Sig', method=rank_method, max_iter=max_iter ) else: sc.tl.rank_genes_groups( adata_sub, groupby='Sig', method=rank_method ) gene_rnk = adata_sub.uns["rank_genes_groups"]["names"] gene_rnk = np.array([t[0] for t in gene_rnk]) gene_rnk = pd.DataFrame(adata_sub.uns["rank_genes_groups"]["scores"], index=gene_rnk) adata.uns['DEG'][g] = gene_rnk return adata
[docs] def downstream_region_enrich( adata: ad.AnnData, df_res: pd.DataFrame, alpha: float, min_score: float, group: int, region_col: str, batch_col: str, dataset: list, indvd_col: Optional[str]=None, ) -> ad.AnnData: """ Computes region enrichment for cell groups containing significant cells based on predefined statistical results. Parameters ---------- adata : ad.AnnData The AnnData object containing expression data. - `adata.X` should have underwent gene specificity score normalization. df_res : pd.DataFrame Results of single-cell SBP-relevant enrichment scores and other statistics. alpha : float Significance level indicating significant cell subpopulations. min_score : float Minimum enrichment score for significant cell subpopulations. group : str Column name indicating the cell groups, based on which significant cells and non-significant cells are compared, must be present in `adata.obs`. region_col : str Column name indicating the brain region the cell came from. must be present in `adata.obs`. batch_col : str Column name indicating the biological batch the cell came from. must be present in `adata.obs`. dataset : list List of subset of dataset to perform region enrichment analysis on. indvd_col : {None, str}, optional Column name indicating biological replicates, must be present in `adata.obs`. If provided, region enrichment score will be presented as mean and standard error across biological replicates. Returns ------- adata : ad.AnnData The AnnData object with `adata.uns['Region Enrichment']`, where the region enrichment analysis results for each cell group are stored. References ---------- Yang, L. et al. Projection-TAGs enable multiplex projection tracing and multi-modal profiling of projection neurons. Nat. Commun. 16, 5557 (2025). """ # If no significant cell is found, skip this function if sum(df_res[f'p_fdr{alpha}']=='True')==0: print("No significant cell is found.") else: adata.uns['Region Enrichment'] = {} # Find pre-defined cell groups containing significant cells adata.obs['norm_score'] = df_res['norm_score'] adata.obs[f'p_fdr{alpha}'] = df_res[f'p_fdr{alpha}'] adata.obs["Sig"] = np.where( (adata.obs["norm_score"]>min_score)&(adata.obs[f"p_fdr{alpha}"]=="True"), "Significant", "Non-significant" ) sig_groups = list(adata.obs[adata.obs['Sig']=='Significant'][group].unique()) # Compute region enrichment for each selected group for g in sig_groups: region_enrich_g = {} df_g = adata.obs[(adata.obs[group]==g) & (adata.obs[batch_col].isin(dataset))] if indvd_col is None: counts_all = dict(df_g[region_col].value_counts()) prop_all = {k:v/sum(counts_all.values()) for k,v in counts_all.items()} counts_sig = dict(df_g[df_g['Sig']=='Significant'][region_col].value_counts()) prop_sig = {k:v/sum(counts_sig.values()) for k,v in counts_sig.items()} for r in counts_all.keys(): if prop_all[r] != 0: enrich = prop_sig[r]/prop_all[r] region_enrich_g[r] = enrich region_enrich_g = pd.DataFrame( region_enrich_g.values(), region_enrich_g.keys(), columns=['Region Enrichment'] ) else: region_enrich_g = {} indvds = list(df_g[indvd_col].unique()) # Compute region enrichment for all individuals, respectively for i in indvds: region_enrich_g_i = {} df_g_i = df_g[df_g[indvd_col]==i] counts_all = dict(df_g_i[region_col].value_counts()) prop_all = {k:v/sum(counts_all.values()) for k,v in counts_all.items()} counts_sig = dict(df_g_i[df_g_i['Sig']=='Significant'][region_col].value_counts()) prop_sig = {k:v/sum(counts_sig.values()) for k,v in counts_sig.items()} for r in counts_all.keys(): if prop_all[r] != 0: enrich = prop_sig[r]/prop_all[r] region_enrich_g_i[r] = enrich region_enrich_g_i = pd.DataFrame( region_enrich_g_i.values(), region_enrich_g_i.keys(), columns=['Region Enrichment'] ) region_enrich_g[i] = region_enrich_g_i # Concatenate results from different individuals common_idx = set.intersection(*[set(df.index) for df in region_enrich_g.values()]) common_idx = sorted(common_idx) dfs = [df.loc[common_idx] for df in region_enrich_g.values()] region_enrich_g = pd.concat(dfs, axis=1) # Compute mean and standard error across individuals region_enrich_g["Mean"] = region_enrich_g.mean(axis=1) region_enrich_g["Std"] = region_enrich_g.std(axis=1) n = region_enrich_g.index[region_enrich_g.eq("Mean").any(axis=1)][0] region_enrich_g.iloc[:n, :].columns = [f"Indvd{i+1}" for i in range(n)] # Apply one-sample T-test def row_ttest(row): values = row[[f"Indvd{i+1}" for i in range(n)]].values values = np.log(values) + 1e-6 t_stat, p_two_tailed = ttest_1samp(values, popmean=0) if t_stat > 0: p_one_tailed = p_two_tailed/2 else: p_one_tailed = 1 - p_two_tailed/2 return p_one_tailed region_enrich_g["p-value"] = region_enrich_g.apply(row_ttest, axis=1) _, region_enrich_g["p-fdr"], _, _ = multipletests(region_enrich_g["p-value"], method="fdr_bh") region_enrich_g = region_enrich_g.reindex(order) adata.uns['Region Enrichment'][g] = region_enrich_g return adata
def run_hdWGCNA_py( R: str, pwd: os.PathLike, seurat_rds: os.PathLike, celltype_col: str, batch_col: str, reduction: str, default: bool, PARAMS: Optional[Dict[str, Any]]=None ) -> None: """ Run hdWGCNA and module preservation pipeline. Parameters ---------- R : str R version, load as `module load {R}` on terminal. pwd : os.PathLike File path containing the Seurat object of scRNA-seq data. seurat_rds : os.PathLike File name of the Seurat object of scRNA-seq data, must end with `.rds`. celltype_col : str Pre-defined cell type column in seurat_obj@meta.data. batch_col : str Pre-defined batch effect column in seurat_obj@meta.data. reduction : str The dimensionality reduction to perform KNN on. default : bool If True, run this pipeline with default setting, `PARAMS` must be provided else. PARAMS : dict Specify customed arguments, must contain following arguments: - k: Number of nearest neighbors to aggregate (default=50). - max_shared: The maximum number of cells to be shared across two meta-cells (default=15). - min_cells: The minimum number of cells in a particular group to construct meta-cells (default=100). - min_metacell: Minimum number of meta-cells a group must include (default=100). - deepSplit: The sensitivity of the dynamic tree cut algorithm in hierarchical clustering (default=4). - pamStage: Whether the Partitioning Around Medoids (PAM) refinement step is applied (default=FALSE). - detectCutHeight: The cut height threshold used during the module detection (default=0.995). - minModuleSize: The minimum number of genes required for a cluster to be considered a module (default=50). - mergeCutHeight: The threshold for merging modules based on the dissimilarity of their eigengenes (default=0.2). - n_permutations: Number of permutations for the module preservation test (default=500). - seed: Random seed for reproducibility (default=0). Returns ------- None. Results files will be saved during running process. References ---------- [1] Morabito, S., Reese, F., Rahimzadeh, N., Miyoshi, E. & Swarup, V. hdWGCNA identifies co-expression networks in high-dimensional transcriptomics data. Cell Rep. Methods 3, 100498 (2023). [2] Langfelder, P., Luo, R., Oldham, M. C. & Horvath, S. Is my network module preserved and reproducible? PloS Comput. Biol. 7, e1001057 (2011). Notes ----- To enable users to perform all operations exclusively through the Python interface when conditions permit, we introduce this function. We also acknowledge that running R code from Python provides no practical advantage; if further parameter configuration is required, we recommend using the R code for this part directly. """ # Fetch R code files (development stage) R_pwd = "/slurm/home/yrd/liaolab/nieshuyang/ZOOM/R" Metacell_R = f"{R_pwd}/Metacell.R" run_hdWGCNA_R = f"{R_pwd}/run_hdWGCNA.R" ModulePreservation_R = f"{R_pwd}/ModulePreservation.R" # Start up a shell to run command lines shell = subprocess.Popen(["/bin/bash"], stdin=subprocess.PIPE, text=True) # Run hdWGCNA pipeline and module preservation statistics on R with default setting. if default: shell.stdin.write(f""" module load {R} Rscript {Metacell_R} {pwd} {seurat_rds} {celltype_col} {batch_col} {reduction} \n Rscript {run_hdWGCNA_R} {pwd} {seurat_rds} {celltype_col} {batch_col} \n Rscript {ModulePreservation_R} {pwd} {seurat_rds} {celltype_col} {batch_col} \n """) shell.stdin.flush() else: k = PARAMS['k'] max_shared = PARAMS['max_shared'] min_cells = PARAMS['min_cells'] min_metacell = PARAMS['min_metacell'] deepSplit = PARAMS['deepSplit'] pamStage = PARAMS['pamStage'] detectCutHeight = PARAMS['detectCutHeight'] minModuleSize = PARAMS['minModuleSize'] mergeCutHeight = PARAMS['mergeCutHeight'] n_permutations = PARAMS['n_permutations'] seed = PARAMS['seed'] shell.stdin.write(f""" module load {R} Rscript {Metacell_R} {pwd} {seurat_rds} {celltype_col} {batch_col} {reduction} {k} {max_shared} {min_cells} {seed} \n Rscript {run_hdWGCNA_R} {pwd} {seurat_rds} {celltype_col} {batch_col} {min_metacell} {deepSplit} {pamStage} {detectCutHeight} {minModuleSize} {mergeCutHeight} {seed} \n Rscript {ModulePreservation_R} {pwd} {seurat_rds} {celltype_col} {batch_col} {n_permutations} {seed} \n """) shell.stdin.flush() # Run the shell shell.stdin.close() shell.wait() return None
[docs] def gsea_perm( gene_rep: pd.DataFrame, weight_key: str, sign_key: str, weight_perm: pd.DataFrame, sign_perm: pd.DataFrame, gene_sets: dict, min_size: int, max_size: int, one_sided: bool, n_jobs: int ) -> pd.DataFrame: """ Permform GSEA based on spatial permutation test. Parameters ---------- gene_rep : pd.DataFrame Results for PLS-R, must contain following columns: - weight_key: Column name indicating the original gene weights for scoring. - sign_key: Column name indicating the direction of genes. weight_perm : pd.DataFrame Gene weights in permutation tests. sign_perm : pd.DataFrame Signed gene weights in permutation tests. direction : bool If True, find gene set relevant to the positive direction of given SBP else negative direction. gene_sets : dict Gene sets for enrichment analysis, must be organized as {'Term1': [Gene1, Gene2,...],...} min_size & max_size : int, Minimum and maximum size of target gene set to be included in GSEA analysis one_sided: bool: If True, infer statistical significance via one-sided p-values. Else, use two-sided p-values. n_jobs : int Number of cores used for parallel computation. Returns ------- gsea_res : pd.DataFrame Results of spatial permutation test based GSEA. - index: Gene terms in `gene_sets`. - ES: Raw GSEA enrichment scores. - NES: Normalized GSEA enrichment scores. - p_perm: P-values inferred from spatial permutation test. References ---------- [1] Fulcher, B. D., Arnatkeviciute, A. & Fornito, A. Overcoming false-positive gene-category enrichment in the analysis of spatially resolved transcriptomic brain atlas data. Nat. Commun. 12, 2669 (2021). [2] Martins, D. et al. Imaging transcriptomics: convergent cellular, transcriptomic, and molecular neuroimaging signatures in the healthy adult human brain. Cell Rep. 37, 110173 (2021). [3] Fang, Z., Liu, X. & Peltz, G. GSEApy: a comprehensive package for performing gene set enrichment analysis in Python. Bioinformatics 39, btac757 (2023). Notes ----- Though we developed this function to link SBP-derived gene signatures to cell type- specific gene co-expression modules, this function can be equally applied to other biologically meaningful gene sets like GO, KEGG pathways, and so on. """ # Rank genes by `Weight` for GSEA df = pd.DataFrame({'Sign': gene_rep[sign_key], 'Weight': gene_rep[weight_key]}, index=gene_rep.index) df.loc[df['Sign']<0, 'Weight'] = df.loc[df['Sign']<0, 'Weight']*-1 df = df.sort_values(by = 'Weight', ascending=False) df_rnk = pd.DataFrame(df['Weight']) # GSEA analysis on original gene ranking gsea_res = gp.prerank( rnk=df_rnk, gene_sets=gene_sets, threads=-1, min_size=min_size, max_size=max_size, permutation_num=0, outdir=None, verbose=False ) # Dataframe format of results gsea_res = gsea_res.res2d gsea_res = gsea_res.set_index('Term') # Function for parallel permutation test def gsea_perm_p(p, sign_perm, weight_perm, gene_sets, min_size, max_size): df_p = pd.DataFrame({'Sign': sign_perm.iloc[:,p], 'Weight': weight_perm.iloc[:,p]}, index=gene_rep.index) df_p.loc[df_p['Sign']<0, 'Weight'] = df_p.loc[df_p['Sign']<0, 'Weight']*-1 df_p = df_p.sort_values(ascending=False, by = 'Weight') df_rnk_p = pd.DataFrame(df_p['Weight']) gsea_res_p = gp.prerank( rnk=df_rnk_p, gene_sets=gene_sets, threads=-1, min_size=min_size, max_size=max_size, permutation_num=0, outdir=None, verbose=False ) gsea_res_p = gsea_res_p.res2d gsea_res_p = gsea_res_p.set_index('Term') gsea_res_p = gsea_res_p.reindex(gsea_res.index) return gsea_res_p['ES'].values # Parallel computation n_perm = weight_perm.shape[1] results = Parallel(n_jobs=n_jobs)( delayed(gsea_perm_p)(p, sign_perm, weight_perm, gene_sets, min_size, max_size) for p in range(n_perm) ) GSEA_perm = pd.concat([pd.Series(res) for res in results], axis=1) GSEA_perm.index = gsea_res.index # Compute normalized enrichment score and p-value for term in gsea_res.index: ES = gsea_res.loc[term]['ES'] ES_perm = GSEA_perm.loc[term] if ES > 0: if one_sided: count = sum(ES_perm>ES) else: count = sum(abs(ES_perm)>abs(ES)) NES = ES/np.mean(ES_perm[ES_perm>0]) elif ES < 0: if one_sided: count = sum(ES_perm<ES) else: count = sum(abs(ES_perm)>abs(ES)) NES = -1*(ES/np.mean(ES_perm[ES_perm<0])) gsea_res.loc[term, 'NES'] = NES gsea_res.loc[term, 'p_perm'] = count/n_perm gsea_res = gsea_res.sort_values(by='p_perm') return gsea_res