Source code for xmipp3.protocols.protocol_convert_pdb

# -*- coding: utf-8 -*-
# **************************************************************************
# *
# * Authors:  Jesus Cuenca (jcuenca@cnb.csic.es)
# *           Roberto Marabini (rmarabini@cnb.csic.es)
# *           Ignacio Foche
# * 
# *           
# * Unidad de  Bioinformatica of Centro Nacional de Biotecnologia , CSIC
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'scipion@cnb.csic.es'
# *
# **************************************************************************

# General imports
import os

# Scipion em imports
from pwem.convert.headers import setMRCSamplingRate
from pwem.emlib.image import ImageHandler
from pwem.convert import cifToPdb, headers
from pwem.objects import Volume, Transform, SetOfVolumes, AtomStruct, SetOfAtomStructs
from pwem.protocols import ProtInitialVolume
import pyworkflow.protocol.params as params
import pyworkflow.protocol.constants as const
from pyworkflow.utils import replaceBaseExt, removeExt, getExt, createLink, replaceExt, removeBaseExt
from pyworkflow import UPDATED

[docs]class XmippProtConvertPdb(ProtInitialVolume): """ Convert atomic structure(s) into volume(s) """ _label = 'convert pdbs to volumes' _devStatus = UPDATED OUTPUT_NAME1 = "outputVolume" OUTPUT_NAME2 = "outputVolumes" OUTPUT_NAME3 = "outputPdb" OUTPUT_NAME4 = "outputAtomStructs" _possibleOutputs = {OUTPUT_NAME1: Volume, OUTPUT_NAME2: SetOfVolumes, OUTPUT_NAME3: AtomStruct, OUTPUT_NAME4: SetOfAtomStructs} # --------------------------- Class constructor -------------------------------------------- def __init__(self, **args): # Calling parent class constructor super().__init__(**args) # Defining execution mode. Steps will take place in parallel now # Full tutorial on how to parallelize protocols can be read here: # https://scipion-em.github.io/docs/release-3.0.0/docs/developer/parallelization.html self.stepsExecutionMode = params.STEPS_PARALLEL # --------------------------- DEFINE param functions -------------------------------------------- def _defineParams(self, form): """ Define the parameters that will be input for the Protocol. This definition is also used to generate automatically the GUI. """ # Defining condition string for x, y, z coords coordsCondition = 'setSize and not vol' # Defining parallel arguments form.addParallelSection(threads=4, mpi=1) # Generating form form.addSection(label='Input') form.addParam('pdbObj', params.PointerParam, pointerClass='AtomStruct, SetOfAtomStructs', label="Input structure(s) ", help='Specify input atomic structure(s).') form.addParam('sampling', params.FloatParam, default=1.0, label="Sampling rate (Å/px)", help='Sampling rate (Angstroms/pixel)') form.addParam('vol', params.BooleanParam, label='Use a volume as an empty template?', default=False, condition='not isinstance(pdbObj, SetOfAtomStructs)', help='Use an existing volume to define the size and origin for the output volume. If this option' 'is selected, make sure that "Center PDB" in advanced parameters is set to *No*.') form.addParam('volObj', params.PointerParam, pointerClass='Volume', label="Input volume ", condition='not isinstance(pdbObj, SetOfAtomStructs) and vol', allowsNull=True, help='The origin and the final size of the output volume will be taken from this volume.') form.addParam('setSize', params.BooleanParam, label='Set final size?', default=False, condition='not vol and not isinstance(pdbObj, SetOfAtomStructs)') form.addParam('size', params.IntParam, label="Box side size (px)", condition='isinstance(pdbObj, SetOfAtomStructs)', allowsNull=True, help='This size should apply to all volumes') form.addParam('size_z', params.IntParam, condition=coordsCondition, allowsNull=True, label="Final size (px) Z", help='Final size in Z in pixels. If no value is provided, protocol will estimate it.') form.addParam('size_y', params.IntParam, condition=coordsCondition, allowsNull=True, label="Final size (px) Y", help='Final size in Y in pixels. If no value is provided, protocol will estimate it.') form.addParam('size_x', params.IntParam, condition=coordsCondition, allowsNull=True, label="Final size (px) X", help='Final size in X in pixels. If desired output size is x = y = z you can only fill this ' 'field. If no value is provided, protocol will estimate it.') form.addParam('centerPdb', params.BooleanParam, default=True, expertLevel=const.LEVEL_ADVANCED, label="Center PDB", help='Center PDB with the center of mass.') form.addParam('outPdb', params.BooleanParam, default=False, expertLevel=const.LEVEL_ADVANCED, label="Store centered PDB", help='Set to \'Yes\' if you want to save centered PDB. ' 'It will be stored in the output directory of this protocol.') form.addParam('convertCif', params.BooleanParam, default=True, expertLevel=const.LEVEL_ADVANCED, label="Convert CIF to PDB", help='If set to true and input atom struct file is a CIF, it will get converted to PDB.') form.addParam('clean', params.BooleanParam, default=True, expertLevel=const.LEVEL_ADVANCED, label="Clean tmp files", help='Delete all non-output files once the protocol has finished producing them.') # --------------------------- INSERT steps functions -------------------------------------------- def _insertAllSteps(self): """ In this function the steps that are going to be executed should be defined. Two of the most used functions are: _insertFunctionStep or _insertRunJobStep """ # Checking if input is set or not isSet = not isinstance(self.pdbObj.get(), AtomStruct) # Generating pdb list and obtaining input sampling rate pdbList = self._getPdbFileNames() samplingR = self.sampling.get() # Defining list of function ids to be waited by the createOutput function deps = [] for pdbFn in pdbList: # Calling processConversion in parallel with each input data deps.append(self._insertFunctionStep(self.processConversion, pdbFn, samplingR, isSet, prerequisites=[])) # Insert output conversion step self._insertFunctionStep(self.createOutput, isSet, samplingR, prerequisites=deps) # --------------------------- STEPS functions --------------------------------------------
[docs] def processConversion(self, pdb, samplingR, isSet): """ This step runs the pdb conversion. """ # Conditionally converting all input atomic structures to .pdb pdb = self._convertAtomStruct(pdb) # Generating output file for each input outFile = removeExt(pdb) # Getting args for program call args = self._getConversionArgs(pdb, samplingR, outFile, isSet=isSet) # Showing input and output file names self.info("Input file: " + pdb) self.info("Output file: " + outFile) # Calling program self.runJob("xmipp_volume_from_pdb", args)
[docs] def createOutput(self, isSet, samplingR): extraPath = self._getExtraPath() # Saving centered pdbs if wanted if self.centerPdb and self.outPdb: centeredPdbs = self._getExtraPath('centerted') self.runJob('mkdir -p', centeredPdbs) self.runJob('mv', '{}/*_centered.pdb {}'.format(extraPath, centeredPdbs)) # Generating output objects outputVol = self._createSetOfVolumes() if isSet else None pdbOut = self.centerPdb and self.outPdb if pdbOut: outputPdb = self._createSetOfPDBs() if isSet else None # Instantiating image handler ih = ImageHandler() # Generating input list pdbFns = self._getPdbFileNames() # Converting volumes. Since xmipp generates always a .vol we do the conversion here for pdbFn in pdbFns: # Generating ouput mrc volumeFn = self._getExtraPath(replaceBaseExt(pdbFn, 'vol')) mrcFn = replaceExt(volumeFn, 'mrc') ih.convert(volumeFn, mrcFn) setMRCSamplingRate(mrcFn, samplingR) # Generating output volume object from mrc volume = Volume() volume.setSamplingRate(samplingR) volume.setFileName(mrcFn) volume.fixMRCVolume(setSamplingRate=True) # If input is set, append volume to set if isSet: outputVol.append(volume) # Generating output PDB if pdbOut: centeredPdbFn = '{}/{}_centered.pdb'.format(centeredPdbs, removeBaseExt(pdbFn)) atom = AtomStruct(filename=centeredPdbFn) atom.setVolume(volume) if isSet: outputPdb.append(atom) if not isSet: # If input is single atom struct, get produced volume as output outputVol = volume if self.vol: origin = Transform() origin.setShiftsTuple(self.shifts) outputVol.setOrigin(origin) if pdbOut: outputPdb = atom # Setting ouput sampling rate outputVol.setSamplingRate(samplingR) # Removing temporary files if self.clean: self.runJob('rm', '-rf {}/*.vol {}/*.cif {}/*.pdb'.format(extraPath, extraPath, extraPath)) if pdbOut: outputPdbName = self.OUTPUT_NAME4 if isSet else self.OUTPUT_NAME3 self._defineOutputs(**{outputPdbName: outputPdb}) # Defining output outputVolName = self.OUTPUT_NAME2 if isSet else self.OUTPUT_NAME1 self._defineOutputs(**{outputVolName: outputVol}) self._defineSourceRelation(self.pdbObj, outputVol)
# --------------------------- INFO functions -------------------------------------------- def _summary(self): """ Even if the full set of parameters is available, this function provides summary information about an specific run. """ summary = [] # Add some lines of summary information if not (hasattr(self, self.OUTPUT_NAME1) or hasattr(self, self.OUTPUT_NAME2)): summary.append("The output is not ready yet.") return summary def _validate(self): """ The function of this hook is to add some validation before the protocol is launched to be executed. It should return a list of errors. If the list is empty the protocol can be executed. """ errors = [] # Checking if MPI is selected (only threads are allowed) if self.numberOfMpi > 1: errors.append('MPI cannot be selected, because Scipion is going to drop support for it. Select threads instead.') return errors # --------------------------- UTLIS functions -------------------------------------------- def _getPdbFileNames(self): """ This function returns the input file/s stored in a list. """ pbdObj = self.pdbObj.get() if isinstance(pbdObj, AtomStruct): # If it is a single AtomStruct, place it inside a list of 1 element return [pbdObj.getFileName()] else: # If it is a SetOfAtom Structs, get all of the elements iterating the set return [i.getFileName() for i in pbdObj] def _convertAtomStruct(self, pdbRaw): """ This function receives an atomic struct file, and conditionally converts it to .pdb if it is in .cif format. """ # Get output path for atomic struct file baseExt = replaceBaseExt(pdbRaw, 'pdb') if self.convertCif.get() else os.path.basename(pdbRaw) convertedPdb = self._getExtraPath(baseExt).replace(" ", "_") # If file extension is .cif and conversion is required, convert to .pdb, or else we link it as is if getExt(pdbRaw) == ".cif" and self.convertCif.get(): cifToPdb(pdbRaw, convertedPdb) else: createLink(pdbRaw, convertedPdb) # Return optionally converted file return convertedPdb def _getConversionArgs(self, pdb, samplingRate, outputFile, isSet=False): """ This function generates the arguments to convert an input pdb to a volume. """ # Main arguments args = '-i "%s" --sampling %f -o "%s"' % (pdb, samplingRate, outputFile) # Flag for centered pdbs if self.centerPdb: args += ' --centerPDB' # Flag for output pdbs if self.outPdb: args += ' --oPDB' # Setting size and origin if selected if isSet: args += ' --size %d' % (self.size.get()) else: if self.vol: vol = self.volObj.get() size = vol.getDim() ccp4header = headers.Ccp4Header(vol.getFileName(), readHeader=True) self.shifts = ccp4header.getOrigin() args += ' --size %d %d %d --orig %d %d %d' % (size[2], size[1], size[0], self.shifts[0]/samplingRate, self.shifts[1]/samplingRate, self.shifts[2]/samplingRate) else: if self.setSize: args += ' --size' if self.size_x.hasValue(): args += ' %d' % self.size_x.get() if self.size_y.hasValue() and self.size_z.hasValue(): args += ' %d %d' % (self.size_y.get(), self.size_z.get()) # Returning produced args return args