Source code for tomo.objects

# -*- coding: utf-8 -*-
#  **************************************************************************
# *
# * Authors:     J.M. De la Rosa Trevin (delarosatrevin@scilifelab.se) [1]
# *
# * [1] SciLifeLab, Stockholm University
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'scipion@cnb.csic.es'
# *
# **************************************************************************

import os
from datetime import datetime
import threading
from collections import OrderedDict
import numpy as np
import math
import statistics
import csv

from pyworkflow.object import Integer, Float, String, Pointer, Boolean, CsvList
from pwem.objects import Transform
import pyworkflow.utils.path as path
import pwem.objects.data as data
from pwem.convert.transformations import euler_matrix
from pwem.emlib.image import ImageHandler

import tomo.constants as const


[docs]class TiltImageBase: """ Base class for TiltImageM and TiltImage. """ def __init__(self, tsId=None, tiltAngle=None, acquisitionOrder=None, **kwargs): self._tiltAngle = Float(tiltAngle) self._tsId = String(tsId) self._acqOrder = Integer(acquisitionOrder)
[docs] def getTsId(self): """ Get unique TiltSerie ID, usually retrieved from the file pattern provided by the user at the import time. """ return self._tsId.get()
[docs] def setTsId(self, value): self._tsId.set(value)
[docs] def getTiltAngle(self): return self._tiltAngle.get()
[docs] def setTiltAngle(self, value): self._tiltAngle.set(value)
[docs] def getAcquisitionOrder(self): return self._acqOrder.get()
[docs] def setAcquisitionOrder(self, value): self._acqOrder.set(value)
[docs] def copyInfo(self, other, copyId=False, copyTM=True): self.copyAttributes(other, '_tiltAngle', '_tsId', '_acqOrder') if copyId: self.copyObjId(other) if copyTM and other.hasTransform(): self.copyAttributes(other, '_transform')
[docs]class TiltImage(data.Image, TiltImageBase): """ Tilt image """ def __init__(self, location=None, **kwargs): data.Image.__init__(self, location, **kwargs) TiltImageBase.__init__(self, **kwargs)
[docs] def copyInfo(self, other, copyId=False, copyTM=True): data.Image.copyInfo(self, other) TiltImageBase.copyInfo(self, other, copyId=copyId, copyTM=copyTM)
[docs] def parseFileName(self, suffix="", extension=None): """ This method returns the filename of the Tilt-Image adding a specified suffix and changing its extension. :param suffix: String to be added at the end of the location path (before extension). :param extension: String containing the new extension of the filename. :return: String containing the parsed filename with the specified suffix and extension. """ fileName = os.path.basename(self.getFileName()) fileName, fileExtension = os.path.splitext(fileName) if extension is not None: fileExtension = extension return fileName + suffix + fileExtension
[docs]class TiltSeriesBase(data.SetOfImages): def __init__(self, **kwargs): data.SetOfImages.__init__(self, **kwargs) self._tsId = String(kwargs.get('tsId', None)) # TiltSeries will always be used inside a SetOfTiltSeries # so, let's do no store the mapper path by default self._mapperPath.setStore(False) self._acquisition = TomoAcquisition() self._origin = Transform() self._anglesCount = Integer()
[docs] def getAnglesCount(self): return self._anglesCount
[docs] def setAnglesCount(self, value): self._anglesCount = value
[docs] def getTsId(self): """ Get unique TiltSerie ID, usually retrieved from the file pattern provided by the user at the import time. """ return self._tsId.get()
[docs] def setTsId(self, value): self._tsId.set(value)
[docs] def copyInfo(self, other, copyId=False): """ Copy basic information (id and other properties) but not _mapperPath or _size from other set of tomograms to current one. """ self.copy(other, copyId=copyId, ignoreAttrs=['_mapperPath', '_size'])
# self.copyAttributes(other, ['_tsId', '_anglesCount'])
[docs] def append(self, tiltImage): tiltImage.setTsId(self.getTsId()) data.SetOfImages.append(self, tiltImage)
[docs] def clone(self, ignoreAttrs=('_mapperPath', '_size')): clone = self.getClass()() clone.copy(self, ignoreAttrs=ignoreAttrs) return clone
[docs] def close(self): # Do nothing on close, since the db will be closed by SetOfTiltSeries pass
[docs] def getScannedPixelSize(self): mag = self._acquisition.getMagnification() return self._samplingRate.get() * 1e-4 * mag
[docs] def generateTltFile(self, tltFilePath, reverse=False): """ Generates an angle file in .tlt format in the specified location. If reverse is set to true the angles in file are sorted in the opposite order. :param tltFilePath: String containing the path where the file is created. :param reverse: Boolean indicating if the angle list must be reversed. """ angleList = [] for ti in self.iterItems(orderBy="_tiltAngle"): angleList.append(ti.getTiltAngle()) if reverse: angleList.reverse() with open(tltFilePath, 'w') as f: f.writelines("%s\n" % angle for angle in angleList)
[docs] def hasOrigin(self): """ Method indicating if the TiltSeries object has a defined origin. """ return self._origin is not None
[docs] def setOrigin(self, newOrigin): """ Method to set the origin of the TiltSeries object. :param newOrigin: Scipion Transform object indicating the origin to be set to the TiltSeries. """ self._origin = newOrigin
[docs] def getOrigin(self, force=False): """ Method to get the origin associated to the TiltSeries. If there is no origin associated to the the object it may create a default one. :param force: Boolean indicating if the method must return a default origin in case the object has no one associated. """ if self.hasOrigin(): return self._origin else: if force: return self._getDefaultOrigin() else: return None
def _getDefaultOrigin(self): sampling = self.getSamplingRate() t = Transform() x, y, z = self.getDim() if z > 1: z /= -2. print(t) t.setShifts(x / -2. * sampling, y / -2. * sampling, z * sampling) return t # The identity matrix
[docs] def getShiftsFromOrigin(self): """ Method to return the origin shift from the Scipion Transform object. """ origin = self.getOrigin(force=True).getShifts() x = origin[0] y = origin[1] z = origin[2] return x, y, z
# x, y, z are floats in Angstroms
[docs] def updateOriginWithResize(self, resizeFactor): """ Method to update the origin after resizing the TiltSeries. """ origin = self.getOrigin() xOri, yOri, zOri = self.getShiftsFromOrigin() origin.setShifts(xOri * resizeFactor, yOri * resizeFactor, zOri * resizeFactor) self.setOrigin(origin)
# x, y, z are floats in Angstroms
[docs]class TiltSeries(TiltSeriesBase): ITEM_TYPE = TiltImage
[docs] def applyTransform(self, outputFilePath): ih = ImageHandler() inputFilePath = self.getFirstItem().getFileName() newStack = True # TODO: Handle output tilt-series datatype format if self.getFirstItem().hasTransform(): for index, ti in enumerate(self): if ti.hasTransform(): if newStack: ih.createEmptyImage(fnOut=outputFilePath, xDim=ti.getXDim(), yDim=ti.getYDim(), nDim=self.getSize()) newStack = False transform = ti.getTransform().getMatrix() transformArray = np.array(transform) ih.applyTransform(inputFile=str(index + 1) + ':mrcs@' + inputFilePath, outputFile=str(index + 1) + '@' + outputFilePath, transformMatrix=transformArray, shape=(ti.getYDim(), ti.getXDim())) else: raise Exception('ERROR: Some tilt-image is missing from transform object associated.') else: path.createLink(inputFilePath, outputFilePath)
def _dimStr(self): """ Return the string representing the dimensions. """ return '%s x %s' % (self._firstDim[0], self._firstDim[1])
[docs] def writeNewstcomFile(self, ts_folder, **kwargs): '''Writes an artifitial newst.com file''' newstcomPath = ts_folder + '/newst.com' pathi = self.getTsId() taperAtFill = kwargs.get('taperAtFill', (1, 0)) offsetsInXandY = kwargs.get('offsetsInXandY', (0.0, 0.0)) imagesAreBinned = kwargs.get('imagesAreBinned', 1.0) binByFactor = kwargs.get('binByFactor', 1) with open(newstcomPath, 'w') as f: f.write('$newstack -StandardInput\n\ InputFile {}.st\n\ OutputFile {}.ali\n\ TransformFile {}.xf\n\ TaperAtFill {},{}\n\ AdjustOrigin\n\ OffsetsInXandY {},{}\n\ #DistortionField .idf\n\ ImagesAreBinned {}\n\ BinByFactor {}\n\ #GradientFile {}.maggrad\n\ $if (-e ./savework) ./savework'.format(pathi, pathi, pathi, taperAtFill[0], taperAtFill[1], offsetsInXandY[0], offsetsInXandY[1], imagesAreBinned, binByFactor, pathi)) return newstcomPath
[docs] def writeTiltcomFile(self, ts_folder, **kwargs): '''Writes an artifitial tilt.com file''' tiltcomPath = ts_folder + '/tilt.com' pathi = self.getTsId() thickness = kwargs.get('thickness', 500) binned = kwargs.get('binned', 1) offset = kwargs.get('offset', 0.0) shift = kwargs.get('shift', (0.0, 0.0)) radial = kwargs.get('radial', (0.35, 0.035)) xAxisTill = kwargs.get('xAxisTill', 0.0) log = kwargs.get('log', 0.0) scale = kwargs.get('scale', (0.0, 1000.0)) mode = kwargs.get('mode', 2) subsetStart = kwargs.get('subsetStart', (0, 0)) actionIfGPUFails = kwargs.get('actionIfGPUFails', (1, 2)) with open(tiltcomPath, 'w') as f: f.write('$tilt -StandardInput\n\ InputProjections {}.ali\n\ OutputFile {}.rec\n\ IMAGEBINNED {} \n\ TILTFILE {}.tlt\n\ THICKNESS {}\n\ RADIAL {} {}\n\ FalloffIsTrueSigma 1\n\ XAXISTILT {}\n\ LOG {}\n\ SCALE {} {}\n\ PERPENDICULAR\n\ MODE {}\n\ FULLIMAGE {} {}\n\ SUBSETSTART 0 0\n\ AdjustOrigin\n\ ActionIfGPUFails {},{}\n\ XTILTFILE {}.xtilt\n\ OFFSET {}\n\ SHIFT {} {}\n\ $if (-e ./savework) ./savework'.format(pathi, pathi, binned, pathi, thickness, radial[0], radial[1], xAxisTill, log, scale[0], scale[1], mode, self.getDim()[0], self.getDim()[1], subsetStart[0], subsetStart[1], actionIfGPUFails[0], actionIfGPUFails[1], pathi, offset, shift[0], shift[1])) return tiltcomPath
[docs] def writeTltFile(self, ts_folder): xtiltPath = ts_folder + '/%s.tlt' % self.getTsId() with open(xtiltPath, 'w') as f: for ti in self: f.write(str(ti.getTiltAngle()) + '\n')
[docs] def writeXtiltFile(self, ts_folder): xtiltPath = ts_folder + '/%s.xtilt' % self.getTsId() with open(xtiltPath, 'w') as f: for ti in self: f.write('0.00\n')
[docs] def writeXfFile(self, transformFilePath): """ This method takes a tilt series and the output transformation file path and creates an IMOD-based transform file in the location indicated. """ tsMatrixTransformList = [] for ti in self: transform = ti.getTransform().getMatrix().flatten() transformIMOD = ['%.7f' % transform[0], '%.7f' % transform[1], '%.7f' % transform[3], '%.7f' % transform[4], '%.3f' % transform[2], '%.3f' % transform[5]] tsMatrixTransformList.append(transformIMOD) with open(transformFilePath, 'w') as f: csvW = csv.writer(f, delimiter='\t') csvW.writerows(tsMatrixTransformList)
[docs] def writeImodFiles(self, folderName, **kwargs): # Create a newst.com file self.writeNewstcomFile(folderName, **kwargs) # Create a tilt.com file self.writeTiltcomFile(folderName, **kwargs) # Create a .tlt file self.writeTltFile(folderName) # Create a .xtilt file self.writeXtiltFile(folderName) # Create a .xf file transformFilePath = folderName + '/%s.xf' % self.getTsId() self.writeXfFile(transformFilePath)
[docs]class SetOfTiltSeriesBase(data.SetOfImages): EXPOSE_ITEMS = True USE_CREATE_COPY_FOR_SUBSET = True """ Base class for SetOfTiltImages and SetOfTiltImagesM. """ def __init__(self, **kwargs): data.SetOfImages.__init__(self, **kwargs) self._anglesCount = Integer() self._acquisition = TomoAcquisition()
[docs] def getAnglesCount(self): return self._anglesCount.get()
[docs] def setAnglesCount(self, value): self._anglesCount.set(value)
[docs] def copyInfo(self, other): """ Copy information (sampling rate and ctf) from other set of images to current one""" super().copyInfo(other) self.copyAttributes(other, '_anglesCount')
[docs] def iterClassItems(self, iterDisabled=False): """ Iterate over the images of a class. Params: iterDisabled: If True, also include the disabled items. """ for cls in self.iterItems(): if iterDisabled or cls.isEnabled(): for img in cls: if iterDisabled or img.isEnabled(): yield img
def _setItemMapperPath(self, item): """ Set the mapper path of this class according to the mapper path of the SetOfClasses and also the prefix according to class id """ item._mapperPath.set('%s,%s' % (self.getFileName(), item.getTsId())) item.load() def _insertItem(self, item): """ Create the SetOfImages assigned to a class. If the file exists, it will load the Set. """ self._setItemMapperPath(item) data.EMSet._insertItem(self, item) item.write(properties=False) # Set.write(self) def __getitem__(self, itemId): """ Setup the mapper classes before returning the item. """ classItem = data.SetOfImages.__getitem__(self, itemId) self._setItemMapperPath(classItem) return classItem
[docs] def getFirstItem(self): classItem = data.EMSet.getFirstItem(self) self._setItemMapperPath(classItem) return classItem
[docs] def iterItems(self, **kwargs): for item in data.EMSet.iterItems(self, **kwargs): self._setItemMapperPath(item) yield item
[docs] def copyItems(self, inputTs, orderByTs='id', updateTsCallback=None, orderByTi='id', updateTiCallback=None): """ Copy items (TiltSeries and TiltImages) from the input Set. Params: inputTs: input TiltSeries (or movies) from where to copy elements. orderByTs: optional orderBy value for iterating over TiltSeries updateTsCallback: optional callback after TiltSeries is created orderByTi: optional orderBy value for iterating over TiltImages updateTiCallback: optional callback after TiltImage is created """ for i, ts in enumerate(inputTs.iterItems(orderBy=orderByTs)): if ts.isEnabled(): tsOut = self.ITEM_TYPE() tsOut.copyInfo(ts) tsOut.copyObjId(ts) if updateTsCallback: updateTsCallback(i, ts, tsOut) self.append(tsOut) for j, ti in enumerate(ts.iterItems(orderBy=orderByTi)): tiOut = tsOut.ITEM_TYPE() tiOut.copyInfo(ti) tiOut.copyObjId(ti) tiOut.setLocation(ti.getLocation()) if updateTiCallback: updateTiCallback(j, ts, ti, tsOut, tiOut) tsOut.append(tiOut) self.update(tsOut)
[docs] def updateDim(self): """ Update dimensions of this set base on the first element. """ firstItem = self.getFirstItem() self.setDim(firstItem.getDim()) self._anglesCount.set(firstItem.getSize())
[docs] def getScannedPixelSize(self): mag = self._acquisition.getMagnification() return self._samplingRate.get() * 1e-4 * mag
[docs]class SetOfTiltSeries(SetOfTiltSeriesBase): ITEM_TYPE = TiltSeries def _dimStr(self): """ Return the string representing the dimensions. """ return '%s x %s x %s' % (self._anglesCount, self._firstDim[0], self._firstDim[1])
[docs]class TiltImageM(data.Movie, TiltImageBase): """ Tilt movie. """ def __init__(self, location=None, **kwargs): data.Movie.__init__(self, location, **kwargs) TiltImageBase.__init__(self, **kwargs)
[docs] def copyInfo(self, other, copyId=False, copyTM=True): data.Movie.copyInfo(self, other) TiltImageBase.copyInfo(self, other, copyId=copyId, copyTM=copyTM)
[docs]class TiltSeriesM(TiltSeriesBase): ITEM_TYPE = TiltImageM
[docs]class SetOfTiltSeriesM(SetOfTiltSeriesBase): ITEM_TYPE = TiltSeriesM def __init__(self, **kwargs): SetOfTiltSeriesBase.__init__(self, **kwargs) self._gainFile = String() self._darkFile = String() # Store the frames range to avoid loading the items self._firstFramesRange = data.FramesRange()
[docs] def setGain(self, gain): self._gainFile.set(gain)
[docs] def getGain(self): return self._gainFile.get()
[docs] def setDark(self, dark): self._darkFile.set(dark)
[docs] def getDark(self): return self._darkFile.get()
[docs] def getFramesRange(self): return self._firstFramesRange
[docs] def setFramesRange(self, value): self._firstFramesRange.set(value)
[docs] def copyInfo(self, other): """ Copy SoM specific information plus inherited """ SetOfTiltSeriesBase.copyInfo(self, other) self._gainFile.set(other.getGain()) self._darkFile.set(other.getDark())
# self._firstFramesRange.set(other.getFramesRange()) def _dimStr(self): """ Return the string representing the dimensions. """ return '%s x %s' % (self._anglesCount, self._firstDim)
[docs]class TiltSeriesDict: """ Helper class that to store TiltSeries and TiltImage but using dictionaries for quick access. This class also contains some logic related to the streaming: - Check for new input items that needs to be processed - Check for items already done that needs to be saved. """ def __init__(self, inputSet=None, outputSet=None, newItemsCallback=None, doneItemsCallback=None): """ Initialize the dict. :param inputSet: The set with input items. It will be monitored for new items from streaming. :param newItemsCallback: When new items are discovered, this function will be called :param doneItemsCallback: When some items are done, this function will be called. """ self.__dict = OrderedDict() self.__inputSet = inputSet if inputSet is not None: self.__inputClosed = inputSet.isStreamClosed() self.__lastCheck = None self.__finalCheck = False self.__newItemsCallback = newItemsCallback self.__doneItemsCallback = doneItemsCallback self.__new = set() self.__finished = set() # Reported as finished tasks, but not saved self.__done = set() # Finished and saved tasks self.__lock = threading.Lock() if outputSet is not None: for ts in outputSet: # We don't need tilt-images for done items self.addTs(ts, includeTi=False) self.__done.add(ts.getTsId())
[docs] def addTs(self, ts, includeTi=False): """ Add a clone of the tiltseries. """ self.__dict[ts.getTsId()] = (ts.clone(), OrderedDict()) if includeTi: for ti in ts: self.addTi(ti)
[docs] def hasTs(self, tsId): return tsId in self.__dict
[docs] def getTs(self, tsId): return self.__dict[tsId][0]
[docs] def addTi(self, ti): self.getTiDict(ti.getTsId())[ti.getObjId()] = ti.clone()
[docs] def getTi(self, tsId, tiObjId): return self.getTiDict(tsId)[tiObjId]
[docs] def getTiDict(self, tsId): return self.__dict[tsId][1]
[docs] def getTiList(self, tsId): return list(self.getTiDict(tsId).values())
def __iter__(self): for ts, d in self.__dict.values(): yield ts # ---- Streaming related methods -------------
[docs] def update(self): self._checkNewInput() self._checkNewOutput()
def _checkNewInput(self): # print(">>> DEBUG: _checkNewInput ") inputSetFn = self.__inputSet.getFileName() mTime = datetime.fromtimestamp(os.path.getmtime(inputSetFn)) # if self.__lastCheck: # print('Last check: %s, modification: %s' # % (pwutils.prettyTime(self.__lastCheck), # pwutils.prettyTime(mTime))) if self.__lastCheck is None or self.__lastCheck <= mTime: updatedSet = self.__inputSet.getClass()(filename=inputSetFn) updatedSet.loadAllProperties() newItems = [] for ts in updatedSet: if not self.hasTs(ts.getTsId()): self.addTs(ts, includeTi=True) newItems.append(ts.getTsId()) self.__inputClosed = updatedSet.isStreamClosed() updatedSet.close() if newItems: self.__newItemsCallback(newItems) self.__lastCheck = datetime.now() def _checkNewOutput(self): # print(">>> DEBUG: _checkNewInput ") # First check that we have some items in the finished self.__lock.acquire() doneItems = list(self.__finished) self.__finished.clear() self.__lock.release() if doneItems or (self.allDone() and not self.__finalCheck): self.__done.update(doneItems) self.__doneItemsCallback(doneItems) if self.allDone(): self.__finalCheck = True
[docs] def setFinished(self, *tsIdList): """ Notify that all TiltSeries in the list of ids are finished. """ self.__lock.acquire() self.__finished.update(tsIdList) self.__lock.release()
[docs] def allDone(self): """ Return True if input stream is closed and all task are done. """ # print(">>> DEBUG: allDone\n" # " inputClosed: %s\n" # " len(dict): %s\n" # " len(done): %s" % (self.__inputClosed, len(self.__dict), # len(self.__done))) return self.__inputClosed and len(self.__dict) == len(self.__done)
[docs]class TomoAcquisition(data.Acquisition): def __init__(self, angleMin=None, angleMax=None, step=None, angleAxis1=None, angleAxis2=None, accumDose=None, tiltAxisAngle=None, **kwargs): data.Acquisition.__init__(self, **kwargs) self._angleMin = Float(angleMin) self._angleMax = Float(angleMax) self._step = Float(step) self._angleAxis1 = Float(angleAxis1) self._angleAxis2 = Float(angleAxis2) self._accumDose = Float(accumDose) self._tiltAxisAngle = Float(tiltAxisAngle)
[docs] def getTiltAxisAngle(self): return self._tiltAxisAngle.get()
[docs] def setTiltAxisAngle(self, value): self._tiltAxisAngle.set(value)
[docs] def getAngleMax(self): return self._angleMax.get()
[docs] def setAngleMax(self, value): self._angleMax.set(value)
[docs] def getAngleMin(self): return self._angleMin.get()
[docs] def setAngleMin(self, value): self._angleMin.set(value)
[docs] def getStep(self): return self._step.get()
[docs] def setStep(self, value): return self._step.set(value)
[docs] def getAngleAxis1(self): return self._angleAxis1.get()
[docs] def setAngleAxis1(self, value): self._angleAxis1.set(value)
[docs] def getAngleAxis2(self): return self._angleAxis2.get()
[docs] def setAngleAxis2(self, value): self._angleAxis2.set(value)
[docs] def getAccumDose(self): return self._accumDose.get()
[docs] def setAccumDose(self, value): self._accumDose.set(value)
[docs]class Tomogram(data.Volume): """ Class to hold the tomogram abstraction inside Scipion. The origin (self._origin) of the volume is set as the location of the first coordinate loaded from the binary file. The volume may be displaced by setting a different origin using the methods implemented in the inherited class data.Image in scipion-em plugin. """ def __init__(self, **kwargs): data.Volume.__init__(self, **kwargs) self._acquisition = None self._tsId = String(kwargs.get('tsId', None)) self._dim = None
[docs] def getTsId(self): """ Get unique TiltSeries ID, usually retrieved from the file pattern provided by the user at the import time. """ return self._tsId.get()
[docs] def setTsId(self, value): self._tsId.set(value)
[docs] def getAcquisition(self): return self._acquisition
[docs] def setAcquisition(self, acquisition): self._acquisition = acquisition
[docs] def hasAcquisition(self): return (self._acquisition is not None and self._acquisition.getAngleMin() is not None and self._acquisition.getAngleMax() is not None)
[docs] def getDim(self): """Return image dimensions as tuple: (Xdim, Ydim, Zdim)""" if self._dim is None: from pwem.emlib.image import ImageHandler fn = self.getFileName() if fn is not None and os.path.exists(fn.replace(':mrc', '')): x, y, z, n = ImageHandler().getDimensions(self) # Some volumes in mrc format can have the z dimension # as n dimension, so we need to consider this case. if z > 1: self._dim = (x, y, z) return x, y, z else: self._dim = (x, y, n) return x, y, n else: return self._dim return None
[docs] def copyInfo(self, other): """ Copy basic information """ super().copyInfo(other) self.copyAttributes(other, '_acquisition', '_tsId', '_origin')
[docs]class SetOfTomograms(data.SetOfVolumes): ITEM_TYPE = Tomogram EXPOSE_ITEMS = True def __init__(self, *args, **kwargs): data.SetOfVolumes.__init__(self, **kwargs) self._acquisition = TomoAcquisition()
[docs] def updateDim(self): """ Update dimensions of this set base on the first element. """ self.setDim(self.getFirstItem().getDim())
[docs]class TomoMask(Tomogram): """ Object used to represent segmented tomograms """ def __init__(self, **kwargs): Tomogram.__init__(self, **kwargs) self._volName = String()
[docs] def getVolName(self): """ Get the reference tomogram file for the current tomoMask. """ return self._volName.get()
[docs] def setVolName(self, tomoName): """ Set the reference tomogram file for the current tomoMask. """ self._volName.set(tomoName)
[docs] def getTomogram(self): """ Generate the reference tomogram object for the current tomoMask. """ tomo = Tomogram() tomo.setLocation(self.getVolName()) tomo.setSamplingRate(self.getSamplingRate()) tomo.setAcquisition(self.getAcquisition()) return tomo
[docs]class SetOfTomoMasks(SetOfTomograms): ITEM_TYPE = TomoMask EXPOSE_ITEMS = True
[docs]class Coordinate3D(data.EMObject): """This class holds the (x,y,z) position and other information associated with a coordinate""" def __init__(self, **kwargs): data.EMObject.__init__(self, **kwargs) self._volumePointer = Pointer(objDoStore=False) self._x = Float() self._y = Float() self._z = Float() self._volId = Integer() self._eulerMatrix = data.Transform() self._groupId = Integer() # This may refer to a mesh, ROI, vesicle or any group of coordinates self._tomoId = String(kwargs.get('tomoId', None)) # Used to access to the corresponding tomogram from each # coord (it's the tsId) def _getOffset(self, dim, originFunction=const.SCIPION): """ Returns the offset to apply to a one of the coordinates :param dim integer to get the dimension from (X=0, Y=1, Z=2) :param originFunction function to call to do the conversion """ if originFunction == const.SCIPION: return 0 origin_Scipion = self.getVolumeOrigin()[dim] aux = originFunction(self.getVolume().getDim()) origin = aux[dim] if aux is not None else -origin_Scipion return origin + origin_Scipion
[docs] def getX(self, originFunction): """ See getPosition method for a full description of how "originFunction" works""" return self._x.get() - self._getOffset(0, originFunction)
[docs] def setX(self, x, originFunction): """ See setPosition method for a full description of how "originFunction" works""" self._x.set(x + self._getOffset(0,originFunction))
[docs] def shiftX(self, shiftX): self._x.sum(shiftX)
[docs] def getY(self, originFunction): """ See getPosition method for a full description of how "originFunction" works""" return self._y.get() - self._getOffset(1,originFunction)
[docs] def setY(self, y, originFunction): """ See setPosition method for a full description of how "originFunction" works""" self._y.set(y + self._getOffset(1,originFunction))
[docs] def shiftY(self, shiftY): self._y.sum(shiftY)
[docs] def getZ(self, originFunction): """ See getPosition method for a full description of how "originFunction" works""" return self._z.get() - self._getOffset(2,originFunction)
[docs] def setZ(self, z, originFunction): """ See setPosition method for a full description of how "originFunction" works""" self._z.set(z + self._getOffset(2,originFunction))
[docs] def shiftZ(self, shiftZ): self._z.sum(shiftZ)
[docs] def setMatrix(self, matrix): self._eulerMatrix.setMatrix(matrix)
[docs] def getMatrix(self): return self._eulerMatrix.getMatrix()
[docs] def hasTransform(self): return self._eulerMatrix is not None
[docs] def euler2Matrix(self, r, p, y): self._eulerMatrix.setMatrix(euler_matrix(r, p, y))
[docs] def eulerAngles(self): R = self.getMatrix() sy = math.sqrt(R[0, 0] * R[0, 0] + R[1, 0] * R[1, 0]) singular = sy < 1e-6 if not singular: x = math.atan2(R[2, 1], R[2, 2]) y = math.atan2(-R[2, 0], sy) z = math.atan2(R[1, 0], R[0, 0]) else: x = math.atan2(-R[1, 2], R[1, 1]) y = math.atan2(-R[2, 0], sy) z = 0 return np.array([x, y, z])
[docs] def scale(self, factor): """ Scale x, y and z coordinates by a given factor. """ self._x.multiply(factor) self._y.multiply(factor) self._z.multiply(factor)
[docs] def getPosition(self, originFunction): """Get the position a Coordinate3D refered to a given origin defined by originFunction. The input of the method is a funtion (originFunction) which moves the coordinate position refered to the bottom left corner to other origin (retrieved by originFunction) in the grid. Parameters: :param function originFunction: Function to return a Vector to refer a coordinate to the bottom left corner from a given convention. Example: >>> origin = originFunction((Lx, Ly, Lz)) >>> (vx, vy, vz) # Vector to refer (x,y,z) coordinate to an origin from the bottom left corner Firstly, the Scipion origin vector stored in the Tomogram associated to the Coordinate3D will be applied to refer the current coordinate to the bottom left coordinate of the Tomogram. """ return self.getX(originFunction), self.getY(originFunction), self.getZ(originFunction)
[docs] def setPosition(self, x, y, z, originFunction): """Set the position of the coordinate to be saved in the Coordinate3D object. The inputs of the method are the (x,y,z) position of the coordinate and a funtion (originFunction) which moves the current position to the bottom left corner of the Tomogram with dimensions Lx, Ly, Lz. Parameters: :param int x: Position of the coordinate in the X axis :param int y: Position of the coordinate in the Y axis :param int z: Position of the coordinate in the Z axis :param function originFunction: Function to return a Vector to refer a coordinate to the bottom left corner from a given convention. Example: >>> origin = originFunction((Lx, Ly, Lz)) >>> (vx, vy, vz) # Vector to refer (x,y,z) coordinate to the bottom left corner In this way, it is possible to apply the Scipion origin vector stored in the Tomogram associated to the Coordinate3D which moves the positions referred to the bottom left corner of a grid to the center of gravity of the grid (or any other origin specified by the user). IMPORTANT NOTE: For this method to work properly, it is needed to associate the Tomogram before doing a call to this method. Example: >>> coord = Coordinate3D() >>> coord.setPosition(x, y, z, originFunction) >>> Error: Tomogram is still NoneType >>> coord.setVolume(Tomogram) >>> coord.setPosition(x, y, z, originFunction) >>> Exit: Everything runs normally This requirement is only needed for "setPostion" method. The remaining attributes of the object can be set either before or after calling "setVolume" method. """ self.setX(x, originFunction) self.setY(y, originFunction) self.setZ(z, originFunction)
[docs] def getVolume(self): """ Return the micrograph object to which this coordinate is associated. """ return self._volumePointer.get()
[docs] def setVolume(self, volume): """ Set the micrograph to which this coordinate belongs. """ self._volumePointer.set(volume) self._volId.set(volume.getObjId()) if volume.getTsId(): # See getCoordinate3D() --> as a tomo is necessary to be created, the tomoId (tsId), # which may have been previously stored is deleted when calling setVolume self.setTomoId(volume.getTsId())
[docs] def copyInfo(self, coord): """ Copy information from other coordinate. """ self.setPosition(*coord.getPosition(const.CENTER_GRAVITY)) self.setObjId(coord.getObjId()) self.setBoxSize(coord.getBoxSize())
[docs] def setBoxSize(self, boxSize): self._boxSize = boxSize
[docs] def getBoxSize(self): return self._boxSize
[docs] def getVolId(self): return self._volId.get()
[docs] def setVolId(self, volId): self._volId.set(volId)
[docs] def invertY(self): if not self.getVolume() is None: dims = self.getVolume().getDim() height = dims[1] self.setY(height - self.getY(const.SCIPION), const.SCIPION)
# else: error TODO
[docs] def getVolName(self): return self.getVolume().getFileName()
[docs] def getGroupId(self): return self._groupId.get()
[docs] def setGroupId(self, groupId): self._groupId.set(groupId)
[docs] def hasGroupId(self): return self._groupId is not None
[docs] def getVolumeOrigin(self, angstrom=False): """Return the a vector that can be used to move the position of the Coordinate3D (referred to the center of the Tomogram or other origin specified by the user) to the bottom left corner of the Tomogram """ vol = self.getVolume() if not vol: raise Exception("3D coordinate must be referred to a volume to get its origin.") if angstrom: return vol.getShiftsFromOrigin() else: sr = vol.getSamplingRate() origin = vol.getShiftsFromOrigin() return origin[0] / sr, origin[1] / sr, origin[2] / sr
[docs] def getTomoId(self): return self._tomoId.get()
[docs] def setTomoId(self, tomoId): self._tomoId.set(tomoId)
[docs]class SetOfCoordinates3D(data.EMSet): """ Encapsulate the logic of a set of volumes coordinates. Each coordinate has a (x,y,z) position and is related to a Volume The SetOfCoordinates3D can also have information about TiltPairs. """ ITEM_TYPE = Coordinate3D def __init__(self, **kwargs): data.EMSet.__init__(self, **kwargs) self._boxSize = Integer() self._samplingRate = Float() self._precedentsPointer = Pointer()
[docs] def getBoxSize(self): """ Return the box size of the particles. """ return self._boxSize.get()
[docs] def setBoxSize(self, boxSize): """ Set the box size of the particles. """ self._boxSize.set(boxSize)
[docs] def getSamplingRate(self): """ Return the sampling rate of the particles. """ return self._samplingRate.get()
[docs] def setSamplingRate(self, sampling): """ Set the sampling rate of the particles. """ self._samplingRate.set(sampling)
[docs] def iterVolumes(self): """ Iterate over the objects set associated with this set of coordinates. """ return self.getPrecedents()
[docs] def iterVolumeCoordinates(self, volume): """ Iterates over the set of coordinates belonging to that micrograph. """ pass
[docs] def iterCoordinates(self, volume=None, orderBy='id'): """ Iterate over the coordinates associated with a tomogram. If volume=None, the iteration is performed over the whole set of coordinates. IMPORTANT NOTE: During the storing process in the database, Coordinates3D will lose their pointer to ther associated Tomogram. This method overcomes this problem by retrieving and relinking the Tomogram as if nothing would ever happened. It is recommended to use this method when working with Coordinates3D, being the common "iterItems" deprecated for this set. Example: >>> for coord in coordSet.iterItems() >>> print(coord.getVolName()) >>> Error: Tomogram associated to Coordinate3D is NoneType (pointer lost) >>> for coord in coordSet.iterCoordinates() >>> print(coord.getVolName()) >>> '/path/to/Tomo.file' retrieved correctly """ if volume is None: volId = None elif isinstance(volume, int): volId = volume elif isinstance(volume, data.Volume): volId = volume.getObjId() else: raise Exception('Invalid input tomogram of type %s' % type(volume)) # Iterate over all coordinates if tomoId is None, # otherwise use tomoId to filter the where selection coordWhere = '1' if volId is None else '_volId=%d' % int(volId) for coord in self.iterItems(where=coordWhere, orderBy=orderBy): coord.setVolume(self.getPrecedents()[coord.getVolId()]) yield coord
[docs] def getPrecedents(self): """ Returns the SetOfTomograms or Tilt Series associated with this SetOfCoordinates""" return self._precedentsPointer.get()
[docs] def setPrecedents(self, precedents): """ Set the tomograms or Tilt Series associated with this set of coordinates. Params: tomograms: Either a SetOfTomograms or Tilt Series object or a pointer to it. """ if precedents.isPointer(): self._precedentsPointer.copy(precedents) else: self._precedentsPointer.set(precedents)
[docs] def getFiles(self): filePaths = set() filePaths.add(self.getFileName()) return filePaths
[docs] def getSummary(self): summary = [] summary.append("Number of particles picked: %s" % self.getSize()) summary.append("Particle size: %s" % self.getBoxSize()) return "\n".join(summary)
[docs] def copyInfo(self, other): """ Copy basic information (id and other properties) but not _mapperPath or _size from other set of objects to current one. """ self.setBoxSize(other.getBoxSize()) self.setSamplingRate(other.getSamplingRate()) self.setPrecedents(other.getPrecedents())
def __str__(self): """ String representation of a set of coordinates. """ if self._boxSize.hasValue(): boxSize = self._boxSize.get() boxStr = ' %d x %d x %d' % (boxSize, boxSize, boxSize) else: boxStr = 'No-Box' s = "%s (%d items, %s%s)" % (self.getClassName(), self.getSize(), boxStr, self._appendStreamState()) return s
[docs] def getFirstItem(self): coord = data.EMSet.getFirstItem(self) coord.setVolume(self.getPrecedents()[coord.getVolId()]) return coord
def __getitem__(self, itemId): """Add a pointer to a Tomogram before returning the Coordinate3D""" coord = data.EMSet.__getitem__(self, itemId) # In case pointer is lost in a for loop # clone = self.getPrecedents().getClass()() # clone.copy(self) # coord.setVolume(clone[coord.getVolId()]) coord.setVolume(self.getPrecedents()[coord.getVolId()]) return coord
[docs]class SubTomogram(data.Volume): def __init__(self, **kwargs): data.Volume.__init__(self, **kwargs) self._acquisition = None self._coordinate = None self._volId = Integer() self._volName = String()
[docs] def hasCoordinate3D(self): return self._coordinate is not None
[docs] def setCoordinate3D(self, coordinate): self._coordinate = coordinate
[docs] def getCoordinate3D(self): """Since the object Coordinate3D needs a volume, use the information stored in the SubTomogram to reconstruct the corresponding Tomogram associated to its Coordinate3D""" tomo = Tomogram() subtomoOrigin = self.getOrigin() if subtomoOrigin: tomo.setOrigin(subtomoOrigin) tomo.setLocation(self.getVolName()) tomo.setSamplingRate(self.getSamplingRate()) coord = self._coordinate coord.setVolume(tomo) return coord
[docs] def getAcquisition(self): return self._acquisition
[docs] def setAcquisition(self, acquisition): self._acquisition = acquisition
[docs] def hasAcquisition(self): return self._acquisition is not None and \ self._acquisition.getAngleMin() is not None and \ self._acquisition.getAngleMax() is not None
[docs] def getVolId(self): """ Return the tomogram id if the coordinate is not None. or have set the _volId property. """ if self._volId.hasValue(): return self._volId.get() if self.hasCoordinate3D(): return self.getCoordinate3D().getVolId() return None
[docs] def setVolId(self, volId): self._volId.set(volId)
[docs] def getVolName(self): """ Return the tomogram filename if the coordinate is not None. or have set the _volName property. """ if self._volName.hasValue(): return self._volName.get() if self.getVolume(): return self.getVolume().getFileName() return None
[docs] def setVolName(self, volName): self._volName.set(volName)
[docs] def getVolumeOrigin(self, angstrom=False): """Return the a vector that can be used to move the position of the Coordinate3D associated to the SubTomogram (referred to the center of the Tomogram or other origin specified by the user) to the bottom left corner of the Tomogram """ if angstrom: return self.getShiftsFromOrigin() else: sr = self.getSamplingRate() origin = self.getShiftsFromOrigin() return int(origin[0] / sr), int(origin[1] / sr), int(origin[2] / sr)
[docs]class SetOfSubTomograms(data.SetOfVolumes): ITEM_TYPE = SubTomogram REP_TYPE = SubTomogram def __init__(self, **kwargs): super().__init__(**kwargs) self._acquisition = TomoAcquisition() self._coordsPointer = Pointer()
[docs] def copyInfo(self, other): """ Copy basic information (sampling rate and ctf) from other set of images to current one""" super().copyInfo(other) if hasattr(other, '_coordsPointer'): # Like the vesicles in pyseg self.copyAttributes(other, '_coordsPointer')
[docs] def hasCoordinates3D(self): return self._coordsPointer.hasValue()
[docs] def getCoordinates3D(self): """ Returns the SetOfCoordinates associated with this SetOfParticles""" return self._coordsPointer.get()
[docs] def setCoordinates3D(self, coordinates): """ Set the SetOfCoordinates associates with this set of particles. """ self._coordsPointer.set(coordinates)
[docs]class AverageSubTomogram(SubTomogram): """Represents a Average SubTomogram. It is a SubTomogram but it is useful to differentiate outputs.""" def __init__(self, **kwargs): SubTomogram.__init__(self, **kwargs)
[docs]class SetOfAverageSubTomograms(SetOfSubTomograms): """Represents a set of Averages. It is a SetOfSubTomograms but it is useful to differentiate outputs.""" ITEM_TYPE = AverageSubTomogram REP_TYPE = AverageSubTomogram def __init__(self, **kwargs): SetOfSubTomograms.__init__(self, **kwargs)
[docs]class ClassSubTomogram(SetOfSubTomograms): """ Represent a Class that groups SubTomogram objects. The representative of the class is an AverageSubTomogram. """ REP_TYPE = AverageSubTomogram
[docs] def copyInfo(self, other): """ Copy basic information (id and other properties) but not _mapperPath or _size from other set of SubTomograms to current one. """ self.copy(other, copyId=False, ignoreAttrs=['_mapperPath', '_size'])
[docs] def clone(self): clone = self.getClass()() clone.copy(self, ignoreAttrs=['_mapperPath', '_size']) return clone
[docs] def close(self): # Do nothing on close, since the db will be closed by SetOfClasses pass
[docs]class SetOfClassesSubTomograms(data.SetOfClasses): """ Store results from a subtomogram averaging method. """ ITEM_TYPE = ClassSubTomogram REP_TYPE = AverageSubTomogram
[docs]class LandmarkModel(data.EMObject): """Represents the set of landmarks belonging to an specific tilt-series.""" def __init__(self, tsId=None, fileName=None, modelName=None, **kwargs): data.EMObject.__init__(self, **kwargs) self._tsId = String(tsId) self._fileName = String(fileName) self._modelName = String(modelName) self._tiltSeries = Pointer(objDoStore=False)
[docs] def getTiltSeries(self): """ Return the tilt-series associated with this landmark model. """ return self._tiltSeries.get()
[docs] def setTiltSeries(self, tiltSeries): """ Set the tilt-series from which this landmark model were calculated. :param tiltSeries: Either a TiltSeries object or a pointer to it. """ if tiltSeries.isPointer(): self._tiltSeries.copy(tiltSeries) else: self._tiltSeries.set(tiltSeries)
[docs] def getTsId(self): return str(self._tsId)
[docs] def getFileName(self): return str(self._fileName)
[docs] def getModelName(self): return str(self._modelName)
[docs] def setTsId(self, tsId): self._tsId = String(tsId)
[docs] def setFileName(self, fileName): self._fileName = String(fileName)
[docs] def setModelName(self, modelName): self._modelName = String(modelName)
[docs] def addLandmark(self, xCoor, yCoor, tiltIm, chainId, xResid, yResid): fieldNames = ['xCoor', 'yCoor', 'tiltIm', 'chainId', 'xResid', 'yResid'] mode = "a" if os.path.exists(self.getFileName()) else "w" with open(self.getFileName(), mode) as f: writer = csv.DictWriter(f, delimiter='\t', fieldnames=fieldNames) if mode == "w": writer.writeheader() writer.writerow({'xCoor': xCoor, 'yCoor': yCoor, 'tiltIm': tiltIm, 'chainId': chainId, 'xResid': xResid, 'yResid': yResid})
[docs] def retrieveInfoTable(self): """ This methods return a table containing the information of the lankmark model. One landmark pero line specifying in order: xCoor, YCoor, tiltIm, chainId, xResid, yResid""" fileName = self.getFileName() outputInfo = [] with open(fileName) as f: reader = csv.reader(f) # Ignore header next(reader) for line in reader: vector = line[0].split() outputInfo.append(vector) return outputInfo
[docs]class SetOfLandmarkModels(data.EMSet): """Represents a class that groups a set of landmark models.""" ITEM_TYPE = LandmarkModel def __init__(self, **kwargs): super().__init__(**kwargs) self._setOfTiltSeriesPointer = Pointer() def __getitem__(self, itemId): """Add a pointer to a tilt-series before returning the landmark model""" lm = super().__getitem__(itemId) return self.completeLandmarkModel(lm)
[docs] def completeLandmarkModel(self, lm): """This method completes a landmark model object setting in execution time the tilt-series associated to it, since it is not possible to save pointers in the item classes of the set. IMPORTANT: this method must be implement every time it is necesary to retrive information from the tilt-series associated to the landmark models that compose the set.""" tsId = lm.getTsId() # Check for tilt series in set with coincident tsId for ts in self.getSetOfTiltSeries().iterItems(where="_tsId=='%s'" % tsId): lm.setTiltSeries(ts) return lm
[docs] def getLandmarkModelFromTsId(self, tsId): """ This method return the landmark model belonging to the set that has a coincident input tsId. :param tsId: tilt-series ID to search the landmark model into the set.""" for lm in self.iterItems(where="_tsId=='%s'" % tsId): return lm
[docs] def getSetOfTiltSeries(self, pointer = False): """ Return the set of tilt-series associated with this set of landmark models. """ if pointer: return self._setOfTiltSeriesPointer else: return self._setOfTiltSeriesPointer.get()
[docs] def setSetOfTiltSeries(self, setOfTiltSeries): """ Set the set of tilt-series from which this set of landmark models were calculted. :param tiltSeries: Either a TiltSeries object or a pointer to it. """ if setOfTiltSeries.isPointer(): self._setOfTiltSeriesPointer.copy(setOfTiltSeries) else: self._setOfTiltSeriesPointer.set(setOfTiltSeries)
[docs]class MeshPoint(Coordinate3D): """Mesh object: it stores the coordinates of the points (specified by the user) needed to define the triangulation of a volume. A Mesh object can be consider as a point cloud in 3D containing the coordinates needed to divide a given region of space into planar triangles interconnected that will result in a closed surface.""" def __init__(self, **kwargs): Coordinate3D.__init__(self, **kwargs) self._volumeName = String() self._description = None # Algebraic description of fitted mesh
[docs] def getVolumeName(self): return self._volumeName
[docs] def setVolumeName(self, volName): self._volumeName.set(volName)
[docs] def getDescription(self): return self._description
[docs] def setDescription(self, description): self._description = description
[docs] def hasDescription(self): return self._description is not None
[docs]class SetOfMeshes(SetOfCoordinates3D): """ Store a series of meshes. """ ITEM_TYPE = MeshPoint def __init__(self, **kwargs): SetOfCoordinates3D.__init__(self, **kwargs) self._numberOfMeshes = Integer() # Indicates how many meshes are in the set
[docs] def getNumberOfMeshes(self): return self._numberOfMeshes.get()
[docs] def setNumberOfMeshes(self, n): self._numberOfMeshes.set(n)
[docs]class Ellipsoid(data.EMObject): """This class represent an ellipsoid. This is an instance class of description attribute of object MeshPoint""" def __init__(self, **kwargs): data.EMObject.__init__(self, **kwargs) self._center = String() self._radii = String() self._algebraicDesc = String()
[docs] def getCenter(self): return self._center.get()
[docs] def setCenter(self, center): self._center.set(center)
[docs] def getRadii(self): return self._radii.get()
[docs] def setRadii(self, radii): self._radii.set(radii)
[docs] def getAlgebraicDesc(self): return self._center.get()
[docs] def setAlgebraicDesc(self, algebraicDesc): self._algebraicDesc.set(algebraicDesc)
[docs] def hasAlgebraicDesc(self): return self._algebraicDesc is not None
[docs]class CTFTomo(data.CTFModel): """ Represents a generic CTF model for a tilt-image. """ def __init__(self, **kwargs): data.CTFModel.__init__(self, **kwargs) self._index = Integer(kwargs.get('index', None)) self._defocusUDeviation = Float() self._isDefocusUDeviationInRange = Boolean(True) self._defocusVDeviation = Float() self._isDefocusVDeviationInRange = Boolean(True)
[docs] def copyInfo(self, other, copyId=False): self.copyAttributes(other, '_defocusU', '_defocusV', '_defocusAngle', '_defocusRatio', '_psdFile', '_resolution', '_fitQuality', '_index', '_defocusUDeviation', '_defocusVDeviation') self.setEnabled(other.isEnabled()) self.setIsDefocusUDeviationInRange(other.getIsDefocusUDeviationInRange()) self.setIsDefocusVDeviationInRange(other.getIsDefocusVDeviationInRange()) if other.hasPhaseShift(): self.setPhaseShift(other.getPhaseShift()) if other.hasEstimationInfoAsList(): if other.hasAstigmatismInfoAsList(): self._defocusUList = CsvList(pType=float) self._defocusVList = CsvList(pType=float) self._defocusAngleList = CsvList(pType=float) self.setDefocusUList(other.getDefocusUList()) self.setDefocusVList(other.getDefocusVList()) self.setDefocusAngleList(other.getDefocusAngleList()) else: self._defocusUList = CsvList(pType=float) self.setDefocusUList(other.getDefocusUList()) if other.hasPhaseShiftInfoAsList(): self._phaseShiftList = CsvList(pType=float) self.setPhaseShiftList(other.getPhaseShiftList()) if other.hasCutOnFrequncyInfoAsList(): self._cutOnFreqList = CsvList(pType=float) self.setCutOnFreqList(other.getCutOnFreqList()) self.setCutOnFreq(other.getCutOnFreq()) if copyId: self.copyObjId(other)
[docs] @staticmethod def ctfModelToCtfTomo(ctfModel): newCTFTomo = CTFTomo() newCTFTomo.copyAttributes(ctfModel, '_defocusU', '_defocusV', '_defocusAngle', '_defocusRatio', '_psdFile', '_resolution', '_fitQuality') return newCTFTomo
[docs] def getIndex(self): return self._index
[docs] def setIndex(self, value): self._index = Integer(value)
[docs] def getdefocusUDeviation(self): return self._defocusUDeviation
[docs] def setIsDefocusUDeviationInRange(self, value): self._isDefocusUDeviationInRange = Boolean(value)
[docs] def getIsDefocusUDeviationInRange(self): return self._isDefocusUDeviationInRange
[docs] def getdefocusVDeviation(self): return self._defocusVDeviation
[docs] def setIsDefocusVDeviationInRange(self, value): self._isDefocusVDeviationInRange = Boolean(value)
[docs] def getIsDefocusVDeviationInRange(self): return self._isDefocusVDeviationInRange
[docs] def getCutOnFreq(self): return self._cutOnFreq
[docs] def setCutOnFreq(self, value): self._cutOnFreq = Float(value)
" List data methods allow compatibility with IMOD metadata. "
[docs] def getDefocusUList(self): return self._defocusUList.get()
[docs] def setDefocusUList(self, defList): self._defocusUList.set(defList)
[docs] def appendDefocusUList(self, value): self._defocusUList.append(value)
[docs] def getDefocusVList(self): return self._defocusVList.get()
[docs] def setDefocusVList(self, defList): self._defocusVList.set(defList)
[docs] def appendDefocusVList(self, value): self._defocusVList.append(value)
[docs] def getDefocusAngleList(self): return self._defocusAngleList.get()
[docs] def setDefocusAngleList(self, defList): self._defocusAngleList.set(defList)
[docs] def appendDefocusAngleList(self, value): self._defocusAngleList.append(value)
[docs] def getPhaseShiftList(self): return self._phaseShiftList.get()
[docs] def setPhaseShiftList(self, defList): self._phaseShiftList.set(defList)
[docs] def appendPhaseShiftList(self, value): self._phaseShiftList.append(value)
[docs] def getCutOnFreqList(self): return self._cutOnFreqList.get()
[docs] def setCutOnFreqList(self, cutOnFreqList): self._cutOnFreqList.set(cutOnFreqList)
[docs] def appendCutOnFreqList(self, value): self._cutOnFreqList.append(value)
[docs] def hasEstimationInfoAsList(self): """ This method checks if the CTFTomo object contains estimation information in the form of a list. """ if hasattr(self, "_defocusUList") or hasattr(self, "_defocusUList"): return True else: return False
[docs] def hasAstigmatismInfoAsList(self): """ This method checks if the CTFTomo object contains astigmatism information in the form of a list. """ if hasattr(self, "_defocusUList") and hasattr(self, "_defocusVList"): return True else: return False
[docs] def hasPhaseShiftInfoAsList(self): """ This method checks if the CTFTomo object contains phase shift information in the form of a list. """ if hasattr(self, "_phaseShiftList"): return True else: return False
[docs] def hasCutOnFrequncyInfoAsList(self): """ This method checks if the CTFTomo object contains cut-on frequency information in the form of a list. """ if hasattr(self, "_cutOnFreqList"): return True else: return False
[docs] def completeInfoFromList(self): """ This method will set the _defocusU, _defocusV and _defocusAngle attributes from the provided CTF estimation information lists. Based on the IMOD program ctfphaseflip: "The program will assign that defocus value to the midpoint of the range of views. For a view at a given tilt angle, it will find the defocus either by interpolating between two surrounding midpoint angles, if there are such angles, or by taking the nearest defocus value, if the angle is beyond the range of the available midpoint angles. " - From IMOD documentation https://bio3d.colorado.edu/imod/doc/man/ctfphaseflip.html This method will assign as the defocus value and angle the median of the estimation list. """ " DEFOCUS INFORMATION -----------------------------------------------------------------------------------------" " Check that at least one list is provided " if not self.hasEstimationInfoAsList(): raise Exception("CTFTomo object has no _defocusUList neither _defocusUList argument initialized. No " "list information available.") " Get the number of provided list (1 or 2) " numberOfProvidedList = 2 if (hasattr(self, "_defocusUList") and hasattr(self, "_defocusVList")) else 1 " No astigmatism is estimated (only one list provided) " if numberOfProvidedList == 1: providedDefocusUList = self.getDefocusUList() if hasattr(self, "_defocusUList") else self.getDefocusVList() providedDefocusUList = providedDefocusUList.split(",") " DefocusAngle is set to 0 degrees " self.setDefocusAngle(0) " DefocusU and DefocusV are set at the same value, equal to the middle estimation of the list " middlePoint = math.trunc(len(providedDefocusUList) / 2) " If the size of the defocus list is even, mean the 2 centre values " if len(providedDefocusUList) % 2 == 0: value = (float(providedDefocusUList[middlePoint]) + float(providedDefocusUList[middlePoint - 1])) / 2 self.setDefocusU(value) self.setDefocusV(value) else: " If the size of defocus estimation is odd, get the centre value " value = providedDefocusUList[middlePoint] self.setDefocusU(value) self.setDefocusV(value) else: " Astigmatism is estimated (two lists are provided) " providedDefocusUList = self.getDefocusUList() providedDefocusUList = providedDefocusUList.split(",") providedDefocusVList = self.getDefocusVList() providedDefocusVList = providedDefocusVList.split(",") providedDefocusAngleList = self.getDefocusAngleList() providedDefocusAngleList = providedDefocusAngleList.split(",") " Check that the three list are equally long " if len(providedDefocusUList) != len(providedDefocusVList) or \ len(providedDefocusUList) != len(providedDefocusAngleList) or \ len(providedDefocusVList) != len(providedDefocusAngleList): raise Exception("DefocusUList, DefocusVList and DefocusAngleList lengths must be equal.") " DefocusU, DefocusV and DefocusAngle are set equal to the middle estimation of the list " middlePoint = math.trunc(len(providedDefocusUList) / 2) " If the size of the defocus list is even, mean the 2 centre values " if len(providedDefocusUList) % 2 == 0: defocusU = (float(providedDefocusUList[middlePoint]) + float(providedDefocusUList[middlePoint - 1])) / 2 defocusV = (float(providedDefocusVList[middlePoint]) + float(providedDefocusVList[middlePoint - 1])) / 2 defocusAngle = (float(providedDefocusAngleList[middlePoint]) + float(providedDefocusAngleList[middlePoint - 1])) / 2 self.setDefocusU(defocusU) self.setDefocusV(defocusV) self.setDefocusAngle(defocusAngle) else: " If the size of defocus estimation list is odd, get the centre value " defocusU = providedDefocusUList[middlePoint] defocusV = providedDefocusVList[middlePoint] defocusAngle = providedDefocusAngleList[middlePoint] self.setDefocusU(defocusU) self.setDefocusV(defocusV) self.setDefocusAngle(defocusAngle) " PHASE SHIFT INFORMATION -------------------------------------------------------------------------------------" " Check if phase shift information is also available " if hasattr(self, "_phaseShiftList"): providedPhaseShiftList = self.getPhaseShiftList() providedPhaseShiftList = providedPhaseShiftList.split(",") " Check that all the lists are equally long " if len(providedDefocusUList) != len(providedPhaseShiftList): raise Exception("PhaseShiftList length must be equal to DefocusUList, DefocusVList and " "DefocusAngleList lengths.") " PhaseShift is set equal to the middle estimation of the list " middlePoint = math.trunc(len(providedPhaseShiftList) / 2) " If the size of the phase shift list is even, mean the 2 centre values " if len(providedPhaseShiftList) % 2 == 0: phaseShift = (float(providedPhaseShiftList[middlePoint]) + float(providedPhaseShiftList[middlePoint - 1])) / 2 self.setPhaseShift(phaseShift) else: " If the size of phase shift list estimation is odd, get the centre value " phaseShift = providedPhaseShiftList[middlePoint] self.setPhaseShift(phaseShift) " CUT-ON FREQUENCY INFORMATION --------------------------------------------------------------------------------" " Check if cut-on frequency information is also available " if hasattr(self, "_cutOnFreqList"): providedCutOnFreqList = self.getCutOnFreqList() providedCutOnFreqList = providedCutOnFreqList.split(",") " Check that all the lists are equally long " if len(providedPhaseShiftList) != len(providedCutOnFreqList): raise Exception("CutOnFreqList length must be equal to PhaseShiftList, DefocusUList, DefocusVList and " "DefocusAngleList lengths.") " Cut-on frequency is set equal to the middle estimation of the list " middlePoint = math.trunc(len(providedCutOnFreqList) / 2) " If the size of the cut-on frequency shift list is even, mean the 2 centre values " if len(providedCutOnFreqList) % 2 == 0: cutOnFreq = (float(providedCutOnFreqList[middlePoint]) + float(providedCutOnFreqList[middlePoint - 1])) / 2 self.setCutOnFreq(cutOnFreq) else: " If the size of the cut-on frequency list estimation is odd, get the centre value " cutOnFreq = providedCutOnFreqList[middlePoint] self.setCutOnFreq(cutOnFreq) " Standardize the input values " self.standardize()
[docs] def getDefocusUDeviation(self, mean): return abs(self.getDefocusU() - mean)
[docs] def isDefocusUDeviationInRange(self, mean, percentage=20): defocusUDeviation = self.getDefocusUDeviation(mean) return True if defocusUDeviation < (percentage * mean/100) else False
[docs] def getDefocusVDeviation(self, mean): return abs(self.getDefocusV() - mean)
[docs] def isDefocusVDeviationInRange(self, mean, percentage=20): defocusVDeviation = self.getDefocusVDeviation(mean) return True if defocusVDeviation < (percentage * mean/100) else False
[docs]class CTFTomoSeries(data.EMSet): """ Represents a set of CTF models belonging to the same tilt-series. """ ITEM_TYPE = CTFTomo def __init__(self, **kwargs): data.EMSet.__init__(self, **kwargs) self._tiltSeriesPointer = Pointer(kwargs.get('tiltSeriesPointer', None)) self._tsId = String(kwargs.get('tsId', None)) self._isDefocusUDeviationInRange = Boolean(True) self._isDefocusVDeviationInRange = Boolean(True) # CtfModels will always be used inside a SetOfTiltSeries # so, let's do no store the mapper path by default self._mapperPath.setStore(False)
[docs] def clone(self, ignoreAttrs=('_mapperPath', '_size')): clone = self.getClass()() clone.copy(self, ignoreAttrs=ignoreAttrs) return clone
def __del__(self): # Cancel closing the mapper since this class is an item of a set and shares the mapper with its parent set. pass
[docs] def getTiltSeries(self): """ Return the tilt-series associated with this CTF model series. """ return self._tiltSeriesPointer.get()
[docs] def setTiltSeries(self, tiltSeries): """ Set the tilt-series from which this CTF model series were estimated. :param tiltSeries: Either a TiltSeries object or a pointer to it. """ if tiltSeries.isPointer(): self._tiltSeriesPointer.copy(tiltSeries) else: self._tiltSeriesPointer.set(tiltSeries)
[docs] def getTsId(self): """ Get unique TiltSeries ID, usually retrieved from the file pattern provided by the user at the import time. """ return self._tsId.get()
[docs] def setTsId(self, value): self._tsId.set(value)
[docs] def getNumberOfEstimationsInRange(self): """ Return the tilt-images range size used for estimation. """ return self._estimationsInRange.get()
[docs] def setNumberOfEstimationsInRange(self, estimationRange): """ Set the tilt-images range size used for estimation. :param estimationRange: Integer of the range size. """ self._estimationsInRange = Integer(estimationRange)
[docs] def getIMODDefocusFileFlag(self): """ Return the format file from which the CTF estimation information has been acquired. This parameter is useful for posterior information and format conversions between IMOD and Scipion. The flag value "is the sum of: 1 if the file has astigmatism values 2 if the astigmatism axis angle is in radians, not degrees 4 if the file has phase shifts 8 if the phase shifts are in radians, not degrees 16 if tilt angles need to be inverted to match what the program expects (what Ctfplotter would produce) with the -invert option 32 if the file has cut-on frequencies attenuating the phase at low frequencies" from https://bio3d.colorado.edu/imod/doc/man/ctfphaseflip.html """ return self._IMODDefocusFileFlag.get()
[docs] def setIMODDefocusFileFlag(self, flag): """ Set the format file from which the CTF estimation information has been acquired. :param flag: Integer of the range size. This parameter is useful for posterior information and format conversions between IMOD and Scipion. The flag value "is the sum of: 1 if the file has astigmatism values 2 if the astigmatism axis angle is in radians, not degrees 4 if the file has phase shifts 8 if the phase shifts are in radians, not degrees 16 if tilt angles need to be inverted to match what the program expects (what Ctfplotter would produce) with the -invert option 32 if the file has cut-on frequencies attenuating the phase at low frequencies" from https://bio3d.colorado.edu/imod/doc/man/ctfphaseflip.html """ self._IMODDefocusFileFlag = Integer(flag)
[docs] def setNumberOfEstimationsInRangeFromDefocusList(self): """ Set the tilt-images estimation range size used for estimation from the defocus info list size. """ estimationRange = 0 for ctfEstimation in self: # Check that at least one list is provided if not (hasattr(ctfEstimation, "_defocusUList") or hasattr(ctfEstimation, "_defocusUList")): raise Exception("CTFTomo object has no _defocusUList neither _defocusUList argument initialized. No " "list information available.") providedList = ctfEstimation.getDefocusUList() if hasattr(ctfEstimation, "_defocusUList") \ else ctfEstimation.getDefocusVList() providedList = providedList.split(",") listLength = len(providedList) - 1 if listLength > estimationRange: estimationRange = listLength self.setNumberOfEstimationsInRange(estimationRange)
[docs] def getIsDefocusUDeviationInRange(self): return self._isDefocusUDeviationInRange
[docs] def setIsDefocusUDeviationInRange(self, value): self._isDefocusUDeviationInRange = Boolean(value)
[docs] def getIsDefocusVDeviationInRange(self): return self._isDefocusVDeviationInRange
[docs] def setIsDefocusVDeviationInRange(self, value): self._isDefocusVDeviationInRange = Boolean(value)
[docs] def calculateDefocusUDeviation(self, defocusUTolerance=20): defocusUValueList = [] for ctfTomo in self: defocusUValueList.append(ctfTomo.getDefocusU()) mean = statistics.mean(defocusUValueList) for ctfTomo in self.iterItems(iterate=False): ctfTomo._defocusUDeviation.set(ctfTomo.getDefocusUDeviation(mean)) isDefocusUDeviationInRange = ctfTomo.isDefocusUDeviationInRange(mean, percentage=defocusUTolerance) if not isDefocusUDeviationInRange: self._isDefocusUDeviationInRange.set(False) ctfTomo._isDefocusUDeviationInRange.set(isDefocusUDeviationInRange) self.update(ctfTomo)
[docs] def calculateDefocusVDeviation(self, defocusVTolerance=20): defocusVValueList = [] for ctfTomo in self: defocusVValueList.append(ctfTomo.getDefocusV()) mean = statistics.mean(defocusVValueList) for ctfTomo in self.iterItems(iterate=False): ctfTomo._defocusVDeviation.set(ctfTomo.getDefocusVDeviation(mean)) isDefocusVDeviationInRange = ctfTomo.isDefocusVDeviationInRange(mean, percentage=defocusVTolerance) if not isDefocusVDeviationInRange: self._isDefocusVDeviationInRange.set(False) ctfTomo._isDefocusVDeviationInRange.set(isDefocusVDeviationInRange) self.update(ctfTomo)
[docs]class SetOfCTFTomoSeries(data.EMSet): """ Represents a set of CTF model series belonging to the same set of tilt-series. """ ITEM_TYPE = CTFTomoSeries USE_CREATE_COPY_FOR_SUBSET = True def __init__(self, **kwargs): data.EMSet.__init__(self, **kwargs) self._setOfTiltSeriesPointer = Pointer(kwargs.get('tiltSeriesPointer', None))
[docs] def copyInfo(self, other): data.EMSet.copyInfo(self, other) self.setSetOfTiltSeries(other.getSetOfTiltSeries(pointer=True))
[docs] def getSetOfTiltSeries(self, pointer=False): """ Return the tilt-series associated with this CTF model series. """ return self._setOfTiltSeriesPointer.get() if not pointer else self._setOfTiltSeriesPointer
[docs] def setSetOfTiltSeries(self, setOfTiltSeries): """ Set the tilt-series from which this CTF model series were estimated. :param setOfTiltSeries: Either a TiltSeries object or a pointer to it. """ if setOfTiltSeries.isPointer(): self._setOfTiltSeriesPointer.copy(setOfTiltSeries) else: self._setOfTiltSeriesPointer.set(setOfTiltSeries)
[docs] def iterClassItems(self, iterDisabled=False): """ Iterate over the images of a class. Params: iterDisabled: If True, also include the disabled items. """ for cls in self.iterItems(): if iterDisabled or cls.isEnabled(): for img in cls: if iterDisabled or img.isEnabled(): yield img
def _setItemMapperPath(self, item): """ Set the mapper path of this class according to the mapper path of the SetOfClasses and also the prefix according to class id """ item._mapperPath.set('%s,id%s' % (self.getFileName(), item.getObjId())) item.load() def _insertItem(self, item): """ Create the SetOfImages assigned to a class. If the file exists, it will load the Set. """ self._setItemMapperPath(item) data.EMSet._insertItem(self, item) item.write(properties=False) # Set.write(self) def __getitem__(self, itemId): """ Setup the mapper classes before returning the item. """ classItem = data.EMSet.__getitem__(self, itemId) objId = None for tiltSeries in self.getSetOfTiltSeries().iterItems(iterate=False): if tiltSeries.getTsId() == classItem.getTsId(): objId = tiltSeries.getObjId() if objId is None: raise ("Could not find tilt-series with tsId = %s" % classItem.getTsId()) classItem.setTiltSeries(self.getSetOfTiltSeries()[objId]) self._setItemMapperPath(classItem) return classItem
[docs] def getFirstItem(self): classItem = data.EMSet.getFirstItem(self) self._setItemMapperPath(classItem) return classItem
[docs] def iterItems(self, orderBy='id', direction='ASC'): for item in data.EMSet.iterItems(self, orderBy=orderBy, direction=direction): objId = None for tiltSeries in self.getSetOfTiltSeries(): if tiltSeries.getTsId() == item.getTsId(): objId = tiltSeries.getObjId() if objId is None: raise ("Could not find tilt-series with tsId = %s" % item.getTsId()) item.setTiltSeries(self.getSetOfTiltSeries()[objId]) self._setItemMapperPath(item) yield item