Source code for pkpd.objects

# **************************************************************************
# *
# * Authors:     Carlos Oscar Sorzano (info@kinestat.com)
# *
# * Kinestat Pharma
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'info@kinestat.com'
# *
# **************************************************************************

import copy
import sys

from scipion.install.plugin_funcs import PluginInfo

try:
    from itertools import izip
except ImportError:
    izip = zip
import math
from os.path import join

import openpyxl
# from pyworkflow.plugin import PluginInfo
#
# from .pkpd_units import (PKPDUnit, convertUnits, changeRateToMinutes,
#                         changeRateToWeight)
from .pkpd_units import PKPDUnit, convertUnits, changeRateTo, changeRateToWeight
import pyworkflow as pw
import pyworkflow.utils as pwutils
from pwem.objects import *
from .utils import (writeMD5, verifyMD5, excelWriteRow, excelFillCells,
                    excelAdjustColumnWidths, computeXYmean)
from .biopharmaceutics import (PKPDDose, PKPDVia, DrugSource, createDeltaDose,
                               createVia)

[docs]class PKPDVariable: TYPE_NUMERIC = 1000 TYPE_TEXT = 1001 TYPE_NAMES = {TYPE_NUMERIC: 'numeric', TYPE_TEXT: 'text'} ROLE_TIME = 1010 ROLE_MEASUREMENT = 1011 ROLE_LABEL = 1012 ROLE_NAMES = {ROLE_TIME: 'time', ROLE_MEASUREMENT: 'measurement', ROLE_LABEL: 'label'} def __init__(self): self.varName = None self.varType = None self.displayString = "" self.role = None self.comment = "" self.units = None
[docs] def parseTokens(self, tokens): # t ; h ; numeric[%f] ; time ; strippedTokens=[] for token in tokens: strippedTokens.append(token.replace(';','')) tokens=strippedTokens # Get name self.varName = tokens[0].strip() # Get units unitString = tokens[1].strip() self.units = PKPDUnit(unitString) if not self.units.unit: raise Exception("Unrecognized unit: %s"%unitString) # Get type and display typeString = tokens[2].strip() self.displayString = "" leftBracket=typeString.find('[') rightBracket=typeString.find(']') if leftBracket!=-1 and rightBracket!=-1: self.displayString = typeString[leftBracket+1:rightBracket] typeString = typeString[0:leftBracket] typeString = typeString.lower() if typeString=="numeric": self.varType = PKPDVariable.TYPE_NUMERIC if self.displayString=="": self.displayString="%f" elif typeString=="text": self.varType = PKPDVariable.TYPE_TEXT if self.displayString=="": self.displayString="%s" else: raise Exception("Unrecognized type: %s"%typeString) # Get role roleString = tokens[3].strip().lower() if roleString=="time": self.role = PKPDVariable.ROLE_TIME elif roleString=="measurement": self.role = PKPDVariable.ROLE_MEASUREMENT elif roleString=="label": self.role = PKPDVariable.ROLE_LABEL else: raise Exception("Unrecognized role: %s"%roleString) # Get comment self.comment = tokens[4].strip()
def _printToStream(self,fh): displayString = self.displayString.replace("%%","%%%%") if displayString=="": if self.varType == PKPDVariable.TYPE_NUMERIC: displayString="%f" elif self.varType == PKPDVariable.TYPE_TEXT: displayString="%s" fh.write("%s ; %s ; %s[%s] ; %s ; %s\n" % (self.varName, self.getUnitsString(), self.getTypeString(), displayString, self.getRoleString(), self.comment)) def _printToExcel(self,wb,row, col=1): displayString = self.displayString.replace("%%","%%%%") if displayString=="": if self.varType == PKPDVariable.TYPE_NUMERIC: displayString="%f" elif self.varType == PKPDVariable.TYPE_TEXT: displayString="%s" excelWriteRow([self.varName, self.getUnitsString(), "%s[%s]"%(self.getTypeString(),displayString), self.getRoleString(), self.comment], wb, row, col) return row+1
[docs] def getTypeString(self): return self.TYPE_NAMES.get(self.varType, '')
[docs] def getRoleString(self): return self.ROLE_NAMES.get(self.role, '')
[docs] def getUnitsString(self): return self.units._toString()
[docs] def isNumeric(self): return self.varType == self.TYPE_NUMERIC
[docs] def isLabel(self): return self.role == self.ROLE_LABEL
[docs] def isMeasurement(self): return self.role == self.ROLE_MEASUREMENT
[docs] def isTime(self): return self.role == self.ROLE_TIME
[docs] def getLabel(self): return "%s [%s]" % (self.varName, self.getUnitsString())
[docs]class PKPDSample: def __init__(self): self.sampleName = "" self.variableDictPtr = None self.doseDictPtr = None self.doseList = [] self.groupList = [] self.descriptors = None self.measurementPattern = None
[docs] def parseTokens(self,tokens,variableDict,doseDict,groupDict): # FemaleRat1; dose=Dose1[,Dose2]; weight=207; [group=Group1,Group2] # Keep a pointer to variableDict and doseDict self.variableDictPtr = variableDict self.doseDictPtr = doseDict # Get name self.sampleName = tokens[0].strip() if len(tokens)>1: # Get rest of variables self.descriptors = {} for n in range(1,len(tokens)): if '=' in tokens[n]: varTokens = tokens[n].split('=') varName = varTokens[0].strip() varValue = varTokens[1].strip() if varName=="dose": for doseName in varValue.split(','): doseName=doseName.strip() if doseName in doseDict: self.doseList.append(doseName) else: raise Exception("Unrecognized dose %s"%doseName) elif varName=="group": for groupName in varValue.split(','): groupName=groupName.strip() if groupName in groupDict.keys(): self.groupList.append(groupName) else: raise Exception("Unrecognized group %s"%groupName) else: if varName in variableDict: varPtr = variableDict[varName] if varPtr.role != PKPDVariable.ROLE_LABEL: raise Exception("Samples can only use role variables") self.descriptors[varName] = varValue self.measurementPattern = []
[docs] def getSampleName(self): return self.sampleName
[docs] def getTimeVariable(self): for varName in self.variableDictPtr: if self.variableDictPtr[varName].isTime(): return varName
[docs] def getTimeUnits(self): timeName = self.getTimeVariable() return self.variableDictPtr[timeName].units
[docs] def interpretDose(self): self.parsedDoseList = [] firstUnit = None for doseName in sorted(self.doseList): dose = copy.copy(self.doseDictPtr[doseName]) dose.doseAmount = self.evaluateExpression(dose.doseAmount) dose.changeTimeUnitsTo(self.getTimeUnits().unit) dose.prepare() if dose.doseType == PKPDDose.TYPE_INFUSION: dose.doseAmount, dose.dunits.unit = changeRateTo(self.getTimeUnits().unit, dose.doseAmount, dose.dunits.unit) if firstUnit==None: firstUnit = dose.dunits.unit else: dose.doseAmount = convertUnits(dose.doseAmount, dose.dunits.unit, firstUnit) self.parsedDoseList.append(dose)
# if len(self.parsedDoseList)==0: # raise Exception("Cannot find any useful dose")
[docs] def isDoseABolus(self): if len(self.parsedDoseList)!=1: return False return self.parsedDoseList[0].isDoseABolus()
[docs] def getDoseAt(self,t0,dt=0.5): doseAmount = 0.0 for dose in self.parsedDoseList: doseAmount += dose.getDoseAt(t0,dt) return doseAmount
[docs] def getCumulatedDose(self,t0,tF): return self.getDoseAt(t0,tF-t0)
[docs] def getDoseUnits(self): if len(self.parsedDoseList)>0: if self.parsedDoseList[0].doseType==PKPDDose.TYPE_INFUSION: return changeRateToWeight(self.parsedDoseList[0].dunits.unit) else: return self.parsedDoseList[0].dunits.unit else: return PKPDUnit.UNIT_NONE
[docs] def addMeasurementPattern(self,tokens): self.measurementPattern = [] for n in range(1,len(tokens)): varName = tokens[n].strip() if varName in self.variableDictPtr: self.measurementPattern.append(varName) setattr(self,"measurement_%s"%varName,[]) else: raise Exception("Unrecognized variable %s"%varName)
[docs] def addMeasurement(self,line): line = line.strip() if line=="": return tokens = line.split() if len(tokens)<len(self.measurementPattern): raise Exception("Not enough values to fill measurement pattern") for n in range(0,len(tokens)): ok=True varName = self.measurementPattern[n] if tokens[n]=="NA" or tokens[n]=="ULOQ" or tokens[n]=="LLOQ" or "LLOQ" in tokens[n]: ok = (self.variableDictPtr[varName].role != PKPDVariable.ROLE_TIME) if ok: exec("self.measurement_%s.append('%s')"%(varName,tokens[n])) else: raise Exception("Time measurements cannot be NA")
[docs] def addMeasurementColumn(self,varName,values): if self.measurementPattern is None: self.measurementPattern = [] if not varName in self.measurementPattern: self.measurementPattern.append(varName) setattr(self, "measurement_%s"%varName, []) if type(values)==list: for value in values: exec("self.measurement_%s.append('%s')"%(varName,str(float(value)))) elif type(values)==np.ndarray: for i in range(values.size): exec("self.measurement_%s.append('%s')"%(varName,str(float(values[i]))))
[docs] def getNumberOfVariables(self): return len(self.measurementPattern)
[docs] def getNumberOfMeasurements(self): return len(getattr(self,"measurement_%s"%self.measurementPattern[0]))
def _printToStream(self,fh): fh.write("%s"%self.sampleName) if self.doseList: fh.write("; dose=%s"%(",".join(self.doseList))) if self.groupList: fh.write("; group=%s"%(",".join(self.groupList))) if self.descriptors: descriptorString = "" for key in sorted(self.descriptors.keys()): descriptorString +="; %s=%s"%(key,self.descriptors[key]) fh.write(" %s"%descriptorString) fh.write("\n") def _printToExcel(self,wb,row,col=1): toPrint=[self.sampleName] if self.doseList: toPrint.append("dose=%s"%(",".join(self.doseList))) if self.groupList: toPrint.append("group=%s"%(",".join(self.groupList))) if self.descriptors: for key in sorted(self.descriptors.keys()): toPrint.append("%s=%s"%(key,self.descriptors[key])) excelWriteRow(toPrint,wb,row,col) return row+1 def _printMeasurements(self,fh): patternString = "" for n in range(0,len(self.measurementPattern)): patternString += "; %s"%self.measurementPattern[n] fh.write("%s %s\n"%(self.sampleName,patternString)) if len(self.measurementPattern)>0: aux=getattr(self,"measurement_%s"%self.measurementPattern[0]) N = len(aux) for i in range(0,self.getNumberOfMeasurements()): lineString = "" for n in range(0,len(self.measurementPattern)): aux=getattr(self,"measurement_%s"%self.measurementPattern[n]) lineString += aux[i]+" " fh.write("%s\n"%lineString) fh.write("\n") def _printMeasurementsToExcel(self,wb,row): toPrint = [self.sampleName] for n in range(0,len(self.measurementPattern)): toPrint.append(self.measurementPattern[n]) excelWriteRow(toPrint,wb,row); row+=1 if len(self.measurementPattern)>0: aux=getattr(self,"measurement_%s"%self.measurementPattern[0]) N = len(aux) for i in range(0,self.getNumberOfMeasurements()): toPrint = [""] for n in range(0,len(self.measurementPattern)): aux=getattr(self,"measurement_%s"%self.measurementPattern[n]) try: toPrint.append(float(aux[i])) except: if isinstance(aux[i], str): toPrint.append(aux[i]) else: toPrint.append("") excelWriteRow(toPrint,wb,row); row+=1 return row+1
[docs] def getRange(self, varName): if varName not in self.measurementPattern: return [None, None] else: aux = getattr(self,"measurement_%s"%varName) aux = [x for x in aux if x != "NA" and (not x in "LLOQ") and x!="ULOQ" and x!="None" and x!="MS"] x = np.asarray(aux, dtype=np.double) return [x.min(),x.max()]
[docs] def getValues(self, varName): if type(varName)==list: retval=[] for vName in varName: if vName not in self.measurementPattern: retval.append(None) else: retval.append(getattr(self,"measurement_%s"%vName)) return retval else: if varName not in self.measurementPattern: return None else: return getattr(self,"measurement_%s"%varName)
[docs] def setValues(self, varName, varValues): setattr(self,"measurement_%s"%varName,varValues)
[docs] def getXYValues(self,varNameX,varNameY): xl = [] yl = [] xs = self.getValues(varNameX) if type(varNameY)==list: ys = self.getValues(varNameY) for ysi in ys: xPartial =[] yPartial = [] for x, y in izip(xs, ysi): if x != "NA" and (not "LLOQ" in x) and y!="ULOQ" and y != "NA" and ("LLOQ" not in y) and y!="ULOQ": xPartial.append(float(x)) yPartial.append(float(y)) xl.append(np.array(xPartial)) yl.append(np.array(yPartial)) else: ys = self.getValues(varNameY) xPartial =[] yPartial = [] for x, y in izip(xs, ys): if x != "NA" and x!="LLOQ" and y!="ULOQ" and y != "NA" and y!= "LLOQ" and y!="ULOQ" and \ x!= "None" and x!="MS" and y!="None" and x!="NS" and y!="NS" and \ (not "LLOQ" in x) and (not "LLOQ" in y) and y!="MS" and y!="NRE" and y!="NR": xPartial.append(float(x)) if "[" in y: y=y.replace("[","").replace("]","") yPartial.append(float(y)) xl.append(np.array(xPartial)) yl.append(np.array(yPartial)) return xl, yl
[docs] def getSampleMeasurements(self): return [PKPDSampleMeasurement(self,n) for n in range(0,self.getNumberOfMeasurements())]
[docs] def substituteValuesInExpression(self, expression, prefix=""): expressionPython = copy.copy(expression) if self.descriptors is not None: for key, variable in self.variableDictPtr.items(): if key in self.descriptors: value = self.descriptors[key] if value=="NA" or value=="LLOQ" or value=="ULOQ": expressionPython="None" break else: if variable.varType == PKPDVariable.TYPE_NUMERIC: expressionPython = expressionPython.replace("$%s(%s)"%(prefix,key),"%f"%float(value)) else: expressionPython = expressionPython.replace("$%s(%s)"%(prefix,key),"'%s'"%value) return expressionPython
[docs] def evaluateExpression(self, expression, prefix=""): expressionPython=self.substituteValuesInExpression(expression,prefix) return eval(expressionPython, {"__builtins__" : {"True": True, "False": False, "None": None} }, {})
[docs] def evaluateParsedExpression(self, parsedOperation, varList): ldict = {} for varName in varList: variable = self.variableDictPtr[varName] if variable.isLabel(): exec ("%s=self.getDescriptorValue('%s')" % (varName, varName), locals(), ldict) if variable.isNumeric(): exec ("%s=float(%s)" % (varName, varName), locals(), ldict) else: # Measurement or time exec ("%s=np.asarray(self.getValues('%s'),dtype=np.float)" % (varName, varName), dict(locals(), **globals()), ldict) aux = None exec ("aux=%s" % parsedOperation, dict(locals(), **globals()), ldict) aux=ldict['aux'] return aux
[docs] def getVariableValues(self, varList): varDict = {} for varName in varList: var = self.variableDictPtr[varName] if var.isLabel(): varDict[varName]=self.descriptors[varName] elif var.isMeasurement(): varDict[varName]=getattr(self,"measurement_%s"%varName) return varDict
[docs] def getDescriptorValue(self,descriptorName): if self.descriptors is None: return None if descriptorName in self.descriptors.keys(): return self.descriptors[descriptorName] else: return None
[docs] def setDescriptorValue(self, descriptorName, descriptorValue): if self.descriptors is None: self.descriptors = {} self.descriptors[descriptorName] = descriptorValue
[docs]class PKPDSampleMeasurement(): def __init__(self, sample, n): self.sample = sample self.n = n
[docs] def getValues(self): values = [] for i in range(0,self.sample.getNumberOfVariables()): aux=getattr(self.sample,"measurement_%s"%self.sample.measurementPattern[i]) values.append(aux[self.n]) return values
[docs]class PKPDGroup(): def __init__(self, groupName): self.groupName = groupName self.sampleList = []
[docs] def getSamplesString(self): return ",".join(self.sampleList)
[docs]class PKPDExperiment(EMObject): READING_GENERAL = 1 READING_VARIABLES = 2 READING_VIAS = 3 READING_DOSES = 4 READING_GROUPS = 5 READING_SAMPLES = 6 READING_MEASUREMENTS = 7 READING_A_MEASUREMENT = 8 def __init__(self, **args): EMObject.__init__(self, **args) self.fnPKPD = String() self.infoStr = String() self.general = {} self.variables = {} self.samples = {} self.doses = {} self.vias = {} self.groups = {} def __str__(self): if not self.infoStr.hasValue(): self.load(fullRead=False) self.infoStr.set("variables: %d, samples: %d" % (len(self.variables), len(self.samples))) return self.infoStr.get()
[docs] def load(self, fnExperiment="", verifyIntegrity=True, fullRead=True): if fnExperiment!="": self.fnPKPD.set(fnExperiment) if verifyIntegrity and not verifyMD5(self.fnPKPD.get()): raise Exception("The file %s has been modified since its creation"%self.fnPKPD.get()) if self.fnPKPD.get() is None: return fh=open(self.fnPKPD.get(),'r') if not fh: raise Exception("Cannot open the file "+self.fnPKPD) state=None for line in fh.readlines(): line=line.strip() if line=="": if state==PKPDExperiment.READING_A_MEASUREMENT: state=PKPDExperiment.READING_MEASUREMENTS continue if line[0]=='[': section = line.split('=')[0].strip().lower() if section=="[experiment]": state=PKPDExperiment.READING_GENERAL elif section=="[variables]": state=PKPDExperiment.READING_VARIABLES elif section=="[vias]": state=PKPDExperiment.READING_VIAS elif section=="[doses]": state=PKPDExperiment.READING_DOSES elif section=="[groups]": state=PKPDExperiment.READING_GROUPS elif section=="[samples]": state=PKPDExperiment.READING_SAMPLES elif section=="[measurements]": if fullRead: state=PKPDExperiment.READING_MEASUREMENTS else: break else: print("Skipping: ",line) elif state==PKPDExperiment.READING_GENERAL: tokens = line.split('=') lhs = tokens[0].strip().lower() rhs = tokens[1].strip() self.general[lhs]=rhs elif state==PKPDExperiment.READING_VARIABLES: tokens = line.split(';') if len(tokens)!=5: print("Skipping variable: ",line) continue varname = tokens[0].strip() self.variables[varname] = PKPDVariable() self.variables[varname].parseTokens(tokens) elif state==PKPDExperiment.READING_VIAS: if line!="": tokens = line.split(';') if len(tokens)<2: print("Skipping via: ",line) continue vianame = tokens[0].strip() self.vias[vianame] = PKPDVia(ptrExperiment=self) self.vias[vianame].parseTokens(tokens) elif state==PKPDExperiment.READING_DOSES: if line!="": tokens = line.split(';') if len(tokens)!=5: print("Skipping dose: ",line) continue dosename = tokens[0].strip() self.doses[dosename] = PKPDDose() self.doses[dosename].parseTokens(tokens,self.vias) elif state==PKPDExperiment.READING_GROUPS: if line!="": groupName = line.strip() self.groups[groupName] = PKPDGroup(groupName) elif state==PKPDExperiment.READING_SAMPLES: if line!="": tokens = line.split(';') samplename = tokens[0].strip() newSample = PKPDSample() newSample.parseTokens(tokens,self.variables, self.doses, self.groups) if not newSample.groupList: # If there is no group, create one for this sample groupName = "__"+newSample.sampleName self.groups[groupName]=PKPDGroup(groupName) newSample.groupList.append(groupName) for groupName in newSample.groupList: # A sample may belong to several groups self.groups[groupName].sampleList.append(newSample.sampleName) self.samples[samplename] = newSample elif state==PKPDExperiment.READING_MEASUREMENTS: tokens = line.split(';') if len(tokens)<3: print("Skipping measurement: ",line) continue samplename = tokens[0].strip() if samplename in self.samples: self.samples[samplename].addMeasurementPattern(tokens) state=PKPDExperiment.READING_A_MEASUREMENT else: print("Skipping measurement: %s"%line) elif state==PKPDExperiment.READING_A_MEASUREMENT: self.samples[samplename].addMeasurement(line) fh.close()
[docs] def write(self, fnExperiment, writeToExcel=True): fh=open(fnExperiment,'w') self._printToStream(fh) fh.close() self.fnPKPD.set(fnExperiment) writeMD5(fnExperiment) self.infoStr.set("variables: %d, samples: %d" % (len(self.variables), len(self.samples))) if writeToExcel: self.writeToExcel(os.path.splitext(fnExperiment)[0]+".xlsx")
def _printToStream(self,fh): fh.write("[EXPERIMENT] ===========================\n") for key, value in self.general.items(): fh.write("%s = %s\n"%(key,value)) fh.write("\n") fh.write("[VARIABLES] ============================\n") for key in sorted(self.variables.keys()): self.variables[key]._printToStream(fh) fh.write("\n") fh.write("[VIAS] ================================\n") for key in sorted(self.vias.keys()): self.vias[key]._printToStream(fh) fh.write("\n") fh.write("[DOSES] ================================\n") for key in sorted(self.doses.keys()): self.doses[key]._printToStream(fh) fh.write("\n") fh.write("[GROUPS] ================================\n") for groupName in sorted(self.groups.keys()): fh.write("%s\n"%groupName) fh.write("\n") fh.write("[SAMPLES] ================================\n") for key in sorted(self.samples.keys()): self.samples[key]._printToStream(fh) fh.write("\n") fh.write("[MEASUREMENTS] ===========================\n") for key in sorted(self.samples.keys()): self.samples[key]._printMeasurements(fh) fh.write("\n")
[docs] def writeToExcel(self, fnXls): wb = openpyxl.Workbook() wb.active.title = "Experiment" currentRow = 1 excelWriteRow("EXPERIMENT",wb,currentRow,bold=True); excelFillCells(wb,currentRow); currentRow+=1 for key, value in self.general.items(): excelWriteRow([key,value],wb,currentRow); currentRow+=1 currentRow+=1 excelWriteRow("VARIABLES",wb,currentRow,bold=True); excelFillCells(wb,currentRow); currentRow+=1 for key in sorted(self.variables.keys()): currentRow=self.variables[key]._printToExcel(wb,currentRow) currentRow+=1 excelWriteRow("VIAS",wb,currentRow,bold=True); excelFillCells(wb,currentRow); currentRow+=1 for key in sorted(self.vias.keys()): currentRow=self.vias[key]._printToExcel(wb,currentRow) currentRow+=1 excelWriteRow("DOSES",wb,currentRow,bold=True); excelFillCells(wb,currentRow); currentRow+=1 for key in sorted(self.doses.keys()): currentRow=self.doses[key]._printToExcel(wb,currentRow) currentRow+=1 excelWriteRow("GROUPS",wb,currentRow,bold=True); excelFillCells(wb,currentRow); currentRow+=1 for groupName in sorted(self.groups.keys()): excelWriteRow(groupName,wb, currentRow); currentRow+=1 currentRow+=1 excelWriteRow("SAMPLES",wb,currentRow,bold=True); excelFillCells(wb,currentRow); currentRow+=1 for key in sorted(self.samples.keys()): currentRow=self.samples[key]._printToExcel(wb,currentRow) currentRow+=1 excelWriteRow("MEASUREMENTS",wb,currentRow,bold=True); excelFillCells(wb,currentRow); currentRow+=1 for key in sorted(self.samples.keys()): currentRow=self.samples[key]._printMeasurementsToExcel(wb,currentRow) currentRow+=1 excelAdjustColumnWidths(wb) wb.save(fnXls)
[docs] def getRange(self,varName): vmin = None vmax = None for key, value in self.samples.items(): vmini, vmaxi = value.getRange(varName) if vmin==None or vmini<vmin: vmin = vmini if vmax==None or vmaxi>vmax: vmax = vmaxi return [vmin,vmax]
[docs] def sampleSummary(self): summary=[] for varName, var in self.variables.items(): if var.role == PKPDVariable.ROLE_LABEL: toAdd = varName+": " if var.varType==PKPDVariable.TYPE_NUMERIC: listOfValues=[] else: listOfValues={} for sampleName, sample in self.samples.items(): value = sample.descriptors[varName] if var.varType==PKPDVariable.TYPE_NUMERIC: listOfValues.append(float(value)) else: if value in listOfValues: listOfValues[value]+=1 else: listOfValues[value]=1 if var.varType==PKPDVariable.TYPE_NUMERIC: listOfValuesNp = np.array(listOfValues) toAdd += " mean=%f std=%f 5%%=%f 25%%=%f 50%%=%f 75%%=%f 95%%=%f"%\ (np.mean(listOfValuesNp),np.std(listOfValuesNp),np.percentile(listOfValuesNp,5),\ np.percentile(listOfValuesNp,25),np.percentile(listOfValuesNp,50),\ np.percentile(listOfValuesNp,75),np.percentile(listOfValuesNp,95)) else: for value in listOfValues: toAdd += value + "(" + str(listOfValues[value]) + ") " summary.append(toAdd) return summary
[docs] def getXYMeanValues(self,varNameX,varNameY): XYlist = [] for sampleName, sample in self.samples.items(): xValues, yValues = sample.getXYValues(varNameX,varNameY) XYlist.append((xValues,yValues)) return computeXYmean(XYlist)
[docs] def getVarUnits(self,varName): if varName in self.variables: return self.variables[varName].units.unit else: return PKPDUnit.UNIT_NONE
[docs] def getDoseUnits(self): if len(self.samples)==0: return PKPDUnit.UNIT_NONE listOfSamples = list(self.samples.values()) listOfSamples[0].interpretDose() return listOfSamples[0].getDoseUnits()
[docs] def addParameterToSample(self, sampleName, varName, varUnits, varDescr, varValue, rewrite=False): if not varName in self.variables: varX = PKPDVariable() varX.varName = varName varX.varType = PKPDVariable.TYPE_NUMERIC varX.displayString = "%f" varX.role = PKPDVariable.ROLE_LABEL varX.comment = varDescr varX.units = PKPDUnit() varX.units.unit = varUnits self.variables[varName] = varX else: varPresent = self.variables[varName] if varPresent.role!=PKPDVariable.ROLE_LABEL: raise Exception("Only labels can be reused (%s)"%varName) if rewrite: varPresent.comment = varDescr varPresent.units.unit = varUnits else: if varPresent.comment!=varDescr or varPresent.units.unit!=varUnits: raise Exception("%s is already a variable in the experiment with a different purpose"%varName) if sampleName in self.samples: sample = self.samples[sampleName] if sample.descriptors==None: sample.descriptors={} sample.descriptors[varName] = varValue
[docs] def addLabelToSample(self, sampleName, varName, varDescr, varValue, rewrite=False): if not varName in self.variables: varX = PKPDVariable() varX.varName = varName varX.varType = PKPDVariable.TYPE_TEXT varX.displayString = "%s" varX.role = PKPDVariable.ROLE_LABEL varX.comment = varDescr varX.units = PKPDUnit() varX.units.unit = PKPDUnit.UNIT_NONE self.variables[varName] = varX else: varPresent = self.variables[varName] if varPresent.role!=PKPDVariable.ROLE_LABEL: raise Exception("Only labels can be reused (%s)"%varName) if rewrite: varPresent.comment = varDescr varPresent.units.unit = PKPDUnit.UNIT_NONE else: if varPresent.comment!=varDescr: raise Exception("%s is already a variable in the experiment with a different purpose"%varName) if sampleName in self.samples: sample = self.samples[sampleName] if sample.descriptors==None: sample.descriptors={} sample.descriptors[varName] = varValue
[docs] def getSubGroup(self,condition): if condition=="": return self.samples samplesSubGroup = {} for sampleName, sample in self.samples.items(): if sample.evaluateExpression(condition): samplesSubGroup[sampleName] = sample return samplesSubGroup
[docs] def getSubGroupLabels(self,condition,labelName): subgroupLabels = [] for sampleName, sample in self.samples.items(): if condition!="" and sample.evaluateExpression(condition) or condition=="": subgroupLabels.append(sample.descriptors[labelName]) return subgroupLabels
[docs] def getNonBolusDoses(self): nonBolusList = [] for sampleName, sample in self.samples.items(): sample.interpretDose() if not sample.isDoseABolus(): nonBolusList.append(sampleName) return nonBolusList
[docs] def addSampleToGroup(self,groupName,sample): if not groupName in self.groups.keys(): self.groups[groupName] = PKPDGroup(groupName) self.groups[groupName].sampleList.append(sample.sampleName)
[docs] def addSample(self,sample): self.samples[sample.sampleName] = sample for groupName in sample.groupList: self.addSampleToGroup(groupName,sample)
[docs] def getTimeVariable(self): for varName in self.variables: if self.variables[varName].isTime(): return varName
[docs] def getTimeUnits(self): timeName = self.getTimeVariable() return self.variables[timeName].units
[docs] def getMeasurementVariables(self): retval=[] for varName in self.variables: if self.variables[varName].isMeasurement(): retval.append(varName) return retval
[docs] def getFirstSample(self): if len(self.samples)==0: return None else: return self.samples[next(iter(self.samples))]
[docs] def subset(self,listOfSampleNames): newExperiment = PKPDExperiment() # General for key, value in self.general.items(): if not (key in newExperiment.general): newExperiment.general[key] = copy.copy(value) # Variables for key, value in self.variables.items(): if not (key in newExperiment.variables): newExperiment.variables[key] = copy.copy(value) # Doses viasSubset = [] for key, value in self.doses.items(): dose = copy.copy(value) newExperiment.doses[dose.doseName] = dose viasSubset.append(dose.via.viaName) # Vias for via in viasSubset: newExperiment.vias[via] = self.vias[via] # Samples and groups newExperiment.groups = {} for sampleName in listOfSampleNames: sample = copy.copy(self.samples[sampleName]) for groupName, group in self.groups.items(): if sampleName in group.sampleList: if not groupName in newExperiment.groups.keys(): newExperiment.groups[groupName]=PKPDGroup(groupName) newExperiment.groups[groupName].sampleList.append(sampleName) newExperiment.samples[sample.sampleName] = sample return newExperiment
[docs] def gather(self, otherExperiment): # General for key, value in otherExperiment.general.items(): if not (key in self.general): self.general[key] = copy.copy(value) # Variables for key, value in otherExperiment.variables.items(): if not (key in self.variables): self.variables[key] = copy.copy(value) # Doses for key, value in otherExperiment.doses.items(): if not (key in self.doses): self.doses[key] = copy.copy(value) # Vias for key, value in otherExperiment.vias.items(): if key not in self.vias: self.vias[key] = copy.copy(value) # Samples for key, value in otherExperiment.samples.items(): if key not in self.samples: self.addSample(copy.copy(value))
[docs]class PKPDModelBase(object): def __init__(self): self.fnExperiment = None self.parameters = None self.parameterUnits = None self.xName = None self.yName = None self.experiment = None
[docs] def setExperiment(self, experiment): self.experiment = experiment if experiment!=None: self.fnExperiment = experiment.fnPKPD
[docs] def unsetExperiment(self): self.experiment = None self.fnExperiment = None
[docs] def setXVar(self, x): if not x in self.experiment.variables: raise Exception("Cannot find %s as a variable in the experiment"%x) self.xName = x self.xRange = self.experiment.getRange(x)
[docs] def setYVar(self, y): if type(y)==list: self.yName = [] self.yRange = [] for yi in y: if not yi in self.experiment.variables: raise Exception("Cannot find %s as a variable in the experiment"%yi) self.yName.append(yi) self.yRange.append(self.experiment.getRange(yi)) else: if not y in self.experiment.variables: raise Exception("Cannot find %s as a variable in the experiment"%y) self.yName = y self.yRange = self.experiment.getRange(y)
[docs] def setXYValues(self, x, y): self.x = [] self.y = [] self.ylog = [] for n in range(len(x)): idx = np.logical_and(np.isfinite(x[n]), np.isfinite(y[n])) xidx=x[n][idx] yidx=y[n][idx] self.x.append(xidx) self.y.append(yidx) self.ylog.append(np.array([math.log10(yidxi) if yidxi>0 else float("inf") for yidxi in yidx]))
[docs] def getNumberOfParameters(self): return len(self.getParameterNames())
[docs] def getDescription(self): pass
[docs] def getParameterNames(self): pass
[docs] def getParameterDescriptions(self): return ['Automatically fitted model of the form %s'%self.getModelEquation()]*self.getNumberOfParameters()
[docs] def calculateParameterUnits(self,sample): pass
[docs] def rearrange(self,parameters): return parameters
[docs] def setParameters(self, parameters): self.parameters = self.rearrange(parameters)
[docs]class PKPDModelBase2(PKPDModelBase): def __init__(self): PKPDModelBase.__init__(self) self.bounds = None
[docs] def forwardModel(self, parameters, x=None): pass
[docs] def printSetup(self): print("Model: %s"%self.getModelEquation()) print("Variables: "+str(self.getParameterNames())) print("Bounds: "+str(self.getBounds()))
[docs] def setSample(self, sample): self.sample = sample self.Dunits = sample.getDoseUnits()
[docs] def getEquation(self): pass
[docs] def getModelEquation(self): pass
[docs] def getParameterDescriptions(self): return ['Automatically fitted model of the form %s'%self.getModelEquation()]*self.getNumberOfParameters() pass
[docs] def areParametersSignificant(self, lowerBound, upperBound): """ :param lowerBound and upperBound: a numpy array of parameters :return: a list of string with "True", "False", "NA", "Suspicious" """ pass
[docs] def areParametersValid(self, p): pass
[docs] def setBounds(self, boundsString): self.bounds = None if boundsString!="" and boundsString!=None: tokens=boundsString.split(';') if len(tokens)!=self.getNumberOfParameters(): raise Exception("The number of bound intervals (%d) does not match the number of parameters (%d)"%\ (len(tokens),self.getNumberOfParameters())) self.bounds=[] for token in tokens: values = token.strip().split(',') self.bounds.append((float(values[0][1:]),float(values[1][:-1])))
[docs] def getBounds(self): return self.bounds
[docs] def setConfidenceInterval(self,lowerBound,upperBound): yPredictedBackup = copy.copy(self.yPredicted) self.yPredictedLower=[] self.yPredictedUpper=[] for j in range(len(self.yPredicted)): self.yPredictedLower.append(np.copy(self.yPredicted[j])) self.yPredictedUpper.append(np.copy(self.yPredicted[j])) for i in range(0,int(math.pow(2,self.getNumberOfParameters()))): pattern = ("{0:0%db}"%(self.getNumberOfParameters())).format(i) p = np.where(np.array(list(pattern))=="1",upperBound,lowerBound) p = p.astype(np.float) if not self.areParametersValid(p): continue y = self.forwardModel(p) for j in range(len(y)): yj=y[j] for n in range(len(yj)): if yj[n]<(self.yPredictedLower[j][n]): if yj[n]<0: self.yPredictedLower[j][n]=0 else: (self.yPredictedLower[j][n])=yj[n] if yj[n]>(self.yPredictedUpper[j][n]): (self.yPredictedUpper[j][n])=yj[n] self.yPredicted = yPredictedBackup
[docs] def setConfidenceIntervalNA(self): self.yPredictedUpper = [] self.yPredictedLower = [] for y in self.yPredicted: self.yPredictedUpper.append(["NA"]*y.shape[0]) self.yPredictedLower.append(["NA"]*y.shape[0])
[docs]class PKPDModel(PKPDModelBase2):
[docs] def prepare(self): pass
[docs]class PKPDODEModel(PKPDModelBase2): def __init__(self): PKPDModelBase2.__init__(self) self.t0 = None # (min) self.tF = None # (min) self.deltaT = 0.25 # (min) self.drugSource = None self.drugSourceImpulse = None self.tFImpulse = None self.thImpulse = None # self.show = False
[docs] def setXYValues(self, x, y): if type(x)!=list or (type(x) and type(x[0])!=np.ndarray): x = [np.array(x)]*self.getResponseDimension() if type(y)!=list or (type(y)==list and type(y[0])!=np.ndarray): y = [np.array(y)] self.x=[] self.y=[] self.ylog=[] for n in range(self.getResponseDimension()): idx = np.logical_and(np.isfinite(x[n]), np.isfinite(y[n])) xidx=x[n][idx] yidx=y[n][idx] self.x.append(xidx) self.y.append(yidx) self.ylog.append(np.array([math.log10(yidxi) if yidxi>0 else float("inf") for yidxi in yidx]))
[docs] def F(self, t, y): return 0
[docs] def G(self, t, dD): return 0
[docs] def imposeConstraints(self, yt): if type(yt)==np.ndarray: yt[yt<0]=0 elif type(yt)==np.float64: if yt<0: yt=0
[docs] def H(self, y): pass
[docs] def getResponseDimension(self): return None
[docs] def getStateDimension(self): return None
[docs] def forwardModel(self, parameters, x=None, drugSource=None): self.parameters = parameters if drugSource is None: drugSource=self.drugSource # Simulate the system response t = self.t0 Nsamples = int(math.ceil((self.tF-self.t0)/self.deltaT))+1 if self.getStateDimension()>1: yt = np.zeros(self.getStateDimension(),np.double) Yt = np.zeros((Nsamples,self.getStateDimension()),np.double) else: yt = 0.0 Yt = np.zeros(Nsamples) Xt = np.zeros(Yt.shape[0]) delta_2 = 0.5*self.deltaT K = self.deltaT/3 for i in range(0,Nsamples): t = self.t0 + i*self.deltaT # More accurate than t+= self.deltaT Xt[i]=t # Internal evolution # Runge Kutta's 4th order (http://lpsa.swarthmore.edu/NumInt/NumIntFourth.html) k1 = self.F(t,yt) dD1 = drugSource.getAmountReleasedAt(t,delta_2) dyD1 = self.G(t, dD1) y1 = yt+k1*delta_2+dyD1 # print("t=",t," y0=",yt," k1=",k1," dD1=",dD1," dyD1=",dyD1," y1=",y1) t_delta_2=t+delta_2 k2 = self.F(t_delta_2,y1) y2 = yt+k2*delta_2+dyD1 # print("k2=",k2," y2=",y2) dD = drugSource.getAmountReleasedAt(t,self.deltaT) dyD = self.G(t, dD) k3 = self.F(t_delta_2,y2) y3 = yt+k3*self.deltaT+dyD # print("k3=",k3," dD=",dD," dyD=",dyD," y3=",y3) k4 = self.F(t+self.deltaT,y3) # y4 = yt+k4*self.deltaT+dyD # print("k4=",k4," y4=",y4) # Update state yt += (0.5*(k1+k4)+k2+k3)*K+dyD # print("yt=",yt) # print(" ") # Make sure it makes sense self.imposeConstraints(yt) # Apply measurement transformation self.H(yt) # if self.show: # print("t=%f dD=%s dyD=%s dy=%s"%(t,str(dD),str(dyD),str((0.5*(k1+k4)+k2+k3)*K))) # Keep this result and go to next iteration if self.getStateDimension()>1: Yt[i,:]=yt else: Yt[i]=yt # Get the values at x if x is None: x = self.x self.yPredicted = [] for j in range(0,self.getResponseDimension()): if self.getStateDimension()==1: self.yPredicted.append(np.interp(x[j],Xt,Yt)) else: self.yPredicted.append(np.interp(x[j],Xt,Yt[:,j])) return self.yPredicted
[docs] def getImpulseResponse(self, parameters, tImpulse): if self.tFImpulse is None: self.tFImpulse = self.tF # Create unit dose if self.drugSourceImpulse is None: self.drugSourceImpulse = DrugSource() dose = createDeltaDose(1.0, via=createVia("Intravenous; iv",self.experiment), dunits=self.drugSource.getDoseUnits()) self.drugSourceImpulse.setDoses([dose], 0.0, self.tFImpulse) y=self.forwardModel(parameters, [tImpulse], self.drugSourceImpulse) return y[0]
[docs] def forwardModelByConvolution(self, parameters, x=None): self.parameters = parameters # Simulate the system response Nsamples = int(math.ceil(self.tF/self.deltaT))+1 Xt = np.zeros(Nsamples) # Get the drug input D = copy.copy(Xt) for i in range(0,Nsamples): t = i*self.deltaT # More accurate than t+= self.deltaT Xt[i]=t D[i]=self.drugSource.getAmountReleasedAt(t,self.deltaT) # Get the model impulse response if self.thImpulse is None: Nsamples = int(math.ceil(self.tFImpulse/self.deltaT))+1 self.thImpulse = np.zeros(Nsamples) for i in range(0,Nsamples): self.thImpulse[i] = i*self.deltaT # More accurate than t+= self.deltaT h = self.getImpulseResponse(parameters, self.thImpulse) Yt = np.convolve(D,h,'full')[0:len(D)] # Get the values at x if x is None: x = self.x self.yPredicted = [] for j in range(0,self.getResponseDimension()): self.yPredicted.append(np.interp(x[j],Xt,Yt)) return self.yPredicted
[docs] def printOtherParameterization(self): pass
[docs]class PKPDOptimizer: def __init__(self,model,fitType,goalFunction="RMSE"): self.model = model self.fitType = fitType self.Nevaluations = 0 self.bestRmse=1e38 self.yTarget = [np.array(yi, dtype=np.float32) for yi in model.y] self.yTargetLogs = [np.log10(yi) for yi in self.yTarget] if fitType=="linear": self.takeYLogs = False self.takeRelative = False elif fitType=="log": self.yTarget = self.yTargetLogs self.takeYLogs = True self.takeRelative = False elif fitType=="relative": self.takeYLogs = False self.takeRelative = True self.bounds = model.getBounds() if goalFunction=="RMSE": self.goalFunction = self.goalRMSE else: raise Exception("Unknown goal function") self.verbose = 1
[docs] def inBounds(self,parameters): if self.bounds==None or len(self.bounds)!=len(parameters): return True for n in range(0,len(parameters)): if parameters[n]<self.bounds[n][0] or parameters[n]>self.bounds[n][1]: return False return True
[docs] def hugeError(self): allDiffs = None for yTarget in self.yTarget: diff = 1e38*np.ones(yTarget.shape) if allDiffs is None: allDiffs = diff else: allDiffs = np.concatenate([allDiffs, diff]) return allDiffs
[docs] def getResiduals(self,parameters): if not self.inBounds(parameters): return self.hugeError() yPredicted = self.model.forwardModel(parameters) allDiffs = None for y, yTarget, yTargetLog in izip(yPredicted,self.yTarget,self.yTargetLogs): if self.takeYLogs: diff = np.full(yTarget.shape,np.nan) idx = np.logical_and(np.isfinite(y),y>=1e-20) if np.sum(idx)<0.8*diff.size: return self.hugeError() diff[idx] = yTargetLog[idx]-np.log10(y[idx]) else: diff = yTarget - y if self.takeRelative: diff = diff/yTarget if allDiffs is None: allDiffs = diff else: allDiffs = np.concatenate([allDiffs, diff]) idx = np.logical_not(np.isfinite(allDiffs)) allDiffs[idx]=np.nan e = allDiffs if e.size<parameters.size: return self.hugeError() rmse = math.sqrt(np.nanmean(np.power(e,2))) if rmse<self.bestRmse: print(" Best rmse so far=%f"%rmse) print(" at x=%s"%str(parameters)) print(" e=%s"%str(e)) # print(" yTarget=%s"%str(yTarget)) # print(" yTargetLog=%s"%str(yTargetLog)) # print(" y=%s"%str(y)) sys.stdout.flush() self.bestRmse=rmse elif self.Nevaluations%100==0: print(" Neval=%d RMSE=%f"%(self.Nevaluations,rmse)) sys.stdout.flush() self.Nevaluations+=1 return e
[docs] def goalRMSE(self,parameters): e = self.getResiduals(parameters) rmse = math.sqrt(np.nanmean(np.power(e,2))) return rmse
def _evaluateQuality(self, x, y, yp): # Spiess and Neumeyer, BMC Pharmacology 2010, 10:6 self.e = None yToUse = None for yi, ypi in izip(y,yp): diff = [] for yii, ypii in izip(yi,ypi): if np.isfinite(yii) and np.isfinite(ypii): diff.append(yii-ypii) diff = np.asarray(diff) if self.e is None: self.e = diff yToUse = np.asarray(yi) else: self.e = np.concatenate([self.e, diff]) yToUse = np.concatenate([yToUse, yi]) yToUse[np.logical_not(np.isfinite(yToUse))]=np.nan # Remove infinites self.R2 = (1-np.nanvar(diff)/np.nanvar(yToUse)) n=len(diff) # Number of samples p=self.model.getNumberOfParameters() if n-p>0: self.R2adj = 1-self.R2*(n-1)/(n-p)*(1-self.R2) else: self.R2adj = -1 d2=np.nansum(np.multiply(diff,diff)) if d2>0: logL = 0.5*(-n*(math.log(2*math.pi)+1-math.log(n)+math.log(d2))) else: logL=np.nan self.AIC = 2*p-2*logL if n-p-1>0: self.AICc = self.AIC+2*p*(p+1)/(n-p-1) else: self.AICc = -1 self.BIC = p*math.log(n)-2*logL def _printFitting(self, x, y, yp): self._evaluateQuality(x, y, yp) for j in range(len(x)): if len(x)>1: print("Series %d ---------"%j) xj=x[j] yj=y[j] ypj=yp[j] for n in range(0,xj.shape[0]): print("%f %f %f %f"%(xj[n],yj[n],ypj[n],yj[n]-ypj[n])) print("------------------------") print("Mean error = %f"%np.mean(self.e)) print("Std error = %f"%np.std(self.e)) print("R2 = %f"%self.R2) print("R2adj = %f"%self.R2adj) print("AIC = %f"%self.AIC) print("AICc(Recommended) = %f"%self.AICc) print("BIC = %f"%self.BIC) print("------------------------")
[docs] def evaluateQuality(self): yPredicted=self.model.forwardModel(self.model.parameters) x = copy.copy(self.model.x) y = copy.copy(self.model.y) if self.fitType=="linear" or self.fitType=="relative": yp = yPredicted elif self.fitType=="log": if type(self.model.y[0])!=list and type(self.model.y[0])!=np.ndarray: if type(yPredicted[0])==list or type(yPredicted[0])==np.ndarray: yPredicted=yPredicted[0] ylog = smartLog(self.model.y) yplog = smartLog(yPredicted) idx = np.array(np.where(np.logical_and(np.isfinite(ylog),np.isfinite(yplog)))) if type(idx[0])==np.ndarray: idx=idx[0] xToUse = self.model.x if type(self.model.x)==list: xToUse=np.asarray(xToUse) if type(ylog)==list: ylog=np.asarray(ylog) if type(yplog)==list: yplog=np.asarray(yplog) x= xToUse[idx].ravel() y= ylog[idx].ravel() yp=yplog[idx].ravel() else: y = [smartLog(yi) for yi in self.model.y] yp = [smartLog(yi) for yi in yPredicted] self._evaluateQuality(x,y,yp)
[docs] def printFitting(self): yPredicted = self.model.forwardModel(self.model.parameters) print("==========================================") print("X Y Ypredicted Error=Y-Ypredicted ") print("==========================================") self._printFitting(self.model.x, self.model.y, yPredicted) print("==================================================================") print("X log10(Y) log10(Ypredicted) Error=log10(Y)-log10(Ypredicted)") print("==================================================================") if type(self.model.y[0])!=list and type(self.model.y[0])!=np.ndarray: if type(yPredicted[0])==list or type(yPredicted[0])==np.ndarray: yPredicted=yPredicted[0] ylog = smartLog(self.model.y) yplog = smartLog(yPredicted) idx = np.array(np.where(np.logical_and(np.isfinite(ylog),np.isfinite(yplog)))) if type(idx[0])==np.ndarray: idx=idx[0] xToUse = self.model.x if type(self.model.x)==list: xToUse=np.asarray(xToUse) if type(ylog)==list: ylog=np.asarray(ylog) if type(yplog)==list: yplog=np.asarray(yplog) self._printFitting(xToUse[idx].ravel(), ylog[idx].ravel(), yplog[idx].ravel()) else: logY = [smartLog(y) for y in self.model.y] logYp = [smartLog(y) for y in yPredicted] self._printFitting(self.model.x, logY, logYp)
[docs]class PKPDDEOptimizer(PKPDOptimizer):
[docs] def optimize(self): from scipy.optimize import differential_evolution if self.verbose>0: print("Optimizing with Differential Evolution (DE), a global optimizer") self.optimum = differential_evolution(self.goalFunction, self.model.getBounds(), maxiter=30) if self.verbose>0: print("Best DE function value: "+str(self.optimum.fun)) print("Best DE parameters: "+str(self.optimum.x)) self.model.setParameters(self.optimum.x) if self.verbose>0: print(self.model.getEquation()) self.printFitting() print(" ") return self.optimum
[docs]class PKPDLSOptimizer(PKPDOptimizer):
[docs] def optimize(self, ftol=1.49012e-8, xtol=1.49012e-8): # Same values as in minpack.py from scipy.optimize import leastsq if self.verbose>0: print("Optimizing with Least Squares (LS), a local optimizer") print("Initial parameters: "+str(self.model.parameters)) self.optimum, J, self.info, mesg, _ = leastsq(self.getResiduals, self.model.parameters, full_output=True, ftol=ftol, xtol=xtol) # J is the jacobian C=MSE*inv(J'*J) if self.verbose>0: print("Best LS function value: "+str(self.goalFunction(self.optimum))) print("Best LS parameters: "+str(self.optimum)) print("Covariance matrix:") if J is not None: e = self.getResiduals(self.optimum) JtJ=np.matmul(np.transpose(J),J) if np.linalg.det(JtJ)>1e-6: self.cov_x = np.var(e)*np.linalg.inv(JtJ) print(np.array_str(self.cov_x,max_line_width=120)) else: self.cov_x = None print("Singular Jacobian, we cannot estimate the covariance") else: self.cov_x = None print("Singular Jacobian, we cannot estimate the covariance") self.model.setParameters(self.optimum) if self.verbose>0: print(self.model.getEquation()) self.printFitting() print(" ") return self.optimum
[docs] def setConfidenceInterval(self,confidenceInterval): if self.cov_x is not None: from scipy.stats import norm nstd = norm.ppf(1-(1-confidenceInterval/100)/2) perr = np.sqrt(np.clip(np.diag(self.cov_x),0.0,None)) self.lowerBound = self.optimum-nstd*perr self.upperBound = self.optimum+nstd*perr self.significance = self.model.areParametersSignificant(self.lowerBound,self.upperBound) parameterNames = self.model.getParameterNames() print("Confidence intervals %f%% --------------------------"%confidenceInterval) print("ParameterName ParameterValue ParameterConfidenceInterval IsStatisticallySignificant") for n in range(0,len(self.optimum)): print("%s %f [%f,%f] %s"%(parameterNames[n],self.optimum[n],self.lowerBound[n],self.upperBound[n],self.significance[n])) self.model.setConfidenceInterval(self.lowerBound,self.upperBound) else: self.lowerBound=["NA"]*len(self.optimum) self.upperBound=["NA"]*len(self.optimum) self.significance=["NA"]*len(self.optimum) self.model.setConfidenceIntervalNA()
[docs]class PKPDSampleFit: READING_SAMPLEFITTINGS_NAME = 0 READING_SAMPLEFITTINGS_MODELEQ = 1 READING_SAMPLEFITTINGS_R2 = 2 READING_SAMPLEFITTINGS_R2ADJ = 3 READING_SAMPLEFITTINGS_AIC = 4 READING_SAMPLEFITTINGS_AICc = 5 READING_SAMPLEFITTINGS_BIC = 6 READING_SAMPLEFITTINGS_PARAMETER_BOUNDS = 7 READING_SAMPLEFITTINGS_SAMPLE_VALUES = 8 def __init__(self): self.sampleName = "" # Lists with the sample fits values self.x = None self.y = None self.yp = None self.yl = None self.yu = None self.modelEquation = "" self.R2 = 0 self.R2adj = 0 self.AIC = 0 self.AICc = 0 self.BIC = 0 self.parameters = None self.lowerBound = None self.upperBound = None self.significance = None self.multiOutputSeries = False
[docs] def printForPopulation(self,fh,observations): outputStr = "" for parameter in self.parameters: outputStr += "%f "%parameter observations = np.vstack([observations, self.parameters]) outputStr += " # %f %f %f %f %f"%(self.R2,self.R2adj,self.AIC,self.AICc,self.BIC) fh.write(outputStr+"\n") return observations
[docs] def printForPopulationExcel(self,wb, row, observations): toPrint = [] for parameter in self.parameters: toPrint.append("%f "%parameter) observations = np.vstack([observations, self.parameters]) excelWriteRow(toPrint+[self.R2,self.R2adj,self.AIC,self.AICc,self.BIC], wb, row) return observations, row+1
[docs] def getBasicInfo(self): """ Return a string with some basic information of the fitting. """ info = "" info += "Sample name: %s\n" % self.sampleName info += "Model: %s\n" % self.modelEquation info += "R2: %f\n" % self.R2 info += "R2adj: %f\n" % self.R2adj info += "AIC: %f\n" % self.AIC info += "AICc(Recommended): %f\n" % self.AICc info += "BIC: %f\n" % self.BIC return info
def _printToStream(self, fh): if self.significance == None: self.significance = ["Undetermined"]*len(self.parameters) fh.write(self.getBasicInfo()) fh.write("Parameter lowerBound upperBound IsStatisticallySignificant -------\n") for parameter, lower, upper, significance in izip(self.parameters,self.lowerBound,self.upperBound,\ self.significance): fh.write("%f [%s,%s] %s\n"%(parameter,str(lower),str(upper),significance)) fh.write("X Y Ypredicted [Ylower,Yupper] -------\n") for j in range(len(self.x)): fh.write("Series %d -----\n"%j) xj = self.x[j] yj = self.y[j] ypj = self.yp[j] ylj = self.yl[j] yuj = self.yu[j] for x,y,yp,yl,yu in izip(xj,yj,ypj,ylj,yuj): fh.write("%f %s %s [%s,%s]\n"%(x,str(y),str(yp),str(yl),str(yu))) fh.write("\n") def _printToExcel(self, wb, row, parameterNames): if self.significance == None: self.significance = ["Undetermined"]*len(self.parameters) basicInfo = self.getBasicInfo() i=0 for line in basicInfo.split('\n'): if i == 0: excelWriteRow(line.split(':'), wb, row, bold=True) excelFillCells(wb, row) else: excelWriteRow(line.split(':'), wb, row) row += 1 i+=1 excelWriteRow(["Parameter","Value","lowerBound","upperBound","IsStatisticallySignificant"],wb,row); row+=1 for parameterName, parameter, lower, upper, significance in izip(parameterNames,self.parameters, self.lowerBound,self.upperBound,self.significance): excelWriteRow([parameterName, parameter,str(lower),str(upper),significance], wb, row); row+=1 row+=1 excelWriteRow(["X","Y","Ypredicted", "[Ylower","Yupper]"],wb,row); row+=1 for j in range(len(self.x)): excelWriteRow("Series %d"%j, wb, row); row+=1 xj = self.x[j] yj = self.y[j] ypj = self.yp[j] ylj = self.yl[j] yuj = self.yu[j] for x,y,yp,yl,yu in izip(xj,yj,ypj,ylj,yuj): try: ypToPrint=float(yp) except: ypToPrint=str(yp) try: ylToPrint=float(yl) except: ylToPrint=str(yl) try: yuToPrint=float(yu) except: yuToPrint=str(yu) excelWriteRow([float(x), float(y), ypToPrint, ylToPrint, yuToPrint], wb, row); row+=1 return row+1
[docs] def restartReadingState(self): self.state = PKPDSampleFit.READING_SAMPLEFITTINGS_NAME
[docs] def readFromLine(self, line): if self.state==PKPDSampleFit.READING_SAMPLEFITTINGS_NAME: tokens = line.split(':') self.sampleName = tokens[1].strip() self.state = PKPDSampleFit.READING_SAMPLEFITTINGS_MODELEQ elif self.state==PKPDSampleFit.READING_SAMPLEFITTINGS_MODELEQ: tokens = line.split(':') self.modelEquation = tokens[1].strip() self.state = PKPDSampleFit.READING_SAMPLEFITTINGS_R2 elif self.state==PKPDSampleFit.READING_SAMPLEFITTINGS_R2: tokens = line.split(':') self.R2 = float(tokens[1]) self.state = PKPDSampleFit.READING_SAMPLEFITTINGS_R2ADJ elif self.state==PKPDSampleFit.READING_SAMPLEFITTINGS_R2ADJ: tokens = line.split(':') self.R2adj = float(tokens[1]) self.state = PKPDSampleFit.READING_SAMPLEFITTINGS_AIC elif self.state==PKPDSampleFit.READING_SAMPLEFITTINGS_AIC: tokens = line.split(':') self.AIC = float(tokens[1]) self.state = PKPDSampleFit.READING_SAMPLEFITTINGS_AICc elif self.state==PKPDSampleFit.READING_SAMPLEFITTINGS_AICc: tokens = line.split(':') self.AICc = float(tokens[1]) self.state = PKPDSampleFit.READING_SAMPLEFITTINGS_BIC elif self.state==PKPDSampleFit.READING_SAMPLEFITTINGS_BIC: tokens = line.split(':') self.BIC = float(tokens[1]) self.state = PKPDSampleFit.READING_SAMPLEFITTINGS_PARAMETER_BOUNDS elif self.state==PKPDSampleFit.READING_SAMPLEFITTINGS_PARAMETER_BOUNDS: if line.startswith("Parameter lowerBound upperBound"): self.parameters=[] self.lowerBound=[] self.upperBound=[] self.significance=[] elif line.startswith("X Y Ypredicted"): self.state=PKPDSampleFit.READING_SAMPLEFITTINGS_SAMPLE_VALUES self.x=[] self.y=[] self.yp=[] self.yl=[] self.yu=[] else: tokens=line.split() self.parameters.append(float(tokens[0])) self.significance.append(tokens[2]) tokens=(tokens[1])[1:-1].split(',') self.lowerBound.append(tokens[0]) self.upperBound.append(tokens[1]) elif self.state==PKPDSampleFit.READING_SAMPLEFITTINGS_SAMPLE_VALUES: tokens=line.split() if tokens[0].strip()=="Series": self.x.append([]) self.y.append([]) self.yp.append([]) self.yl.append([]) self.yu.append([]) self.multiOutputSeries = True else: if self.multiOutputSeries: self.x[-1].append(float(tokens[0])) self.y[-1].append(float(tokens[1])) self.yp[-1].append(float(tokens[2])) tokens=(tokens[3])[1:-1].split(',') self.yl[-1].append(tokens[0]) self.yu[-1].append(tokens[1]) else: self.x.append(float(tokens[0])) self.y.append(float(tokens[1])) self.yp.append(float(tokens[2])) tokens=(tokens[3])[1:-1].split(',') self.yl.append(tokens[0]) self.yu.append(tokens[1])
[docs] def copyFromOptimizer(self,optimizer): self.R2 = optimizer.R2 self.R2adj = optimizer.R2adj self.AIC = optimizer.AIC self.AICc = optimizer.AICc self.BIC = optimizer.BIC self.significance = optimizer.significance self.lowerBound = optimizer.lowerBound self.upperBound = optimizer.upperBound
[docs]class PKPDSampleFitBootstrap: READING_SAMPLEFITTINGS_NAME = 0 READING_SAMPLEFITTINGS_XB = 1 READING_SAMPLEFITTINGS_YB = 2 READING_SAMPLEFITTINGS_PARAMETERS = 3 def __init__(self): self.sampleName = "" self.R2 = [] self.R2adj = [] self.AIC = [] self.AICc = [] self.BIC = [] self.parameters = None self.xB = [] self.yB = [] def _printSample(self,fh,n, n0=0): outputStr = "%d: "%(n+n0) for parameter in self.parameters[n,:]: outputStr += "%f "%parameter outputStr += " # %f %f %f %f %f"%(self.R2[n],self.R2adj[n],self.AIC[n],self.AICc[n],self.BIC[n]) fh.write(outputStr+"\n") def _printSampleExcel(self,wb, row, n, n0=0): toPrint = ["Sample %d"%(n+n0)] for parameter in self.parameters[n,:]: toPrint.append(parameter) excelWriteRow(toPrint+[self.R2[n],self.R2adj[n],self.AIC[n],self.AICc[n],self.BIC[n]],wb,row) return row+1
[docs] def printForPopulation(self,fh,observations): for n in range(0,self.parameters.shape[0]): self._printSample(fh,n,observations.shape[0]) observations = np.vstack([observations, self.parameters]) return observations
[docs] def printForPopulationExcel(self,wb,row,observations): for n in range(0,self.parameters.shape[0]): row=self._printSampleExcel(wb,row,n,observations.shape[0]) observations = np.vstack([observations, self.parameters]) return observations,row+1
def _printToStream(self,fh): fh.write("Sample name: %s\n"%self.sampleName) for n in range(0,self.parameters.shape[0]): fh.write("xB: %s\n"%self.xB[n]) fh.write("yB: %s\n"%self.yB[n]) outputStr = "" for parameter in self.parameters[n,:]: outputStr += "%f "%parameter outputStr += " # %f %f %f %f %f"%(self.R2[n],self.R2adj[n],self.AIC[n],self.AICc[n],self.BIC[n]) fh.write(outputStr+"\n") fh.write("\n") def _printToExcel(self,wb,row,parameterNames): excelWriteRow(["Sample name:",self.sampleName],wb,row,bold=True); row+=1 for n in range(0,self.parameters.shape[0]): excelWriteRow("Sample %d"%n, wb, row); row+=1 excelWriteRow(["xB:"]+self.xB[n][1:-1].split(), wb, row); row+=1 excelWriteRow(["yB:"]+self.yB[n][1:-1].split(), wb, row); row+=1 return row+1
[docs] def restartReadingState(self): self.state = PKPDSampleFitBootstrap.READING_SAMPLEFITTINGS_NAME
[docs] def readFromLine(self, line): if self.state==PKPDSampleFitBootstrap.READING_SAMPLEFITTINGS_NAME: tokens = line.split(':') self.sampleName = tokens[1].strip() self.strRead = "" self.state = PKPDSampleFitBootstrap.READING_SAMPLEFITTINGS_XB elif self.state==PKPDSampleFitBootstrap.READING_SAMPLEFITTINGS_XB: if ":" in line: tokens = line.split(':') self.strRead += tokens[1].strip() else: self.strRead += line if "]" in self.strRead: self.xB.append(self.strRead.strip()) self.strRead="" self.state = PKPDSampleFitBootstrap.READING_SAMPLEFITTINGS_YB elif self.state==PKPDSampleFitBootstrap.READING_SAMPLEFITTINGS_YB: if ":" in line: tokens = line.split(':') self.strRead += tokens[1].strip() else: self.strRead += line if "]" in self.strRead: self.yB.append(self.strRead.strip()) self.strRead="" self.state = PKPDSampleFitBootstrap.READING_SAMPLEFITTINGS_PARAMETERS elif self.state==PKPDSampleFitBootstrap.READING_SAMPLEFITTINGS_PARAMETERS: tokens = line.split('#') tokensParameters = tokens[0].strip().split(' ') tokensQuality = tokens[1].strip().split(' ') if self.parameters is None: self.parameters = np.empty((0,len(tokensParameters)),np.double) self.parameters = np.vstack([self.parameters, [float(prm) for prm in tokensParameters]]) self.R2.append(float(tokensQuality[0])) self.R2adj.append(float(tokensQuality[1])) self.AIC.append(float(tokensQuality[2])) self.AICc.append(float(tokensQuality[3])) self.BIC.append(float(tokensQuality[4])) self.state = PKPDSampleFitBootstrap.READING_SAMPLEFITTINGS_XB
[docs] def copyFromOptimizer(self,optimizer): self.R2.append(optimizer.R2) self.R2adj.append(optimizer.R2adj) self.AIC.append(optimizer.AIC) self.AICc.append(optimizer.AICc) self.BIC.append(optimizer.BIC)
[docs]class PKPDFitting(EMObject): READING_FITTING_EXPERIMENT = 1 READING_FITTING_PREDICTOR = 2 READING_FITTING_PREDICTED = 3 READING_FITTING_PREDICTED_LIST = 4 READING_FITTING_MODEL = 5 READING_POPULATION_HEADER = 6 READING_POPULATION = 7 READING_SAMPLEFITTINGS_BEGIN = 8 READING_SAMPLEFITTINGS_CONTINUE = 9 def __init__(self, cls="", **args): EMObject.__init__(self, **args) self.fnFitting = String() self.fnExperiment = String() self.predictor = None self.predicted = None self.modelDescription = "" self.modelParameters = [] self.modelParameterUnits = [] self.sampleFits = [] self.summaryLines = [] if cls=="": self.sampleFittingClass = "PKPDSampleFit" else: self.sampleFittingClass = cls
[docs] def isPopulation(self): if self.fnFitting.get() is None: return False return self.fnFitting.get().endswith("bootstrapPopulation.pkpd")
[docs] def write(self, fnFitting, writeToExcel=True): fh=open(fnFitting,'w') self._printToStream(fh) fh.close() self.fnFitting.set(fnFitting) writeMD5(fnFitting) if writeToExcel: self.writeToExcel(os.path.splitext(fnFitting)[0] + ".xlsx")
[docs] def getAllParameters(self): allParameters = np.empty((0,len(self.modelParameters)),np.double) for sampleFitting in self.sampleFits: allParameters = np.vstack([allParameters, sampleFitting.parameters]) return allParameters
[docs] def getStats(self, observations=None): if observations is None: observations = self.getAllParameters() mu=np.mean(observations,axis=0) C=np.cov(np.transpose(observations)) sigma = np.sqrt(np.diag(C)) R=np.corrcoef(np.transpose(observations)) percentiles = np.percentile(observations,[0, 2.5, 25, 50, 75, 97.5, 100],axis=0) return mu, sigma, R, percentiles
def _printToStream(self,fh): fh.write("[FITTING] ===========================\n") fh.write("Experiment: %s\n"%self.fnExperiment.get()) fh.write("Predictor (X): ") self.predictor._printToStream(fh) fh.write("Predicted (Y): ") if type(self.predicted)==list: fh.write("Predicted list=%d\n"%len(self.predicted)) for y in self.predicted: y._printToStream(fh) else: self.predicted._printToStream(fh) fh.write("Model: %s\n"%self.modelDescription) fh.write("\n") fh.write("[POPULATION PARAMETERS] =============\n") auxUnit = PKPDUnit() for paramName, paramUnits in izip(self.modelParameters, self.modelParameterUnits): auxUnit.unit = paramUnits fh.write("%s [%s] "%(paramName,auxUnit._toString())) fh.write(" # R2 R2adj AIC AICc BIC\n") observations = np.empty((0,len(self.modelParameters)),np.double) for sampleFitting in self.sampleFits: observations = sampleFitting.printForPopulation(fh,observations) fh.write("\n") mu=np.mean(observations,axis=0) if observations.shape[0]>2: C=np.cov(np.transpose(observations)) if not C.shape: R=np.asarray([1.0]) sigma = C else: R = np.corrcoef(np.transpose(observations)) sigma = np.sqrt(np.diag(C)) fh.write("Mean parameters = %s\n"%np.array_str(mu)) fh.write("Median parameters = %s\n"%np.array_str(np.median(observations,axis=0))) limits = np.percentile(observations,[2.5,97.5],axis=0) fh.write("Lower bound (2.5%%) = %s\n"%np.array_str(limits[0])) fh.write("Upper bound (97.5%%) = %s\n"%np.array_str(limits[1])) fh.write("Covariance matrix =\n%s\n"%np.array_str(C,max_line_width=120)) fh.write("Correlation matrix =\n%s\n"%np.array_str(R,max_line_width=120)) fh.write("\n") fh.write("[SAMPLE FITTINGS] ===================\n") for sampleFitting in self.sampleFits: sampleFitting._printToStream(fh)
[docs] def writeToExcel(self,fnXls): wb = openpyxl.Workbook() wb.active.title = "Experiment" currentRow = 1 excelWriteRow("FITTING",wb,currentRow,bold=True); excelFillCells(wb,currentRow); currentRow+=1 excelWriteRow(["Experiment:",self.fnExperiment.get()], wb, currentRow); currentRow+=1 excelWriteRow("Predictor (X): ", wb, currentRow); currentRow=self.predictor._printToExcel(wb,currentRow,2) toPrint=["Predicted (Y): "] if type(self.predicted)==list: excelWriteRow(toPrint+["Predicted list=%d"%len(self.predicted)], wb, currentRow); currentRow+=1 for y in self.predicted: currentRow=y._printToExcel(wb,currentRow) else: excelWriteRow(toPrint, wb, currentRow) currentRow=self.predicted._printToExcel(wb,currentRow,2); currentRow+=1 excelWriteRow(["Model:",self.modelDescription], wb, currentRow); currentRow+=1 currentRow+=1 excelWriteRow("POPULATION PARAMETERS",wb,currentRow,bold=True); excelFillCells(wb,currentRow); currentRow+=1 auxUnit = PKPDUnit() toPrint=[] for paramName, paramUnits in izip(self.modelParameters, self.modelParameterUnits): auxUnit.unit = paramUnits toPrint.append("%s [%s] "%(paramName,auxUnit._toString())) excelWriteRow(toPrint+["R2","R2adj","AIC","AICc","BIC"],wb,currentRow); currentRow+=1 observations = np.empty((0,len(self.modelParameters)),np.double) for sampleFitting in self.sampleFits: observations, currentRow = sampleFitting.printForPopulationExcel(wb,currentRow,observations) currentRow+=1 mu=np.mean(observations,axis=0) if observations.shape[0]>2: excelWriteRow(["Mean parameters =",np.array_str(mu)], wb, currentRow); currentRow+=1 excelWriteRow(["Median parameters =",np.array_str(np.median(observations,axis=0))], wb, currentRow); currentRow+=1 limits = np.percentile(observations,[2.5,97.5],axis=0) excelWriteRow(["Lower bound (2.5%%) =",np.array_str(limits[0])], wb, currentRow); currentRow+=1 excelWriteRow(["Upper bound (97.5%%) =",np.array_str(limits[1])], wb, currentRow); currentRow+=1 currentRow+=1 excelWriteRow("SAMPLE FITTINGS",wb,currentRow,bold=True); excelFillCells(wb,currentRow); currentRow+=1 for sampleFitting in self.sampleFits: currentRow=sampleFitting._printToExcel(wb,currentRow,self.modelParameters) excelAdjustColumnWidths(wb) wb.save(fnXls)
[docs] def load(self, fnFitting=None): fnFitting = str(fnFitting or self.fnFitting) fh = open(fnFitting) if not fh: raise Exception("Cannot open %s" % fnFitting) if not verifyMD5(fnFitting): raise Exception("The file %s has been modified since its creation" % fnFitting) self.fnFitting.set(fnFitting) auxUnit = PKPDUnit() for line in fh.readlines(): line=line.strip() if line=="": if state==PKPDFitting.READING_SAMPLEFITTINGS_CONTINUE: state=PKPDFitting.READING_SAMPLEFITTINGS_BEGIN continue if line.startswith('[') and line.endswith('='): section = line.split('=')[0].strip().lower() if section=="[fitting]": state=PKPDFitting.READING_FITTING_EXPERIMENT self.summaryLines.append(line) elif section=="[population parameters]": state=PKPDFitting.READING_POPULATION_HEADER self.summaryLines.append(line) elif section=="[sample fittings]": state=PKPDFitting.READING_SAMPLEFITTINGS_BEGIN else: print("Skipping: ",line) elif state==PKPDFitting.READING_FITTING_EXPERIMENT: tokens = line.split(':') self.fnExperiment.set(tokens[1].strip()) state = PKPDFitting.READING_FITTING_PREDICTOR self.summaryLines.append(line) elif state==PKPDFitting.READING_FITTING_PREDICTOR: tokens = line.split(':') self.predictor = PKPDVariable() self.predictor.parseTokens(tokens[1].split(';')) state = PKPDFitting.READING_FITTING_PREDICTED self.summaryLines.append(line) elif state==PKPDFitting.READING_FITTING_PREDICTED: tokens = line.split(':') if (tokens[1].strip().startswith("Predicted list")): state = PKPDFitting.READING_FITTING_PREDICTED_LIST self.remainingPredicted = int(tokens[1].split('=')[1]) self.predicted = [] self.summaryLines.append(line) else: self.predicted = PKPDVariable() self.predicted.parseTokens(tokens[1].split(';')) state = PKPDFitting.READING_FITTING_MODEL self.summaryLines.append(line) elif state==PKPDFitting.READING_FITTING_PREDICTED_LIST: self.summaryLines.append(line) newVar = PKPDVariable() newVar.parseTokens(line.split(';')) self.predicted.append(newVar) self.remainingPredicted -= 1 if self.remainingPredicted == 0: state = PKPDFitting.READING_FITTING_MODEL elif state==PKPDFitting.READING_FITTING_MODEL: tokens = line.split(':') self.modelDescription = tokens[1].strip() state = PKPDFitting.READING_POPULATION self.summaryLines.append(line) self.summaryLines.append("\n") elif state==PKPDFitting.READING_POPULATION_HEADER: lineParts = line.split('#') tokens = lineParts[0].strip().split(' ') for i in range(0,len(tokens),2): self.modelParameters.append(tokens[i]) self.modelParameterUnits.append(auxUnit._fromString(tokens[i+1][1:-1])) state = PKPDFitting.READING_POPULATION elif state==PKPDFitting.READING_POPULATION: self.summaryLines.append(line) elif state==PKPDFitting.READING_SAMPLEFITTINGS_BEGIN: newSampleFit = eval("%s()"%self.sampleFittingClass) self.sampleFits.append(newSampleFit) self.sampleFits[-1].restartReadingState() self.sampleFits[-1].readFromLine(line) state = PKPDFitting.READING_SAMPLEFITTINGS_CONTINUE elif state==PKPDFitting.READING_SAMPLEFITTINGS_CONTINUE: self.sampleFits[-1].readFromLine(line) fh.close()
[docs] def getSampleFit(self, sampleName): for sampleFit in self.sampleFits: if sampleFit.sampleName == sampleName: return sampleFit return None
[docs] def loadExperiment(self): experiment = PKPDExperiment() experiment.load(self.fnExperiment.get()) return experiment
[docs] def getTimeUnits(self): return self.loadExperiment().getTimeUnits()
[docs] def gather(self, otherFitting, experiment=None): if not self.predictor: self.predictor = copy.copy(otherFitting.predictor) if not self.predicted: self.predicted = copy.copy(otherFitting.predicted) if self.modelDescription == "": self.modelDescription = otherFitting.modelDescription if len(self.modelParameters)==0: self.modelParameters = copy.copy(otherFitting.modelParameters) if len(self.modelParameterUnits)==0: self.modelParameterUnits = copy.copy(otherFitting.modelParameterUnits) for fit in otherFitting.sampleFits: add = True if experiment is not None: if not fit.sampleName in experiment.samples: add=False if add: self.sampleFits.append(copy.copy(fit))
[docs]class PKPDSampleSignalAnalysis: def __init__(self): self.sampleName = "" self.x = None self.y = None self.analysisVariables = [] self.parameters = None self.lowerBound = None self.upperBound = None self.significance = None def _printToStream(self,fh): if self.significance == None: self.significance = ["Undetermined"]*len(self.parameters) fh.write("Sample name: %s\n"%self.sampleName) for varName, parameter in izip(self.analysisVariables,self.parameters): fh.write("%s = %f\n"%(varName,parameter)) fh.write("\n")
[docs]class PKPDSignalAnalysis(EMObject): READING_EXPERIMENT = 1 READING_PREDICTOR = 2 READING_PREDICTED = 3 READING_MODEL = 4 READING_ANALYSIS_VARIABLES = 5 READING_POPULATION_HEADER = 6 READING_POPULATION = 7 READING_SAMPLEANALYSIS_NAME = 8 READING_SAMPLEANALYSIS_PARAMETERS = 9 def __init__(self, **args): EMObject.__init__(self, **args) self.fnAnalysis = String() self.fnExperiment = String() self.predictor = None self.predicted = None self.analysisDescription = "" self.analysisParameters = [] self.sampleAnalyses = [] self.summaryLines = []
[docs] def write(self, fnAnalysis): fh=open(fnAnalysis,'w') self._printToStream(fh) fh.close() self.fnAnalysis.set(fnAnalysis) writeMD5(fnAnalysis)
def _printToStream(self,fh): fh.write("[SIGNAL ANALYSIS] ===========================\n") fh.write("Experiment: %s\n"%self.fnExperiment.get()) fh.write("Predictor (X): ") self.predictor._printToStream(fh) fh.write("Predicted (Y): ") self.predicted._printToStream(fh) fh.write("Analysis: %s\n"%self.analysisDescription) fh.write("\n") fh.write("[POPULATION PARAMETERS] =============\n") fh.write(' '.join(self.analysisParameters)+"\n") i=0 for sampleAnalysis in self.sampleAnalyses: outputStr = "" j=0 for parameter in sampleAnalysis.parameters: outputStr += "%f "%parameter if i==0 and j==0: observations = np.zeros([len(self.sampleAnalyses),len(sampleAnalysis.parameters)]) observations[i,j]=parameter j+=1 fh.write(outputStr+"\n") i+=1 fh.write("\n") mu=np.mean(observations,axis=0) C=np.cov(np.transpose(observations)) sigma = np.sqrt(np.diag(C)) R=np.corrcoef(np.transpose(observations)) fh.write("Mean parameters = %s\n"%np.array_str(mu)) fh.write("Median parameters = %s\n"%np.array_str(np.median(observations,axis=0))) fh.write("Lower bound (95%%, independent Gaussians) = %s\n"%np.array_str(mu-1.96*sigma)) fh.write("Upper bound (95%%, independent Gaussians) = %s\n"%np.array_str(mu+1.96*sigma)) fh.write("Covariance matrix =\n%s\n"%np.array_str(C,max_line_width=120)) fh.write("Correlation matrix =\n%s\n"%np.array_str(R,max_line_width=120)) fh.write("\n") fh.write("[SAMPLE ANALYSES] ===================\n") for sampleAnalysis in self.sampleAnalyses: sampleAnalysis._printToStream(fh)
[docs] def load(self,fnAnalysis): fh=open(fnAnalysis) if not fh: raise Exception("Cannot open %s"%fnAnalysis) if not verifyMD5(fnAnalysis): raise Exception("The file %s has been modified since its creation"%fnAnalysis) self.fnAnalysis.set(fnAnalysis) for line in fh.readlines(): line=line.strip() if line=="": if state==PKPDSignalAnalysis.READING_SAMPLEANALYSIS_PARAMETERS: state=PKPDSignalAnalysis.READING_SAMPLEANALYSIS_NAME continue if line.startswith('[') and line.endswith('='): section = line.split('=')[0].strip().lower() if section=="[signal analysis]": state=PKPDSignalAnalysis.READING_EXPERIMENT self.summaryLines.append(line) elif section=="[population parameters]": state=PKPDSignalAnalysis.READING_POPULATION_HEADER self.summaryLines.append(line) elif section=="[sample analyses]": state=PKPDSignalAnalysis.READING_SAMPLEANALYSIS_NAME else: print("Skipping: ",line) elif state==PKPDSignalAnalysis.READING_EXPERIMENT: tokens = line.split(':') self.fnExperiment.set(tokens[1].strip()) state = PKPDSignalAnalysis.READING_PREDICTOR self.summaryLines.append(line) elif state==PKPDSignalAnalysis.READING_PREDICTOR: tokens = line.split(':') self.predictor = PKPDVariable() self.predictor.parseTokens(tokens[1].split(';')) state = PKPDSignalAnalysis.READING_PREDICTED self.summaryLines.append(line) elif state==PKPDSignalAnalysis.READING_PREDICTED: tokens = line.split(':') self.predicted = PKPDVariable() self.predicted.parseTokens(tokens[1].split(';')) state = PKPDSignalAnalysis.READING_MODEL self.summaryLines.append(line) elif state==PKPDSignalAnalysis.READING_MODEL: tokens = line.split(':') self.analysisDescription = tokens[1].strip() state = PKPDSignalAnalysis.READING_POPULATION self.summaryLines.append(line) self.summaryLines.append("\n") elif state==PKPDSignalAnalysis.READING_POPULATION_HEADER: self.analysisParameters=line.split(' ') state = PKPDSignalAnalysis.READING_POPULATION elif state==PKPDSignalAnalysis.READING_POPULATION: self.summaryLines.append(line) elif state==PKPDSignalAnalysis.READING_SAMPLEANALYSIS_NAME: tokens = line.split(':') self.sampleAnalyses.append(PKPDSampleSignalAnalysis()) self.sampleAnalyses[-1].sampleName = tokens[1].strip() self.sampleAnalyses[-1].parameters=[] state = PKPDSignalAnalysis.READING_SAMPLEANALYSIS_PARAMETERS elif state==PKPDSignalAnalysis.READING_SAMPLEANALYSIS_PARAMETERS: tokens=line.split('=') self.sampleAnalyses[-1].analysisVariables.append(tokens[0].strip()) self.sampleAnalyses[-1].parameters.append(float(tokens[1].strip())) fh.close()
[docs] def getSampleAnalysis(self,sampleName): for sampleAnalysis in self.sampleAnalyses: if sampleAnalysis.sampleName == sampleName: return sampleAnalysis return None
[docs]class PKPDAllometricScale(EMObject): READING_PREDICTOR = 1 READING_MODELS = 2 READING_X = 3 READING_Y = 4 def __init__(self, **args): EMObject.__init__(self, **args) self.fnScale = String() self.predictor = "" self.predictorUnits = "" self.scaled_vars = [] self.averaged_vars = [] self.models = {} self.qualifiers = {} self.confidence = 0 self.X = None self.Y = None
[docs] def write(self, fnScale): fh=open(fnScale,'w') self._printToStream(fh) fh.close() self.fnScale.set(fnScale) writeMD5(fnScale)
def _printToStream(self,fh): fh.write("[ALLOMETRIC SCALING] ===========================\n") fh.write("Predictor (X): %s %s\n"%(self.predictor,self.predictorUnits)) for varName, varUnits in self.scaled_vars: model = self.models[varName] qualifiers = self.qualifiers[varName] # fh.write("Scale model: %s=(%f)*%s^(%f) %f%% Confidence interval=[%f,%f]"%\ # (varName,model[0],model[1],self.confidence,qualifiers[0],qualifiers[1])) fh.write("Scale model: %s=(%f)*%s^(%f); R2=%f; %f%% Confidence intervals (y=k*x^a) k=[%f,%f] a=[%f,%f]; %s\n" % \ (varName, model[0], self.predictor, model[1], qualifiers[0], self.confidence, qualifiers[1], qualifiers[2], qualifiers[3], qualifiers[4], varUnits)) for varName, varUnits in self.averaged_vars: model = self.models[varName] qualifiers = self.qualifiers[varName] fh.write("Average model: %s=%f Std=%f; %s\n"%(varName,model[0],qualifiers[0],varUnits)) fh.write("\n") fh.write("[DATA] =========================================\n") fh.write("%s= %s\n"%(self.predictor,str(self.X))) for varName, y in self.Y.items(): fh.write("%s= %s\n"%(varName,str(y)))
[docs] def load(self,fnScale): fh=open(fnScale) if not fh: raise Exception("Cannot open %s"%fnScale) if not verifyMD5(fnScale): raise Exception("The file %s has been modified since its creation"%fnScale) self.fnScale.set(fnScale) for line in fh.readlines(): line=line.strip() if line=="": continue if line.startswith('[') and line.endswith('='): section = line.split('=')[0].strip().lower() if section=="[allometric scaling]": state=PKPDAllometricScale.READING_PREDICTOR elif section=="[data]": state=PKPDAllometricScale.READING_X else: print("Skipping: ",line) elif state==PKPDAllometricScale.READING_PREDICTOR: tokens = line.split(':')[1] tokens = tokens.split() self.predictor = tokens[0].strip() self.predictorUnits = tokens[1].strip() state = PKPDAllometricScale.READING_MODELS elif state==PKPDAllometricScale.READING_MODELS: tokens = line.split(':') if tokens[0]=="Scale model": tokens = tokens[1].split(';') # "%s=(%f)*%s^(%f); R2=%f; %f%% Confidence intervals (y=k*x^a) k=[%f,%f] a=[%f,%f]" idxL=0 idxR=tokens[0].find('=') varName=tokens[0][idxL:idxR].strip() idxL=idxR+2 idxR=tokens[0].find(')',idxL) k=float(tokens[0][idxL:idxR]) idxL=tokens[0].find('(',idxR) idxR=tokens[0].find(')',idxL) a=float(tokens[0][idxL+1:idxR]) self.models[varName]=[k, a] idxL=tokens[1].find('=') R2=float(tokens[1][idxL+1:]) idxR=tokens[2].find('%') self.confidence=float(tokens[2][:idxR]) idxL=tokens[2].find('[',idxR+1) idxR=tokens[2].find(',',idxL+1) kl=float(tokens[2][idxL+1:idxR]) idxL=idxR idxR=tokens[2].find(']',idxL+1) ku=float(tokens[2][idxL+1:idxR]) idxL=tokens[2].find('[',idxR+1) idxR=tokens[2].find(',',idxL+1) al=float(tokens[2][idxL+1:idxR]) idxL=idxR idxR=tokens[2].find(']',idxL+1) au=float(tokens[2][idxL+1:idxR]) self.qualifiers[varName]=[R2,kl,ku,al,au] varUnits = tokens[3].strip() self.scaled_vars.append((varName,varUnits)) elif tokens[0]=="Average model": # Average model: %s=%f Std=%f idxR=tokens[1].find('=') varName=tokens[1][:idxR].strip() idxL=idxR idxR=tokens[1].find(' ',idxL) mean=float(tokens[1][idxL+1:idxR]) idxL=tokens[1].find('=',idxR+1) idxR=tokens[1].find(';',idxL+1) std=float(tokens[1][idxL+1:idxR]) varUnits = tokens[1][idxR+1:] self.models[varName]=[mean] self.qualifiers[varName]=[std] self.averaged_vars.append((varName,varUnits)) else: print("Skipping: ",line) elif state==PKPDAllometricScale.READING_X: tokens = line.split('=') self.X = [] for x in tokens[1].strip()[1:-1].split(','): self.X.append(float(x)) self.Y = {} state = PKPDAllometricScale.READING_Y elif state==PKPDAllometricScale.READING_Y: tokens = line.split('=') varName = tokens[0].strip() self.Y[varName] = [] for y in tokens[1].strip()[1:-1].split(','): self.Y[varName].append(float(y)) fh.close()
[docs]class PKPDDoseResponse(EMObject): def __init__(self, **args): EMObject.__init__(self, **args) self.doses=[] self.Nresponses=[] self.Npatients=[] self.responses=[]
[docs] def read(self,inputStr,isFn=False): if isFn: fh=open(inputStr) inputStr=fh.readlines() fh.close() else: inputStr=inputStr.split("\n") for line in inputStr: line=line.strip() if line!="": tokens=line.split(":") dose=float(tokens[0].strip()) self.doses.append(dose) self.responses.append(tokens[1].strip()) k=0 n=0 for token in tokens[1].split(): n+=1 if token.strip()=="1": k+=1 self.Nresponses.append(k) self.Npatients.append(n)
[docs] def write(self,fnOut): fh=open(fnOut,"w") for i in range(len(self.doses)): fh.write("%f: %s\n"%(self.doses[i],self.responses[i])) fh.close()
[docs] def findDose(self,dose): # for n in range(len(self.doses)): # if abs(self.doses[n]-dose)<1e-6: # return n if len(self.doses)>0 and abs(self.doses[-1]-dose)<1e-6: return len(self.doses)-1 else: return None
[docs] def appendResponse(self,dose,response): # response must be Boolean n = self.findDose(dose) if n is None: n=len(self.doses) self.doses.append(dose) self.responses.append("") self.Nresponses.append(0) self.Npatients.append(0) responseStr=" 1" if response else " 0" self.responses[n]=(self.responses[n]+responseStr).strip() self.Npatients[n]+=1 if response: self.Nresponses[n]+=1
[docs]def flattenArray(y): if type(y[0])!=list and type(y[0])!=np.ndarray: y = [np.array(y,dtype=np.float32)] else: y = [np.array(yi,dtype=np.float32) for yi in y] return y
[docs]def smartLog(y): return np.array([math.log10(yi) if np.isfinite(yi) and yi>0 else float("inf") for yi in y])
[docs]class PKPDDataSet: _datasetDict = {} # store all created datasets def __init__(self, name, folder, files, url=None): """ Params: #filesDict is dict with key, value pairs for each file """ self._datasetDict[name] = self self.folder = folder import pkg_resources package = PluginInfo('scipion-pkpd', 'scipion-pkpd', remote=False).pipName dist = pkg_resources.get_distribution(package).location self.path = join(dist, 'pkpd', 'data', 'test', folder) self.filesDict = files self.url = url
[docs] def getFile(self, key): if key in self.filesDict: return join(self.path, self.filesDict[key]) return join(self.path, key)
[docs] def getPath(self): return self.path
[docs] @classmethod def getDataSet(cls, name): """ This method is called every time the dataset want to be retrieved """ assert name in cls._datasetDict, "Dataset: %s dataset doesn't exist." % name ds = cls._datasetDict[name] folder = ds.folder url = '' if ds.url is None else ' -u ' + ds.url if not pwutils.envVarOn('SCIPION_TEST_NOSYNC'): command = ("%s %s testdata --download %s %s" % (pw.PYTHON, pw.getSyncDataScript(), folder, url)) print(">>>> %s" % command) os.system(command) return cls._datasetDict[name]
# Inhalation ======================================================================== from .inhalation import PKSubstanceLungParameters as PKSubstanceLungParameters2 from .inhalation import PKDepositionParameters as PKDepositionParameters2 from .inhalation import PKPhysiologyLungParameters as PKPhysiologyLungParameters2 from .inhalation import PKLung as PKLung2
[docs]class PKSubstanceLungParameters(PKSubstanceLungParameters2): pass
[docs]class PKPhysiologyLungParameters(PKPhysiologyLungParameters2): pass
[docs]class PKDepositionParameters(PKDepositionParameters2): pass
[docs]class PKLung(PKLung2): pass