Source code for xmipp3.protocols.protocol_movie_max_shift

# **************************************************************************
# *
# * Authors:    David Maluenda    (dmaluenda@cnb.csic.es)
# *             Daniel Marchan (da.marchan@cnb.csic.es) REFACTOR
# *
# * Unidad de  Bioinformatica of Centro Nacional de Biotecnologia , CSIC
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'scipion@cnb.csic.es'
# *
# **************************************************************************
import os
from datetime import datetime
from os.path import exists
import numpy as np
import copy

from pyworkflow import VERSION_3_0
from pyworkflow import UPDATED, PROD
import pyworkflow.protocol.params as params
import pyworkflow.utils as pwutils
from pyworkflow.object import Set
from pyworkflow.protocol.constants import STATUS_NEW
from pyworkflow.protocol.params import PointerParam
from pyworkflow.utils.properties import Message

from pwem.protocols import ProtProcessMovies
from pwem.objects import SetOfMicrographs, SetOfMovies


OUTPUT_MICS = "outputMicrographs"
OUTPUT_MICS_DISCARDED = "outputMicrographs"
OUTPUT_MOVIES = "outputMovies"
OUTPUT_MOVIES_DISCARDED = "outputMoviesDiscarded"
OUTPUT_MICS_DW = "outputMicrographsDoseWeighted"
OUTPUT_MICS_DW_DISCARDED = "outputMicrographsDoseWeightedDiscarded"

