Source code for xmipptomo.protocols.protocol_score_transform

# **************************************************************************
# *
# * Authors:    David Herreros Calero (dherreros@cnb.csic.es)
# *
# * Unidad de  Bioinformatica of Centro Nacional de Biotecnologia , CSIC
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'coss@cnb.csic.es'
# *
# **************************************************************************


import numpy as np

from pyworkflow import BETA
from pyworkflow.object import Float
from pyworkflow.protocol import params

from tomo.protocols import ProtTomoPicking
from tomo.objects import SubTomogram, TomoAcquisition

from pwem.convert.transformations import listQuaternions, quaternion_distance


[docs]class XmippProtScoreTransform(ProtTomoPicking): '''Protocol to score a series of alignments stored in a SetOfSubtomograms by quaternion distance analysis''' _label = 'score transformations' _devStatus = BETA def __init__(self, **args): ProtTomoPicking.__init__(self, **args) # self.stepsExecutionMode = STEPS_PARALLEL def _defineParams(self, form): form.addSection(label='Input') form.addParam('firstSubtomos', params.PointerParam, pointerClass='SetOfSubTomograms', label="First Subtomograms to compare", important=True) form.addParam('secondSubtomos', params.PointerParam, pointerClass='SetOfSubTomograms', label="Second Subtomograms to compare", important=True) # --------------------------- INSERT steps functions --------------------------- def _insertAllSteps(self): self._insertFunctionStep('scoreTransformStep') self._insertFunctionStep('createOutputStep') # --------------------------- STEPS functions ---------------------------
[docs] def scoreTransformStep(self): self.first_subtomos = self.firstSubtomos.get() self.second_subtomos = self.secondSubtomos.get() # Extract Transformation Matrices from input SubTomograms first_matrices = self.queryMatrices(self.first_subtomos) second_matrices = self.queryMatrices(self.second_subtomos) # Convert Trasnformation Matrices to Quaternions aux = list(zip(*first_matrices)) first_quaternions = list(zip(aux[0], listQuaternions(aux[1]))) aux = list(zip(*second_matrices)) second_quaternions = list(zip(aux[0], listQuaternions(aux[1]))) # Compute distance matrix from quaternions self.dist = [(t1[0], quaternion_distance(t1[1], t2[1])) for t1, t2 in zip(first_quaternions, second_quaternions) if t1[0] == t2[0]] # Save summary to use it in the protocol Info only_distances = np.asarray(list(zip(*self.dist))[1]) mean_dist = np.mean(only_distances) std_dist = np.std(only_distances) percentage_outliers = np.sum(only_distances > mean_dist + 3 * std_dist) \ + np.sum(only_distances < mean_dist - 3 * std_dist) percentage_outliers = 100 * percentage_outliers / len(only_distances) summary = self._getExtraPath('Summary.txt') with open(summary, 'w') as file: file.write('Mean distance between the two sets: %.2f\n' % mean_dist) file.write('Estimated percentage of outliers: %.2f%%\n' % percentage_outliers)
[docs] def createOutputStep(self): outSubtomos = self._createSetOfSubTomograms() outSubtomos.setSamplingRate(self.second_subtomos.getSamplingRate()) outSubtomos.setCoordinates3D(self.second_subtomos.getCoordinates3D()) # acquisition = TomoAcquisition() # acquisition.setAngleMin(self.second_subtomos.getFirstItem().getAcquisition().getAngleMin()) # acquisition.setAngleMax(self.second_subtomos.getFirstItem().getAcquisition().getAngleMax()) # acquisition.setStep(self.second_subtomos.getFirstItem().getAcquisition().getStep()) # outSubtomos.setAcquisition(acquisition) for ids, inSubtomo in enumerate(self.first_subtomos.iterItems()): subtomogram = SubTomogram() subtomogram.setObjId(self.dist[ids][0]) subtomogram.setLocation(inSubtomo.getLocation()) subtomogram.setCoordinate3D(inSubtomo.getCoordinate3D()) subtomogram.setTransform(inSubtomo.getTransform()) subtomogram.setVolName(inSubtomo.getVolName()) subtomogram.distanceScore = Float(self.dist[ids][1]) outSubtomos.append(subtomogram) self._defineOutputs(outputSetOfSubtomogram=outSubtomos) self._defineSourceRelation(self.firstSubtomos, outSubtomos) self._defineSourceRelation(self.secondSubtomos, outSubtomos)
# --------------------------- UTILS functions ---------------------------
[docs] def queryMatrices(self, subtomos): matrices = [(subtomo.getObjId(), subtomo.getTransform().getMatrix()) for subtomo in subtomos.iterItems()] return matrices
# --------------------------- INFO functions --------------------------- def _summary(self): summary = [] if self.getOutputsSize() >= 1: summary_file = self._getExtraPath('Summary.txt') with open(summary_file, 'r') as file: summary.append(file.read()) else: summary.append('Output not ready yet') return summary def _validate(self): validateMsgs = [] firstTransform = self.firstSubtomos.get().getFirstItem().getTransform() secondTransform = self.secondSubtomos.get().getFirstItem().getTransform() if firstTransform is None or secondTransform is None: validateMsgs.append('Please provide subtomograms which have transformation matrix in "inputAlignment".') return validateMsgs