Source code for continuousflex.protocols.utilities.genesis_utilities

import numpy as np
import os
import copy
from Bio.SVDSuperimposer import SVDSuperimposer
from pyworkflow.utils import runCommand, buildRunCommand
from xmippLib import SymList
import pwem.emlib.metadata as md
import sys
from subprocess import Popen
import re


EMFIT_NONE = 0
EMFIT_VOLUMES = 1
EMFIT_IMAGES = 2

FORCEFIELD_CHARMM = 0
FORCEFIELD_AAGO = 1
FORCEFIELD_CAGO = 2

SIMULATION_MIN = 0
SIMULATION_MD = 1
SIMULATION_NMMD = 2
SIMULATION_REMD = 3
SIMULATION_RENMMD = 4

PROGRAM_ATDYN = 0
PROGRAM_SPDYN= 1

INTEGRATOR_VVERLET = 0
INTEGRATOR_LEAPFROG = 1

IMPLICIT_SOLVENT_GBSA = 0
IMPLICIT_SOLVENT_NONE = 1

TPCONTROL_NONE = 0
TPCONTROL_LANGEVIN = 1
TPCONTROL_BERENDSEN = 2
TPCONTROL_BUSSI = 3

ENSEMBLE_NVT = 0
ENSEMBLE_NVE = 1
ENSEMBLE_NPT = 2

BOUNDARY_NOBC = 0
BOUNDARY_PBC = 1

ELECTROSTATICS_PME = 0
ELECTROSTATICS_CUTOFF = 1

NUCLEIC_NO = 0
NUCLEIC_RNA =1
NUCLEIC_DNA = 2

RB_PROJMATCH = 0
RB_WAVELET = 1

