Source code for tomo.protocols.protocol_assign_tomo2tomoMask

# **************************************************************************
# *
# * Authors:     Scipion Team
# *
# * Unidad de  Bioinformatica of Centro Nacional de Biotecnologia , CSIC
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'scipion@cnb.csic.es'
# *
# **************************************************************************
from os import symlink
from os.path import basename, abspath
from pwem.protocols import EMProtocol
from pyworkflow import BETA
from pyworkflow.protocol.params import PointerParam
from tomo.objects import TomoMask
from tomo.utils import isMatchingByTsId


[docs]class ProtAssignTomo2TomoMask(EMProtocol): """ This protocol assign tomograms to tomomasks (segmentations).""" _devStatus = BETA _label = 'assign tomograms to tomo masks (segmentations)' MATERIALS_SUFFIX = '_materials' def _defineParams(self, form): form.addSection(label='Input') form.addParam('inTomoMasks', PointerParam, pointerClass='SetOfTomoMasks', label='Tomo masks (segmentations)', help='Select the tomo masks desired to be referred to the introduced tomograms. The match ' 'between both sets is carried out firstly by tsId and if not possible, then it will try ' 'to do it by filename.') form.addParam('inputTomos', PointerParam, pointerClass='SetOfTomograms', label='Tomograms', help='Select the tomograms to be assigned to the input tomo masks.') # --------------------------- INSERT steps functions -------------------------- def _insertAllSteps(self): self._insertFunctionStep(self.createOutputStep) # --------------------------- STEPS functions --------------------------------------------
[docs] def createOutputStep(self): inTomos = self.inputTomos.get() inTomoMasks = self.inTomoMasks.get() outputSetOfTomoMasks = inTomoMasks.create(self._getPath(), template='tomomasks%s.sqlite') outputSetOfTomoMasks.setSamplingRate(inTomos.getSamplingRate()) if isMatchingByTsId(self.inTomoMasks.get(), self.inputTomos.get()): tomoTsIds = [tomo.getTsId() for tomo in inTomos] for inTomoMask in inTomoMasks: outTomoMask = self.setMatchingTomogram(tomoTsIds, inTomoMask, inTomos) outputSetOfTomoMasks.append(outTomoMask) else: # Membrane Annotator tool adds suffix _materials to the generated tomomasks tomoBaseNames = [basename(tomo.getFileName().replace(self.MATERIALS_SUFFIX, '')) for tomo in inTomos] for inTomoMask in inTomoMasks: outTomoMask = self.setMatchingTomogram(tomoBaseNames, inTomoMask, inTomos, isMatchingByTsId=False) outputSetOfTomoMasks.append(outTomoMask) self._defineOutputs(outputTomoMasks=outputSetOfTomoMasks) self._defineSourceRelation(inTomos, outputSetOfTomoMasks)
# --------------------------- INFO functions -------------------------------------------- def _validate(self): errors = [] inTomoMasks = self.inTomoMasks.get() inTomos = self.inputTomos.get() tol = 0.01 if abs(inTomos.getSamplingRate() - inTomoMasks.getSamplingRate()) > tol: errors.append('The sampling rate of input sets of tomomasks [%.2f Å/pix] and tomograms [%.2f Å/pix] differ ' 'in more than %.2f Å/pix.' % (inTomoMasks.getSamplingRate(), inTomos.getSamplingRate(), tol)) else: # Check match by tsId tomoMaskTsIds = [tomoMask.getTsId() for tomoMask in inTomoMasks] tomoTsIds = [tomo.getTsId() for tomo in inTomos] numberMatchesByTsId = len(set(tomoTsIds) & set(tomoMaskTsIds)) # Length of the intersection of both lists # Check match by basename tomoMaskBaseNames = [tomoMask.getFileName().replace(self.MATERIALS_SUFFIX, '') for tomoMask in inTomoMasks] tomoBaseNames = [tomo.getFileName() for tomo in inTomos] numberMatchesByBaseName = len(set(tomoMaskBaseNames) & set(tomoBaseNames)) # Length of the intersection of both lists if numberMatchesByTsId == 0 and numberMatchesByBaseName: errors.append('Unable to find any match between the introduced datasets after checking the tsIds and ' 'the basename of all elements.') return errors def _warnings(self): warnings = [] inTomoMasks = self.inTomoMasks.get() inTomos = self.inputTomos.get() if isMatchingByTsId(inTomoMasks, inTomos): notMatchingMsg = '' # Check match by tsId tomoTsIds = [tomo.getTsId() for tomo in inTomos] for tomoMask in inTomoMasks: if tomoMask.getTsId() not in tomoTsIds: notMatchingMsg += '\n\t-%s' % tomoMask.getFileName() if notMatchingMsg: warnings.append('Not able to find a tsId-based match for the following tomomasks in ' 'the introduced tomograms:' + notMatchingMsg) else: notMatchingMsg = '' # Check match by basename tomoBaseNames = [basename(tomo.getFileName().replace(self.MATERIALS_SUFFIX, '')) for tomo in inTomos] for tomoMask in inTomoMasks: tomoMaskName = tomoMask.getFileName() tomoMaskBaseName = basename(tomoMaskName.replace(self.MATERIALS_SUFFIX, '')) if tomoMaskBaseName not in tomoBaseNames: notMatchingMsg += '\n\t-%s' % tomoMaskName if notMatchingMsg: warnings.append('Not able to find a basename-based match for the following tomomasks in ' 'the introduced tomograms:' + notMatchingMsg) return warnings def _summary(self): summary = [] if self.isFinished(): summary.append("%s tomograms assigned to %s tomomasks." % (self.getObjectTag('inputTomos'), self.getObjectTag('inTomoMasks'))) return summary # --------------------------- UTILS functions --------------------------------------------
[docs] def setMatchingTomogram(self, idList, inTomoMask, inTomos, isMatchingByTsId=True): inFileName = inTomoMask.getFileName() outFileName = self._getExtraPath(basename(inFileName)) symlink(abspath(inFileName), abspath(outFileName)) outTomoMask = TomoMask() outTomoMask.setLocation(inFileName) outTomoMask.copyInfo(inTomoMask) outTomoMask.setFileName = outFileName if isMatchingByTsId: outTomoMask.setVolName(inTomos[idList.index(inTomoMask.getTsId()) + 1].getFileName()) else: outTomoMask.setVolName(inTomos[idList.index(basename(inTomoMask.getFileName().replace( self.MATERIALS_SUFFIX, ''))) + 1].getFileName()) # TODO: if the volume has not been matched at this point try to find out if the tsId is contained in the basename return outTomoMask