Source code for tomo.protocols.protocol_import_coordinates_from_scipion

# *
# * Authors:     Scipion Team
# *
# * Unidad de  Bioinformatica of Centro Nacional de Biotecnologia , CSIC
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'scipion-users@lists.sourceforge.net'
# *
# **************************************************************************

from os.path import basename, exists
from pwem.protocols import EMProtocol
from pyworkflow import BETA
from pyworkflow.object import String
from pyworkflow.protocol import FileParam, IntParam, PointerParam
from pyworkflow.utils import Message, removeBaseExt, yellowStr
from ..constants import SCIPION, ERR_COORDS_FROM_SQLITE_NO_MATCH
from .protocol_base import ProtTomoBase
from ..objects import SetOfTomograms, SetOfCoordinates3D


[docs]class ProtImportCoordinates3DFromScipion(EMProtocol, ProtTomoBase): """Protocol to import a set of 3d coordinates from Scipion sqlite file""" _label = 'import 3D coordinates from scipion' _devStatus = BETA def __init__(self, **kwargs): super().__init__(**kwargs) self.notMatchingMsg = None def _defineParams(self, form): form.addSection(label=Message.LABEL_INPUT) form.addParam('sqliteFile', FileParam, label='Scipion sqlite file') form.addParam('importTomograms', PointerParam, pointerClass='SetOfTomograms', label='Input tomograms', help='Select the tomograms to which the coordinates should be referred to. ' 'The matching between coordinates and tomograms is made checking the tsId/tomoId ' 'attribute. If no matches are found, then it tries to do it comparing the filenames. ' '*IMPORTANT*: the coordinates will be assumed to be at the same sampling rate as the ' 'introduced tomograms.') form.addParam('boxSize', IntParam, label='Box Size [pix]', default=20) def _insertAllSteps(self): self._insertFunctionStep(self.importCoordinatesStep) # --------------------------- STEPS functions -----------------------------
[docs] def importCoordinatesStep(self): inTomoSet = self.importTomograms.get() inCoordsSet = SetOfCoordinates3D() outCoordsSet = self._createSetOfCoordinates3D(inTomoSet) # Generate a set of 3d coordinates and assign the mapper of the introduced sqlite file inCoordsSet.setSamplingRate(inTomoSet.getSamplingRate()) inCoordsSet.setBoxSize(self.boxSize.get()) inCoordsSet._mapperPath.set('%s, %s' % (self.sqliteFile.get(), '')) inCoordsSet.load() # Check if the coordinates and the tomograms can be related via the tomoId or the filename self._checkCoordinatesMatching(inTomoSet, inCoordsSet, outCoordsSet) if self.notMatchingMsg: self._store() # Define the outputs outCoordsSet.setSamplingRate(inTomoSet.getSamplingRate()) outCoordsSet.setBoxSize(self.boxSize.get()) self._defineOutputs(outputCoordinates=outCoordsSet)
# --------------------------- INFO functions ------------------------------ def _validate(self): errorList = [] if not exists(self.sqliteFile.get()): errorList.append('Introduced file was not found:\n\t%s' % self.sqliteFile.get()) return errorList def _summary(self): summaryMsg = [] if self.isFinished(): if getattr(self, 'outputTomograms', None): summaryMsg.append('A *set of tomograms was generated* containing only the ones which there are\n' 'at least one coordinate referred to.\n') statusMsg = getattr(self, 'notMatchingMsg', None) if statusMsg: summaryMsg.append(statusMsg.get()) return summaryMsg # --------------------------- UTILS functions ---------------------------- def _checkCoordinatesMatching(self, inTomoSet, inCoordsSet, outCoordsSet): notFoundCoords = [] notFoundCoordsMsg = '' notFoundTomosMsg = '' inTomoSetMatchingIndices = [] pattern = 'Row %i - tomoId = %s - (x, y, x) = (%.2f, %.2f, %.2f)' tomoTsIdList, tomoBaseNameList = zip(*[(tomo.getTsId(), removeBaseExt(tomo.getFileName())) for tomo in inTomoSet]) for coord in inCoordsSet: coordTomoId = coord.getTomoId() if coordTomoId: if coordTomoId in tomoTsIdList: indByTomoId = tomoTsIdList.index(coordTomoId) + 1 coord.setVolume(inTomoSet[indByTomoId]) inTomoSetMatchingIndices.append(indByTomoId) # Add it to the output set of coordinates outCoordsSet.append(coord) else: indexByName = self._getMatchingIndexByFileName(coordTomoId, tomoBaseNameList) if indexByName: coord.setVolume(inTomoSet[indexByName]) inTomoSetMatchingIndices.append(indexByName) # Add it to the output set of coordinates outCoordsSet.append(coord) else: self._appendBaddCoordMsgToList(coord, notFoundCoords, inTomoSet, coordTomoId, pattern) else: self._appendBaddCoordMsgToList(coord, notFoundCoords, inTomoSet, 'NoTomoId', pattern) # Build a precedents set with only the matching tomograms, in case there are not all the ones present in the # input set pattern = '\t-{}\n' if inTomoSetMatchingIndices: inTomoSetMatchingIndices = set(inTomoSetMatchingIndices) nMatchingTomos = len(inTomoSetMatchingIndices) inputTomoSetSize = inTomoSet.getSize() if nMatchingTomos < inputTomoSetSize: # Create the output set of tomograms outTomoSet = SetOfTomograms.create(self._getPath(), template='tomograms%s.sqlite') outTomoSet.copyInfo(inTomoSet) for ind in inTomoSetMatchingIndices: outTomoSet.append(inTomoSet[ind]) # Update the precedents to the output set of 3d coordinates and register them outCoordsSet.setPrecedents(outTomoSet) self._defineOutputs(outputTomograms=outTomoSet) # Generate a message to report about the non-matching tomograms found notMatchingTomoFiles = self._getNotMatchingTomoFiles(inTomoSet, inTomoSetMatchingIndices) nOfNonMatchingTomos = len(notMatchingTomoFiles) notFoundTomosMsg += ('*[%i] tomograms were excluded:*\n' 'The following tomograms were excluded from the set because no coordinates are ' 'referred to them:\n%s' % (nOfNonMatchingTomos, pattern * nOfNonMatchingTomos)).format(*notMatchingTomoFiles) else: raise Exception(ERR_COORDS_FROM_SQLITE_NO_MATCH) if notFoundCoords: nOfNonMatchingCoords = len(notFoundCoords) # Format the non-matching coordinates message and add the header notFoundCoordsMsg += '*[%i] coordinates were excluded*.\nThey have a tomoId which was not found in the ' \ 'tsId attribute of none of the tomograms introduced nor contained in their basename.' \ '\nThe details can be checked in the output log.' % nOfNonMatchingCoords # Print the detailed information in the output log print(yellowStr(('EXCLUDED COORDINATES [%i]:\n%s' % (nOfNonMatchingCoords, pattern * nOfNonMatchingCoords)).format(*notFoundCoords))) self.notMatchingMsg = String(notFoundTomosMsg + '\n\n' + notFoundCoordsMsg if notFoundTomosMsg else notFoundCoordsMsg) @staticmethod def _getMatchingIndexByFileName(coordTomoId, tomoBaseNameList): matchingIndex = None matches = list(map(lambda x: coordTomoId in x, tomoBaseNameList)) if any(matches): matchingIndex = matches.index(True) + 1 return matchingIndex @staticmethod def _getNotMatchingTomoFiles(inTomoSet, inTomoSetMatchingIndices): return [basename(inTomoSet[ind + 1].getFileName()) for ind in range(inTomoSet.getSize()) if (ind + 1) not in inTomoSetMatchingIndices] @staticmethod def _appendBaddCoordMsgToList(coord, notFoundCoordsList, inTomoSet, coordTomoId, pattern): coord.setVolume(inTomoSet[1]) # 3D coordinate must be referred to a volume to get its origin notFoundCoordsList.append(pattern % (coord.getObjId(), coordTomoId, *coord.getPosition(SCIPION)))