Source code for xmipptomo.protocols.protocol_score_transform
# **************************************************************************
# *
# * Authors:    David Herreros Calero (dherreros@cnb.csic.es)
# *
# * Unidad de  Bioinformatica of Centro Nacional de Biotecnologia , CSIC
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'coss@cnb.csic.es'
# *
# **************************************************************************
import numpy as np
from pyworkflow import BETA
from pyworkflow.object import Float
from pyworkflow.protocol import params
from tomo.protocols import ProtTomoPicking
from tomo.objects import SubTomogram, TomoAcquisition
from pwem.convert.transformations import listQuaternions, quaternion_distance
[docs]class XmippProtScoreTransform(ProtTomoPicking):
    '''Protocol to score a series of alignments stored in a SetOfSubtomograms by
    quaternion distance analysis'''
    _label = 'score transformations'
    _devStatus = BETA
    def __init__(self, **args):
        ProtTomoPicking.__init__(self, **args)
        # self.stepsExecutionMode = STEPS_PARALLEL
    def _defineParams(self, form):
        form.addSection(label='Input')
        form.addParam('firstSubtomos', params.PointerParam,
                      pointerClass='SetOfSubTomograms',
                      label="First Subtomograms to compare", important=True)
        form.addParam('secondSubtomos', params.PointerParam,
                      pointerClass='SetOfSubTomograms',
                      label="Second Subtomograms to compare", important=True)
    # --------------------------- INSERT steps functions ---------------------------
    def _insertAllSteps(self):
        self._insertFunctionStep('scoreTransformStep')
        self._insertFunctionStep('createOutputStep')
    # --------------------------- STEPS functions ---------------------------
[docs]    def scoreTransformStep(self):
        self.first_subtomos = self.firstSubtomos.get()
        self.second_subtomos = self.secondSubtomos.get()
        # Extract Transformation Matrices from input SubTomograms
        first_matrices = self.queryMatrices(self.first_subtomos)
        second_matrices = self.queryMatrices(self.second_subtomos)
        # Convert Trasnformation Matrices to Quaternions
        aux = list(zip(*first_matrices))
        first_quaternions = list(zip(aux[0], listQuaternions(aux[1])))
        aux = list(zip(*second_matrices))
        second_quaternions = list(zip(aux[0], listQuaternions(aux[1])))
        # Compute distance matrix from quaternions
        self.dist = [(t1[0], quaternion_distance(t1[1], t2[1]))
                     for t1, t2 in zip(first_quaternions, second_quaternions)
                     if t1[0] == t2[0]]
        # Save summary to use it in the protocol Info
        only_distances = np.asarray(list(zip(*self.dist))[1])
        mean_dist = np.mean(only_distances)
        std_dist = np.std(only_distances)
        percentage_outliers = np.sum(only_distances > mean_dist + 3 * std_dist) \
                              + np.sum(only_distances < mean_dist - 3 * std_dist)
        percentage_outliers = 100 * percentage_outliers / len(only_distances)
        summary = self._getExtraPath('Summary.txt')
        with open(summary, 'w') as file:
            file.write('Mean distance between the two sets: %.2f\n' % mean_dist)
            file.write('Estimated percentage of outliers: %.2f%%\n' % percentage_outliers)
[docs]    def createOutputStep(self):
        outSubtomos = self._createSetOfSubTomograms()
        outSubtomos.setSamplingRate(self.second_subtomos.getSamplingRate())
        outSubtomos.setCoordinates3D(self.second_subtomos.getCoordinates3D())
        # acquisition = TomoAcquisition()
        # acquisition.setAngleMin(self.second_subtomos.getFirstItem().getAcquisition().getAngleMin())
        # acquisition.setAngleMax(self.second_subtomos.getFirstItem().getAcquisition().getAngleMax())
        # acquisition.setStep(self.second_subtomos.getFirstItem().getAcquisition().getStep())
        # outSubtomos.setAcquisition(acquisition)
        for ids, inSubtomo in enumerate(self.first_subtomos.iterItems()):
            subtomogram = SubTomogram()
            subtomogram.setObjId(self.dist[ids][0])
            subtomogram.setLocation(inSubtomo.getLocation())
            subtomogram.setCoordinate3D(inSubtomo.getCoordinate3D())
            subtomogram.setTransform(inSubtomo.getTransform())
            subtomogram.setVolName(inSubtomo.getVolName())
            subtomogram.distanceScore = Float(self.dist[ids][1])
            outSubtomos.append(subtomogram)
        self._defineOutputs(outputSetOfSubtomogram=outSubtomos)
        self._defineSourceRelation(self.firstSubtomos, outSubtomos)
        self._defineSourceRelation(self.secondSubtomos, outSubtomos)
    # --------------------------- UTILS functions ---------------------------
[docs]    def queryMatrices(self, subtomos):
        matrices = [(subtomo.getObjId(), subtomo.getTransform().getMatrix())
                    for subtomo in subtomos.iterItems()]
        return matrices
    # --------------------------- INFO functions ---------------------------
    def _summary(self):
        summary = []
        if self.getOutputsSize() >= 1:
            summary_file = self._getExtraPath('Summary.txt')
            with open(summary_file, 'r') as file:
                summary.append(file.read())
        else:
            summary.append('Output not ready yet')
        return summary
    def _validate(self):
        validateMsgs = []
        firstTransform = self.firstSubtomos.get().getFirstItem().getTransform()
        secondTransform = self.secondSubtomos.get().getFirstItem().getTransform()
        if firstTransform is None or secondTransform is None:
            validateMsgs.append('Please provide subtomograms which have transformation matrix in "inputAlignment".')
        return validateMsgs