Source code for pwem.protocols.protocol_movie_eraser

import os
import time

from pwem.objects import SetOfMovies, SetOfCoordinates, SetOfMicrographs
from pwem.protocols import EMProtocol
from pyworkflow import BETA
from pyworkflow.protocol import params
import pyworkflow.utils as pwutils
from pwem.objects.data import Movie
from copy import deepcopy
from pyworkflow.utils import ProgressBar

[docs]class ProtMovieEraser(EMProtocol): """ Protocol for removing movies based on different conditions: - If the input is SetOfMicrographs, it removes movies already aligned into micrographs. - If the input is SetOfCoordinates, it removes movies with unselected particles. WARNING: There is no way back. Be sure you understand the consequences. """ _label = "movie eraser" _devStatus = BETA def __init__(self, **kwargs): super().__init__(**kwargs) def _defineParams(self, form): form.addSection(label='Input') form.addParam('sourceSet', params.PointerParam, pointerClass='SetOfMicrographs, SetOfCoordinates', label="Input set", important=True, help='Select the SetOfMicrographs or Coordinates to drive the movie DELETION process.') # TODO: ADD SetOFCoordinates form.addParam('movieSet', params.PointerParam, pointerClass='SetOfMovies', label="Movies source", important=True, help='Select the SetOfMovies to locate the movie files. Match will be done by ids!.') form.addParam("dryMode", params.BooleanParam, label="Dry mode", default=True, expertLevel=params.LEVEL_ADVANCED) # -------------------------- STEP methods --------------------------------- def _insertAllSteps(self): # Insert different steps based on the input type sourceSet = self.sourceSet.get() if isinstance(sourceSet, SetOfMicrographs): self._insertFunctionStep(self.deleteMoviesMicStep) state = False elif isinstance(sourceSet, SetOfCoordinates): self._insertFunctionStep(self.deleteMoviesCoorStep) state = True else: raise NotImplementedError( "Only SetOfMicrographs or SetOfCoordinates are supported") self._insertFunctionStep(self.deleteMoviesStep) # Initialize a dictionary to keep track of movies and # their deletion status self.movMicNameList = {} self.movieSet = self.movieSet.get() self.coordSet = self.sourceSet.get() for mov in self.movieSet: newMovie = Movie() newMovie = deepcopy(mov) self.movMicNameList[mov.getMicName()] =\ {'mov': newMovie, 'deletethis' : state}
[docs] def deleteMoviesMicStep(self): """ Delete all movies origin of the micset until set is closed""" lastMicId = 0 # create an auxiliary list with micNames so we # do not need to make a search in the databse for each mic # For each mic micSet = self.sourceSet.get() for mic in micSet: self.movMicNameList[mic.getMicName()]['deletethis'] = True
[docs] def deleteMoviesCoorStep(self): """ Delete movies that are not related with the coordinate set""" movieSetWithCoordinates = self.coordSet.getUniqueValues("_micName") for micName in movieSetWithCoordinates: self.movMicNameList[micName]['deletethis'] = False
[docs] def deleteMoviesStep(self): """ Delete movies using the movMicNameList""" deletedMovies = self._createSetOfMovies(suffix="deleted") keptMovies = self._createSetOfMovies(suffix="kept") deletedMovies.copyInfo(self.movieSet ) keptMovies.copyInfo(self.movieSet ) try: ## PHN: for security deletedMovies.setGain(self.movieSet.getGain()) deletedMovies.setDark(self.movieSet.getDark()) deletedMovies.setFramesRange(self.movieSet.getFramesRange()) deletedMovies._firstFramesRange = self.movieSet._firstFramesRange keptMovies.setGain(self.movieSet.getGain()) keptMovies.setDark(self.movieSet.getDark()) keptMovies.setFramesRange(self.movieSet.getFramesRange()) keptMovies._firstFramesRange = self.movieSet._firstFramesRange except Exception as e: raise AttributeError(f"Missing parameter {e}") # Iterate through movies, delete and update sets progress = ProgressBar(total=len(self.movieSet), fmt=ProgressBar.NOBAR) progress.start() step = max(100, len(self.movieSet) // 100) for i, mov in enumerate(self.movMicNameList.values()): if i % step == 0: progress.update(i+1) newMovie = Movie() oldMovie = mov['mov'] newMovie.copyInfo(oldMovie) if mov['deletethis']: deletedMovies.append(newMovie) self._deleteMovie(oldMovie.getFileName()) else: keptMovies.append(newMovie) progress.finish() outputArgs = {"deletedMovies": deletedMovies, "keptMovies": keptMovies} self._defineOutputs(**outputArgs) self._defineSourceRelation(self.movieSet, deletedMovies) self._defineSourceRelation(self.movieSet, keptMovies)
def _deleteMovie(self, movieFile): """ Deletes the movie named movieFile""" # Check existence if not os.path.exists(movieFile): print("%s is gone already!." % movieFile) # if dry mode elif self.dryMode.get(): print("%s would have been deleted" % movieFile, flush=True) # Delete!!! else: # Delete the movie if os.path.islink(movieFile): movieFile2 = os.path.realpath(movieFile) else: movieFile2 = movieFile os.remove(movieFile2) # print("%s deleted." % movieFile, movieFile2, flush=True) # -------------------------- INFO functions ------------------------------- def _warnings(self): if not self.dryMode.get(): return ["If you continue, all movies related to the set of micrographs will be DELETED!. Try to run it in \"dry mode\" to double check first."] def _summary(self): summary = [] summary.append("Dry mode is active. Harmless" if self.dryMode.get() else "Dry mode deactivated, harm can be done!") return summary