Source code for emfacilities.protocols.protocol_monitor_2d_streamer

# **************************************************************************
# *
# * Authors:     J.M. De la Rosa Trevin (delarosatrevin@scilifelab.se) [1]
# *
# * [1] SciLifeLab, Stockholm University
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'scipion@cnb.csic.es'
# *
# **************************************************************************

import time

import pyworkflow.object as pwobj
import pyworkflow.protocol.params as params
from pyworkflow.project import Manager

from .protocol_monitor import ProtMonitor


[docs]class ProtMonitor2dStreamer(ProtMonitor): """ This protocol will monitor an input set of particles (usually in streaming) and will run/schedule many copies of a given 2D classification protocol but using subsets of the input particles as the 2D classification input. """ _label = '2d streamer' def __init__(self, **kwargs): ProtMonitor.__init__(self, **kwargs) self._runIds = pwobj.CsvList(pType=int) def _defineParams(self, form): form.addSection(label='Input') form.addParam('input2dProtocol', params.PointerParam, label="Input 2D protocol", important=True, pointerClass='ProtClassify2D', help="This protocol will serve as the template run" "that will be repeated with subsets of the " "input particles. ") form.addParam('inputParticles', params.PointerParam, pointerClass='SetOfParticles', important=True, label="Input particles", help='Input particles that will be used to execute' 'many 2D classification runs based on the 2D ' 'protocol template selected. ') form.addParam('batchSize', params.IntParam, label="Batch size", help="How many particles (approximately) you want to " "group to make the new batch and launch a new 2d" "classification job. ") form.addParam('startingNumber', params.IntParam, default=0, label="Starting number", help="Specify a value greater than 0 if you want to skip " "this amount of particles from the classification " "batches (e.g, if you have classified them for the " "initial 2D classification template. ") group = form.addGroup('Monitoring') group.addParam('samplingInterval', params.IntParam, default=10, label="Update interval (min)", help="After how many minutes the protocol should look " "for new input data and schedule more 2D classification" "jobs if necessary. ") # --------------------------- INSERT steps functions --------------------- def _insertAllSteps(self): self._insertFunctionStep('monitorStep') # --------------------------- STEPS functions ----------------------------
[docs] def monitorStep(self): interval = self.samplingInterval.get() * 60 # list of particles that will be inserted in the new set self._counter = 0 self._lastMicId = None self._lastPartId = 0 self._subset = self._createSubset() self._runPrerequisites = [] if self.input2dProtocol.get().isActive(): self._runPrerequisites.append(self.input2dProtocol.get().getObjId()) self._streamClosed = False # list of runs that has been (or will) be scheduled/run finished = False while not finished: self._checkNewInput() time.sleep(interval) finished = self._streamClosed
# -------------------------- UTILS functions ------------------------------ def _createSubset(self): """ Create a new empty set of particles with a given suffix. """ self._counter += 1 subset = self._createSetOfParticles(suffix="_%03d" % self._counter) subset.copyInfo(self.inputParticles.get()) return subset def _writeSubset(self, subset): """ Generated the output of this subset. """ newSubsetName = 'outputParticles_%03d' % self._counter self.info("Creating new subset: %s" % newSubsetName) subset.write() self._defineOutputs(**{newSubsetName: subset}) self._defineTransformRelation(self.inputParticles, subset) # The following is required to commit the changes to the database self._store(subset) subset.close() manager = Manager() project = manager.loadProject(self.getProject().getName()) input2D = self.input2dProtocol.get() copyProt = project.copyProtocol(project.getProtocol(input2D.getObjId())) copyProt.inputParticles.set(project.getProtocol(self.getObjId())) copyProt.inputParticles.setExtended(newSubsetName) project.scheduleProtocol(copyProt, self._runPrerequisites) # Next schedule will be after this one self._runPrerequisites.append(copyProt.getObjId()) def _checkNewInput(self): """ Check if there are new particles and generate a new set and its corresponding 2D classification. """ self.info("Checking new input...") subset = self._subset for particle in self._iterParticles(): micId = particle.getMicId() partId = particle.getObjId() subset.append(particle) self.info("micId: %03d, particle: %05s, size: %s" % (micId, partId, subset.getSize())) # Check the following after finding particles of a new micrograph if micId != self._lastMicId: if self._lastMicId is not None and subset.getSize() > self.batchSize: self._writeSubset(subset) subset = self._createSubset() self._lastMicId = micId self._lastPartId = partId # Write last group of particles if input stream is closed if self._streamClosed: self._writeSubset(subset) self._subset = subset def _iterParticles(self): inputParts = self.inputParticles.get() inputParts.load() inputParts.loadAllProperties() self._streamClosed = inputParts.isStreamClosed() for p in inputParts.iterItems(orderBy=['_micId', 'id'], direction='ASC', where='id > %d' % self._lastPartId): yield p inputParts.close()