Source code for zoom.pls_tool

# -*- coding: utf-8 -*-
"""
Functionality for performing cross-validation PLS-R and
traditional imaging-transcriptomics paradigm.
"""

import numpy as np
import pandas as pd
import scipy.stats as stats
from scipy.stats import norm
import sklearn
from tqdm import tqdm
from sklearn import model_selection, linear_model
from sklearn.cross_decomposition import PLSRegression
from sklearn.metrics import mean_squared_error
from joblib import Parallel, delayed
from sklearn import model_selection, linear_model
import multiprocessing
from typing import Union, Tuple

[docs] def optimal_component_eval( expression: pd.DataFrame, SBP: pd.DataFrame, ncomps: Union[np.ndarray, list], cv: int, seed: int ) -> Tuple[list, list, pd.DataFrame]: """ Evaluate optimal component for PLS-R through cross-validation strategy. Parameters ---------- expression : pd.DataFrame AHBA gene expression matrix. SBP : pd.DataFrame One-dimensional data frame of spatial brain phenotype. ncomps: np.ndarray or list Optimal component number candidates. cv : int Number of folds in cross-validation. seed : int Random seed to control the dataset split. Returns ------- best_comps & r : list Local optimal component numbers during the current dataset splition and the corresponding Pearson's correlation between the predicted SBP and actual SBP. preds : pd.DataFrame Predictions of cross-validation PLS-R. References ---------- Wang, Y. et al. Spatio-molecular profiles shape the human cerebellar hierarchy along the sensorimotor-association axis. Cell Rep. 43, 113770 (2024). """ # Initialization cv_inner = cv best_comps = [] r = [] preds = pd.DataFrame(np.zeros((len(SBP)))) preds.index = expression.index # Split samples with KFold split strategy sel = model_selection.KFold(n_splits=cv, shuffle=True, random_state=seed) sel_inner = model_selection.KFold(n_splits=cv_inner, shuffle=True, random_state=seed) for tr_ind, te_ind in sel.split(expression): mse = [] expression_cur = expression.iloc[tr_ind,:] for tr_i, te_i in sel_inner.split(expression_cur): mse.append([]) # Inner traing sets tr_rows = expression.index[tr_ind][tr_i] tr_set = pd.DataFrame(expression.loc[tr_rows,:], copy=True) tr_y = pd.DataFrame(SBP).loc[tr_rows,:] # Inner test sets te_rows = expression.index[tr_ind][te_i] te_set = pd.DataFrame(expression.loc[te_rows,:], copy=True) te_y = pd.DataFrame(SBP).loc[te_rows,:] # For all optimal component candidate, evaluate their performance for nc in ncomps: clf = PLSRegression(n_components=nc) pred_y = clf.fit(tr_set,tr_y).predict(te_set).flatten() mse[-1].append(mean_squared_error(te_y,pred_y)) # Pick current best component number mse = np.sum(mse,axis=0) nc = ncomps[np.argmin(mse)] best_comps.append(nc) #refit full model clf = PLSRegression(n_components=nc) # Training sets tr_rows = expression.index[tr_ind] tr_set = pd.DataFrame(expression.loc[tr_rows,:], copy=True) tr_y = pd.DataFrame(SBP).iloc[tr_ind,:] # Test sets te_rows = expression.index[te_ind] te_set = pd.DataFrame(expression.loc[te_rows,:], copy=True) te_y = pd.DataFrame(SBP).iloc[te_ind,:] mod = clf.fit(tr_set,tr_y) preds.loc[te_rows,:] = mod.predict(te_set) r.append(stats.pearsonr(mod.predict(te_set).flatten(), te_y.values.flatten())[0]) return best_comps,r,preds
def wrapper(args): return optimal_component_eval(*args)
[docs] def run_component_eval( expression: pd.DataFrame, SBP: pd.DataFrame, ncomps: Union[np.ndarray, list], cv: int, seed: int, repeats: int ) -> int: """ Evaluate optimal component for PLS-R through cross-validation strategy. Parameters ---------- expression : pd.DataFrame AHBA gene expression matrix. SBP : pd.DataFrame One-dimensional data frame of spatial brain phenotype. ncomps: np.ndarray or list Optimal component number candidates. cv : int Number of folds in cross-validation. seed : int Random seed to control the dataset split. repeats : int How many times should optimal component evaluation be run? Returns ------- best_comp : int The optimal component number for current PLS-R model. References ---------- Wang, Y. et al. Spatio-molecular profiles shape the human cerebellar hierarchy along the sensorimotor-association axis. Cell Rep. 43, 113770 (2024). """ # Initialization cores = multiprocessing.cpu_count() #pool = multiprocessing.Pool(processes=cores) tasks = [] results = [] # Evaluate optimal component number with parallel computation for x in range(1, repeats+1): current_seed = seed*x task = (expression, SBP, ncomps, cv, current_seed) tasks.append(task) #ans = pool.starmap(optimal_component_eval,tasks) with multiprocessing.Pool(processes=cores) as pool: ans = list(tqdm(pool.imap_unordered(wrapper, tasks), total=len(tasks))) for n in range(len(ans)): results.append(pd.DataFrame({ 'Best_comp':ans[n][0], 'Correlation':ans[n][1]})) # Pick optimal component number comp_stat = pd.concat(results, ignore_index=True) best_comp = comp_stat['Best_comp'].value_counts().index[0] return best_comp
[docs] def model_eval( expression: pd.DataFrame, SBP: pd.DataFrame, best_comp: int, cv: int, repeats: int, seed: int, n_jobs: int ) -> Tuple[pd.DataFrame, list]: """ Evaluate the final model performance for the PLS-R model. Parameters ---------- expression : pd.DataFrame AHBA gene expression matrix. SBP : pd.DataFrame One-dimensional data frame of spatial brain phenotype. best_comp : int The optimal component number for current PLS-R model. cv : int Number of folds. repeats : int How many times should model performance evaluation be run? seed : int Random seed to control the dataset split. n_jobs : int Number of cores used for parallel computation. Returns ------- preds_rep : pd.DataFrame The predicted SBP values across repeats. scores : list Model performances across repeats, as measured by Pearson's correlation. References ---------- Wang, Y. et al. Spatio-molecular profiles shape the human cerebellar hierarchy along the sensorimotor-association axis. Cell Rep. 43, 113770 (2024). """ clf = PLSRegression(best_comp) # Help function to compute model performance for each repeat def model_eval_i(i, expression, SBP, clf, cv, seed): # Initialization preds = np.zeros((len(SBP),)) sel = model_selection.KFold(n_splits=cv, shuffle=True, random_state=(seed*(i+1))) for fold, (tr_ind, te_ind) in enumerate(sel.split(expression)): # Training sets tr_set = expression.iloc[tr_ind] tr_y = SBP.iloc[tr_ind] # Test sets te_set = expression.iloc[te_ind] te_y = SBP.iloc[te_ind] predictions = clf.fit(tr_set, tr_y).predict(te_set).flatten() preds[te_ind] = predictions return preds # Running the parallel processing results = Parallel(n_jobs=n_jobs)( delayed(model_eval_i)(i, expression, SBP, clf, cv, seed) for i in range(repeats) ) # Extract predictions from results preds_rep = pd.DataFrame([result for result in results]).T preds_rep.columns = range(repeats) preds_rep.index = expression.index scores = [] for i in range(repeats): scores.append(stats.pearsonr(preds_rep.iloc[:,i].values, SBP.values[:,0])[0]) return preds_rep, scores
[docs] def pls_perm( expression: pd.DataFrame, SBP: pd.DataFrame, SBP_perm: pd.DataFrame, best_comp: int, scores: list, cv: int, seed: int, n_jobs: int ) -> Tuple[float, pd.DataFrame]: """ Perform spatial permutation test for PLS-R. Parameters ---------- expression : pd.DataFrame AHBA gene expression matrix. SBP : pd.DataFrame One-dimensional data frame of spatial brain phenotype. SBP_perm : pd.DataFrame Spatial autocorrelation-preserved null model for SBP. best_comp : int The optimal component number for current PLS-R model. scores : list Model performances across repeats, as measured by Pearson's correlation. cv : int Number of folds. seed : int Random seed to control the dataset split. n_jobs : int Number of cores used for parallel computation. Returns ------- psa : float P-value accounting for spatial autocorrelation. scores_perm : list Model performances across permutation tests, as measured by Pearson's correlation. References ---------- Wang, Y. et al. Spatio-molecular profiles shape the human cerebellar hierarchy along the sensorimotor-association axis. Cell Rep. 43, 113770 (2024). """ clf = PLSRegression(best_comp) # Help function to perform permutation test for each permutation def pls_perm_p(p, SBP_perm, expression, SBP, cv, clf, seed): # Initialization y = SBP_perm.iloc[:, p] preds_p = pd.DataFrame(np.zeros((len(y),1)), index=expression.index) sel = model_selection.KFold(n_splits=cv, shuffle=True, random_state=seed) for tr_ind, te_ind in sel.split(expression): # Training sets tr_set, tr_y = expression.iloc[tr_ind], y.iloc[tr_ind] # Test sets te_set, te_y = expression.iloc[te_ind], y.iloc[te_ind] mod = clf.fit(tr_set, tr_y) preds_te = mod.predict(te_set) preds_p.iloc[te_ind, 0] = preds_te.ravel() score_p = stats.pearsonr(preds_p.iloc[:,0], SBP.values[:,0])[0] return score_p results = Parallel(n_jobs=n_jobs)( delayed(pls_perm_p)(p, SBP_perm, expression, SBP, cv, clf, seed) for p in range(SBP_perm.shape[1]) ) scores_perm = results median_score = np.median(scores) p_perm = (scores_perm>median_score).sum()/len(scores_perm) return p_perm, scores_perm
[docs] def get_R2( mod: sklearn.cross_decomposition._pls.PLSRegression, y: np.ndarray ) -> np.ndarray: """ Compute variance explained for each PLS-R component. Parameters ---------- mod : sklearn.cross_decomposition._pls.PLSRegression PLS-R model fitted to X and y. y : np.ndarray Dependent variable of PLS-R model. Returns ------- R2 : np.ndarray with shape (component,) Variance explained for each PLS-R component. """ component = mod.n_components R2 = np.zeros(component) T = mod.x_scores_ for h in range(component): th = np.atleast_2d(T[:,h]).T R2h = (th.T@y@y.T@th)/(y.T@y@th.T@th) R2[h] = R2h[0][0] return R2
[docs] def get_vip2( mod: sklearn.cross_decomposition._pls.PLSRegression, X: np.ndarray, y: np.ndarray ) -> np.ndarray: """ Compute variance explained for each PLS-R component. Parameters ---------- mod : sklearn.cross_decomposition._pls.PLSRegression PLS-R model fitted to X and y. X & y : np.ndarray Independent and dependent variables of PLS-R model. Returns ------- vip : np.ndarray with shape (X.shape[1],) Variable importance in projection index for independent variables. References ---------- Mahieu, B., Qannari, E. M. & Jaillais, B. Extension and significance testing of Variable Importance in Projection (VIP) indices in Partial Least Squares regression and Principal Components Analysis. Chemom. Intell. Lab. Syst. 242, 104986 (2023). """ R2 = get_R2(mod, y) W = mod.x_weights_ norm = X.shape[1]/sum(R2) vip = norm*np.dot(W**2, R2) return vip
[docs] def vip_perm( expression: pd.DataFrame, SBP: pd.DataFrame, SBP_perm: pd.DataFrame, best_comp: int, one_sided: bool, n_jobs: int ) -> Tuple[pd.DataFrame,pd.DataFrame,pd.DataFrame]: """ Compute gene-level statistics and corresponding permutation p-values. Parameters ---------- expression : pd.DataFrame AHBA gene expression matrix. SBP : pd.DataFrame One-dimensional data frame of spatial brain phenotype. SBP_perm : pd.DataFrame Spatial autocorrelation-preserved null model for SBP. best_comp : int The optimal component number for current PLS-R model. one_sided : bool If True, infer statistical significance via one-sided p-values. Else, use two-sided p-values. n_jobs : int Number of cores used for parallel computation. Returns ------- gene_rep : pd.DataFrame Gene-level PLS statistics and permutation p-values. RC_perm : pd.DataFrame Gene-level regression coefficients (RCs) in permutation tests. VIP_perm : pd.DataFrame Gene-level Variable importance in projection (VIP) in permutation tests. References ---------- [1] Wang, Y. et al. Spatio-molecular profiles shape the human cerebellar hierarchy along the sensorimotor-association axis. Cell Rep. 43, 113770 (2024). [2] Mahieu, B., Qannari, E. M. & Jaillais, B. Extension and significance testing of Variable Importance in Projection (VIP) indices in Partial Least Squares regression and Principal Components Analysis. Chemom. Intell. Lab. Syst. 242, 104986 (2023). """ n_perm = SBP_perm.shape[1] # Fit the PLS-R model X = expression.values y = SBP.values clf = PLSRegression(n_components=best_comp) mod = clf.fit(X, y) # Get `RC` and `VIP` f_rc = mod.coef_.flatten() vip2 = get_vip2(mod, X, y) # Construct result DataFrame gene_rep = pd.DataFrame({'RC': f_rc, 'VIP': vip2}, index=expression.columns) # Help function to perform permutation test for each permutation def vip_perm_p(p, X, SBP_perm, best_comp): # Initialize PLS-R model clf_p = PLSRegression(n_components=best_comp) y_p = np.atleast_2d(SBP_perm.iloc[:,p].values).T # Fit the permutation null model mod_p = clf_p.fit(X, y_p) RC_p = mod_p.coef_.flatten() VIP2_p = get_vip2(mod_p, X, y_p) return RC_p, VIP2_p # Permutated beta and vip results = Parallel(n_jobs=n_jobs)( delayed(vip_perm_p)(p, X, SBP_perm, best_comp) for p in range(n_perm) ) # Unzip parallel permutation results RC_perm = pd.concat([pd.Series(res[0]) for res in results], axis=1) RC_perm.index = expression.columns VIP_perm = pd.concat([pd.Series(res[1]) for res in results], axis=1) VIP_perm.index = expression.columns # Compute permutation p-values if one_sided: for g in gene_rep.index: RC_g = gene_rep.loc[g,'RC'] VIP_g = gene_rep.loc[g,'VIP'] RC_perm_g = RC_perm.loc[g,:] VIP_perm_g = VIP_perm.loc[g,:] if RC_g > 0: condition = RC_perm_g>0 RC_perm_g = RC_perm_g[condition] VIP_perm_g = VIP_perm_g[condition] count_rc = sum(RC_perm_g>RC_g) count_vip = sum(VIP_perm_g>VIP_g) elif RC_g < 0: condition = RC_perm_g<0 RC_perm_g = RC_perm_g[condition] VIP_perm_g = VIP_perm_g[condition] count_rc = sum(RC_perm_g<RC_g) count_vip = sum(VIP_perm_g>VIP_g) gene_rep.loc[g, 'p_perm_rc'] = count_rc/n_perm gene_rep.loc[g, 'p_perm_vip'] = count_vip/n_perm else: gene_rep['p_perm_rc'] = [abs(RC_perm.loc[g,:]).ge(abs(gene_rep.loc[g,'RC'])).sum()/n_perm for g in gene_rep.index] gene_rep['p_perm_vip'] = [abs(VIP_perm.loc[g,:]).ge(abs(gene_rep.loc[g,'VIP'])).sum()/n_perm for g in gene_rep.index] return gene_rep, RC_perm, VIP_perm
[docs] def get_Q2( SBP: pd.DataFrame, preds_rep: pd.DataFrame, ) -> float: """ Compute cross-validation R2(Q2). Parameters ---------- SBP : pd.DataFrame One-dimensional data frame of spatial brain phenotype. preds_rep : pd.DataFrame The predicted SBP values across repeats. Returns ------- Q2 : float The cross-validation R2(Q2). References ---------- [1] Wang, Y. et al. Spatio-molecular profiles shape the human cerebellar hierarchy along the sensorimotor-association axis. Cell Rep. 43, 113770 (2024). [2] Mahieu, B., Qannari, E. M. & Jaillais, B. Extension and significance testing of Variable Importance in Projection (VIP) indices in Partial Least Squares regression and Principal Components Analysis. Chemom. Intell. Lab. Syst. 242, 104986 (2023). """ # Compute total sum of squares and residuals tss = np.sum((SBP.values-np.mean(SBP))**2) residual = preds_rep-SBP.values # Compute Q2 across repeats repeats = preds_rep.shape[1] q2_list = [] for i in range(repeats): press = np.sum((residual[i])**2) q2 = 1-press/tss q2_list.append(q2) Q2 = np.median(q2_list) return Q2
[docs] def boot_pls1( expression: pd.DataFrame, SBP: pd.DataFrame, best_comp: int, n_boot: int, seed: int, n_jobs: int ): """ Estimate Z-scored PLS1 gene weights with bootstrap strategy. Parameters ---------- expression : pd.DataFrame AHBA gene expression matrix. SBP : pd.DataFrame One-dimensional data frame of spatial brain phenotype. best_comp : int The optimal component number for current PLS-R model. n_boot : int Number of bootstrap iteration. seed : int Random seed to control the resampling. n_jobs : int Number of cores used for parallel computation. Returns pls1_zscore : np.ndarray Z-scored PLS1 gene weights estimated through bootstrap. References ---------- Whitaker, K. J., Vértes, P. E., Romero-Garcia, R & Bullmore, E. T. Adolescence is associated with genomically patterned consolidation of the hubs of the human brain connectome. Proc. Natl Acad. Sci. USA 113, 9105-9110 (2016). """ n_samples = expression.shape[0] # Fit the PLS-R model X = expression.values y = SBP.values clf = PLSRegression(n_components=best_comp) mod = clf.fit(X, y) # Extract the original weights orig_weights = mod.x_weights_[:,0] # Help function to perform bootstrap PLS1 weights estimation def boot_pls1_b(b, X, y, best_comp, n_samples, orig_weights, seed): rng = np.random.RandomState(seed*b) # sample indices with replacement idx = rng.randint(0, n_samples, size=n_samples) X_b = X[idx] y_b = y[idx] # Fit PLS on bootstrap sample with same n_components clf_b = PLSRegression(n_components=best_comp) mod_b = clf_b.fit(X_b, y_b) boot_weights_b = mod_b.x_weights_[:,0] # The sign of PLS components is arbitrary, make sure this aligns between runs corr, _ = stats.pearsonr(orig_weights,boot_weights_b) if corr < 0: boot_weights_b *= -1 return boot_weights_b results = Parallel(n_jobs=n_jobs)( delayed(boot_pls1_b)(b,X,y,best_comp,n_samples,orig_weights,seed) for b in range(n_boot) ) boot_weights = np.vstack(results) # Bootstrap standard error se = boot_weights.std(axis=0, ddof=1) pls1_zscore = np.divide(orig_weights, se, out=np.full_like(orig_weights,np.nan), where=se!=0) return pls1_zscore
[docs] def pls1_perm( expression: pd.DataFrame, SBP: pd.DataFrame, SBP_perm: pd.DataFrame, best_comp: int, n_boot: int, one_sided: bool, seed: int, n_jobs: int ) -> Tuple[pd.DataFrame,pd.DataFrame,pd.DataFrame]: """ Compute gene-level statistics and corresponding permutation p-values. Parameters ---------- expression : pd.DataFrame AHBA gene expression matrix. SBP : pd.DataFrame One-dimensional data frame of spatial brain phenotype. SBP_perm : pd.DataFrame Spatial autocorrelation-preserved null model for SBP. best_comp : int The optimal component number for current PLS-R model. n_boot : int Number of bootstrap iteration. one_sided : bool If True, infer statistical significance via one-sided p-values. Else, use two-sided p-values. seed : int Random seed to control the resampling. n_jobs : int Number of cores used for parallel computation. Returns ------- gene_rep : pd.DataFrame Gene-level Z-scored PLS1 weights and permutation p-values. PLS1_perm : pd.DataFrame Gene-level Z-scored PLS1 weights in permutation tests. """ # Initialization n_gene = expression.shape[1] n_perm = SBP_perm.shape[1] PLS1_perm = np.zeros((n_gene, n_perm)) # Calculate the PLS1 weights for given SBP pls1 = boot_pls1(expression, SBP, best_comp, n_boot, seed, n_jobs) gene_rep = pd.DataFrame(pls1, index=expression.columns, columns=["PLS1"]) # Calculate the PLS1 weights for each permutation for p in tqdm(range(n_perm)): y_p = np.atleast_2d(SBP_perm.iloc[:,p].values).T y_p = pd.DataFrame(y_p, index=SBP_perm.index) pls1_perm_p = boot_pls1(expression, y_p, best_comp, n_boot, seed, n_jobs) PLS1_perm[:,p] = pls1_perm_p PLS1_perm = pd.DataFrame(PLS1_perm, index=expression.columns) if one_sided: for g in gene_rep.index: PLS1_g = gene_rep.loc[g,'PLS1'] PLS1_perm_g = PLS1_perm.loc[g,:] if PLS1_g > 0: condition = PLS1_perm_g>0 PLS1_perm_g = PLS1_perm_g[condition] count_pls1 = sum(PLS1_perm_g>PLS1_g) elif PLS1_g < 0: condition = PLS1_perm_g<0 PLS1_perm_g = PLS1_perm_g[condition] count_pls1 = sum(PLS1_perm_g<PLS1_g) gene_rep.loc[g, 'p_perm'] = count_pls1/n_perm else: gene_rep['p_perm'] = [abs(PLS1_perm.loc[g,:]).ge(abs(gene_rep.loc[g,'PLS1'])).sum()/n_perm for g in gene_rep.index] return gene_rep, PLS1_perm