Source code for pkpd.protocols.protocol_pkpd_filter_measurements

# **************************************************************************
# *
# * Authors:     Carlos Oscar Sorzano (info@kinestat.com)
# *
# * Kinestat Pharma
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'info@kinestat.com'
# *
# **************************************************************************

import pyworkflow.protocol.params as params
from .protocol_pkpd import ProtPKPD
from pkpd.objects import PKPDExperiment, PKPDSample, PKPDVariable

# TESTED in test_workflow_gabrielsson_pk02.py
# TESTED in test_workflow_gabrielsson_pk04.py
# TESTED in test_workflow_gabrielsson_pk06.py
# TESTED in test_workflow_gabrielsson_pk07.py
# TESTED in test_workflow_gabrielsson_pk11.py

[docs]class ProtPKPDFilterMeasurements(ProtPKPD): """ Filter measurements.\n Protocol created by http://www.kinestatpharma.com\n """ _label = 'filter measurements' #--------------------------- DEFINE param functions -------------------------------------------- def _defineParams(self, form): form.addSection('Input') form.addParam('inputExperiment', params.PointerParam, label="Input experiment", important=True, pointerClass='PKPDExperiment', help='Select an experiment with samples') form.addParam('filterType', params.EnumParam, choices=["Exclude","Keep","Remove NA","Remove ULOQ & LLOQ","Substitute LLOQ","Substitute ULOQ"], label="Filter mode", default=0, help='Exclude or keep measurements meeting the following condition.\n"\ "NA values are excluded in keep filters, and kept in exclude filters') form.addParam('condition', params.StringParam, label="Condition", condition="filterType<=1", help='Example: $(t)<200\n $(Cp)>=1000 and $(Cp)<=2000"') form.addParam('substitute', params.FloatParam, label="Substitute by", condition="filterType==4 or filterType==5") #--------------------------- INSERT steps functions -------------------------------------------- def _insertAllSteps(self): self._insertFunctionStep('runFilter',self.inputExperiment.get().getObjId(), self.filterType.get(), \ self.condition.get()) self._insertFunctionStep('createOutputStep') #--------------------------- STEPS functions --------------------------------------------
[docs] def runFilter(self, objId, filterType, condition): import copy experiment = self.readExperiment(self.inputExperiment.get().fnPKPD) self.printSection("Filtering") if self.filterType.get()==0: filterType="exclude" elif self.filterType.get()==1: filterType="keep" elif self.filterType.get()==2: filterType="rmNA" elif self.filterType.get()==3: filterType="rmLL" elif self.filterType.get()==4: filterType="subsLL" elif self.filterType.get()==5: filterType="subsUL" filteredExperiment = PKPDExperiment() filteredExperiment.general = copy.copy(experiment.general) filteredExperiment.variables = copy.copy(experiment.variables) filteredExperiment.samples = {} filteredExperiment.doses = {} filteredExperiment.vias = {} usedDoses = [] for sampleKey, sample in experiment.samples.items(): candidateSample = PKPDSample() candidateSample.variableDictPtr = copy.copy(sample.variableDictPtr) candidateSample.doseDictPtr = copy.copy(sample.doseDictPtr) candidateSample.sampleName = copy.copy(sample.sampleName) candidateSample.doseList = copy.copy(sample.doseList) candidateSample.descriptors = copy.copy(sample.descriptors) candidateSample.measurementPattern = copy.copy(sample.measurementPattern) N = 0 # Number of initial measurements if len(sample.measurementPattern)>0: aux=getattr(sample,"measurement_%s"%sample.measurementPattern[0]) N = len(aux) if N==0: continue # Create empty output variables Nvar = len(sample.measurementPattern) convertToFloat = [] for i in range(0,Nvar): exec("candidateSample.measurement_%s = []"%sample.measurementPattern[i]) convertToFloat.append(sample.variableDictPtr[sample.measurementPattern[i]].varType == PKPDVariable.TYPE_NUMERIC) for n in range(0,N): toAdd = [] okToAddTimePoint = True conditionPython = copy.copy(condition) for i in range(0,Nvar): ldict = {} exec("aux=sample.measurement_%s[%d]"%(sample.measurementPattern[i],n),locals(),ldict) aux = ldict['aux'] if filterType=="rmNA": if aux=="NA" or aux=="None": okToAddTimePoint = False else: toAdd.append(aux) elif filterType=="rmLL": if aux=="LLOQ" or aux=="ULOQ": okToAddTimePoint = False else: toAdd.append(aux) elif filterType=="subsLL": okToAddTimePoint = True if aux=="LLOQ": toAdd.append(str(self.substitute.get())) else: toAdd.append(aux) elif filterType=="subsUL": okToAddTimePoint = True if aux=="ULOQ": toAdd.append(str(self.substitute.get())) else: toAdd.append(aux) else: # Keep or exclude toAdd.append(aux) varString = "$(%s)"%sample.measurementPattern[i] if varString in conditionPython: if aux=="NA": okToAddTimePoint = False else: if convertToFloat[i]: conditionPython = conditionPython.replace(varString,"%f"%float(aux)) else: conditionPython = conditionPython.replace(varString,"'%s'"%aux) if (filterType=="exclude" or filterType=="keep") and okToAddTimePoint: okToAddTimePoint = eval(conditionPython, {"__builtins__" : None }, {}) if filterType=="exclude": okToAddTimePoint = not okToAddTimePoint if okToAddTimePoint: for i in range(0,Nvar): exec("candidateSample.measurement_%s.append('%s')"%(sample.measurementPattern[i],toAdd[i])) N = len(getattr(sample,"measurement_%s"%sample.measurementPattern[0])) # Number of final measurements if N!=0: filteredExperiment.samples[candidateSample.sampleName] = candidateSample for doseName in candidateSample.doseList: if not doseName in usedDoses: usedDoses.append(doseName) if len(usedDoses)>0: for doseName in usedDoses: filteredExperiment.doses[doseName] = copy.copy(experiment.doses[doseName]) viaName=experiment.doses[doseName].via.viaName if not viaName in filteredExperiment.vias: filteredExperiment.vias[viaName] = copy.copy(experiment.vias[viaName]) self.writeExperiment(filteredExperiment,self._getPath("experiment.pkpd")) self.experiment = filteredExperiment
[docs] def createOutputStep(self): self._defineOutputs(outputExperiment=self.experiment) self._defineSourceRelation(self.inputExperiment, self.experiment)
#--------------------------- INFO functions -------------------------------------------- def _summary(self): msg=[] if self.filterType.get()==0: msg.append("Exclude %s"%self.condition.get()) elif self.filterType.get()==1: msg.append("Keep %s"%self.condition.get()) elif self.filterType.get()==2: msg.append("Remove NA") return msg