# **************************************************************************
# *
# * Authors: J.M. De la Rosa Trevin [1]
# * Adrian Quintana (aquintana@cnb.csic.es)
# *
# * [1] SciLifeLab, Stockholm University
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307 USA
# *
# * All comments concerning this program package may be sent to the
# * e-mail address 'scipion@cnb.csic.es'
# *
# **************************************************************************
import os
import subprocess
from pyworkflow.protocol import Protocol
from pyworkflow.utils.path import cleanPath
from pwem import emlib
import pwem
from .constants import XMIPP_DLTK_NAME
try: # If binding is not already done, this will fail.
from xmipp_base import * # xmipp_base and xmippViz come from the binding and
from xmippViz import * # it is not available before installing the binaries
except Exception as exc: # TODO: catch exception by type and ensure it is caused by CondaEnvManager
error = exc
class CondaEnvManager:
""" Fake class to avoid very early fails (during installation). """
@classmethod
def yieldInstallAllCmds(*args, **kwargs):
yield ('echo "Some error occurred when importing xmipp_base '
'(from Xmipp binding): %s"' % error, 'void.target')
import xmipp3
LABEL_TYPES = {
emlib.LABEL_SIZET: int,
emlib.LABEL_DOUBLE: float,
emlib.LABEL_INT: int,
emlib.LABEL_BOOL: bool
}
[docs]def getXmippPath(*paths):
'''Return the path the the Xmipp installation folder
if a subfolder is provided, will be concatenated to the path'''
return os.path.join(pwem.Config.XMIPP_HOME, *paths)
[docs]def isXmippCudaPresent(program=""):
if program=="":
return os.path.isfile(getXmippPath("bin","xmipp_cuda_reconstruct_fourier"))
else:
return os.path.isfile(getXmippPath("bin",program))
[docs]def getLabelPythonType(label):
""" From xmipp label to python variable type """
labelType = emlib.labelType(label)
return LABEL_TYPES.get(labelType, str)
[docs]class XmippProtocol:
""" This class groups some common functionalities that
share some Xmipp protocols, like converting steps.
"""
def _insertConvertStep(self, inputName, xmippClass, resultFn):
""" Insert the convertInputToXmipp if the inputName attribute
is not an instance of xmippClass.
It will return the result filename, if the
conversion is needed, this will be input resultFn.
If not, it will be inputAttr.getFileName()
"""
inputAttr = getattr(self, inputName)
if not isinstance(inputAttr, xmippClass):
self._insertFunctionStep('convertInputToXmipp', inputName, xmippClass,
resultFn)
return resultFn
return inputAttr.getFileName()
[docs] @classmethod
def getModel(cls, *modelPath, **kwargs):
""" Returns the path to the models folder followed by
the given relative path.
.../xmipp/models/myModel/myFile.h5 <= getModel('myModel', 'myFile.h5')
NOTE: it raise and exception when model not found, set doRaise=False
in the arguments to skip that raise, especially in validation
asserions!
"""
os.environ['XMIPP_HOME'] = getXmippPath()
return getModel(*modelPath, **kwargs)
[docs] @classmethod
def getCondaEnv(cls, **kwargs):
""" Returns the environ corresponding to the condaEnv of the protocol.
kwargs can contain:
- 'env': Any environ (the xmipp one is the default)
- '_conda_env': An installed condaEnv name
> _conda_env preference: kwargs > protocol default > general default
"""
envName = CondaEnvManager.getCondaName(cls, **kwargs)
env = kwargs.get('env', xmipp3.Plugin.getEnviron())
return CondaEnvManager.getCondaEnv(env, envName)
# findRow() cannot go to xmipp_base (binding) because depends on emlib.metadata.Row()
[docs]def findRow(md, label, value):
""" Query the metadata for a row with label=value.
Params:
md: metadata to query.
label: label to check value
value: value for equal condition
Returns:
Row object of the row found.
None if no row is found with label=value
"""
mdQuery = emlib.MetaData() # store result
mdQuery.importObjects(md, emlib.MDValueEQ(label, value))
n = mdQuery.size()
if n == 0:
row = None
elif n == 1:
row = emlib.metadata.Row()
row.readFromMd(mdQuery, mdQuery.firstObject())
else:
raise Exception("findRow: more than one row found matching the query "
"%s = %s" % (emlib.label2Str(label), value))
return row
[docs]def findRowById(md, value):
""" Same as findRow, but using MDL_ITEM_ID for label. """
return findRow(md, emlib.MDL_ITEM_ID, int(value))
# getMdFirstRow() cannot go to xmipp_base (binding) because depends on emlib.metadata.Row()
[docs]def getMdFirstRow(filename):
""" Create a MetaData but only read the first row.
This method should be used for validations of labels
or metadata size, but the full metadata is not needed.
"""
md = emlib.MetaData()
md.read(filename, 1)
if md.getParsedLines():
row = emlib.metadata.Row()
row.readFromMd(md, md.firstObject())
else:
row = None
return row
# iterMdRows() cannot go to xmipp_base (binding) because depends on emlib.metadata.Row()
[docs]def iterMdRows(md):
""" Iterate over the rows of the given metadata. """
# If md is string, take as filename and create the metadata
if isinstance(md, str):
md = emlib.MetaData(md)
row = emlib.metadata.Row()
for objId in md:
row.readFromMd(md, objId)
yield row
[docs]class XmippSet:
# FIXME: It seems unused...
""" Support class to store sets in Xmipp base on a MetaData. """
def __init__(self, itemClass):
""" Create new set, base on a Metadata.
itemClass: Class that represent the items.
A method .getFileName should be available to store the md.
Items contained in XmippSet are supposed to inherit from Row.
"""
self._itemClass = itemClass
# self._fileName = fileName
self._md = emlib.MetaData()
def __iter__(self):
"""Iterate over the set of images in the MetaData"""
#self._md.read(self._fileName)
for objId in self._md:
item = self._itemClass()
item.readFromMd(self._md, objId)
#m = emib.Image(md.getValue(emlib.MDL_IMAGE, objId))
#if self.hasCTF():
# m.ctfModel = XmippCTFModel(md.getValue(emlib.MDL_CTF_MODEL, objId))
yield item
#
# def setFileName(self, filename):
# self._fileName = filename
[docs] def setMd(self, md):
self._md = md
[docs] def write(self, filename, mode):
self._md.write(filename, mode)
[docs] def append(self, item):
"""Add a new item to the set, the item can be of a base class (EM)
and will be try to convert it to the respective _itemClass."""
objId = self._md.addObject()
# Convert to xmipp micrograph if necessary
if isinstance(item, self._itemClass):
itemXmipp = item
else:
itemXmipp = self._itemClass.convert(item)
itemXmipp.writeToMd(self._md, objId)
[docs] def sort(self, label):
self._md.sort(label)
[docs] def read(self, filename):
self._md.read(filename)
[docs] def isEmpty(self):
return self._md.isEmpty()
[docs] def getItemClass(self):
""" Return the Class of the items in the Set. """
return self._itemClass
[docs] def getSize(self):
return self._md.size()
[docs] def convert(self, xmippSetClass, filename):
""" Convert from a generic set to a xmippSetClass.
In particular a filename is requiered to store the result MetaData.
It is also asummed that this class have a .copyInfo method.
"""
if isinstance(self, xmippSetClass):
return self
setOut = xmippSetClass(filename)
setOut.copyInfo(self)
for item in self:
setOut.append(item)
setOut.write()
return setOut
[docs]class ProjMatcher:
""" Base class for protocols that use a projection """
[docs] def projMatchStep(self, volume, angularSampling, symmetryGroup, images, fnAngles, Xdim):
# Generate gallery of projections
fnGallery = self._getExtraPath('gallery.stk')
if volume.endswith('.mrc'):
volume+=":mrc"
self.runJob("xmipp_angular_project_library",
"-i %s -o %s --sampling_rate %f --sym %s --method fourier 1 0.25 bspline "
"--compute_neighbors --angular_distance -1 --experimental_images %s"
% (volume, fnGallery, angularSampling, symmetryGroup, images))
# Assign angles
self.runJob("xmipp_angular_projection_matching",
"-i %s -o %s --ref %s --Ri 0 --Ro %s --max_shift 1000 "
"--search5d_shift %s --search5d_step %s --append"
% (images, fnAngles, fnGallery, str(Xdim/2),
str(int(Xdim/10)), str(int(Xdim/25))))
cleanPath(self._getExtraPath('gallery_sampling.xmd'))
cleanPath(self._getExtraPath('gallery_angles.doc'))
cleanPath(self._getExtraPath('gallery.doc'))
# Write angles in the original file and sort
MD=emlib.MetaData(fnAngles)
for id in MD:
galleryReference = MD.getValue(emlib.MDL_REF,id)
MD.setValue(emlib.MDL_IMAGE_REF, "%05d@%s" % (galleryReference+1, fnGallery), id)
MD.write(fnAngles)
[docs] def produceAlignedImagesStep(self, volumeIsCTFCorrected, fn, images):
from numpy import array, dot
fnOut = 'classes_aligned@' + fn
MDin = emlib.MetaData(images)
MDout = emlib.MetaData()
n = 1
hasCTF = MDin.containsLabel(emlib.MDL_CTF_MODEL)
for i in MDin:
fnImg = MDin.getValue(emlib.MDL_IMAGE,i)
fnImgRef = MDin.getValue(emlib.MDL_IMAGE_REF,i)
maxCC = MDin.getValue(emlib.MDL_MAXCC,i)
rot = MDin.getValue(emlib.MDL_ANGLE_ROT,i)
tilt = MDin.getValue(emlib.MDL_ANGLE_TILT,i)
psi =-1.*MDin.getValue(emlib.MDL_ANGLE_PSI,i)
flip = MDin.getValue(emlib.MDL_FLIP,i)
if flip:
psi = -psi
eulerMatrix = emlib.Euler_angles2matrix(0., 0., psi)
x = MDin.getValue(emlib.MDL_SHIFT_X,i)
y = MDin.getValue(emlib.MDL_SHIFT_Y,i)
shift = array([x, y, 0])
shiftOut = dot(eulerMatrix, shift)
[x,y,z]= shiftOut
if flip:
x = -x
id = MDout.addObject()
MDout.setValue(emlib.MDL_IMAGE, fnImg, id)
MDout.setValue(emlib.MDL_IMAGE_REF, fnImgRef, id)
MDout.setValue(emlib.MDL_IMAGE1, "%05d@%s"%(n, self._getExtraPath("diff.stk")), id)
if hasCTF:
fnCTF = MDin.getValue(emlib.MDL_CTF_MODEL,i)
MDout.setValue(emlib.MDL_CTF_MODEL,fnCTF,id)
MDout.setValue(emlib.MDL_MAXCC, maxCC, id)
MDout.setValue(emlib.MDL_ANGLE_ROT, rot, id)
MDout.setValue(emlib.MDL_ANGLE_TILT, tilt, id)
MDout.setValue(emlib.MDL_ANGLE_PSI, psi, id)
MDout.setValue(emlib.MDL_SHIFT_X, x,id)
MDout.setValue(emlib.MDL_SHIFT_Y, y,id)
MDout.setValue(emlib.MDL_FLIP,flip,id)
MDout.setValue(emlib.MDL_ENABLED,1,id)
n+=1
MDout.write(fnOut,emlib.MD_APPEND)
# Actually create the differences
img = emlib.Image()
imgRef = emlib.Image()
if hasCTF and volumeIsCTFCorrected:
Ts = MDin.getValue(emlib.MDL_SAMPLINGRATE, MDin.firstObject())
for i in MDout:
img.readApplyGeo(MDout,i)
imgRef.read(MDout.getValue(emlib.MDL_IMAGE_REF,i))
if hasCTF and volumeIsCTFCorrected:
fnCTF = MDout.getValue(emlib.MDL_CTF_MODEL,i)
emlib.applyCTF(imgRef, fnCTF, Ts)
img.convert2DataType(emlib.DT_DOUBLE)
imgDiff = img-imgRef
imgDiff.write(MDout.getValue(emlib.MDL_IMAGE1,i))
[docs]class HelicalFinder:
""" Base class for protocols that find helical symmetry """
[docs] def getSymmetry(self,dihedral):
if dihedral:
return "helicalDihedral"
else:
return "helical"
[docs] def runCoarseSearch(self, fnVol, dihedral, heightFraction, z0, zF, zStep,
rot0, rotF, rotStep, Nthr, fnOut, cylinderInnerRadius,
cylinderOuterRadius, height, Ts, Cn="c1"):
args = ("-i %s --sym %s --heightFraction %f -z %f %f %f "
"--rotHelical %f %f %f --thr %d -o %s --sampling %f"
%(fnVol, self.getSymmetry(dihedral), heightFraction, z0, zF, zStep,
rot0, rotF, rotStep, Nthr, fnOut, Ts))
if Cn!="c1":
args += " --sym2 %s"%Cn
if cylinderOuterRadius > 0 and cylinderInnerRadius < 0:
args += " --mask cylinder %d %d"%(-cylinderOuterRadius, -height)
elif cylinderOuterRadius > 0 and cylinderInnerRadius > 0:
args += " --mask tube %d %d %d" % (-cylinderInnerRadius, -cylinderOuterRadius, -height)
self.runJob('xmipp_volume_find_symmetry', args, numberOfMpi=1)
[docs] def runFineSearch(self, fnVol, dihedral, fnCoarse, fnFine, heightFraction,
z0, zF, rot0, rotF, cylinderInnerRadius, cylinderOuterRadius,
height, Ts, Cn="c1"):
md=emlib.MetaData(fnCoarse)
objId=md.firstObject()
rotInit=md.getValue(emlib.MDL_ANGLE_ROT,objId)
zInit=md.getValue(emlib.MDL_SHIFT_Z,objId)
args=("-i %s --sym %s --heightFraction %f --localHelical %f %f -o %s "
"-z %f %f 1 --rotHelical %f %f 1 --sampling %f"
%(fnVol, self.getSymmetry(dihedral), heightFraction, zInit, rotInit,
fnFine, z0, zF, rot0, rotF, Ts))
if Cn!="c1":
args += " --sym2 %s"%Cn
if cylinderOuterRadius>0 and cylinderInnerRadius<0:
args+=" --mask cylinder %d %d"%(-cylinderOuterRadius,-height)
elif cylinderOuterRadius>0 and cylinderInnerRadius>0:
args+=" --mask tube %d %d %d"%(-cylinderInnerRadius,-cylinderOuterRadius,-height)
self.runJob('xmipp_volume_find_symmetry',args, numberOfMpi=1)
[docs] def runSymmetrize(self, fnVol, dihedral, fnParams, fnOut, heightFraction,
cylinderInnerRadius, cylinderOuterRadius, height, Ts, Cn="c1"):
md=emlib.MetaData(fnParams)
objId=md.firstObject()
rot0=md.getValue(emlib.MDL_ANGLE_ROT,objId)
z0=md.getValue(emlib.MDL_SHIFT_Z,objId)
args=("-i %s --sym %s --helixParams %f %f --heightFraction %f -o %s "
"--sampling %f --dont_wrap"
% (fnVol,self.getSymmetry(dihedral),z0,rot0,heightFraction,fnOut,Ts))
if Cn!="c1":
args += " --sym2 %s"%Cn
self.runJob('xmipp_transform_symmetrize',args,numberOfMpi=1)
doMask=False
if cylinderOuterRadius>0 and cylinderInnerRadius<0:
args="-i %s --mask cylinder %d %d"%(fnVol,-cylinderOuterRadius,-height)
doMask=True
elif cylinderOuterRadius>0 and cylinderInnerRadius>0:
args="-i %s --mask tube %d %d %d"%(fnVol,-cylinderInnerRadius,
-cylinderOuterRadius,-height)
doMask=True
if doMask:
self.runJob('xmipp_transform_mask',args,numberOfMpi=1)