Source code for pkpd.protocols.protocol_pkpd_bootstrap_simulate2

# **************************************************************************
# *
# * Authors:     Carlos Oscar Sorzano (info@kinestat.com)
# *
# * Kinestat Pharma
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'info@kinestat.com'
# *
# **************************************************************************

import copy
import numpy as np
import math
import random

import pyworkflow.protocol.params as params
from pkpd.objects import PKPDExperiment, PKPDDose, PKPDSample, PKPDVariable
from pyworkflow.protocol.constants import LEVEL_ADVANCED
from pkpd.pkpd_units import createUnit, multiplyUnits, strUnit, PKPDUnit
from .protocol_pkpd_ode_base import ProtPKPDODEBase

[docs]class ProtPKPDODESimulate2(ProtPKPDODEBase): """ Simulate a complex sequence of doses. Each dose has its own PK profile, and the overall profile is the addition of all the individual profiles. Only the parameters from the first fitting of each model are considered. The first dose goes with the first model, the second dose with the second model, ... A single profile is simulated that is the addition of the different simulations for each one of the input models.""" _label = 'PK simulate (complex)' #--------------------------- DEFINE param functions -------------------------------------------- def _defineParams(self, form): form.addSection('Input') form.addParam('inputODEs', params.MultiPointerParam, label="Input ODE models", pointerClass='ProtPKPDMonoCompartment, ProtPKPDTwoCompartments, ProtPKPDMonoCompartmentPD, ProtPKPDTwoCompartmentsBothPD, '\ 'ProtPKPDODERefine, ProtPKPDTwoCompartmentsClint, ProtPKPDTwoCompartmentsClintCl', help='Select a run of an ODE model') form.addParam('doses', params.TextParam, label="Doses", height=5, width=50, default="", help="Structure: [Dose Name] ; [Via] ; [Dose type] ; [time] ; [dose] \n"\ "The dose name should have no space or special character\n"\ "The via should be one present in the input experiment to the ODE model.\n"\ "Valid units are: h, mg, ug, ...\n"\ "The description is either a bolus or an infusion as shown in the examples\n"\ "\nIt is important that there are two semicolons.\n"\ "Examples:\n"\ "Infusion0; via=Intravenous; infusion; t=0.500000:0.750000 h; d=60 mg/h\n"\ "Bolus0; via=Oral; bolus; t=0.000000 min; d=60 mg\n"\ "RepeatedBolus; via=Oral; repeated_bolus; t=0:24:120 h; d=60 mg") form.addParam('t0', params.FloatParam, label="Initial time (see help)", default=0, help="Same units as input experiment") form.addParam('tF', params.FloatParam, label="Final time (see help)", default=24*7, help="Same units as input experiment") form.addParam('deltaT', params.FloatParam, label="Time step (see help)", default=0.5, expertLevel=LEVEL_ADVANCED, help="Same units as input experiment") #--------------------------- INSERT steps functions -------------------------------------------- def _insertAllSteps(self): self._insertFunctionStep('runSimulate') self._insertFunctionStep('createOutputStep') #--------------------------- STEPS functions --------------------------------------------
[docs] def addSample(self, sampleName, doseList, simulationsX, y): newSample = PKPDSample() newSample.sampleName = sampleName newSample.variableDictPtr = self.outputExperiment.variables newSample.doseDictPtr = self.outputExperiment.doses newSample.descriptors = {} newSample.doseList = doseList if type(self.varNameY)!=list: newSample.addMeasurementPattern([self.varNameY]) newSample.addMeasurementColumn("t", simulationsX) newSample.addMeasurementColumn(self.varNameY,y) else: for j in range(len(self.varNameY)): newSample.addMeasurementPattern([self.varNameY[j]]) newSample.addMeasurementColumn("t", simulationsX) for j in range(len(self.varNameY)): newSample.addMeasurementColumn(self.varNameY[j], y[j]) self.outputExperiment.samples[sampleName] = newSample
[docs] def runSimulate(self): # Take first experiment self.protODE = self.inputODEs[0].get() if hasattr(self.protODE, "outputExperiment"): self.experiment = self.readExperiment(self.protODE.outputExperiment.fnPKPD, show=False) elif hasattr(self.protODE, "outputExperiment1"): self.experiment = self.readExperiment(self.protODE.outputExperiment1.fnPKPD, show=False) else: raise Exception("Cannot find an outputExperiment in the input ODE") if hasattr(self.protODE, "outputFitting"): self.fitting = self.readFitting(self.protODE.outputFitting.fnFitting, show=False) elif hasattr(self.protODE, "outputFitting1"): self.fitting = self.readFitting(self.protODE.outputFitting1.fnFitting, show=False) self.varNameX = self.fitting.predictor.varName if type(self.fitting.predicted)!=list: self.varNameY = self.fitting.predicted.varName else: self.varNameY = [var.varName for var in self.fitting.predicted] # Create output object self.outputExperiment = PKPDExperiment() tvar = PKPDVariable() tvar.varName = "t" tvar.varType = PKPDVariable.TYPE_NUMERIC tvar.role = PKPDVariable.ROLE_TIME tvar.units = createUnit(self.experiment.getTimeUnits().unit) Nsamples = int(math.ceil((self.tF.get() - self.t0.get()) / self.deltaT.get())) + 1 if tvar.units == PKPDUnit.UNIT_TIME_MIN: Nsamples *= 60 self.outputExperiment.variables[self.varNameX] = tvar if type(self.fitting.predicted) != list: self.outputExperiment.variables[self.varNameY] = self.experiment.variables[self.varNameY] else: for varName in self.varNameY: self.outputExperiment.variables[varName] = self.experiment.variables[varName] self.outputExperiment.general["title"] = "Simulated ODE response" self.outputExperiment.general["comment"] = "Simulated ODE response" self.outputExperiment.vias = self.experiment.vias # Read the doses doseLines = [] for line in self.doses.get().replace('\n', ';;').split(';;'): doseLines.append(line) doseIdx = 0 simulationsY = None doseList = [] for protODEPtr in self.inputODEs: self.protODE = protODEPtr.get() if hasattr(self.protODE, "outputExperiment"): self.experiment = self.readExperiment(self.protODE.outputExperiment.fnPKPD, show=False) elif hasattr(self.protODE, "outputExperiment1"): self.experiment = self.readExperiment(self.protODE.outputExperiment1.fnPKPD, show=False) else: raise Exception("Cannot find an outputExperiment in the input ODE") if hasattr(self.protODE, "outputFitting"): self.fitting = self.readFitting(self.protODE.outputFitting.fnFitting, show=False) elif hasattr(self.protODE, "outputFitting1"): self.fitting = self.readFitting(self.protODE.outputFitting1.fnFitting, show=False) for viaName in self.experiment.vias: if not viaName in self.outputExperiment.vias: self.outputExperiment.vias[viaName]=copy.copy(self.experiment.vias[viaName]) # Create drug source self.clearGroupParameters() self.createDrugSource() # Setup model self.model = self.protODE.createModel() self.model.setExperiment(self.outputExperiment) self.model.deltaT = self.deltaT.get() self.model.setXVar(self.varNameX) self.model.setYVar(self.varNameY) self.model.x = [self.t0.get()+i*self.model.deltaT for i in range(0,Nsamples)] self.modelList.append(self.model) tokens = doseLines[doseIdx].split(';') if len(tokens) < 5: print("Skipping dose: ", line) continue dosename = tokens[0].strip() self.outputExperiment.doses[dosename] = PKPDDose() self.outputExperiment.doses[dosename].parseTokens(tokens,self.outputExperiment.vias) doseList.append(dosename) auxSample = PKPDSample() auxSample.descriptors = {} auxSample.doseDictPtr = self.outputExperiment.doses auxSample.variableDictPtr = self.outputExperiment.variables auxSample.doseList = [dosename] auxSample.interpretDose() self.drugSource.setDoses(auxSample.parsedDoseList, self.t0.get()-10, self.tF.get()+10) self.model.drugSource = self.drugSource # Simulate the different responses simulationsX = self.model.x self.setTimeRange(None) parameters = self.fitting.sampleFits[0].parameters print("Input model: %s"%self.protODE.getObjLabel()) print("Sample name: %s"%self.fitting.sampleFits[0].sampleName) print("Parameters: ",parameters) print("Dose: %s"%doseLines[doseIdx]) print(" ") # Prepare source and this object self.protODE.configureSource(self.drugSource) parameterNames = self.getParameterNames() # Necessary to count the number of source and PK parameters # Prepare the model y = self.forwardModel(parameters, [simulationsX]*self.getResponseDimension()) # Keep results if simulationsY is None: simulationsY=copy.copy(y) else: for j in range(self.getResponseDimension()): simulationsY[j] += y[j] doseIdx += 1 self.addSample("Simulation", doseList, simulationsX, simulationsY[0]) self.outputExperiment.write(self._getPath("experiment.pkpd"))
[docs] def createOutputStep(self): self._defineOutputs(outputExperiment=self.outputExperiment) for protODEPtr in self.inputODEs: self._defineSourceRelation(protODEPtr.get(), self.outputExperiment)
#--------------------------- INFO functions -------------------------------------------- def _summary(self): msg = [] return msg def _validate(self): msg=[] return msg