# **************************************************************************
# *
# * Authors: J.M. De la Rosa Trevin (jmdelarosa@cnb.csic.es)
# *
# * Unidad de Bioinformatica of Centro Nacional de Biotecnologia , CSIC
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 3 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307 USA
# *
# * All comments concerning this program package may be sent to the
# * e-mail address 'scipion@cnb.csic.es'
# *
# **************************************************************************
import time
import pyworkflow.object as pwobj
import pyworkflow.protocol as pwprot
import pyworkflow.protocol.params as params
import pyworkflow.utils as pwutils
import pwem.objects as emobj
import pwem.constants as emcts
[docs]class EMProtocol(pwprot.Protocol):
""" Base class to all EM protocols.
It will contains some common functionalities.
"""
_base = True
def __init__(self, **kwargs):
pwprot.Protocol.__init__(self, **kwargs)
def __createSet(self, SetClass, template, suffix, **kwargs):
""" Create a set and set the filename using the suffix.
If the file exists, it will be delete. """
setFn = self._getPath(template % suffix)
# Close the connection to the database if
# it is open before deleting the file
pwutils.cleanPath(setFn)
setObj = SetClass(filename=setFn, **kwargs)
return setObj
def _createSetOfMicrographs(self, suffix=''):
return self.__createSet(emobj.SetOfMicrographs,
'micrographs%s.sqlite', suffix,
indexes=['_index'])
def _createSetOfCoordinates(self, micSet, suffix=''):
coordSet = self.__createSet(emobj.SetOfCoordinates,
'coordinates%s.sqlite', suffix,
indexes=['_micId'])
coordSet.setMicrographs(micSet)
return coordSet
def _createCoordinatesTiltPair(self, micTiltPairs, uCoords, tCoords,
angles, suffix):
coordTiltPairs = self.__createSet(emobj.CoordinatesTiltPair,
'coordinates_pairs%s.sqlite', suffix,
indexes=['_untilted._micId',
'_tilted._micId'])
coordTiltPairs.setUntilted(uCoords)
coordTiltPairs.setTilted(tCoords)
coordTiltPairs.setAngles(angles)
coordTiltPairs.setMicsPair(micTiltPairs)
for coordU, coordT in zip(uCoords, tCoords):
coordTiltPairs.append(emobj.TiltPair(coordU, coordT))
return coordTiltPairs
def _createSetFromName(self, className, suffix=''):
""" Create a new set from the given className. """
_createSetFunc = getattr(self, '_create%s' % className)
return _createSetFunc(suffix)
def _createSetOfImages(self, suffix=''):
return self.__createSet(emobj.SetOfImages, 'images%s.sqlite', suffix)
def _createSetOfParticles(self, suffix=''):
return self.__createSet(emobj.SetOfParticles, 'particles%s.sqlite', suffix,
indexes=['_classId', '_micId'])
def _createSetOfAverages(self, suffix=''):
return self.__createSet(emobj.SetOfAverages, 'averages%s.sqlite', suffix)
def _createSetOfMovieParticles(self, suffix=''):
return self.__createSet(emobj.SetOfMovieParticles,
'movie_particles%s.sqlite', suffix)
def _createSetOfClasses2D(self, imgSet, suffix=''):
classes = self.__createSet(emobj.SetOfClasses2D,
'classes2D%s.sqlite', suffix)
classes.setImages(imgSet)
return classes
def _createSetOfClasses3D(self, imgSet, suffix=''):
classes = self.__createSet(emobj.SetOfClasses3D,
'classes3D%s.sqlite', suffix)
classes.setImages(imgSet)
return classes
def _createSetOfClassesVol(self, suffix=''):
return self.__createSet(emobj.SetOfClassesVol, 'classesVol%s.sqlite', suffix)
def _createSetOfVolumes(self, suffix=''):
return self.__createSet(emobj.SetOfVolumes, 'volumes%s.sqlite', suffix)
def _createSetOfCTF(self, suffix=''):
return self.__createSet(emobj.SetOfCTF, 'ctfs%s.sqlite', suffix)
def _createSetOfNormalModes(self, suffix=''):
return self.__createSet(emobj.SetOfNormalModes, 'modes%s.sqlite', suffix)
def _createSetOfMovies(self, suffix=''):
return self.__createSet(emobj.SetOfMovies, 'movies%s.sqlite', suffix)
def _createSetOfAngles(self, suffix=''):
return self.__createSet(emobj.SetOfAngles,
'tiltpairs_angles%s.sqlite', suffix)
def _createSetOfFSCs(self, suffix=''):
return self.__createSet(emobj.SetOfFSCs, 'fscs%s.sqlite', suffix)
def _createSetOfPDBs(self, suffix=''):
return self.__createSet(emobj.SetOfAtomStructs, 'pdbs%s.sqlite', suffix)
def _defineSourceRelation(self, srcObj, dstObj):
""" Add a DATASOURCE relation between srcObj and dstObj """
self._defineRelation(pwobj.RELATION_SOURCE, srcObj, dstObj)
def _defineTransformRelation(self, srcObj, dstObj):
self._defineRelation(pwobj.RELATION_TRANSFORM, srcObj, dstObj)
# A transform relation allways implies a source relation
self._defineSourceRelation(srcObj, dstObj)
def _defineCtfRelation(self, micsObj, ctfsObj):
self._defineRelation(emcts.RELATION_CTF, micsObj, ctfsObj)
# A ctf relation allways implies a source relation
self._defineSourceRelation(micsObj, ctfsObj)
def _insertChild(self, key, child):
if isinstance(child, emobj.Set):
child.write()
pwprot.Protocol._insertChild(self, key, child)
def _validateDim(self, obj1, obj2, errors,
label1='Input 1', label2='Input 2'):
""" Validate that obj1 and obj2 has the same dimensions.
Params:
obj1, obj2: input objects that can be Images or SetOfImages subclasses.
errors: an error list where to append the error if not same dimensions
label1, label2: labels for input objects used for the error message.
"""
if obj1 is not None and obj2 is not None:
d1 = obj1.getXDim()
d2 = obj2.getXDim()
if d1 is None:
errors.append("Can not get dimensions from %s." % label1)
elif d2 is None:
errors.append("Can not get dimensions from %s." % label2)
elif d1 != d2:
msg = '%s and %s have not the same dimensions, \n' % (label1, label2)
msg += 'which are %d and %d, respectively' % (d1, d2)
errors.append(msg)
def __str__(self):
return self.getObjLabel()
[docs] def allowsDelete(self, obj):
if (isinstance(obj, emobj.SetOfCoordinates) or
isinstance(obj, emobj.CoordinatesTiltPair)):
return True
return False
# ------ Methods for Streaming picking --------------
def _defineStreamingParams(self, form):
""" This function can be called during the _defineParams method
of some protocols that support stream processing.
It will add an Streaming section together with the following
params:
streamingSleepOnWait: Some streaming protocols are quite fast,
so, checking input/ouput updates creates an IO overhead.
This params allows them to sleep (without consuming resources)
to wait for new work to be done.
streamingBatchSize: For some programs it is more efficient to process
many items at once and not one by one. So this parameter will
allow to group a number of items to be processed in the same
protocol step. This can also reduce some IO overhead and spawning
new OS processes.
"""
form.addSection("Streaming")
form.addParam("streamingWarning", params.LabelParam, important=True,
label="The following params are related to how "
"streaming is done in Scipion.")
form.addParam("streamingSleepOnWait", params.IntParam, default=0,
label="Sleep when waiting (secs)",
help="If you specify a value greater than zero, "
"it will be the number of seconds that the "
"protocol will sleep when waiting for new "
"input data in streaming mode. ")
form.addParam("streamingBatchSize", params.IntParam, default=1,
label="Batch size",
help="This value allows to group several items to be "
"processed inside the same protocol step. You can "
"use the following values: \n"
"*1* The default behavior, the items will be "
"processed one by one.\n"
"*0* Put in the same step all the items "
"available. If the sleep time is short, it could be "
"practically the same of one by one. If not, you "
"could have steps with more items. If the steps will "
"be executed in parallel, it is better not to use "
"this option.\n"
"*>1* The number of items that will be grouped into "
"a step.")
def _getStreamingSleepOnWait(self):
return self.getAttributeValue('streamingSleepOnWait', 0)
def _getStreamingBatchSize(self):
return self.getAttributeValue('streamingBatchSize', 1)
def _streamingSleepOnWait(self):
""" This method should be used by protocols that want to sleep
when there is not more work to do.
"""
sleepOnWait = self._getStreamingSleepOnWait()
if sleepOnWait > 0:
self.info("Not much work to do now, sleeping %s seconds."
% sleepOnWait)
time.sleep(sleepOnWait)
def _insertNewMics(self, inputMics, getMicKeyFunc,
insertStepFunc, insertStepListFunc, *args):
""" Insert steps of new micrographs taking into account the batch size.
It is assumed that a self.micDict exists mapping between micKey and mic.
It is also assumed that self.streamClosed is defined...with True value
if the input stream is closed.
This function can be used from several base protocols that support
streaming and batch:
- ProtCTFMicrographs
- ProtParticlePickingAuto
- ProtExtractParticles
Params:
inputMics: the input micrographs to be inserted into steps
getMicKeyFunc: function to get the key of a micrograph
(usually mic.getMicName()
insertStepFunc: function used to insert a single step
insertStepListFunc: function used to insert many steps.
*args: argument list to be passed to step functions
Returns:
The list of step Ids that can be used as dependencies.
"""
deps = []
insertedMics = inputMics
# Despite this function only should insert new micrographs
# let's double check that they are not inserted already
micList = [mic for mic in inputMics
if getMicKeyFunc(mic) not in self.micDict]
def _insertSubset(micSubset):
stepId = insertStepListFunc(micSubset, self.initialIds, *args)
deps.append(stepId)
# Now handle the steps depending on the streaming batch size
batchSize = self._getStreamingBatchSize()
if batchSize == 1: # This is one by one, as before the batch size
for mic in micList:
stepId = insertStepFunc(mic, self.initialIds, *args)
deps.append(stepId)
elif batchSize == 0: # Greedy, take all available ones
_insertSubset(micList)
else: # batchSize > 0, insert only batches of this size
n = len(inputMics)
d = int(n / batchSize) # number of batches to insert
nd = d * batchSize
for i in range(d):
_insertSubset(micList[i * batchSize:(i + 1) * batchSize])
if n > nd and self.streamClosed: # insert last ones
_insertSubset(micList[nd:])
else:
insertedMics = micList[:nd]
for mic in insertedMics:
self.micDict[getMicKeyFunc(mic)] = mic
return deps