Source code for pkpd.protocols.protocol_pkpd_stats_mahalanobis

# **************************************************************************
# *
# * Authors:     Carlos Oscar Sorzano (info@kinestat.com)
# *
# * Kinestat Pharma
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'info@kinestat.com'
# *
# **************************************************************************

import numpy as np
from scipy import stats
from scipy.interpolate import InterpolatedUnivariateSpline
from scipy.spatial import distance


import pyworkflow.protocol.params as params
from .protocol_pkpd import ProtPKPD
from pkpd.utils import upper_tri_masking, uniqueFloatValues

# Tested in test_workflow_levyplot.py

[docs]class ProtPKPDStatsMahalanobis(ProtPKPD): """ Experiment 1 defines the mean and covariance for the Mahalanobis distance. Then, the Mahalanobis distance of all elements in Experiment 1 with respect to the mean is calculated\n If a second experiment is given, then all distances from the second to the mean of the first experiment\n are also calculated.\n Protocol created by http://www.kinestatpharma.com\n """ _label = 'Mahalanobis' #--------------------------- DEFINE param functions -------------------------------------------- def _defineParams(self, form): form.addSection('Input') form.addParam('inputExperiment1', params.PointerParam, label="Experiment 1", pointerClass='PKPDExperiment', help='Select an experiment with samples') form.addParam('labels', params.StringParam, label="Labels", default="", help='Name of the labels to construct the multivariate vectors, e.g. Cl V\n' 'The set of labels is either a set of labels or a single measurement.\n' 'If you use a measurement, make sure that it does not have a measurement\n' 'that is always the same, typically (0,0)') form.addParam('expression1', params.StringParam, label="Subgroup 1 (optional)", default="", help='For example, $(weight)<100 and $(sex)=="male"') form.addParam('inputExperiment2', params.PointerParam, label="Experiment 2 (optional)", pointerClass='PKPDExperiment', allowsNull=True, help='Select an experiment with samples') form.addParam('expression2', params.StringParam, label="Subgroup 2 (optional)", default="", help='For example, $(weight)>=100 and $(sex)=="male". If it is empty, the same Expression 1 will be used for grouping in Experiment 2') form.addParam('resampleT', params.FloatParam, label="Resample profiles (time step)", default=-1, help='Resample the input profiles at this time step (make sure it is in the same units as the input). ' 'Leave it to -1 for no resampling. This is only valid when the label to compare is a measurement.') #--------------------------- INSERT steps functions -------------------------------------------- def _insertAllSteps(self): self._insertFunctionStep('runCompare',self.inputExperiment1.get().getObjId(), self.labels.get(), self.expression1.get(), self.expression2.get()) #--------------------------- STEPS functions --------------------------------------------
[docs] def getLabels(self): labels=[] for token in self.labels.get().split(): labels.append(token.strip()) return labels
[docs] def analysisType(self, experiment): Nlabels=0 Nmeasurements=0 N=len(self.getLabels()) for label in self.getLabels(): if label in experiment.variables: if experiment.variables[label].isLabel(): Nlabels+=1 elif experiment.variables[label].isMeasurement(): Nmeasurements+=1 else: raise Exception("Cannot find the label %s in one of the experiments"%label) if Nlabels==N: return 0 if Nmeasurements==1 and N==1: return 1 raise Exception("The set of labels is not correctly formed. It should be all labels or one measurement")
[docs] def getValues(self,experiment,expression): allX=[] if self.analysisType(experiment)==0: # All labels for label in self.getLabels(): x1 = [float(x) for x in experiment.getSubGroupLabels(expression, label)] allX.append(x1) X = np.asarray(allX, dtype=np.double) return np.transpose(X) else: # A measurement Ylabel=self.getLabels()[0] Xlabel=experiment.getTimeVariable() temp=[] minX=1e38 maxX=-1e38 for sampleName, sample in experiment.samples.items(): x,y=sample.getXYValues(Xlabel,Ylabel) minX=min(minX,np.min(x)) maxX=max(maxX,np.max(x)) temp.append((x[0],y[0])) for x,y in temp: if self.resampleT.get()>0: x,y = uniqueFloatValues(x, y) B = InterpolatedUnivariateSpline(x, y, k=1) xp = np.arange(minX, maxX + self.resampleT.get(), self.resampleT.get()) y = B(xp) allX.append(y) X = np.asarray(allX, dtype=np.double) v = np.var(X,0) X=X[:,v>0] # Remove columns with no variance return X
[docs] def printStats(self,allF,Fstr,explanation): allF=[f for f in allF if not np.isnan(f)] mu=np.mean(allF) sigma = np.std(allF) alpha=1-95/100.0 percentiles = np.percentile(allF,[0, alpha/2*100, 25, 50, 75, (1-alpha/2)*100, 100]) retval="" retval +="%s (%s)\n"%(Fstr,explanation) retval +="%s mean+-std: %f+-%f\n"%(Fstr,mu,sigma) retval +="%s minimum,maximum: [%f,%f]\n"%(Fstr,percentiles[0],percentiles[6]) retval +="%s percentile [%f,%f]%%: [%f,%f]\n"%(Fstr,alpha/2*100,(1-alpha/2)*100,percentiles[1],percentiles[5]) retval +="%s percentile [25,75]%%: [%f,%f]\n"%(Fstr,percentiles[2],percentiles[4]) retval +="%s percentile 50%%: %f\n"%(Fstr,percentiles[3]) return retval
[docs] def runCompare(self, objId1, labels, expression1, expression2): fh = open(self._getPath("report.txt"),'w') self.experiment1 = self.readExperiment(self.inputExperiment1.get().fnPKPD, False) X1 = self.getValues(self.experiment1,self.expression1.get()) X2 = None if self.inputExperiment2.get() is not None: self.experiment2 = self.readExperiment(self.inputExperiment2.get().fnPKPD, False) expression2ToUse = self.expression1.get() if self.expression2.get()=="" else self.expression2.get() X2 = self.getValues(self.experiment2, expression2ToUse) print("Values in SubGroup 1:\n%s"%np.array2string(X1)) if X2 is not None: print("Values in SubGroup 2:\n%s" % np.array2string(X2)) try: self.printSection("Results") C1 = np.cov(np.transpose(X1)) if np.abs(np.linalg.det(C1))<1e-10: distanceStr='Euclidean' self.doublePrint(fh,"The covariance matrix is singular (either there is a column of the data that is always the same or not enough data to define the covariance)") self.doublePrint(fh,"Using Euclidean distance instead") D11 = upper_tri_masking(distance.cdist(X1, X1, 'euclidean')) D11/= X1.shape[1] else: distanceStr='Mahalanobis' C1inv = np.linalg.inv(C1) D11 = upper_tri_masking(distance.cdist(X1,X1,'mahalanobis',VI=C1inv)) np.savetxt(self._getExtraPath("D11.txt"),D11) str11 = self.printStats(D11, "D11", "%s distance Set 1 vs Set1"%distanceStr) self.doublePrint(fh, str11) if X2 is not None: self.doublePrint(fh, "---------------------------") if distanceStr=='Mahalanobis': D12 = distance.cdist(X1,X2,'mahalanobis',VI=C1inv).flatten() D12 /= X1.shape[1] else: D12 = distance.cdist(X1, X2, 'euclidean').flatten() np.savetxt(self._getExtraPath("D12.txt"),D12) str12 = self.printStats(D12, "D12", "%s distance Set 1 vs Set2"%distanceStr) self.doublePrint(fh, str12) [D,pval] = stats.ks_2samp(D11, D12) self.doublePrint(fh, "---------------------------") self.doublePrint(fh,"Kolmogorov-Smirnov test for the compatibility of D11 and D12: D-statistic=%f p-value=%f"%(D,pval)) fh.close() except Exception as e: print(e) fh.close()
#--------------------------- INFO functions -------------------------------------------- def _summary(self): expression2ToUse = self.expression1.get() if self.expression2.get()=="" else self.expression2.get() msg=["Comparison between %s in Subgroup1 (%s) and %s in Subgroup2 (%s), independent samples"%(self.labels.get(),self.expression1.get(),self.labels.get(),expression2ToUse)] msg.append(' ') self.addFileContentToMessage(msg,self._getPath("report.txt")) return msg def _validate(self): msg=[] # self.experiment1 = self.readExperiment(self.inputExperiment1.get().fnPKPD,False) # if self.inputExperiment2.get() is not None: # self.experiment2 = self.readExperiment(self.inputExperiment2.get().fnPKPD) # for label in self.getLabels(): # if not label in self.experiment1.variables: # msg.append("Cannot find %s amongst the Experiment 1 variables"%label) # if self.inputExperiment2.get() is not None: # if not label in self.experiment2.variables: # msg.append("Cannot find %s amongst the Experiment 2 variables" % label) return msg