Source code for pkpd.protocols.protocol_pkpd_ode_refine

# **************************************************************************
# *
# * Authors:     Carlos Oscar Sorzano (info@kinestat.com)
# *
# * Kinestat Pharma
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'info@kinestat.com'
# *
# **************************************************************************

try:
    from itertools import izip
except ImportError:
    izip = zip

import pyworkflow.protocol.params as params
from pkpd.objects import PKPDFitting, PKPDSampleFit, PKPDLSOptimizer
from pyworkflow.protocol.constants import LEVEL_ADVANCED
from .protocol_pkpd_ode_base import ProtPKPDODEBase

[docs]class ProtPKPDODERefine(ProtPKPDODEBase): """ Refinement of an ODE protocol. The parameters are reestimated with a finer sampling rate. """ _label = 'ODE refinement' #--------------------------- DEFINE param functions -------------------------------------------- def _defineParams(self, form): form.addSection('Input') form.addParam('inputODE', params.PointerParam, label="Input ODE model", pointerClass='ProtPKPDMonoCompartment, ProtPKPDMonoCompartmentUrine, ProtPKPDTwoCompartments, '\ 'ProtPKPDTwoCompartmentsAutoinduction, ProtPKPDTwoCompartmentsClint, '\ 'ProtPKPDTwoCompartmentsClintCl, '\ 'ProtPKPDTwoCompartmentsClintMetabolite, ProtPKPDTwoCompartmentsUrine, '\ 'ProtPKPDODERefine', help='Select a run of an ODE model') form.addParam('deltaT', params.FloatParam, default=0.5, label='Step (min)', expertLevel=LEVEL_ADVANCED) form.addParam('fitType', params.EnumParam, choices=["Linear","Logarithmic","Relative","Same as previous protocol"], label="Fit mode", default=3, help='Linear: sum (Cobserved-Cpredicted)^2\nLogarithmic: sum(log10(Cobserved)-log10(Cpredicted))^2\n'\ "Relative: sum ((Cobserved-Cpredicted)/Cobserved)^2") form.addParam('bounds', params.StringParam, label="Parameter bounds", default="", help="If empty, same bounds as for the previous protocol") #--------------------------- INSERT steps functions -------------------------------------------- def _insertAllSteps(self): self._insertFunctionStep('runFit',self.inputODE.get().getObjId(), self.deltaT.get()) self._insertFunctionStep('createOutputStep') #--------------------------- STEPS functions --------------------------------------------
[docs] def parseBounds(self, boundsString): if boundsString!="" and boundsString!=None: tokens=boundsString.split(';') if len(tokens)!=self.getNumberOfParameters(): raise Exception("The number of bound intervals does not match the number of parameters") self.boundsList=[] for token in tokens: values = token.strip().split(',') self.boundsList.append((float(values[0][1:]),float(values[1][:-1])))
[docs] def setBounds(self,sample): self.parseBounds(self.protODE.bounds.get() if self.bounds.get()=="" else self.bounds.get()) self.setBoundsFromBoundsList()
[docs] def setBoundsFromBoundsList(self): Nbounds = len(self.boundsList) Nsource = self.drugSource.getNumberOfParameters() Nmodel = self.model.getNumberOfParameters() if Nbounds!=Nsource+Nmodel: raise "The number of parameters (%d) and bounds (%d) are different"%(Nsource+Nmodel,Nbounds) self.boundsSource = self.boundsList[0:Nsource] self.boundsPK = self.boundsList[Nsource:] self.model.bounds = self.boundsPK
[docs] def getBounds(self): return self.boundsList
[docs] def createModel(self): return self.inputODE.get().createModel()
[docs] def setTimeRange(self, sample): self.inputODE.get().setTimeRange(sample)
[docs] def setVarNames(self,varNameX,varNameY): ProtPKPDODEBase.setVarNames(self,varNameX,varNameY) self.inputODE.get().setVarNames(varNameX,varNameY)
[docs] def setModel(self,model): ProtPKPDODEBase.setModel(self,model) self.inputODE.get().setModel(model)
[docs] def setupModel(self): ProtPKPDODEBase.setupModel(self) self.inputODE.get().setModel(self.model)
[docs] def clearGroupParameters(self): ProtPKPDODEBase.clearGroupParameters(self) self.inputODE.get().clearGroupParameters()
[docs] def getConfidenceInterval(self): return self.inputODE.get().getConfidenceInterval()
[docs] def runFit(self, objId, deltaT): self.protODE = self.inputODE.get() self.experiment = self.readExperiment(self.protODE.outputExperiment.fnPKPD) self.fitting = self.readFitting(self.protODE.outputFitting.fnFitting) # Get the X and Y variable names self.varNameX = self.fitting.predictor.varName if type(self.fitting.predicted)==list: self.varNameY = [v.varName for v in self.fitting.predicted] else: self.varNameY = self.fitting.predicted.varName self.protODE.experiment = self.experiment self.protODE.setVarNames(self.varNameX, self.varNameY) # Create output object self.fitting = PKPDFitting() self.fitting.fnExperiment.set(self.experiment.fnPKPD.get()) self.fitting.predictor=self.experiment.variables[self.varNameX] self.fitting.predicted=self.experiment.variables[self.varNameY] self.fitting.modelParameterUnits = None # Actual fitting fitTypeN = self.protODE.fitType.get() if self.fitType.get()==3 else self.fitType.get() if fitTypeN==0: fitType = "linear" elif fitTypeN==1: fitType = "log" elif fitTypeN==2: fitType = "relative" parameterNames = None for groupName, group in self.experiment.groups.items(): self.printSection("Fitting "+groupName) self.protODE.clearGroupParameters() self.clearGroupParameters() for sampleName in group.sampleList: print(" Sample "+sampleName) sample = self.experiment.samples[sampleName] self.protODE.createDrugSource() self.protODE.setupModel() # Setup self model self.drugSource = self.protODE.drugSource self.drugSourceList = self.protODE.drugSourceList self.model = self.protODE.model self.modelList = self.protODE.modelList self.model.deltaT = self.deltaT.get() self.model.setXVar(self.varNameX) self.model.setYVar(self.varNameY) # Get the values to fit x, y = sample.getXYValues(self.varNameX,self.varNameY) print("X= "+str(x)) print("Y= "+str(y)) # Interpret the dose self.protODE.varNameX = self.varNameX self.protODE.varNameY = self.varNameY self.protODE.model = self.model self.protODE.setTimeRange(sample) sample.interpretDose() self.drugSource.setDoses(sample.parsedDoseList, self.model.t0, self.model.tF) self.protODE.configureSource(self.drugSource) self.model.drugSource = self.drugSource # Prepare the model self.model.setSample(sample) self.calculateParameterUnits(sample) if self.fitting.modelParameterUnits==None: self.fitting.modelParameterUnits = self.parameterUnits # Get the initial parameters if parameterNames==None: parameterNames = self.getParameterNames() parameters0 = [] for parameterName in parameterNames: parameters0.append(float(sample.descriptors[parameterName])) print("Initial solution: %s"%str(parameters0)) print(" ") # Set bounds self.setBounds(sample) self.setXYValues(x, y) self.parameters = parameters0 self.printSetup() self.x = self.mergeLists(self.XList) self.y = self.mergeLists(self.YList) optimizer2 = PKPDLSOptimizer(self,fitType) optimizer2.optimize() optimizer2.setConfidenceInterval(self.protODE.getConfidenceInterval()) self.setParameters(optimizer2.optimum) n=0 for sampleName in group.sampleList: sample = self.experiment.samples[sampleName] # Keep this result sampleFit = PKPDSampleFit() sampleFit.sampleName = sample.sampleName sampleFit.x = x sampleFit.y = y sampleFit.yp = self.yPredicted sampleFit.yl = self.yPredictedLower sampleFit.yu = self.yPredictedUpper sampleFit.parameters = self.parameters sampleFit.modelEquation = self.getEquation() sampleFit.copyFromOptimizer(optimizer2) self.fitting.sampleFits.append(sampleFit) # Add the parameters to the sample and experiment for varName, varUnits, description, varValue in izip(self.getParameterNames(), self.parameterUnits, self.getParameterDescriptions(), self.parameters): self.experiment.addParameterToSample(sampleName, varName, varUnits, description, varValue, rewrite=True) n+=1 self.fitting.modelParameters = self.getParameterNames() self.fitting.modelDescription = self.getDescription() self.fitting.write(self._getPath("fitting.pkpd")) self.experiment.write(self._getPath("experiment.pkpd"))
[docs] def createOutputStep(self): self._defineOutputs(outputFitting=self.fitting) self._defineOutputs(outputExperiment=self.experiment) self._defineSourceRelation(self.inputODE.get(), self.fitting) self._defineSourceRelation(self.inputODE.get(), self.experiment)
#--------------------------- INFO functions -------------------------------------------- def _summary(self): msg = [] msg.append("New sampling rate: %f"%self.deltaT.get()) return msg def _validate(self): return []