Source code for pwem.protocols.protocol_movies

# **************************************************************************
# *
# * Authors:     Airen Zaldivar Peraza (
# *              Roberto Marabini (
# *              J.M. de la Rosa Trevin (
# *              Vahid Abrishami (
# *
# * Unidad de  Bioinformatica of Centro Nacional de Biotecnologia , CSIC
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 3 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address ''
# *
# **************************************************************************

import os
from os.path import join, basename, exists
from datetime import datetime

from pyworkflow import SCIPION_DEBUG_NOCLEAN
import pyworkflow.object as pwobj
import pyworkflow.protocol.params as params
import pyworkflow.protocol.constants as pwcts
import pyworkflow.utils as pwutils

import pwem.objects as emobj
from pwem import emlib

from .protocol_micrographs import ProtPreprocessMicrographs
from .protocol_particles import ProtExtractParticles

[docs]class ProtProcessMovies(ProtPreprocessMicrographs): """ Protocol base for processing movies from direct detectors cameras. This base class will iterate through the movies (extract them if compressed) and call a _processMovie method for each one. """ # Redefine this in subclasses if want to convert the movies to mrc # the value should be either 'mrc' or 'mrcs' CONVERT_TO_MRC = None CORRECT_GAIN = False def __init__(self, **kwargs): ProtPreprocessMicrographs.__init__(self, **kwargs) self.stepsExecutionMode = pwcts.STEPS_PARALLEL def _getConvertExtension(self, filename): """ This method will be used to check whether a movie needs to be converted to be used in a given program. Returns: If return None, it means that not conversion is required. If not None, the return value should be the extension that it should be converted. NOTE: Now by default this function use the CONVERT_TO_MRC property for backward compatibility reasons, but this method could be implemented in any subclass of ProtProcessMovies. """ if (self.CONVERT_TO_MRC and not (filename.endswith("mrc") or filename.endswith("mrcs"))): return self.CONVERT_TO_MRC return None
[docs] def getGainAndDark(self): inputMovs = self.inputMovies.get() return inputMovs.getGain(), inputMovs.getDark()
def _doCorrectGain(self): gain, dark = self.getGainAndDark() return getattr(self, 'CORRECT_GAIN', False) and (gain or dark) # --------------------------- DEFINE param functions ---------------------- def _defineParams(self, form): form.addSection(label=pwutils.Message.LABEL_INPUT) form.addParam('inputMovies', params.PointerParam, pointerClass='SetOfMovies', important=True, label=pwutils.Message.LABEL_INPUT_MOVS, help='Select a set of previously imported movies.') # --------------------------- INSERT steps functions ---------------------- def _insertAllSteps(self): # Build the list of all processMovieStep ids by # inserting each of the steps for each movie self.insertedDict = {} self.samplingRate = self.inputMovies.get().getSamplingRate() # Gain and Dark conversion step self.convertCIStep = [] convertStepId = self._insertFunctionStep('_convertInputStep', prerequisites=[]) self.convertCIStep.append(convertStepId) # Conversion step is part of processMovieStep because of streaming. movieSteps = self._insertNewMoviesSteps(self.insertedDict, self.inputMovies.get()) finalSteps = self._insertFinalSteps(movieSteps) self._insertFunctionStep('createOutputStep', prerequisites=finalSteps, wait=True) # STEP to convert correction images if apply def _convertInputStep(self): pwutils.makePath(self._getExtraPath('DONE')) movs = self.inputMovies.get() # Convert gain gain = movs.getGain() movs.setGain(self.__convertCorrectionImage(gain)) # Convert dark dark = movs.getDark() movs.setDark(self.__convertCorrectionImage(dark)) def __convertCorrectionImage(self, correctionImage): """ Will convert a gain or dark file to a compatible one and return the final file name. Otherwise, will return same passed parameter""" # Get final correction image file finalName = self.getFinalCorrectionImagePath(correctionImage) # If correctionImage is None, finalName will be None if finalName is None: return None elif not os.path.exists(finalName): # Conversion never happened... print('converting %s to %s' % (correctionImage, finalName)) if correctionImage.endswith('.gain'): # treat as tif file fnBase = basename(correctionImage) tmpLink = self._getTmpPath(pwutils.replaceExt(fnBase, 'tif')) pwutils.createAbsLink(correctionImage, tmpLink) correctionImage = tmpLink emlib.image.ImageHandler().convert(correctionImage, finalName) # return final name return os.path.abspath(finalName)
[docs] def getFinalCorrectionImagePath(self, correctionImage): """ Returns the final path to the correction image (converted or not) or and exception correctionImage does not exists""" # Return if the correctionImage is None or the empty string if not correctionImage: return None elif not os.path.exists(correctionImage): raise Exception("Correction image is set but not present in the " "file system. Either clean the value in the import " "protocol or copy the file to %s." % correctionImage) # Do we need to convert? convertTo = self._getConvertExtension(correctionImage) if convertTo is not None: # Get the base name fileName = basename(correctionImage) # Replace extension fileName = pwutils.replaceExt(fileName, convertTo) # "Place" it at extra folder. return self._getExtraPath(fileName) else: return correctionImage
def _insertFinalSteps(self, deps): """ This should be implemented in subclasses""" return deps def _getFirstJoinStepName(self): # This function will be used for streaming, to check which is # the first function that need to wait for all micrographs # to have completed, this can be overwritten in subclasses # (eg in Xmipp 'sortPSDStep') return 'createOutputStep' def _getFirstJoinStep(self): for s in self._steps: if s.funcName == self._getFirstJoinStepName(): return s return None def _loadInputList(self): """ Load the input set of movies and create a list. """ moviesFile = self.inputMovies.get().getFileName() self.debug("Loading input db: %s" % moviesFile) movieSet = emobj.SetOfMovies(filename=moviesFile) movieSet.loadAllProperties() self.listOfMovies = [m.clone() for m in movieSet] self.streamClosed = movieSet.isStreamClosed() movieSet.close() self.debug("Closed db.") def _checkNewInput(self): # Check if there are new movies to process from the input set localFile = self.inputMovies.get().getFileName() now = self.lastCheck = getattr(self, 'lastCheck', now) mTime = datetime.fromtimestamp(os.path.getmtime(localFile)) self.debug('Last check: %s, modification: %s' % (pwutils.prettyTime(self.lastCheck), pwutils.prettyTime(mTime))) # If the input movies.sqlite have not changed since our last check, # it does not make sense to check for new input data if self.lastCheck > mTime and hasattr(self, 'listOfMovies'): return None self.lastCheck = now # Open input movies.sqlite and close it as soon as possible self._loadInputList() newMovies = any(m.getObjId() not in self.insertedDict for m in self.listOfMovies) outputStep = self._getFirstJoinStep() if newMovies: fDeps = self._insertNewMoviesSteps(self.insertedDict, self.listOfMovies) if outputStep is not None: outputStep.addPrerequisites(*fDeps) self.updateSteps() def _checkNewOutput(self): pass # To be implemented in sub-classes def _stepsCheck(self): # Input movie set can be loaded or None when checked for new inputs # If None, we load it self._checkNewInput() self._checkNewOutput() def _insertNewMoviesSteps(self, insertedDict, inputMovies): """ Insert steps to process new movies (from streaming) Params: insertedDict: contains already processed movies inputMovies: input movies set to be check """ deps = [] # For each movie insert the step to process it for movie in inputMovies: if movie.getObjId() not in insertedDict: stepId = self._insertMovieStep(movie) deps.append(stepId) insertedDict[movie.getObjId()] = stepId return deps def _insertMovieStep(self, movie): """ Insert the processMovieStep for a given movie. """ # Note1: At this point is safe to pass the movie, since this # is not executed in parallel, here we get the params # to pass to the actual step that is gone to be executed later on # Note2: We are serializing the Movie as a dict that can be passed # as parameter for a functionStep movieDict = movie.getObjDict(includeBasic=True) movieStepId = self._insertFunctionStep('processMovieStep', movieDict, movie.hasAlignment(), prerequisites=self.convertCIStep) return movieStepId # --------------------------- STEPS functions -----------------------------
[docs] def convertInputStep(self): """ Should be implemented in sub-classes if needed. """ pass
[docs] def processMovieStep(self, movieDict, hasAlignment): movie = emobj.Movie() movie.setAcquisition(emobj.Acquisition()) if hasAlignment: movie.setAlignment(emobj.MovieAlignment()) movie.setAttributesFromDict(movieDict, setBasic=True, ignoreMissing=True) movieFolder = self._getOutputMovieFolder(movie) movieFn = movie.getFileName() movieName = basename(movieFn) movieDoneFn = self._getMovieDone(movie) if self.isContinued() and os.path.exists(movieDoneFn):"Skipping movie: %s, seems to be done" % movieFn) return # Clean old finished files pwutils.cleanPath(movieDoneFn) if self._filterMovie(movie): pwutils.makePath(movieFolder) pwutils.createAbsLink(os.path.abspath(movieFn), join(movieFolder, movieName)) if movieName.endswith('bz2'): newMovieName = movieName.replace('.bz2', '') # We assume that if compressed the name ends with .mrc.bz2 if not exists(newMovieName): self.runJob('bzip2', '-d -f %s' % movieName, cwd=movieFolder) elif movieName.endswith('tbz'): newMovieName = movieName.replace('.tbz', '.mrc') # We assume that if compressed the name ends with .tbz if not exists(newMovieName): self.runJob('tar', 'jxf %s' % movieName, cwd=movieFolder) elif movieName.endswith('.txt'): # Support a list of frame as a simple .txt file containing # all the frames in a raw list, we could use a xmd as well, # but a plain text was choose to simply its generation movieTxt = os.path.join(movieFolder, movieName) with open(movieTxt) as f: movieOrigin = os.path.basename(os.readlink(movieFn)) newMovieName = movieName.replace('.txt', '.mrcs') ih = emlib.image.ImageHandler() for i, line in enumerate(f): if line.strip(): inputFrame = os.path.join(movieOrigin, line.strip()) ih.convert(inputFrame, (i+1, os.path.join(movieFolder, newMovieName))) else: newMovieName = movieName convertExt = self._getConvertExtension(newMovieName) correctGain = self._doCorrectGain() if convertExt or correctGain: inputMovieFn = os.path.join(movieFolder, newMovieName) if inputMovieFn.endswith('.em'): inputMovieFn += ":ems" if convertExt: newMovieName = pwutils.replaceExt(newMovieName, convertExt) else: newMovieName = '%s_corrected.%s' % os.path.splitext(newMovieName) outputMovieFn = os.path.join(movieFolder, newMovieName) # If the protocols wants Scipion to apply the gain, then # there is no reason to convert, since we can produce the # output in the format expected by the program. In some cases, # the alignment programs can directly deal with gain and dark # correction images, so we don't need to apply it if self._doCorrectGain():"Correcting gain and dark '%s' -> '%s'" % (inputMovieFn, outputMovieFn)) gain, dark = self.getGainAndDark() self.correctGain(inputMovieFn, outputMovieFn, gainFn=gain, darkFn=dark) else:"Converting movie '%s' -> '%s'" % (inputMovieFn, outputMovieFn)) emlib.image.ImageHandler().convertStack(inputMovieFn, outputMovieFn) # Just store the original name in case it is needed in _processMovie movie._originalFileName = pwobj.String(objDoStore=False) movie._originalFileName.set(movie.getFileName()) # Now set the new filename (either linked or converted) movie.setFileName(os.path.join(movieFolder, newMovieName))"Processing movie: %s" % movie.getFileName()) self._processMovie(movie) if self._doMovieFolderCleanUp(): self._cleanMovieFolder(movieFolder) # Mark this movie as finished open(movieDoneFn, 'w').close()
# --------------------------- UTILS functions ---------------------------- def _getOutputMovieFolder(self, movie): """ Create a Movie folder where to work with it. """ return self._getTmpPath('movie_%06d' % movie.getObjId()) def _getMovieName(self, movie, ext='.mrc'): return self._getExtraPath('movie_%06d%s' % (movie.getObjId(), ext)) def _getMovieDone(self, movie): return self._getExtraPath('DONE', 'movie_%06d.TXT' % movie.getObjId()) def _isMovieDone(self, movie): """ A movie is done if the marker file exists. """ return os.path.exists(self._getMovieDone(movie)) def _getAllDone(self): return self._getExtraPath('DONE', 'all.TXT') def _getAllFailed(self): return self._getExtraPath('FAILED_all.TXT') def _readDoneList(self): """ Read from a text file the id's of the items that have been done. """ doneFile = self._getAllDone() doneList = [] # Check what items have been previously done if os.path.exists(doneFile): with open(doneFile) as f: doneList += [int(line.strip()) for line in f] return doneList def _writeDoneList(self, movieList): """ Write to a text file the items that have been done. """ with open(self._getAllDone(), 'a') as f: for movie in movieList: f.write('%d\n' % movie.getObjId()) # --------------------------- OVERRIDE functions -------------------------- def _filterMovie(self, movie): """ Check if process or not this movie. """ return True def _processMovie(self, movie): """ Process the movie actions, remember to: 1) Generate all output files inside movieFolder (usually with cwd in runJob) 2) Copy the important result files after processing (movieFolder will be deleted!!!) """ pass # FIXME: check if the following functions could be removed def _getNameExt(self, movieName, postFix, ext): if movieName.endswith("bz2"): # removeBaseExt function only eliminate the last extension, # but if files are compressed, we need to eliminate two extensions: # bz2 and its own image extension (e.g: mrcs, em, etc) return pwutils.removeBaseExt(pwutils.removeBaseExt(movieName)) + postFix + '.' + ext else: return pwutils.removeBaseExt(movieName) + postFix + '.' + ext def _getCorrMovieName(self, movieId, ext='.mrcs'): return 'movie_%06d%s' % (movieId, ext) def _getLogFile(self, movieId): return 'micrograph_%06d_Log.txt' % movieId def _doMovieFolderCleanUp(self): """ This functions allows subclasses to change the default behaviour of cleanup the movie folders after the _processMovie function. In some cases it makes sense that the protocol subclass take cares of when to do the clean up. """ return True def _cleanMovieFolder(self, movieFolder): if pwutils.envVarOn(SCIPION_DEBUG_NOCLEAN):'Clean movie data DISABLED. ' 'Movie folder will remain in disk!!!') else:"Erasing.....movieFolder: %s" % movieFolder) os.system('rm -rf %s' % movieFolder)
# cleanPath(movieFolder)
[docs]class ProtExtractMovieParticles(ProtExtractParticles, ProtProcessMovies): """ Extract a set of Particles from each frame of a set of Movies. """ def _defineParams(self, form): ProtExtractParticles._defineParams(self, form) ProtProcessMovies._defineParams(self, form)
[docs]class ProtMovieAssignGain(ProtPreprocessMicrographs): """ Assign a gain image to a set of movies """ _label = 'assign gain to movies' def __init__(self, **kwargs): ProtPreprocessMicrographs.__init__(self, **kwargs) # --------------------------- DEFINE param functions ---------------------- def _defineParams(self, form): form.addSection(label=pwutils.Message.LABEL_INPUT) form.addParam('inputMovies', params.PointerParam, pointerClass='SetOfMovies', important=True, label=pwutils.Message.LABEL_INPUT_MOVS, help='Select a set of previously imported movies.') form.addParam('gainImage', params.PointerParam, pointerClass='Image', label="Gain image", help="Select a gain image. The movie will be corrected " "as newMovie=Movie/gain") # --------------------------- INSERT steps functions ---------------------- def _insertAllSteps(self): self._insertFunctionStep('createOutputStep') # --------------------------- STEPS functions -----------------------------
[docs] def createOutputStep(self): moviesIn = self.inputMovies.get() moviesOut = self._createSetOfMovies() moviesOut.copyInfo(moviesIn) moviesOut.setGain(self.gainImage.get().getFileName()) moviesOut.copyItems(moviesIn) self._defineOutputs(outputMovies=moviesOut) self._defineSourceRelation(self.inputMovies, moviesOut)