# **************************************************************************
# *
# * Authors: Carlos Oscar Sorzano (info@kinestat.com)
# *
# * Kinestat Pharma
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307 USA
# *
# * All comments concerning this program package may be sent to the
# * e-mail address 'info@kinestat.com'
# *
# **************************************************************************
import copy
import numpy as np
from scipy.interpolate import InterpolatedUnivariateSpline
import pyworkflow.protocol.params as params
from .protocol_pkpd import ProtPKPD
from pkpd.objects import PKPDExperiment, PKPDSample
from pkpd.utils import uniqueFloatValues
# Tested in test_workflow_dissolution_f2.py
[docs]class ProtPKPDAverageSample(ProtPKPD):
""" Produce an experiment with a single sample whose value is the average of all the input samples.\n
Protocol created by http://www.kinestatpharma.com\n """
_label = 'average sample'
MODE_MEAN = 0
MODE_MEDIAN = 1
#--------------------------- DEFINE param functions --------------------------------------------
def _defineParams(self, form):
form.addSection('Input')
form.addParam('inputExperiment', params.PointerParam, label="Input experiment",
pointerClass='PKPDExperiment',
help='Select an experiment with samples')
form.addParam('mode', params.EnumParam, label='Aggregation mode', choices=['Mean','Median'],
default=self.MODE_MEAN)
form.addParam('resampleT', params.FloatParam, label="Resample profiles (time step)", default=-1,
help='Resample the input profiles at this time step (make sure it is in the same units as the input). '
'Leave it to -1 for no resampling. This is only valid when the label to compare is a measurement.')
form.addParam('condition', params.StringParam, default="", label="Condition",
help='You must use available labels. Example: $(Oral_F0)<0.25 and $(sex)=="Female"')
#--------------------------- INSERT steps functions --------------------------------------------
def _insertAllSteps(self):
self._insertFunctionStep('runJoin',self.inputExperiment.get().getObjId(), self.resampleT.get())
self._insertFunctionStep('createOutputStep')
#--------------------------- STEPS functions --------------------------------------------
[docs] def runJoin(self, objId, resampleT):
experiment = self.readExperiment(self.inputExperiment.get().fnPKPD)
tvarName = experiment.getTimeVariable()
mvarNames = experiment.getMeasurementVariables()
self.experiment = PKPDExperiment()
# General
self.experiment.general["title"]="Average of "+experiment.general["title"]
if self.condition.get()!="":
self.experiment.general["title"]+=" Condition: %s"%self.condition.get()
self.experiment.general["comment"]=copy.copy(experiment.general["comment"])
for key, value in experiment.general.items():
if not (key in self.experiment.general):
self.experiment.general[key] = copy.copy(value)
# Variables
for key, value in experiment.variables.items():
if not (key in self.experiment.variables):
self.experiment.variables[key] = copy.copy(value)
# Vias
for key, value in experiment.vias.items():
if not (key in self.experiment.vias):
self.experiment.vias[key] = copy.copy(value)
# Doses
doseName = None
for key, value in experiment.doses.items():
dose = copy.copy(value)
self.experiment.doses[dose.doseName] = dose
doseName = dose.doseName
# Samples
self.printSection("Averaging")
self.experiment.samples["avg"] = PKPDSample()
tokens=["AverageSample"]
if doseName is not None:
tokens.append("dose=%s"%doseName)
self.experiment.samples["avg"].parseTokens(tokens, self.experiment.variables, self.experiment.doses,
self.experiment.groups)
for mvarName in mvarNames:
allt = {}
for sampleName, sample in experiment.getSubGroup(self.condition.get()).items():
t, _ = sample.getXYValues(tvarName,mvarName)
t=t[0] # [[...]] -> [...]
for i in range(len(t)):
ti = float(t[i])
if not ti in allt:
allt[ti] = True
allt = sorted(allt.keys())
observations={}
for sampleName, sample in experiment.getSubGroup(self.condition.get()).items():
for ti in allt:
observations[ti] = []
for sampleName, sample in experiment.getSubGroup(self.condition.get()).items():
print("%s participates in the average"%sampleName)
t, y = sample.getXYValues(tvarName,mvarName)
t=t[0] # [array]
y=y[0] # [array]
tUnique, yUnique = uniqueFloatValues(t,y)
B = InterpolatedUnivariateSpline(tUnique, yUnique, k=1)
mint = np.min(t)
maxt = np.max(t)
for ti in allt:
if ti>=mint and ti<=maxt:
observations[ti].append(B(ti))
for ti in observations:
if len(observations[ti])>0:
if self.mode.get()==self.MODE_MEAN:
observations[ti]=np.mean(observations[ti])
else:
observations[ti] = np.median(observations[ti])
else:
observations[ti]=np.nan
t=[]
yavg=[]
for ti in sorted(observations):
t.append(ti)
yavg.append(observations[ti])
if self.resampleT.get()>0:
t,yavg=uniqueFloatValues(t,yavg)
B = InterpolatedUnivariateSpline(t, yavg, k=1)
t = np.arange(np.min(t), np.max(t) + self.resampleT.get(), self.resampleT.get())
yavg = B(t)
self.experiment.samples["avg"].addMeasurementColumn(tvarName,t)
self.experiment.samples["avg"].addMeasurementColumn(mvarName,yavg)
print(" ")
# Print and save
self.writeExperiment(self.experiment,self._getPath("experiment.pkpd"))
[docs] def createOutputStep(self):
self._defineOutputs(outputExperiment=self.experiment)
self._defineSourceRelation(self.inputExperiment, self.experiment)