[docs]class XmippProtMovieMaxShift(ProtProcessMovies): """ Protocol to make an automatic rejection of those movies whose frames move more than a given threshold. Rejection criteria: - *by frame*: Rejects movies with drifts between frames bigger than a certain maximum. - *by whole movie*: Rejects movies with a total travel bigger than a certain maximum. - *by frame and movie*: Rejects movies if both conditions above are met. - *by frame or movie*: Rejects movies if one of the conditions above are met. AI Generated ## Overview The Movie Maxshift protocol automatically separates aligned movies according to the amount of motion estimated during movie alignment. During cryo-EM movie acquisition, particles and ice may move because of beam-induced motion, mechanical instabilities, charging, or other acquisition effects. Movie-alignment protocols estimate this motion as a sequence of shifts between frames. If a movie shows excessive motion, it may produce a blurred or unreliable averaged micrograph even after alignment. This protocol evaluates the alignment shifts already stored in a set of aligned movies. It can reject movies based on large jumps between consecutive frames, large total accumulated motion across the whole movie, or a combination of both criteria. The output consists of accepted and discarded movie sets. When associated micrographs are available from the previous movie-alignment protocol, the protocol also produces accepted and discarded micrograph sets. ## Inputs and General Workflow The main input is a set of previously aligned movies. The protocol does not perform movie alignment itself. Instead, it reads the shift trajectory already associated with each movie. For each movie, it computes motion statistics from the X and Y shifts and applies the selected rejection rule. Movies that satisfy the selected motion-quality criteria are placed in the accepted output set. Movies that exceed the selected thresholds are placed in the discarded output set. If the input aligned movies are associated with output micrographs from the previous alignment protocol, those micrographs are also separated into accepted and discarded sets using the same movie decisions. ## Input Movies The **Input movies** parameter must point to a set of aligned movies. This is required because the protocol evaluates the shifts estimated during movie alignment. If the movies do not contain alignment information, the protocol cannot compute frame-to-frame motion or total movie travel. Typical inputs are movie sets produced by movie-alignment protocols such as FlexAlign, MotionCorr-like workflows, or other Scipion-compatible alignment steps. ## Rejection Type The **Rejection type** parameter defines how the protocol decides whether a movie should be discarded. There are four possible criteria: **By frame** rejects movies if the maximum displacement between two consecutive frames is larger than the selected frame-shift threshold. **By whole movie** rejects movies if the total accumulated trajectory length over the full movie is larger than the selected movie-shift threshold. **By frame and movie** rejects movies only if both conditions are met: the movie has a large frame-to-frame jump and a large total displacement. **By frame or movie** rejects movies if either condition is met. This is the most conservative option and is the default behavior. The best choice depends on how strict the user wants the quality-control step to be. The “or” criterion removes any movie that violates either motion limit, whereas the “and” criterion removes only movies that are problematic by both measures. ## Maximum Frame Shift The **Max. frame shift** parameter defines the largest allowed displacement between consecutive frames. The protocol computes the difference between the X and Y shifts of consecutive frames and converts this displacement into angstroms using the movie sampling rate. If the largest frame-to-frame displacement exceeds the threshold, the movie satisfies the frame-rejection condition. Large frame-to-frame jumps may indicate unstable alignment, sudden specimen movement, stage instability, charging events, or problematic frames. Such jumps can be especially damaging because they may not be well corrected by smooth motion models. This threshold is used when the rejection type includes the frame criterion. ## Maximum Movie Shift The **Max. movie shift** parameter defines the largest allowed total travel across the movie. The protocol computes the distance between consecutive points of the shift trajectory and sums these distances along the movie. This gives the total path length traveled by the aligned frames, expressed in angstroms. A movie may have no single extreme jump but still show a large accumulated motion. Such movies may be less reliable because the specimen has moved substantially during exposure. This threshold is used when the rejection type includes the whole-movie criterion. ## Frame-to-Frame Motion Versus Total Motion The two rejection criteria capture different types of problems. Frame-to-frame motion detects sudden jumps. A movie with one abrupt movement may be rejected even if its total motion is not very large. Total movie motion detects accumulated drift. A movie may move gradually and smoothly, but if the accumulated travel is very large, the final correction may still be less reliable. Using both criteria gives a more complete assessment of movie behavior. The default “by frame or movie” option is strict because it discards movies that fail either test. ## Accepted Movies The **outputMovies** set contains the movies that pass the selected motion criteria. These movies keep their original alignment information. They can be used for downstream processing or for generating accepted micrographs when available. The accepted set should be understood as the subset whose estimated motion is within the user-defined limits. ## Discarded Movies The **outputMoviesDiscarded** set contains movies that exceed the selected motion thresholds. A discarded movie is not necessarily completely unusable, but it has motion behavior that the user decided to consider risky. Users may inspect discarded movies to decide whether the thresholds are too strict or whether the discarded movies show real acquisition problems. This output is useful for troubleshooting, quality-control reports, and understanding how much data is lost because of excessive motion. ## Associated Micrograph Outputs If the input movies come from a previous protocol that also produced micrographs, Movie Maxshift can separate those micrographs into accepted and discarded sets using the same movie decisions. Depending on the previous protocol, these may be regular averaged micrographs or dose-weighted micrographs. This is useful because downstream processing often continues from micrographs rather than movies. By propagating the movie-level decision to the micrographs, the protocol allows the user to continue the workflow only with motion-acceptable micrographs. If no associated micrograph set is found, the protocol still produces the movie outputs, but no micrograph outputs are created. ## Streaming Behavior The protocol supports streaming input. As new aligned movies appear, their shifts are evaluated and the accepted and discarded outputs are updated. The output sets remain open while the input stream is open and are closed when all movies have been processed. This makes the protocol useful for online quality control during data acquisition or automated processing. ## Practical Recommendations Use this protocol after movie alignment, not before. It requires alignment shifts stored in the input movies. Start with the default rejection type, **by frame or movie**, if you want a conservative quality-control filter. Use **by frame** when you mainly want to detect abrupt jumps between frames. Use **by whole movie** when you mainly want to detect large accumulated drift. Use **by frame and movie** when you want a more permissive rule that discards only movies with both sudden jumps and large total travel. Inspect a subset of discarded movies and their corresponding micrographs. This helps determine whether the thresholds are appropriate for the dataset. Be careful with very strict thresholds. They may remove useful data, especially in datasets with strong but well-corrected beam-induced motion. Remember that this protocol evaluates the magnitude of estimated motion. It does not directly evaluate CTF quality, particle quality, ice contamination, or final reconstruction performance. ## Final Perspective Movie Maxshift is a movie-alignment quality-control protocol. It uses the motion trajectory estimated during alignment to identify movies with excessive frame-to-frame jumps or excessive accumulated drift. For biological users and facility workflows, this provides a simple and interpretable way to remove movies that may compromise downstream processing. It is especially useful in automated pipelines, where problematic movies should be detected and separated before CTF estimation, particle picking, and reconstruction. The protocol is most effective when thresholds are chosen in relation to the expected motion behavior of the dataset and when accepted/discarded results are checked visually during workflow setup. """ _label = 'movie maxshift' _devStatus = PROD _lastUpdateVersion = VERSION_3_0 _possibleOutputs = {OUTPUT_MICS: SetOfMicrographs, OUTPUT_MICS_DW: SetOfMicrographs, OUTPUT_MICS_DISCARDED: SetOfMicrographs, OUTPUT_MICS_DW_DISCARDED: SetOfMicrographs, OUTPUT_MOVIES: SetOfMovies, OUTPUT_MOVIES_DISCARDED: SetOfMovies } REJ_TYPES = ['by frame', 'by whole movie', 'by frame and movie', 'by frame or movie'] REJ_FRAME = 0 REJ_MOVIE = 1 REJ_AND = 2 REJ_OR = 3 def __init__(self, **args): ProtProcessMovies.__init__(self, **args) #self.stepsExecutionMode = STEPS_PARALLEL # -------------------------- DEFINE param functions ------------------------ def _defineParams(self, form): form.addSection(label=Message.LABEL_INPUT) form.addParam('inputMovies', PointerParam, important=True, label=Message.LABEL_INPUT_MOVS, pointerClass='SetOfMovies', help='Select a set of previously aligned Movies.') form.addParam('rejType', params.EnumParam, choices=self.REJ_TYPES, label='Rejection type', default=self.REJ_OR, help='Rejection criteria:\n' ' - *by frame*: Rejects movies with drifts between ' 'frames bigger than a certain maximum.\n' ' - *by whole movie*: Rejects movies with a total ' 'travel bigger than a certain maximmum.\n' ' - *by frame and movie*: Rejects movies if both ' 'conditions above are met.\n' ' - *by frame or movie*: Rejects movies if one of ' 'the conditions above are met.') form.addParam('maxFrameShift', params.FloatParam, default=10, label='Max. frame shift (A)', condition='rejType==%s or rejType==%s or rejType==%s' % (self.REJ_FRAME, self.REJ_AND, self.REJ_OR), help='Maximum drift between consecutive frames ' 'to evaluate the frame condition.') form.addParam('maxMovieShift', params.FloatParam, default=45, label='Max. movie shift (A)', condition='rejType==%s or rejType==%s or rejType==%s' % (self.REJ_MOVIE, self.REJ_AND, self.REJ_OR), help='Maximum total travel to evaluate the whole movie ' 'condition.') #--------------------------- INSERT steps functions ------------------------ def _insertAllSteps(self): """ Insert the steps to perform movie alignment evaluation """ self.initializeStep() self._insertFunctionStep(self.createOutputStep, prerequisites=[], wait=True, needsGPU=False)
[docs] def initializeStep(self): self.samplingRate = self.inputMovies.get().getSamplingRate() self.movsFn = self.inputMovies.get().getFileName() # Important to have both: self.insertedIds = [] # Contains images that have been inserted in a Step (checkNewInput). # Contains images that have been processed in a Step (checkNewOutput). self.acceptedIds = [] self.discardedIds = [] self.isStreamClosed = self.inputMovies.get().isStreamClosed() self.alreadyLoad = False
[docs] def createOutputStep(self): self._closeOutputSet()
def _loadInputSet(self, movsFn): """ Load the input set of movies and create a list. """ self.debug("Loading input db: %s" % movsFn) movSet = SetOfMovies(filename=movsFn) movSet.loadAllProperties() movSet.close() self.debug("Closed db.") return movSet def _loadMicAssociatedInputSet(self): """ Load the input set of mics and create a list. """ parentProt = self.getMapper().getParent(self.inputMovies.get()) micSet = getattr(parentProt, self.outMicName, None) micSet.loadAllProperties() micSet.close() return micSet def _stepsCheck(self): # Input micrograph set can be loaded or None when checked for new inputs # If None, we load it self._checkNewInput() self._checkNewOutput() def _checkNewInput(self): # Check if there are new micrographs to process from the input set self.lastCheck = getattr(self, 'lastCheck', datetime.now()) mTime = datetime.fromtimestamp(os.path.getmtime(self.movsFn)) self.debug('Last check: %s, modification: %s' % (pwutils.prettyTime(self.lastCheck), pwutils.prettyTime(mTime))) # If the input micrographs.sqlite have not changed since our last check, # it does not make sense to check for new input data if self.lastCheck > mTime and self.insertedIds: # If this is empty it is dut to a static "continue" action or it is the first round return None # Open input micrographs.sqlite and close it as soon as possible movSet = self._loadInputSet(self.movsFn) movSetIds = movSet.getIdSet() newIds = [idMic for idMic in movSetIds if idMic not in self.insertedIds] self.isStreamClosed = movSet.isStreamClosed() self.lastCheck = datetime.now() movSet.close() outputStep = self._getFirstJoinStep() if self.isContinued() and not self.insertedIds: # For "Continue" action and the first round doneIds, _, _, _ = self._getAllDoneIds() skipIds = list(set(newIds).intersection(set(doneIds))) newIds = list(set(newIds).difference(set(doneIds))) self.info("Skipping Mics with ID: %s, seems to be done" % skipIds) self.insertedIds = doneIds # During the first round of "Continue" action it has to be filled if newIds: fDeps = self._insertNewMoviesSteps(newIds) if outputStep is not None: outputStep.addPrerequisites(*fDeps) self.updateSteps() def _insertNewMoviesSteps(self, newIds): """ Insert the processMovieStep for a given movie. """ if not self.alreadyLoad: # looking for a setOfMicrographs related to the inputMovies self.setInputMics() # Insert the selection/rejection step process deps = [] stepId = self._insertFunctionStep(self._evaluateMovieAlign, newIds, needsGPU=False, prerequisites=[]) deps.append(stepId) for movId in newIds: self.insertedIds.append(movId) return deps def _evaluateMovieAlign(self, movIds): """ Fill the accepted or the rejected list with the movie. """ inputMovies = self._loadInputSet(self.movsFn) sampling = self.samplingRate for movieId in movIds: movie = inputMovies.getItem("id", movieId).clone() alignment = movie.getAlignment() # getShifts() returns the absolute shifts from a certain reference shiftListX, shiftListY = alignment.getShifts() # initialize the criteria values rejectedByMovie = False rejectedByFrame = False if any(shiftListX) or any(shiftListY): # we use np.arrays to use np.diff() shiftArrayX = np.asarray(shiftListX) shiftArrayY = np.asarray(shiftListY) evalBoth = self.rejType==self.REJ_AND or self.rejType==self.REJ_OR # --- Evaluation for accumulated displacement --- if self.rejType == self.REJ_MOVIE or evalBoth: deltaX = np.diff(shiftArrayX) deltaY = np.diff(shiftArrayY) # Magnitude for each step stepDistances = np.sqrt(deltaX ** 2 + deltaY ** 2) # Total sum of the displacement totalPath = np.sum(stepDistances) * sampling rejectedByMovie = totalPath > self.maxMovieShift.get() # --- Evaluation by maximum displacement between frames --- if self.rejType == self.REJ_FRAME or evalBoth: frameShiftX = np.diff(shiftArrayX) frameShiftY = np.diff(shiftArrayY) frameShifts = np.sqrt(frameShiftX ** 2 + frameShiftY ** 2) maxShiftM = np.max(frameShifts) * sampling rejectedByFrame = maxShiftM > self.maxFrameShift.get() if self.rejType == self.REJ_AND: if rejectedByFrame and rejectedByMovie: self.discardedIds.append(movieId) else: self.acceptedIds.append(movieId) else: # for the OR and the individuals evaluations if rejectedByFrame or rejectedByMovie: self.discardedIds.append(movieId) else: self.acceptedIds.append(movieId) else: # we accept the movie if no shifts is associated self.acceptedIds.append(movieId) def _checkNewOutput(self): """ Check for already selected Movies and update the output set. """ # load if first time in order to make dataSets relations _, _, doneListAccepted, doneListDiscarded = self._getAllDoneIds() # Check for newly done items acceptedIds = copy.deepcopy(self.acceptedIds) discardedIds = copy.deepcopy(self.discardedIds) newDoneAccepted = [movId for movId in acceptedIds if movId not in doneListAccepted] newDoneDiscarded = [movId for movId in discardedIds if movId not in doneListDiscarded] firstTimeAccepted = len(doneListAccepted) == 0 firstTimeDiscarded = len(doneListDiscarded) == 0 allDone = len(doneListAccepted) + len(doneListDiscarded) + \ len(newDoneAccepted) + len(newDoneDiscarded) maxMicSize = self._loadInputSet(self.movsFn).getSize() # We have finished when there is not more input movies # (stream closed) and the number of processed movies is # equal to the number of inputs self.finished = self.isStreamClosed and allDone == maxMicSize streamMode = Set.STREAM_CLOSED if self.finished else Set.STREAM_OPEN # Some checks to debug self.debug('_checkNewOutput: ') self.debug(' newDoneAccepted: %d.' % len(newDoneAccepted)) self.debug(' newDoneDiscarded: %d,' % len(newDoneDiscarded)) if not self.finished and (not newDoneDiscarded and not newDoneAccepted): # If we are not finished and no new output have been produced # it does not make sense to proceed and updated the outputs # so we exit from the function here return def fillOutput(newDoneList, firstTime, AccOrDisc='Accepted'): """ A single function is used to fill the two sets (Accepted/Rej.) """ suffix1 = '' if self.outMicName == OUTPUT_MICS else '_dose-weighted' suffix = 'Discarded' if AccOrDisc=='Discarded' else '' enable = False if AccOrDisc=='Discarded' else True movieSet = self._loadOutputSet(SetOfMovies, 'movies%s.sqlite' % suffix) micsSet = self._loadOutputSet(SetOfMicrographs,'micrographs%s%s.sqlite'%(suffix1, suffix)) print('micrographs%s%s.sqlite'%(suffix1, suffix)) def tryToAppend(outSet, micOut): """ When micrograph is very big, sometimes it's not ready to be read Then we will wait for it up to a minute in 6 time-growing tries. Returns True if fails! """ if micOut is None: print('Mic/Movie is None do not introduce') else: micOut.setEnabled(enable) outSet.append(micOut) inputMovies = self._loadInputSet(self.movsFn) inputMics = self._loadMicAssociatedInputSet() inputMicsIds = inputMics.getIdSet() for movieId in newDoneList: movie = inputMovies.getItem("id", movieId).clone() tryToAppend(movieSet, movie) if movieId in inputMicsIds: mic = inputMics.getItem("id", movieId).clone() tryToAppend(micsSet, mic) else: self.info("Movie with id %d has not a micrograph associated" %movieId) if movieSet.getSize() > 0: self._updateOutputSet(OUTPUT_MOVIES + suffix, movieSet, streamMode) if self.inputMics is not None and micsSet.getSize() > 0: self._updateOutputSet(self.outMicName + suffix, micsSet, streamMode) if firstTime: # define relation just the first time if movieSet.getSize() > 0: self._defineTransformRelation(self.inputMovies.get(), movieSet) if self.inputMics is not None and micsSet.getSize() > 0: self._defineTransformRelation(self.inputMics, micsSet) movieSet.close() if self.inputMics is not None: micsSet.close() # We fill/update the output if there are something new or to close sets if newDoneAccepted: fillOutput(newDoneAccepted, firstTimeAccepted, AccOrDisc='Accepted') # We fill/update the output for the discarded movies if newDoneDiscarded: fillOutput(newDoneDiscarded, firstTimeDiscarded, AccOrDisc='Discarded') # Unlock createOutputStep if finished all jobs if self.finished: outputStep = self._getFirstJoinStep() if outputStep and outputStep.isWaiting(): outputStep.setStatus(STATUS_NEW) self._store() #--------------------------- UTILS functions ------------------------------- def _getAllDoneIds(self): doneIds = [] acceptedIds = [] discardedIds = [] sizeOutput = 0 if hasattr(self, OUTPUT_MOVIES): sizeOutput += self.outputMovies.getSize() acceptedIds.extend(list(self.outputMovies.getIdSet())) doneIds.extend(acceptedIds) if hasattr(self, OUTPUT_MOVIES_DISCARDED): sizeOutput += self.outputMoviesDiscarded.getSize() discardedIds.extend(list(self.outputMoviesDiscarded.getIdSet())) doneIds.extend(discardedIds) return doneIds, sizeOutput, acceptedIds, discardedIds def _loadOutputSet(self, SetClass, baseName): """ Load the output set if it exists or create a new one based on the inputs. """ if SetClass == SetOfMicrographs: if self.inputMics is None: # if no mics to do, do nothing and exit return None inputSet = self.inputMics else: inputSet = self._loadInputSet(self.movsFn) setFile = self._getPath(baseName) print(setFile) if exists(setFile): outputSet = SetClass(filename=setFile) if len(outputSet) == 0: pwutils.path.cleanPath(setFile) outputSet.loadAllProperties() outputSet.enableAppend() else: outputSet = SetClass(filename=setFile) outputSet.setStreamState(outputSet.STREAM_OPEN) outputSet.copyInfo(inputSet) return outputSet
[docs] def setInputMics(self): """ Setting the self.inputMics to the SetOfMics associated to the input movies (or None if no relation is found). The same for DoseWeighted mics """ self.alreadyLoad = True parentProt = self.getMapper().getParent(self.inputMovies.get()) self.outMicName = None self.inputMics = None if hasattr(parentProt, OUTPUT_MICS): self.outMicName = OUTPUT_MICS self.inputMics = getattr(parentProt, OUTPUT_MICS, None) if hasattr(parentProt, OUTPUT_MICS_DW): self.outMicName = OUTPUT_MICS_DW self.inputMics = getattr(parentProt, OUTPUT_MICS_DW, None) if not self.inputMics: self.warning("WARNING: The outputMovies has no outputMicrographs associated. Then, no outputMicrographs will be " "produced.")
# ---------------------- INFO functions ------------------------------------ def _validate(self): errors = [] if not self.inputMovies.get().getFirstItem().hasAlignment(): errors.append('The _Input Movies_ must come from an alignment ' 'protocol.') return errors def _summary(self): def getSize(outNameCondition): for outName, outObj in self.iterOutputAttributes(): if outNameCondition(outName): return outObj.getSize() return 0 moviesAcc = getSize(lambda x: not x.endswith('Discarded')) outDisc = getSize(lambda x: x.endswith('Discarded')) summary = ['Movies processed: %d' % (moviesAcc+outDisc), 'Movies rejected: *%d*' % outDisc] return summary