# **************************************************************************
# *
# * Authors: Carlos Oscar Sorzano (info@kinestat.com)
# *
# * Kinestat Pharma
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307 USA
# *
# * All comments concerning this program package may be sent to the
# * e-mail address 'info@kinestat.com'
# *
# **************************************************************************
import math
from collections import OrderedDict
try:
from itertools import izip
except ImportError:
izip = zip
import numpy as np
import pyworkflow.protocol.params as params
from .protocol_pkpd import ProtPKPD
from pkpd.objects import (PKPDDEOptimizer, PKPDLSOptimizer, PKPDFitting,
PKPDSampleFit)
from pkpd.utils import parseRange
[docs]class ProtPKPDFitBase(ProtPKPD):
""" Base fit protocol"""
#--------------------------- DEFINE param functions --------------------------------------------
def _defineParams1(self, form, defaultPredictor, defaultPredicted):
form.addSection('Input')
form.addParam('inputExperiment', params.PointerParam, label="Input experiment",
pointerClass='PKPDExperiment',
help='Select an experiment with samples')
form.addParam('predictor', params.StringParam, label="Predictor variable (X)", default=defaultPredictor,
help='Y is predicted as an exponential function of X, Y=f(X)')
form.addParam('predicted', params.StringParam, label="Predicted variable (Y)", default=defaultPredicted,
help='Y is predicted as an exponential function of X, Y=f(X)')
#--------------------------- INSERT steps functions --------------------------------------------
def _insertAllSteps(self):
self._insertFunctionStep('runFit',self.getInputExperiment().getObjId(),self.getListOfFormDependencies())
self._insertFunctionStep('createOutputStep')
#--------------------------- STEPS functions --------------------------------------------
[docs] def getXYvars(self):
if hasattr(self,"predictor"):
self.varNameX=self.predictor.get()
else:
self.varNameX=None
if hasattr(self,"predicted"):
self.varNameY=self.predicted.get()
else:
self.varNameY=None
[docs] def setExperiment(self, experiment):
self.experiment = experiment
if experiment!=None:
self.fnExperiment = experiment.fnPKPD
[docs] def setSample(self, sample):
self.sample = sample
self.model.setSample(sample)
[docs] def createModel(self):
pass
[docs] def parseBounds(self, boundsString):
self.boundsList = []
if boundsString!="" and boundsString!=None:
tokens = boundsString.split(';')
if len(tokens)!=self.getNumberOfParameters():
raise Exception("The number of bound intervals does not match the number of parameters")
for token in tokens:
values = token.strip().split(',')
self.boundsList.append((float(values[0][1:]),float(values[1][:-1])))
[docs] def getBounds(self):
return self.boundsList
[docs] def getParameterBounds(self):
""" Return a dictionary where the parameter name is the key
and the bounds are its values. """
boundsDict = OrderedDict()
self.parseBounds(self.bounds.get()) # after this we have boundsList
parameterNames = self.model.getParameterNames()
for paramName, bound in izip(parameterNames, self.getBounds()):
boundsDict[paramName] = bound
# Set None as bound for parameters not matched
for paramName in parameterNames:
if paramName not in boundsDict:
boundsDict[paramName] = None
return boundsDict
[docs] def setupModel(self):
# Setup model
self.model = self.createModel()
self.model.setExperiment(self.experiment)
self.setupFromFormParameters()
self.getXYvars()
self.model.setXVar(self.varNameX)
self.model.setYVar(self.varNameY)
[docs] def calculateParameterUnits(self,sample):
self.parameterUnits = self.model.calculateParameterUnits(sample)
[docs] def getParameterNames(self):
return self.model.getParameterNames()
[docs] def getNumberOfParameters(self):
return len(self.getParameterNames())
[docs] def setParameters(self,prm):
self.model.setParameters(prm)
[docs] def forwardModel(self,prm,xValues):
return self.model.forwardModel(prm,xValues)
[docs] def prepareForAnalysis(self):
pass
[docs] def prepareForSampleAnalysis(self, sampleName):
pass
[docs] def postSampleAnalysis(self, sampleName):
pass
[docs] def postAnalysis(self):
pass
[docs] def runFit(self, objId, otherDependencies):
self.getXYvars()
if hasattr(self,"reportX"):
reportX = parseRange(self.reportX.get())
else:
reportX = None
self.experiment = self.readExperiment(self.getInputExperiment().fnPKPD)
# Setup model
self.printSection("Model setup")
self.model = self.createModel()
self.model.setExperiment(self.experiment)
self.model.setXVar(self.varNameX)
self.model.setYVar(self.varNameY)
self.setupFromFormParameters()
self.model.printSetup()
# Create output object
self.fitting = PKPDFitting()
self.fitting.fnExperiment.set(self.getInputExperiment().fnPKPD.get())
self.fitting.predictor=self.experiment.variables[self.varNameX]
self.fitting.predicted=self.experiment.variables[self.varNameY]
self.fitting.modelDescription=self.model.getDescription()
self.fitting.modelParameters = self.model.getParameterNames()
self.fitting.modelParameterUnits = None
# Actual fitting
if self.fitType.get()==0:
fitType = "linear"
elif self.fitType.get()==1:
fitType = "log"
elif self.fitType.get()==2:
fitType = "relative"
R2List=[]
R2adjList=[]
AICList=[]
AICcList=[]
BICList=[]
self.prepareForAnalysis()
for sampleName, sample in self.experiment.samples.items():
self.printSection("Fitting "+sampleName)
x, y = sample.getXYValues(self.varNameX,self.varNameY)
print("X= "+str(x))
print("Y= "+str(y))
print(" ")
self.model.setBounds(self.bounds.get())
self.model.setXYValues(x, y)
self.prepareForSampleAnalysis(sampleName)
self.model.calculateParameterUnits(sample)
if self.fitting.modelParameterUnits==None:
self.fitting.modelParameterUnits = self.model.parameterUnits
self.model.prepare()
if self.model.bounds == None:
continue
print(" ")
optimizer1 = PKPDDEOptimizer(self.model,fitType)
optimizer1.optimize()
optimizer2 = PKPDLSOptimizer(self.model,fitType)
optimizer2.optimize()
optimizer2.setConfidenceInterval(self.confidenceInterval.get())
self.setParameters(optimizer2.optimum)
optimizer2.evaluateQuality()
# Keep this result
sampleFit = PKPDSampleFit()
sampleFit.sampleName = sample.sampleName
sampleFit.x = self.model.x
sampleFit.y = self.model.y
sampleFit.yp = self.model.yPredicted
sampleFit.yl = self.model.yPredictedLower
sampleFit.yu = self.model.yPredictedUpper
sampleFit.parameters = self.model.parameters
sampleFit.modelEquation = self.model.getEquation()
sampleFit.copyFromOptimizer(optimizer2)
self.fitting.sampleFits.append(sampleFit)
R2List.append(sampleFit.R2)
R2adjList.append(sampleFit.R2adj)
AICList.append(sampleFit.AIC)
AICcList.append(sampleFit.AICc)
BICList.append(sampleFit.BIC)
# Add the parameters to the sample and experiment
for varName, varUnits, description, varValue in izip(self.model.getParameterNames(), self.model.parameterUnits, self.model.getParameterDescriptions(), self.model.parameters):
self.experiment.addParameterToSample(sampleName, varName, varUnits, description, varValue)
self.postSampleAnalysis(sampleName)
if reportX!=None:
print("Evaluation of the model at specified time points")
yreportX = self.model.forwardModel(self.model.parameters, reportX)
print("==========================================")
print("X Ypredicted log10(Ypredicted)")
print("==========================================")
for n in range(0,reportX.shape[0]):
print("%f %f %f"%(reportX[n],yreportX[n],math.log10(yreportX[n])))
print(' ')
self.fitting.write(self._getPath("fitting.pkpd"))
self.experiment.write(self._getPath("experiment.pkpd"))
fnSummary = self._getPath("summary.txt")
fh=open(fnSummary,"w")
fh.write("R2 (Mean+-Std): (%f)+-(%f)\n"%(np.mean(R2List),np.std(R2List)))
fh.write("R2adj (Mean+-Std): (%f)+-(%f)\n"%(np.mean(R2adjList),np.std(R2adjList)))
fh.write("AIC (Mean+-Std): (%f)+-(%f)\n"%(np.mean(AICList),np.std(AICList)))
fh.write("AICc (Mean+-Std): (%f)+-(%f) Recommended\n"%(np.mean(AICcList),np.std(AICcList)))
fh.write("BIC (Mean+-Std): (%f)+-(%f)\n"%(np.mean(BICList),np.std(BICList)))
fh.close()
self.postAnalysis()
[docs] def createOutputStep(self):
self._defineOutputs(outputFitting=self.fitting)
self._defineOutputs(outputExperiment=self.experiment)
self._defineSourceRelation(self.getInputExperiment(), self.fitting)
self._defineSourceRelation(self.getInputExperiment(), self.experiment)
#--------------------------- INFO functions --------------------------------------------
def _summary(self):
self.getXYvars()
msg=['Predicting %s from %s'%(self.varNameX,self.varNameY)]
self.addFileContentToMessage(msg, self._getPath("summary.txt"))
return msg
def _validate(self):
self.getXYvars()
errors=[]
experiment = self.readExperiment(self.getInputExperiment().fnPKPD, False)
if not self.varNameX in experiment.variables:
errors.append("Cannot find %s as variable"%self.varNameX)
if not self.varNameY in experiment.variables:
errors.append("Cannot find %s as variable"%self.varNameY)
return errors
def _citations(self):
return ['Spiess2010']
[docs] def filterVarForWizard(self, v):
""" Define the type of variables required (used in wizard). """
return v.isNumeric() and (v.isMeasurement() or v.isTime())