[docs]class PDBMol: def __init__(self, pdb_file): """ Contructor :param pdb_file: PDB file """ atom = [] atomNum = [] atomName = [] resName = [] resAlter = [] chainName = [] resNum = [] coords = [] occ = [] temp = [] chainID = [] elemName = [] print("> Reading pdb file %s ..." % pdb_file) with open(pdb_file, "r") as f: for line in f: spl = line.split() if len(spl) > 0: if (spl[0] == 'ATOM'): # or (hetatm and spl[0] == 'HETATM'): l = [line[:6], line[6:11], line[12:16], line[16], line[17:21], line[21], line[22:26], line[30:38], line[38:46], line[46:54], line[54:60], line[60:66], line[72:76], line[76:78]] l = [i.strip() for i in l] atom.append(l[0]) atomNum.append(l[1]) atomName.append(l[2]) resAlter.append(l[3]) resName.append(l[4]) chainName.append(l[5]) resNum.append(l[6]) coords.append([float(l[7]), float(l[8]), float(l[9])]) occ.append(l[10]) temp.append(l[11]) chainID.append(l[12]) elemName.append(l[13]) atomNum = np.array(atomNum) atomNum[np.where(atomNum == "*****")[0]] = "-1" self.atom = np.array(atom, dtype='<U6') self.n_atoms = len(self.atom) self.atomNum = np.array(atomNum).astype(int) self.atomName = np.array(atomName, dtype='<U4') self.resName = np.array(resName, dtype='<U4') self.resAlter = np.array(resAlter, dtype='<U1') self.chainName = np.array(chainName, dtype='<U1') self.resNum = np.array(resNum).astype(int) self.coords = np.array(coords).astype(float) self.occ = np.array(occ).astype(float) self.temp = np.array(temp).astype(float) self.chainID = np.array(chainID, dtype='<U4') self.elemName = np.array(elemName, dtype='<U2') if self.n_atoms == 0 : raise RuntimeError("Could not read PDB file : PDB file is empty") print("\t Done \n")
[docs] def save(self, file): """ Save to PDB Format :param file: pdb file path """ print("> Saving pdb file %s ..." % file) with open(file, "w") as file: past_chainName = self.chainName[0] past_chainID = self.chainID[0] for i in range(len(self.atom)): if past_chainName != self.chainName[i] or past_chainID != self.chainID[i]: past_chainName = self.chainName[i] past_chainID = self.chainID[i] file.write("TER\n") atom = self.atom[i].ljust(6) # atom#6s if self.atomNum[i] == -1 or self.atomNum[i] >= 100000: atomNum = "99999" # aomnum#5d else: atomNum = str(self.atomNum[i]).rjust(5) # aomnum#5d atomName = self.atomName[i].ljust(4) # atomname$#4s resAlter = self.resAlter[i].ljust(1) # resAlter#1 resName = self.resName[i].ljust(4) # resname#1s chainName = self.chainName[i].rjust(1) # Astring resNum = str(self.resNum[i]).rjust(4) # resnum coordx = str('%8.3f' % (float(self.coords[i][0]))).rjust(8) # x coordy = str('%8.3f' % (float(self.coords[i][1]))).rjust(8) # y coordz = str('%8.3f' % (float(self.coords[i][2]))).rjust(8) # z\ occ = str('%6.2f' % self.occ[i]).rjust(6) # occ temp = str('%6.2f' % self.temp[i]).rjust(6) # temp chainID = str(self.chainID[i]).ljust(4) # elname elemName = str(self.elemName[i]).rjust(2) # elname file.write("%s%s %s%s%s%s%s %s%s%s%s%s %s%s\n" % ( atom, atomNum, atomName, resAlter, resName, chainName, resNum, coordx, coordy, coordz, occ, temp, chainID, elemName)) file.write("END\n") print("\t Done \n")
[docs] def select_atoms(self, idx): self.coords = self.coords[idx] self.n_atoms = self.coords.shape[0] self.atom = self.atom[idx] self.atomNum = self.atomNum[idx] self.atomName = self.atomName[idx] self.resName = self.resName[idx] self.resAlter = self.resAlter[idx] self.chainName = self.chainName[idx] self.resNum = self.resNum[idx] self.elemName = self.elemName[idx] self.occ = self.occ[idx] self.temp = self.temp[idx] self.chainID = self.chainID[idx]
[docs] def get_chain(self, chainName): if not isinstance(chainName, list): chainName=[chainName] chainidx =[] for i in chainName: idx = np.where(self.chainName == i)[0] if len(idx) == 0: idx= np.where(self.chainID == i)[0] chainidx = chainidx + list(idx) return np.array(chainidx)
[docs] def select_chain(self, chainName): self.select_atoms(self.get_chain(chainName))
[docs] def copy(self): return copy.deepcopy(self)
[docs] def remove_alter_atom(self): idx = [] for i in range(self.n_atoms): if self.resAlter[i] != "": print("!!! Alter residue %s for atom %i"%(self.resName[i], self.atomNum[i])) if self.resAlter[i] == "A": idx.append(i) self.resAlter[i]="" else: idx.append(i) self.select_atoms(idx)
[docs] def remove_hydrogens(self): idx=[] for i in range(self.n_atoms): if not self.atomName[i].startswith("H"): idx.append(i) self.select_atoms(idx)
[docs] def alias_atom(self, atomName, atomNew, resName=None): n_alias = 0 for i in range(self.n_atoms): if self.atomName[i] == atomName: if resName is not None : if self.resName[i] == resName : self.atomName[i] = atomNew n_alias+=1 else: self.atomName[i] = atomNew n_alias+=1 print("%s -> %s : %i lines changed"%(atomName, atomNew, n_alias))
[docs] def alias_res(self, resName, resNew): n_alias=0 for i in range(self.n_atoms): if self.resName[i] == resName : self.resName[i] = resNew n_alias+=1 print("%s -> %s : %i lines changed"%(resName ,resNew, n_alias))
[docs] def add_terminal_res(self): aa = ["ALA", "CYS", "ASP", "GLU", "PHE", "GLY", "HIS", "ILE", "LYS", "LEU", "MET", "ASN", "PRO", "GLN", "ARG", "SER", "THR", "VAL", "TRP", "TYR"] past_chainName = self.chainName[0] past_chainID = self.chainID[0] for i in range(self.n_atoms-1): if past_chainName != self.chainName[i+1] or past_chainID != self.chainID[i+1]: if self.resName[i] in aa : print("End of chain %s ; adding terminal residue to %s %i %s"% (past_chainID,self.resName[i],self.resNum[i],self.atomName[i])) resNum = self.resNum[i] j=0 while self.resNum[i-j] ==resNum : self.resName[i - j] += "T" j+=1 else: print("End of chain %s %s %i"% (past_chainID,self.resName[i],self.resNum[i])) past_chainName = self.chainName[i+1] past_chainID = self.chainID[i+1] i = self.n_atoms-1 if self.resName[i] in aa: print("End of chain %s ; adding terminal residue to %s %i %s" % ( past_chainID, self.resName[i], self.resNum[i], self.atomName[i])) resNum = self.resNum[i] j = 0 while self.resNum[i - j] == resNum: self.resName[i - j] += "T" j += 1 else: print("End of chain %s %s %i" % (past_chainID, self.resName[i], self.resNum[i]))
[docs] def check_res_order(self): chains = list(set(self.chainID)) chains.sort() new_idx = [] for c in chains: chain_idx = self.get_chain(c) resNumlist = list(set(self.resNum[chain_idx])) resNumlist.sort() for i in range(len(resNumlist)): idx = np.where(self.resNum[chain_idx] == resNumlist[i])[0] new_idx += list(chain_idx[idx]) self.select_atoms(np.array(new_idx))
[docs] def atom_res_reorder(self): chains = list(set(self.chainID)) chains.sort() # reorder atoms and res for c in chains: chain_idx = self.get_chain(c) past_resNum = self.resNum[chain_idx[0]] resNum = 1 for i in range(len(chain_idx)): if self.resNum[chain_idx[i]] != past_resNum: if self.resNum[chain_idx[i]] != past_resNum+1: print("ERROR : non sequential residue number in one segment") # past_resNum = self.resNum[chain_idx[i]] # resNum += 1 # self.resNum[chain_idx[i]] = resNum self.atomNum[chain_idx[i]] = i + 1
[docs] def allatoms2ca(self): new_idx = [] for i in range(self.n_atoms): if self.atomName[i] == "CA" or self.atomName[i] == "P": new_idx.append(i) return np.array(new_idx)
[docs] def center(self): self.coords -= np.mean(self.coords, axis=0)
[docs]def matchPDBatoms(mols, ca_only=False): print("> Matching PDBs atoms ...") n_mols = len(mols) if mols[0].chainID[0] in mols[1].chainID: chaintype = 1 print("\t Matching segments ... ") elif mols[0].chainName[0] in mols[1].chainName: chaintype = 0 print("\t Matching chains ... ") else: raise RuntimeError("\t Warning : No matching chains") ids = [] ids_idx = [] for m in mols : id_tmp=[] id_idx_tmp=[] for i in range(m.n_atoms): if (not ca_only) or m.atomName[i] == "CA" or m.atomName[i] == "P": id_tmp.append("%s_%i_%s_%s"%(m.chainName[i] if chaintype == 0 else m.chainID[i], m.resNum[i], m.resName[i] , m.atomName[i])) id_idx_tmp.append(i) ids.append(np.array(id_tmp)) ids_idx.append(np.array(id_idx_tmp)) idx = [] for i in range(len(ids[0])): idx_line = [ids_idx[0][i]] for m in range(1,n_mols): idx_tmp = np.where(ids[0][i] == ids[m])[0] if len(idx_tmp) == 1: idx_line.append(ids_idx[m][idx_tmp[0]]) elif len(idx_tmp) > 1: print("\t Warning : One atom in mol#0 is matching several atoms in mol#%i : "%m) if len(idx_line) == n_mols : idx.append(idx_line) if len(idx)==0: print("\t Warning : No matching coordinates") print("\t %i matching atoms "%len(np.array(idx))) print("\t Done") return np.array(idx)
[docs]def generatePSF(inputPDB, inputTopo, outputPrefix, nucleicChoice): fnPSFgen = outputPrefix+"psfgen.tcl" with open(fnPSFgen, "w") as psfgen: psfgen.write("mol load pdb %s\n" % inputPDB) psfgen.write("\n") psfgen.write("package require psfgen\n") psfgen.write("topology %s\n" % inputTopo) psfgen.write("pdbalias residue HIS HSE\n") psfgen.write("pdbalias residue MSE MET\n") psfgen.write("pdbalias atom ILE CD1 CD\n") if nucleicChoice == NUCLEIC_RNA: psfgen.write("pdbalias residue A ADE\n") psfgen.write("pdbalias residue G GUA\n") psfgen.write("pdbalias residue C CYT\n") psfgen.write("pdbalias residue U URA\n") elif nucleicChoice == NUCLEIC_DNA: psfgen.write("pdbalias residue DA ADE\n") psfgen.write("pdbalias residue DG GUA\n") psfgen.write("pdbalias residue DC CYT\n") psfgen.write("pdbalias residue DT THY\n") psfgen.write("\n") if nucleicChoice == NUCLEIC_RNA or nucleicChoice == NUCLEIC_DNA: psfgen.write("set nucleic [atomselect top nucleic]\n") psfgen.write("set chains [lsort -unique [$nucleic get chain]] ;\n") psfgen.write("foreach chain $chains {\n") psfgen.write(" set sel [atomselect top \"nucleic and chain $chain\"]\n") psfgen.write(" $sel writepdb %s_tmp.pdb\n" % outputPrefix) psfgen.write(" segment N${chain} { pdb %s_tmp.pdb }\n" % outputPrefix) psfgen.write(" coordpdb %s_tmp.pdb N${chain}\n" % outputPrefix) if nucleicChoice == NUCLEIC_DNA: psfgen.write(" set resids [lsort -unique [$sel get resid]]\n") psfgen.write(" foreach r $resids {\n") psfgen.write(" patch DEOX N${chain}:$r\n") psfgen.write(" }\n") psfgen.write("}\n") if nucleicChoice == NUCLEIC_DNA: psfgen.write("regenerate angles dihedrals\n") psfgen.write("\n") psfgen.write("set protein [atomselect top protein]\n") psfgen.write("set chains [lsort -unique [$protein get pfrag]]\n") psfgen.write("foreach chain $chains {\n") psfgen.write(" set sel [atomselect top \"protein and pfrag $chain\"]\n") psfgen.write(" $sel writepdb %s_tmp.pdb\n" % outputPrefix) psfgen.write(" segment P${chain} {pdb %s_tmp.pdb}\n" % outputPrefix) psfgen.write(" coordpdb %s_tmp.pdb P${chain}\n" % outputPrefix) psfgen.write("}\n") psfgen.write("rm -f %s_tmp.pdb\n" % outputPrefix) psfgen.write("\n") psfgen.write("guesscoord\n") psfgen.write("writepdb %s.pdb\n" % outputPrefix) psfgen.write("writepsf %s.psf\n" % outputPrefix) psfgen.write("exit\n") #Run VMD PSFGEN runCommand("vmd -dispdev text -e %s > %s.log " %(fnPSFgen,outputPrefix)) # Check PDB outMol = PDBMol(outputPrefix+".pdb") if outMol.n_atoms == 0: raise RuntimeError("VMD psfgen failed, check %s.log for details"%outputPrefix) #Clean os.system("rm -f " + fnPSFgen)
[docs]def generateGROTOP(inputPDB, outputPrefix, forcefield, smog_dir, nucleicChoice): mol = PDBMol(inputPDB) # mol.remove_alter_atom() mol.remove_hydrogens() mol.check_res_order() moltmp = mol.copy() moltmp.alias_atom("CD", "CD1", "ILE") moltmp.alias_atom("OT1", "O") moltmp.alias_atom("OT2", "OXT") moltmp.alias_res("HSE", "HIS") if nucleicChoice == NUCLEIC_RNA: moltmp.alias_res("CYT", "C") moltmp.alias_res("GUA", "G") moltmp.alias_res("ADE", "A") moltmp.alias_res("URA", "U") elif nucleicChoice == NUCLEIC_DNA: moltmp.alias_res("CYT", "DC") moltmp.alias_res("GUA", "DG") moltmp.alias_res("ADE", "DA") moltmp.alias_res("THY", "DT") moltmp.alias_atom("O1'", "O1*") moltmp.alias_atom("O2'", "O2*") moltmp.alias_atom("O3'", "O3*") moltmp.alias_atom("O4'", "O4*") moltmp.alias_atom("O5'", "O5*") moltmp.alias_atom("C1'", "C1*") moltmp.alias_atom("C2'", "C2*") moltmp.alias_atom("C3'", "C3*") moltmp.alias_atom("C4'", "C4*") moltmp.alias_atom("C5'", "C5*") moltmp.alias_atom("C5M", "C7") moltmp.add_terminal_res() # moltmp.atom_res_reorder() moltmp.save(inputPDB) # Run Smog2 runCommand("%s/bin/smog2" % smog_dir+\ " -i %s -dname %s -%s -limitbondlength -limitcontactlength > %s.log" % (inputPDB, outputPrefix, "CA" if forcefield == FORCEFIELD_CAGO else "AA", outputPrefix)) if forcefield == FORCEFIELD_CAGO: mol.select_atoms(mol.allatoms2ca()) mol.save(outputPrefix+".pdb") # ADD CHARGE TO TOP FILE grotopFile = outputPrefix + ".top" with open(grotopFile, 'r') as f1: with open(grotopFile + ".tmp", 'w') as f2: atom_scope = False write_line = False for line in f1: if "[" in line and "]" in line: if "atoms" in line: atom_scope = True if atom_scope: if "[" in line and "]" in line: if not "atoms" in line: atom_scope = False write_line = False elif not ";" in line and not (not line or line.isspace()): write_line = True else: write_line = False if write_line: f2.write("%s\t0.0\n" % line[:-1]) else: f2.write(line) os.system("cp %s.tmp %s" % (grotopFile, grotopFile)) os.system("rm -f %s.tmp" % grotopFile)
[docs]def save_dcd(mol, coords_list, prefix): print("> Saving DCD trajectory ...") n_frames = len(coords_list) # saving PDBs mol = mol.copy() for i in range(n_frames): mol.coords = coords_list[i] mol.save("%s_frame%i.pdb" % (prefix, i)) # VMD command with open(prefix+"_cmd.tcl", "w") as f : f.write("mol new %s_frame0.pdb\n" % prefix) for i in range(1,n_frames): f.write("mol addfile %s_frame%i.pdb\n" % (prefix, i)) f.write("animate write dcd %s.dcd\n" % prefix) f.write('exit\n') # Running VMD runCommand("vmd -dispdev text -e %s_cmd.tcl" % prefix) # Cleaning for i in range(n_frames): runCommand("rm -f %s_frame%i.pdb\n" % (prefix, i)) runCommand("rm -f %s_cmd.tcl" % prefix) print("\t Done \n")
[docs]def alignMol(mol1, mol2, idx=None): print("> Aligning PDB ...") sup = SVDSuperimposer() if idx is not None: c1 = mol1.coords[idx[:, 0]] c2 = mol2.coords[idx[:, 1]] else: c1 = mol1.coords c2 = mol2.coords sup.set(c1, c2) sup.run() rot, tran = sup.get_rotran() mol2.coords = np.dot(mol2.coords, rot) + tran print("\t Done \n")
[docs]def readLogFile(log_file): with open(log_file,"r") as file: header = None dic = {} for line in file: if line.startswith("INFO:"): if header is None: header = line.split() for i in range(1,len(header)): dic[header[i]] = [] else: splitline = line.split() if len(splitline) == len(header): for i in range(1,len(header)): try : dic[header[i]].append(float(splitline[i])) except ValueError: pass return dic
[docs]def getRMSD(mol1,mol2, align = False, idx=None): if align: alignMol(mol1, mol2, idx=idx) if idx is not None: coord1 = mol1.coords[idx[:, 0]] coord2 = mol2.coords[idx[:, 1]] else: coord1 = mol1.coords coord2 = mol2.coords return np.sqrt(np.mean(np.square(np.linalg.norm(coord1 - coord2, axis=1))))
[docs]def rmsdFromDCD(outputPrefix, inputPDB, targetPDB, idx, align=False): # COMPUTE RMSD rmsd = [] inputPDBmol = PDBMol(inputPDB) targetPDBmol = PDBMol(targetPDB) rmsd.append(getRMSD(mol1 = inputPDBmol, mol2=targetPDBmol, align=align, idx=idx)) coord_arr = dcd2numpyArr(outputPrefix+".dcd") for i in range(len(coord_arr)): inputPDBmol.coords[:,:] = coord_arr[i] rmsd.append(getRMSD(mol1 = inputPDBmol, mol2=targetPDBmol, align=align, idx=idx)) # CLEAN TMP FILES AND SAVE runCommand("rm -f %stmp*" % (outputPrefix)) return rmsd
[docs]def lastPDBFromDCD(inputPDB,inputDCD, outputPDB): # EXTRACT PDB from dcd file with open("%s_tmp_dcd2pdb.tcl" % outputPDB, "w") as f: s = "" s += "mol load pdb %s dcd %s\n" % (inputPDB, inputDCD) s += "set nf [molinfo top get numframes]\n" s += "[atomselect top all frame [expr $nf - 1]] writepdb %s\n" % outputPDB s += "exit\n" f.write(s) runCommand("vmd -dispdev text -e %s_tmp_dcd2pdb.tcl" % outputPDB) # CLEAN TMP FILES runCommand("rm -f %s_tmp_dcd2pdb.tcl" % (outputPDB))
[docs]def runParallelJobs(commands, env=None, numberOfThreads=1, numberOfMpi=1, hostConfig=None): """ Run multiple commands in parallel. Wait until all commands returned :param list commands: list of commands to run in parallel :param dict env: Running environement of subprocesses :param numberOfThreads: Number of openMP threads :param numberOfMpi: Number of MPI cores :return None: """ # Set env if env is None: env = os.environ env["OMP_NUM_THREADS"] = str(numberOfThreads) # run process processes = [] for cmd in commands: programname, params = cmd.split(" ",1) cmd = buildRunCommand(programname, params, numberOfMpi=numberOfMpi, hostConfig=hostConfig, env=env) print("Running command : %s" %cmd) processes.append(Popen(cmd, shell=True, env=env, stdout=sys.stdout, stderr = sys.stderr)) # Wait for processes for i in range(len(processes)): exitcode = processes[i].wait() print("Process done %s" %str(exitcode)) if exitcode != 0: # raise RuntimeError("Command returned with errors : %s" %str(commands[i])) print("Command returned with errors : %s" %str(commands[i]))
[docs]def pdb2vol(inputPDB, outputVol, sampling_rate, image_size): """ Create a density volume from a pdb :param str inputPDB: input pdb file name :param str outputVol: output vol file name :param float sampling_rate: Sampling rate :param int image_size: Size of the output volume :return str: the Xmipp command to run """ cmd = "xmipp_volume_from_pdb" args = "-i %s -o %s --sampling %f --size %i %i %i --centerPDB"%\ (inputPDB, outputVol,sampling_rate,image_size,image_size,image_size) return cmd+ " "+ args
[docs]def projectVol(inputVol, outputProj, expImage, sampling_rate=5.0, angular_distance=-1, compute_neighbors=True): """ Create a set of projections from an input volume :param str inputVol: Input volume file name :param str outputProj: Output set of proj file name :param str expImage: Experimental image to project in the neighborhood :param float sampling_rate: Samplign rate :param float angular_distance: Do not search a distance larger than... :param bool compute_neighbors: Compute projection nearby the experimental image :return str: the Xmipp command to run """ cmd = "xmipp_angular_project_library" args = "-i %s.vol -o %s.stk --sampling_rate %f " % (inputVol, outputProj, sampling_rate) if compute_neighbors : args +="--compute_neighbors --angular_distance %f " % angular_distance args += "--experimental_images %s "%expImage if angular_distance != -1 : args += "--near_exp_data" return cmd+ " "+ args
[docs]def projectMatch(inputImage, inputProj, outputMeta): """ Projection matching of an input experimental image with a set of projections :param str inputImage: File name of the input experimental image :param str inputProj: File name of the input set of projections :param str outputMeta: File name of the output Xmipp metadata file with the angles of the matching :return str: the Xmipp command to run """ cmd = "xmipp_angular_projection_matching " args= "-i %s -o %s --ref %s.stk "%(inputImage, outputMeta, inputProj) args +="--search5d_shift 10.0 --search5d_step 1.0" return cmd + " "+ args
[docs]def waveletAssignement(inputImage, inputProj, outputMeta): """ Make a discrete angular assignment of angles from a set of projections :param str inputImage: File name of input experimental image :param str inputProj: File name of the input set of projections :param str outputMeta: File name of the output Xmipp metadata file with the angles assigned :return str: the Xmipp command to run """ cmd = "xmipp_angular_discrete_assign " args= "-i %s -o %s --ref %s.doc "%(inputImage, outputMeta, inputProj) args +="--psi_step 5.0 --max_shift_change 10.0 --search5D" return cmd + " "+ args
[docs]def continuousAssign(inputMeta, inputVol, outputMeta): """ Make a continuous angular assignment of angles from a volume :param str inputMeta: File name of input Xmipp metadata file with angles to assign :param str inputVol: File name of the input Volume :param str outputMeta: File name of the output Xmipp metadata file with the angles :return str: the Xmipp command to run """ cmd = "xmipp_angular_continuous_assign " args= "-i %s -o %s --ref %s.vol "%(inputMeta, outputMeta, inputVol) return cmd + " "+ args
[docs]def flipAngles(inputMeta, outputMeta): """ Flip angles from Xmipp representation to Euler angles :param str inputMeta : File name of input Xmipp metadata file containing the angles to flip :param str outputMeta: file name of the output Xmipp matadata file :return None: """ Md1 = md.MetaData(inputMeta) flip = Md1.getValue(md.MDL_FLIP, 1) tilt1 = Md1.getValue(md.MDL_ANGLE_TILT, 1) psi1 = Md1.getValue(md.MDL_ANGLE_PSI, 1) x1 = Md1.getValue(md.MDL_SHIFT_X, 1) if flip: Md1.setValue(md.MDL_SHIFT_X, -x1, 1) Md1.setValue(md.MDL_ANGLE_TILT, tilt1 + 180, 1) Md1.setValue(md.MDL_ANGLE_PSI, -psi1, 1) Md1.write(outputMeta)
# def getAngularDist(md1, md2, idx1=1, idx2=1): # rot1 = md1.getValue(md.MDL_ANGLE_ROT, int(idx1)) # tilt1 = md1.getValue(md.MDL_ANGLE_TILT, int(idx1)) # psi1 = md1.getValue(md.MDL_ANGLE_PSI, int(idx1)) # rot2 = md2.getValue(md.MDL_ANGLE_ROT, int(idx2)) # tilt2 = md2.getValue(md.MDL_ANGLE_TILT, int(idx2)) # psi2 = md2.getValue(md.MDL_ANGLE_PSI, int(idx2)) # # return SymList.computeDistanceAngles(SymList(), rot1, tilt1, psi1, rot2, tilt2, psi2, False, True, False) # # # def getShiftDist(md1, md2, idx1=1, idx2=1): # shiftx1 = md1.getValue(md.MDL_SHIFT_X, int(idx1)) # shifty1 = md1.getValue(md.MDL_SHIFT_Y, int(idx1)) # shiftx2 = md2.getValue(md.MDL_SHIFT_X, int(idx2)) # shifty2 = md2.getValue(md.MDL_SHIFT_Y, int(idx2)) # return np.linalg.norm(np.array([shiftx1, shifty1, 0.0]) - np.array([shiftx2, shifty2, 0.0]))
[docs]def getAngularShiftDist(angle1MetaFile, angle2MetaData, angle2Idx, tmpPrefix, symmetry): mdImgTmp = md.MetaData() mdImgTmp.addObject() mdImgTmp.setValue(md.MDL_ANGLE_ROT, angle2MetaData.getValue(md.MDL_ANGLE_ROT, angle2Idx), 1) mdImgTmp.setValue(md.MDL_ANGLE_TILT,angle2MetaData.getValue(md.MDL_ANGLE_TILT,angle2Idx), 1) mdImgTmp.setValue(md.MDL_ANGLE_PSI, angle2MetaData.getValue(md.MDL_ANGLE_PSI, angle2Idx), 1) mdImgTmp.setValue(md.MDL_SHIFT_X, angle2MetaData.getValue(md.MDL_SHIFT_X, angle2Idx), 1) mdImgTmp.setValue(md.MDL_SHIFT_Y, angle2MetaData.getValue(md.MDL_SHIFT_Y, angle2Idx), 1) mdImgTmp.write(tmpPrefix + ".xmd") cmd = "xmipp_angular_distance --ang1 %s --ang2 %s.xmd --oroot %sDist --sym %s --check_mirrors > %s.log" % \ (angle1MetaFile, tmpPrefix, tmpPrefix, symmetry, tmpPrefix) runCommand(cmd) with open(tmpPrefix + ".log", "r") as f: for line in f: if "angular" in line: angDist = float(re.findall("\d+\.\d+", line)[0]) if "shift" in line: shftDist = float(re.findall("\d+\.\d+", line)[0]) return angDist, shftDist
[docs]def dcd2numpyArr(filename): print("> Reading dcd file %s"%filename) with open(filename, 'rb') as f: # Header # ---------------- INIT start_size = int.from_bytes((f.read(4)), "little") crd_type = f.read(4).decode('ascii') nframe = int.from_bytes((f.read(4)), "little") start_frame = int.from_bytes((f.read(4)), "little") len_frame = int.from_bytes((f.read(4)), "little") len_total = int.from_bytes((f.read(4)), "little") for i in range(5): f.read(4) time_step = np.frombuffer(f.read(4), dtype=np.float32) for i in range(9): f.read(4) charmm_version = int.from_bytes((f.read(4)), "little") end_size = int.from_bytes((f.read(4)), "little") if end_size != start_size: raise RuntimeError("Can not read dcd file") # ---------------- TITLE start_size = int.from_bytes((f.read(4)), "little") ntitle = int.from_bytes((f.read(4)), "little") title = f.read(80 * ntitle).decode('ascii') end_size = int.from_bytes((f.read(4)), "little") if end_size != start_size: raise RuntimeError("Can not read dcd file") # ---------------- NATOM start_size = int.from_bytes((f.read(4)), "little") natom = int.from_bytes((f.read(4)), "little") end_size = int.from_bytes((f.read(4)), "little") if end_size != start_size: raise RuntimeError("Can not read dcd file") # ----------------- DCD COORD dcd_list = [] for i in range(nframe): coordarr = np.zeros((natom, 3)) for j in range(3): start_size = int.from_bytes((f.read(4)), "little") while (start_size != 4 * natom): # print("\n-- UNKNOWN %s -- " % start_size) f.read(start_size) end_size = int.from_bytes((f.read(4)), "little") if end_size != start_size: raise RuntimeError("Can not read dcd file") start_size = int.from_bytes((f.read(4)), "little") bin_arr = f.read(4 * natom) if len(bin_arr) == 4 * natom: coordarr[:, j] = np.frombuffer(bin_arr, dtype=np.float32) else: break end_size = int.from_bytes((f.read(4)), "little") if end_size != start_size: raise RuntimeError("Can not read dcd file %i %i " % (start_size, end_size)) dcd_list.append(coordarr) print("\t Done \n") return np.array(dcd_list)