# **************************************************************************
# *
# * Authors: J.M. De la Rosa Trevin (delarosatrevin@gmail.com)
# * Carlos Oscar Sorzano (info@kinestat.com)
# *
# * Kinestat Pharma
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307 USA
# *
# * All comments concerning this program package may be sent to the
# * e-mail address 'info@kinestat.com'
# *
# **************************************************************************
import math
import numpy as np
try:
from itertools import izip
except ImportError:
izip = zip
from datetime import datetime
import tkinter as tk
import pyworkflow.gui.dialog as dialog
import pyworkflow.gui as gui
from pyworkflow.gui.tree import TreeProvider, BoundTree
from pwem.viewers.plotter import EmPlotter
from pkpd.pkpd_units import strUnit
[docs]class SamplesTreeProvider(TreeProvider):
def __init__(self, experiment, fitting=None):
self.experiment = experiment
[docs] def getColumns(self):
return [('Name', 60), ('Dose', 60)]
[docs] def getObjects(self):
sortedSamples = []
for key in sorted(self.experiment.samples.keys()):
sample = self.experiment.samples[key]
sortedSamples.append(sample)
return sortedSamples
[docs] def getObjectInfo(self, obj):
key = obj.sampleName
values = [','.join(obj.doseList)]
return {'key': key, 'text': key,
'values': tuple(values)
}
def _sortEnabled(self):
return None
[docs]class MinMaxSlider(tk.Frame):
"""
Create a personalized frame that contains:
label, min entry, slider and max entry
It also keeps a variable with the value
"""
def __init__(self, master, label, from_=0, to=100, callback=None,
numberOfSteps=25):
self.callback = callback
self.numberOfSteps = numberOfSteps
step = (to - from_) / numberOfSteps
value = (from_ + to) * 0.5
self.var = tk.DoubleVar()
self.var.set(float(value))
self.varMin = tk.DoubleVar()
self.varMin.set(float(from_))
self.varMax = tk.DoubleVar()
self.varMax.set(float(to))
tk.Frame.__init__(self, master)
tk.Label(self, text=label).pack(side=tk.LEFT, padx=2, pady=2,
anchor='s')
def _entry(var):
entry = tk.Entry(self, textvariable=var, bg='white', width=10)
entry.pack(side=tk.LEFT, padx=2, pady=2, anchor='s')
entry.bind('<Return>', self._onBoundChanged)
_entry(self.varMin)
self.slider = tk.Scale(self, from_=from_, to=to, variable=self.var,
bigincrement=step, resolution=step,
orient=tk.HORIZONTAL)
self.slider.pack(side=tk.LEFT, padx=2)
self.slider.bind('<ButtonRelease-1>', self._onButtonRelease)
_entry(self.varMax)
[docs] def getMinMax(self):
return (self.varMin.get(), self.varMax.get())
[docs] def getValue(self):
return self.var.get()
def _onBoundChanged(self, e=None):
v = self.getValue()
minV, maxV = self.getMinMax()
step = (maxV - minV) / self.numberOfSteps
self.slider.config(from_=minV, to=maxV,
bigincrement=step, resolution=step)
if v > maxV or v < minV:
self.var.set(0.5 * (minV + maxV))
def _onButtonRelease(self, e=None):
if self.callback:
self.callback(e)
[docs]class PKPDResponsiveDialog(dialog.Dialog):
def __init__(self, parent, title, **kwargs):
""" From kwargs:
message: message tooltip to show when browsing.
selected: the item that should be selected.
validateSelectionCallback: a callback function to validate selected items.
"""
self.values = []
self.plotter = None
self.targetProtocol = kwargs['targetProtocol']
self.experiment = self.targetProtocol.experiment
self.varNameX = kwargs['varNameX']
self.varNameY = kwargs['varNameY']
self.provider = SamplesTreeProvider(self.experiment)
self.model = self.loadModel()
self.validateSelectionCallback = kwargs.get('validateSelectionCallback', None)
self.setLabels()
dialog.Dialog.__init__(self, parent, title,
buttons=[('Select', dialog.RESULT_YES),
('Cancel', dialog.RESULT_CANCEL)])
[docs] def setLabels(self):
pass
[docs] def loadModel(self):
model = self.targetProtocol.createModel()
model.setExperiment(self.experiment)
# if hasattr(self.protODE, "deltaT"):
# model.deltaT = self.protODE.deltaT.get()
model.setXVar(self.varNameX)
model.setYVar(self.varNameY)
return model
[docs] def body(self, bodyFrame):
bodyFrame.config(bg='white')
gui.configureWeigths(bodyFrame)
self._createSamplesFrame(bodyFrame)
self._createSlidersFrame(bodyFrame)
self._createLogsFrame(bodyFrame)
def _createSamplesFrame(self, content):
frame = tk.Frame(content, bg='white')
#frame = tk.LabelFrame(content, text='General')
lfSamples = tk.LabelFrame(frame, text="Samples", bg='white')
gui.configureWeigths(frame)
lfSamples.grid(row=0, column=0, sticky='news', padx=5, pady=5)
self.samplesTree = self._addBoundTree(lfSamples,
self.provider, 10)
self.samplesTree.itemClick = self._onSampleChanged
frame.grid(row=0, column=0, sticky='news', padx=5, pady=5)
def _createSlidersFrame(self, content):
frame = tk.Frame(content, bg='white')
lfBounds = tk.LabelFrame(frame, text="Parameter Bounds", bg='white')
gui.configureWeigths(frame)
i = 0
self.sliders = {}
paramUnits = self.targetProtocol.parameterUnits
for paramName, bounds in self.targetProtocol.getParameterBounds().items():
bounds = bounds or (0, 1)
slider = MinMaxSlider(lfBounds, "%s [%s]"%(paramName,strUnit(paramUnits[i])),
bounds[0], bounds[1],
callback=self._onVarChanged)
slider.grid(row=i, column=0, padx=5, pady=5)
self.sliders[paramName] = slider
i += 1
lfBounds.grid(row=0, column=0, sticky='news', padx=5, pady=5)
frame.grid(row=0, column=1, sticky='news', padx=5, pady=5)
def _createLogsFrame(self, content):
frame = tk.Frame(content)
def addVar(text, col, varName):
varFrame = tk.Frame(frame)
varFrame.grid(row=0, column=col, sticky='new')
label = tk.Label(varFrame, text=text)#, font=self.fontBold)
label.grid(row=0, column=0, padx=5, pady=2, sticky='nw')
combo = tk.Label(varFrame, text=varName, width=10)
combo.grid(row=0, column=1, sticky='nw', padx=5, pady=5)
radioVar = tk.IntVar()
radio = tk.Checkbutton(varFrame, text='Log10', variable=radioVar)
radio.grid(row=0, column=2, sticky='nw', padx=5, pady=5)
return combo, radio, radioVar
self.timeWidget = addVar('Time variable', 0, self.varNameX)
self.timeWidget[2].trace('w', self._onLogChanged)
self.measureWidget = addVar('Measure variable', 1, self.varNameY)
measureVar = self.measureWidget[2]
measureVar.set(True)
measureVar.trace('w', self._onLogChanged)
frame.grid(row=1, column=0, columnspan=2, sticky='news', padx=5, pady=5)
def _addBoundTree(self, parent, provider, height):
bt = BoundTree(parent, provider, height=height)
bt.grid(row=0, column=0, sticky='news', padx=5, pady=5)
gui.configureWeigths(parent)
return bt
[docs] def apply(self):
self.values = []
def _onVarChanged(self, *args):
sampleKeys = self.samplesTree.selection()
if sampleKeys:
self.computeFit()
self.plotResults()
else:
dialog.showInfo("Warning","Please select some sample(s) to plot.",self)
[docs] def computeFit(self):
currentParams = []
for paramName in self.targetProtocol.getParameterNames():
currentParams.append(self.sliders[paramName].getValue())
self.targetProtocol.setParameters(currentParams)
self.ypValues = self.targetProtocol.forwardModel(currentParams, self.xpValues)
[docs] def getBoundsList(self):
boundList = []
for paramName in self.targetProtocol.getParameterNames():
boundList.append(self.sliders[paramName].getMinMax())
return boundList
[docs] def useTimeLog(self):
return self.timeWidget[2].get()
[docs] def useMeasureLog(self):
return self.measureWidget[2].get()
[docs] def getUnits(self, varName):
return self.experiment.variables[varName].getUnitsString()
[docs] def getLabel(self, varName, useLog):
varLabel = '%s [%s]' % (varName, self.getUnits(varName))
if useLog:
varLabel = "log10(%s)" % varLabel
return varLabel
[docs] def getTimeLabel(self):
return self.getLabel(self.varNameX, self.useTimeLog())
[docs] def getMeasureLabel(self):
return self.getLabel(self.varNameY, self.useMeasureLog())
[docs] def computePlotValues(self, xValues, yValues):
useMeasureLog = self.useMeasureLog()
useTimeLog = self.useTimeLog()
if not (useMeasureLog or useTimeLog):
newXValues = xValues
newYValues = yValues
else:
# If log will be used either for time or measure var
# we need to filter elements larger than 0
newXValues = []
newYValues = []
def _value(v, useLog):
if useLog:
return math.log10(v) if v > 0 else None
return v
for x, y in izip(xValues, yValues):
x = _value(x, useTimeLog)
y = _value(y, useMeasureLog)
if x is not None and y is not None:
newXValues.append(x)
newYValues.append(y)
return newXValues, newYValues
def _updateModel(self):
""" This function should be called whenever the sample changes """
pass
def _onLogChanged(self, *args):
# We will treat a log change as a sample change to plot
self._onSampleChanged()
def _onSampleChanged(self, e=None):
sampleKeys = self.samplesTree.selection()
if sampleKeys:
# When the sample is changed we need to re-compute (with log or not)
# the x, y values
self.sample = self.experiment.samples[sampleKeys[0]]
self.xValues, self.yValues = self.sample.getXYValues(self.varNameX,
self.varNameY)
self.newXValues, self.newYValues = self.computePlotValues(self.xValues[0],
self.yValues[0])
self._updateModel()
self.computeFit()
self.plotResults()
else:
dialog.showInfo("Warning","Please select some sample(s) to plot.",self)
[docs] def plotResults(self):
if self.plotter is None or self.plotter.isClosed():
self.plotter = EmPlotter(style='seaborn-whitegrid')
doShow = True
else:
doShow = False
ax = self.plotter.getLastSubPlot()
self.plotter.clear()
ax = self.plotter.createSubPlot("Sample: %s" % self.sample.sampleName,
self.getTimeLabel(),
self.getMeasureLabel())
self.newXPValues, self.newYPValues = self.computePlotValues(self.xpValues[0],
self.ypValues[0])
ax.plot(self.newXValues, self.newYValues, 'x', label="Observations")
ax.plot(self.newXPValues, self.newYPValues, label="Fit")
ax.legend()
if doShow:
self.plotter.show()
else:
self.plotter.draw()
[docs] def destroy(self):
"""Destroy the window"""
if not (self.plotter is None or self.plotter.isClosed()):
self.plotter.close()
dialog.Dialog.destroy(self)
[docs]class PKPDODEDialog(PKPDResponsiveDialog):
def _updateModel(self):
self.xpValues = [np.asarray([x for x in np.arange(0,np.max(self.xValues),4)])]
self.targetProtocol.model.t0 = 0
self.targetProtocol.model.tF = np.max(self.xValues)
self.targetProtocol.drugSource.setDoses(self.sample.parsedDoseList,
self.targetProtocol.model.t0,
self.targetProtocol.model.tF)
self.targetProtocol.configureSource(self.targetProtocol.drugSource)
self.targetProtocol.model.drugSource = self.targetProtocol.drugSource
# Necessary to count the number of source and PK parameters
self.targetProtocol.getParameterNames()
[docs]class PKPDFitDialog(PKPDResponsiveDialog):
def _updateModel(self):
xLength=np.max(self.xValues)-np.min(self.xValues)
self.xpValues = np.asarray([x for x in np.arange(np.min(self.xValues),np.max(self.xValues),xLength/25)])
self.targetProtocol.getParameterNames()