Source code for xmipp3.protocols.protocol_trigger_data

# **************************************************************************
# *
# * Authors:     Tomas Majtner (tmajtner@cnb.csic.es)
# *              David Maluenda (dmaluenda@cnb.csic.es)
# *              Daniel Marchán (da.marchan@cnb.csic.es)
# *
# * Unidad de  Bioinformatica of Centro Nacional de Biotecnologia , CSIC
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'scipion@cnb.csic.es'
# *
# **************************************************************************

import os
import time
from datetime import datetime

import pyworkflow.protocol.constants as cons
from pyworkflow import VERSION_3_0
from pyworkflow.protocol import Protocol
from pwem.protocols import EMProtocol
from pyworkflow.object import Set
from pyworkflow.protocol.params import BooleanParam, IntParam, PointerParam, GT

SIGNAL_FILENAME = "STOP_STREAM.TXT"

[docs]class XmippProtTriggerData(EMProtocol, Protocol): """ Waits until certain number of images is prepared and then send them to output. It can be done in 3 ways: - If "Send all items to output?" is _No_: Once the number of items is reached, a setOfImages is returned and the protocol finishes (ending the streaming from this point). - If "Send all items to output?" is _Yes_ and: - If "Split items to multiple sets?" is _Yes_: Multiple closed outputs will be returned as soon as the number of items is reached. - If "Split items to multiple sets?" is _No_: Only one output is returned and it is growing up in batches of a certain number of items (completely in streaming). """ _label = 'trigger data' _lastUpdateVersion = VERSION_3_0 # --------------------------- DEFINE param functions ---------------------- def _defineParams(self, form): form.addSection(label='Input') form.addParam('inputImages', PointerParam, pointerClass='SetOfImages', label='Input images', important=True) form.addParam('triggerWait', BooleanParam, default=False, label='Wait for signal to stop the stream?', help='If NO is selected, normal functionality.\n' 'If YES is selected it will wait for a signal to stop the stream.' '\n For this option, select send all items to output with a ' 'minimum size of 1') form.addParam('outputSize', IntParam, default=10000, label='Minimum output size', help='How many particles need to be on input to ' 'create output set.') form.addParam('allImages', BooleanParam, default=True, label='Send all items to output?', help='If NO is selected, only a closed subset of ' '"Output size" items will be send to output.\n' 'If YES is selected it will still running in streaming.') form.addParam('splitImages', BooleanParam, default=False, label='Split items to multiple sets?', condition='allImages', help='If YES is selected, multiple closed outputs of ' '"Output size" are returned.\n' 'If NO is selected, only one open and growing output ' 'is returned') form.addParam('triggerSignal', BooleanParam, default=False, label='Send signal to stop a stream?', help='If NO is selected, normal functionality.\n' 'If YES is selected it will send a signal to a connected Trigger data protocol.' '\n For this option, select the option send all items to output.') form.addParam('triggerProt', PointerParam, pointerClass=self.getClassName(), condition='triggerSignal', label='Trigger data protocol', help='Select the trigger data protocol that you will send a signal to stop the stream.') form.addParam('delay', IntParam, default=10, label="Delay (sec)", validators=[GT(3, "must be larger than 3sec.")], help="Delay in seconds before checking new output") # --------------------------- INSERT steps functions ---------------------- def _insertAllSteps(self): # initializing variables self.finished = False self.images = [] self.splitedImages = [] self.outputCount = 0 self.setImagesClass() self.setImagesType() # steps imsSteps = self._insertFunctionStep('delayStep') self._insertFunctionStep('createOutputStep', prerequisites=[imsSteps], wait=True) def _stepsCheck(self): self._checkNewInput() self._checkNewOutput()
[docs] def createOutputStep(self): self._closeOutputSet()
def _checkNewInput(self): imsFile = self.inputImages.get().getFileName() self.lastCheck = getattr(self, 'lastCheck', datetime.now()) mTime = datetime.fromtimestamp(os.path.getmtime(imsFile)) # If the input's sqlite have not changed since our last check, # it does not make sense to check for new input data if self.lastCheck > mTime and hasattr(self, 'newImages'): return None # loading the input set in a dynamic way inputClass = self.getImagesClass() self.imsSet = inputClass(filename=imsFile) self.imsSet.loadAllProperties() # loading new images to process if len(self.images) > 0: # taking the non-processed yet extraLimitLen = -1 if self.allImages.get() else self.outputSize.get() - len(self.images) self.newImages = [m.clone() for m in self.imsSet.iterItems( orderBy='creation', where='creation>"' + str(self.check) + '"', limit=extraLimitLen)] else: # first time limitLen = -1 if self.allImages.get() else self.outputSize.get() self.newImages = [m.clone() for m in self.imsSet.iterItems( orderBy='creation', limit=limitLen)] self.splitedImages = self.splitedImages + self.newImages self.images = self.images + self.newImages if len(self.newImages) > 0: for item in self.imsSet.iterItems(orderBy='creation', direction='DESC'): self.check = item.getObjCreation() break self.lastCheck = datetime.now() self.streamClosed = self.imsSet.isStreamClosed() self.imsSet.close() # filling the output if needed self._fillingOutput() def _checkNewOutput(self): if getattr(self, 'finished', False): return if self.streamClosed: self.finished = True elif not self.allImages.get() and not self.triggerSignal.get(): self.finished = len(self.images) >= self.outputSize else: self.finished = False # Send the signal to the connected protocol if self.triggerSignal.get() and len(self.images) >= self.outputSize: self.info('Sending signal to stop the input trigger data protocol') self.stopWait() # Wait for trigger data signal if self.triggerWait.get(): self.info('Waiting for signal to stop the stream') if self.waitingHasFinished(): self.info('Stopped by received signal from a trigger data protocol') self.finished = True outputStep = self._getFirstJoinStep() deps = [] if self.finished: # Unlock createOutputStep if finished all jobs self._fillingOutput() # To do the last filling if outputStep and outputStep.isWaiting(): outputStep.setStatus(cons.STATUS_NEW) else: delayId = self._insertFunctionStep('delayStep', prerequisites=[]) deps.append(delayId) if outputStep is not None: outputStep.addPrerequisites(*deps) self.updateSteps() def _fillingOutput(self): imsSqliteFn = '%s.sqlite' % self.getImagesType('lower') outputName = self.getOututName() if len(self.images) >= self.outputSize or self.finished: if self.allImages: # Streaming and semi-streaming if self.splitImages: # Semi-streaming: Splitting the input if len(self.splitedImages) >= self.outputSize or \ (self.finished and len(self.splitedImages) > 0): splitLimIndex = self.outputSize.get() if not self.finished else None numIter = 1 if len(self.splitedImages) < self.outputSize.get() \ else int(len(self.splitedImages) / self.outputSize.get()) for _ in range(numIter): self.outputCount += 1 imageSet = self._loadOutputSet(self.getImagesClass(), '%s%d.sqlite' % (self.getImagesType('lower'), self.outputCount), self.splitedImages[:splitLimIndex or len(self.splitedImages)]) # The splitted outputSets are always closed self._updateOutputSet("%s%d" % (outputName, self.outputCount), imageSet, Set.STREAM_CLOSED) self.splitedImages = self.splitedImages[splitLimIndex:] if splitLimIndex else [] else: # Full streaming case if not os.path.exists(self._getPath(imsSqliteFn)): imageSet = self._loadOutputSet(self.getImagesClass(), imsSqliteFn, self.images) else: # if finished no images to add, but we need to close the set imagesToAdd = self.newImages if not self.finished else [] imageSet = self._loadOutputSet(self.getImagesClass(), imsSqliteFn, imagesToAdd) streamMode = Set.STREAM_CLOSED if self.finished else \ Set.STREAM_OPEN self._updateOutputSet(outputName, imageSet, streamMode) elif not os.path.exists(self._getPath(imsSqliteFn)): imageSet = self._loadOutputSet(self.getImagesClass(), imsSqliteFn, self.images) # The outputSet is always closed here self._updateOutputSet(outputName, imageSet, Set.STREAM_CLOSED) def _loadOutputSet(self, SetClass, baseName, newImages): setFile = self._getPath(baseName) if os.path.exists(setFile): outputSet = SetClass(filename=setFile) outputSet.loadAllProperties() outputSet.enableAppend() else: outputSet = SetClass(filename=setFile) outputSet.setStreamState(outputSet.STREAM_OPEN) inputs = self.inputImages.get() outputSet.copyInfo(inputs) outputSet.copyItems(newImages) return outputSet # --------------------------- INFO functions ------------------------------ def _summary(self): summary = [] outputStr = self.getOututName() if self.allImages.get() and not self.splitImages.get(): summary.append("MODE: *full streaming*.") triggeredMsg = ("'%s' released, it will be growing up " "as soon as the input does." % outputStr) elif self.splitImages.get(): summary.append("MODE: *semi streaming (batches)*.") triggeredMsg = ("%d '%s' are being released with %d items, " "each. A new batch will be created when ready." % (self.getOutputsSize(), outputStr, self.outputSize)) else: summary.append("MODE: *static output*.") triggeredMsg = ("'%s' released and closed. Nothing else to do." % outputStr) if self.getOutputsSize(): summary.append(triggeredMsg) else: inputStr = self.getImagesType() if self.inputImages.get() else '(or not ready)' summary.append("Not enough input %s to release an output, yet." % inputStr) summary.append("At least, %d items are needed to trigger an output." % self.outputSize.get()) if (self.isFinished() and self.outputSize.get() > [o for o in self.iterOutputAttributes()][0][1].getSize()): summary.append("Output released because streaming finished.") return summary def _validate(self): errors = [] if self.triggerSignal.get(): if not isinstance(self.triggerProt.get(), XmippProtTriggerData): errors.append("There is not a Trigger protocol connected to send a stop signal.") # --------------------------- UTILS functions ----------------------------- def _getFirstJoinStepName(self): # This function will be used for streaming, to check which is # the first function that need to wait for all micrographs # to have completed, this can be overriden in subclasses # (e.g., in Xmipp 'sortPSDStep') return 'createOutputStep' def _getFirstJoinStep(self): for s in self._steps: if s.funcName == self._getFirstJoinStepName(): return s return None
[docs] def delayStep(self): time.sleep(self.delay)
[docs] def setImagesClass(self): self._inputClass = self.inputImages.get().getClass()
[docs] def getImagesClass(self): return self._inputClass
[docs] def setImagesType(self): inputSet = self.inputImages.get() if inputSet: inputClassName = inputSet.getClassName() self._inputType = inputClassName.split('SetOf')[1] else: self._inputType = None
[docs] def getImagesType(self, letters='default'): if not self.hasAttribute('_inputType'): self.setImagesType() typeStr = str(self._inputType) if letters == 'lower': return typeStr.lower() else: return typeStr
[docs] def getOututName(self): return 'output%s' % self.getImagesType()
[docs] def stopWait(self): f = open(self._getStopConnectingFilename(), 'w') f.close()
def _getStopConnectingFilename(self): triggerProtocol = self.triggerProt.get() if triggerProtocol is not None: fileName = triggerProtocol._getExtraPath(SIGNAL_FILENAME) else: fileName = None return fileName
[docs] def waitingHasFinished(self): return os.path.exists(self._getStopWaitingFilename())
def _getStopWaitingFilename(self): return self._getExtraPath(SIGNAL_FILENAME)