Source code for tomo.convert.mdoc

import os
import sys
from os.path import join, exists
from pyworkflow.utils import getParentFolder, removeBaseExt
from pathlib import PureWindowsPath


[docs]class MDoc: """class to interact with the IMOD "autodoc" files used by serialEM. This format consists of keyword-value pairs organized into blocks called sections. A section begins with a bracketed key-value pair: [sectionType = name] where the section "name" or value will typically be unique. Lines below a section header of the form key = value provide data associated with that section. In addition, key-value pairs can occur at the beginning of the file, before any section header, and these are referred to as global values. Files with extension ".mdoc" provides data about an MRC file and has the same name as the image file, with the additional extension ".mdoc". In these files, the main section type is "ZValue" and the name for each section is the Z value of the image in the file, numbered from 0. A description of each key is available at URL: https://bio3d.colorado.edu/SerialEM/hlp/html/about_formats.htm Additional information may be stored in section headers of the type "T" (i.e. [T a = b ]). In theory these information in also stored in the "titles" of the MRC files. Example of mdoc file DataMode = 6 # MRC type ImageSize = 4096 4096 # x, y dim ImageFile = 20211130_PKV_tomo_01.mrc # image with tilt series PixelSpacing = 3.64 # A/px Voltage = 200.00 # kvolt [T = Tomography: TALOS-D3558 21-Nov-30 17:42:06] [T = TiltAxisAngle = -91.81 Binning = 1 SpotSize = 7] [ZValue = 0] # first movie TiltAngle = -0.01 ... [ZValue = 1] # second movie TiltAngle = -10.00 ... """ def __init__(self, fileName, voltage=None, magnification=None, samplingRate=None, doseProvidedByUser=None, tiltAngleProvidedByUser=None): self._mdocFileName = fileName self._tsId = None # Dose related attributes self.doseProvidedByUser = doseProvidedByUser self.mdocHasDose = False # Acquisition general attributes self._voltage = voltage self._magnification = magnification self._samplingRate = samplingRate self._tiltAxisAngle = tiltAngleProvidedByUser # Acquisition specific attributes (per angle) self._tiltsMetadata = []
[docs] def read(self, isImportingTsMovies=True, ignoreFilesValidation=False): """ Parse mdoc file. There is a collection of getters that may be used to access this information: """ validateTSFromMdocErrMsg = '' tsFile = None mdoc = self._mdocFileName headerDict, zSlices = self._parseMdoc() # Get acquisition general info self._getAcquisitionInfoFromMdoc(headerDict, zSlices[0]) self._tsId = normalizeTSId(mdoc) parentFolder = getParentFolder(mdoc) if not isImportingTsMovies: # Some mdoc files point to an .st file stored in the ImageFile # header line tsFile = join(parentFolder, headerDict.get("ImageFile", None)) if not os.path.exists(tsFile): tsFile = join(parentFolder, self._tsId + '.mrcs') if not os.path.exists(tsFile): tsFile = join(parentFolder, self._tsId + '.mrc') if not os.path.exists(tsFile): tsFile = join(parentFolder, self._tsId + '.st') validateTSFromMdocErrMsg = self._validateTSFromMdoc(mdoc, tsFile) # Get acquisition specific (per angle) info self._getSlicesData(zSlices, tsFile) # Check Mdoc info read validateMdocContentsErrorMsgList = self._validateMdocInfoRead( ignoreFilesValidation=ignoreFilesValidation or not isImportingTsMovies) # Check all the possible errors found exceptionMsg = '' if validateTSFromMdocErrMsg: exceptionMsg += validateTSFromMdocErrMsg if validateMdocContentsErrorMsgList: exceptionMsg += ' '.join(validateMdocContentsErrorMsgList) # If ts images we are assuming slices follow the angle order if not isImportingTsMovies and not exceptionMsg: self._tiltsMetadata.sort(key=lambda x: float(x.getTiltAngle()), reverse=False) for index, tiMd in enumerate(self._tiltsMetadata): tiMd.setAngleMovieFile((index+1, tiMd.getAngleMovieFile())) return exceptionMsg
def _parseMdoc(self): """ Parse the mdoc file and return a list with a dict key=value for each of the [Zvalue = X] sections and a dictionary for the first lines global variables. :return: dictionary (header), list of dictionaries (Z slices) """ headerDict = {} headerParsed = False zvalueList = [] # list of dictionaries with with open(self._mdocFileName) as f: for line in f: if line.startswith('[ZValue'): # each tilt movie # We have found a new z value headerParsed = True zvalue = int(line.split(']')[0].split('=')[1]) if zvalue != len(zvalueList): raise Exception("Unexpected ZValue = %d" % zvalue) zvalueDict = {} zvalueList.append(zvalueDict) elif line.startswith('[T'): # auxiliary global information if self.getTiltAxisAngle(): # It's in the mdoc, but the user has specified # it manually continue else: strLine = line.strip().replace(' ', '').\ replace(',', '').lower() pattern = 'tiltaxisangle=' if pattern in strLine: # Example of the most common syntax # [T = Tilt axis angle = 90.1, binning = 1 # spot = 9 camera = 0] # [T = TiltAxisAngle = -91.81 Binning = 1 # SpotSize = 7] tiltAxisAngle = \ strLine.split('=')[2].split('binning')[0] # Check if it's a string which # represents a float or not if tiltAxisAngle.lstrip('-+').\ replace('.', '', 1).isdigit(): self._tiltAxisAngle = float(tiltAxisAngle) elif line.strip(): # global variables no in [T sections] key, value = line.split('=') if not headerParsed: headerDict[key.strip()] = value.strip() if zvalueList: zvalueDict[key.strip()] = value.strip() return headerDict, zvalueList def _getAcquisitionInfoFromMdoc(self, headerDict, firstSlice): """Acquisition data is read from to data sources (from higher to lower priority): - From the values introduced in the form by the user (introduced as attributes of the MDoc object) - From the first ZSlice data. - From the file header data.""" if not self.getVoltage(): VOLTAGE = 'Voltage' self._voltage = \ firstSlice.get(VOLTAGE, headerDict.get(VOLTAGE, self._voltage)) if not self.getMagnification(): MAGNIFICATION = 'Magnification' self._magnification = \ firstSlice.get(MAGNIFICATION, headerDict.get(MAGNIFICATION, self._magnification)) if not self.getSamplingRate(): PIXEL_SPACING = 'PixelSpacing' self._samplingRate = \ firstSlice.get(PIXEL_SPACING, headerDict.get(PIXEL_SPACING, self._samplingRate)) def _getSlicesData(self, zSlices, tsFile): parentFolder = getParentFolder(self._mdocFileName) accumulatedDose = 0 for counter, zSlice in enumerate(zSlices): if self.doseProvidedByUser: incomingDose = self.doseProvidedByUser else: incomingDose = \ self._getDoseFromMdoc(zSlice, self.getSamplingRate()) accumulatedDose += incomingDose self._tiltsMetadata.append(TiltMetadata( angle=zSlice.get('TiltAngle', None), angleFile=self._getAngleMovieFileName( parentFolder, zSlice, tsFile), acqOrder=counter+1, accumDose=accumulatedDose, incomingDose=incomingDose )) # round is used to make the condition more robust # for cases like 0.0000000001 if round(accumulatedDose) > 0: self.mdocHasDose = True @staticmethod def _getAngleMovieFileName(parentFolder, zSlice, tsFile): if tsFile: return tsFile else: # PureWindowsPath pathlib is used to make # possible deal with different path separators, like \\ return join(parentFolder, PureWindowsPath( zSlice['SubFramePath']).parts[-1]) @staticmethod def _getDoseFromMdoc(zSlice, pixelSize): """It calculates the accumulated dose on the frames represented by zSlice, and add it to the previous accumulated dose""" # Dose on specimen during camera exposure in electrons/sq. Angstrom EXPOSURE_DOSE = 'ExposureDose' # Dose per frame in electrons per square Angstrom followed # by number of frames at that dose FRAME_DOSES_AND_NUMBERS = 'FrameDosesAndNumber' # Dose rate to the camera, in electrons per unbinned pixel per second DOSE_RATE = 'DoseRate' # Image exposure time EXPOSURE_TIME = 'ExposureTime' # Minimum, maximum, and mean value for this image MIN_MAX_MEAN = 'MinMaxMean' COUNTS_PER_ELECTRON = 'CountsPerElectron' DIVIDED_BY_TWO = 'DividedBy2' def _keysInDict(listOfKeys): return all([key in zSlice.keys() for key in listOfKeys]) # Different ways of calculating the dose, ordered by priority # considering the possible variability between different mdoc files newDose = 0 if pixelSize: pixelSize = float(pixelSize) else: # This case cover the possibility of no sampling rate in form # nor mdoc, avoiding the error execution before the whole # information is gathered and the exception is raised pixelSize = 1 # Directly from field ExposureDose if EXPOSURE_DOSE in zSlice: expDoseVal = zSlice[EXPOSURE_DOSE] if expDoseVal: newDose = float(expDoseVal) # Directly from field FrameDosesAndNumbers if not newDose and FRAME_DOSES_AND_NUMBERS in zSlice: frameDoseAndNums = zSlice[FRAME_DOSES_AND_NUMBERS] if frameDoseAndNums: # Get the mean from a string like '0 6' data = frameDoseAndNums.split() dosePerFrame = data[0] nFrames = data[1] newDose = float(dosePerFrame) * float(nFrames) # Calculated from fields DoseRate and ExposureTime if not newDose and _keysInDict([DOSE_RATE, EXPOSURE_TIME]): doseRate = zSlice[DOSE_RATE] expTime = zSlice[EXPOSURE_TIME] if doseRate and expTime: newDose = float(doseRate) * float(expTime) / pixelSize ** 2 # Calculated from fields MinMaxMean, PixelSpacing and CountsPerElectron if not newDose and _keysInDict([MIN_MAX_MEAN, COUNTS_PER_ELECTRON]): minMaxMean = zSlice[MIN_MAX_MEAN] counts = zSlice[COUNTS_PER_ELECTRON] if all([minMaxMean, pixelSize, counts]): # Get the mean from a string like '-42 2441 51.7968' meanVal = minMaxMean.split()[-1] newDose = (float(meanVal) / float(counts)) / pixelSize ** 2 # # Calculated as in Grigorieff paper --> # https://doi.org/10.7554/eLife.06980.001 # if not newDose and # _keysInDict([MIN_MAX_MEAN, COUNTS_PER_ELECTRON, # EXPOSURE_TIME, DIVIDED_BY_TWO]): # minMaxMean = zSlice[MIN_MAX_MEAN] # counts = zSlice[COUNTS_PER_ELECTRON] # expTime = zSlice[EXPOSURE_TIME] # divByTwo = zSlice[DIVIDED_BY_TWO] # if all([minMaxMean, counts, expTime]): # # Get the mean from a string like '-42 2441 51.7968' # meanVal = minMaxMean.split()[-1] # newDose = _getDivByTwoFactor(divByTwo) * float(meanVal) / # (float(counts) * float(expTime)) divByTwo = 0 if _keysInDict([DIVIDED_BY_TWO]): divByTwo = int(zSlice[DIVIDED_BY_TWO]) divByTwoFactor = 2 if divByTwo else 1 return newDose * divByTwoFactor @staticmethod def _validateTSFromMdoc(mdoc, tsFile): errMsg = '' if not exists(tsFile): errMsg = '\nMdoc --> %s\nExpected tilt series file not found \n%s'\ % (mdoc, tsFile) return errMsg def _validateMdocInfoRead(self, ignoreFilesValidation=False): validateMdocContentsErrorMsgList = [] msg = ['\n*Data not found in file*\n%s:\n' % self._mdocFileName] missingFiles = [] missingAnglesIndices = [] for i, tiltMetadata in enumerate(self._tiltsMetadata): # Check the angles if not tiltMetadata.getTiltAngle(): missingAnglesIndices.append(str(i)) # Check the angle stack files read if not ignoreFilesValidation: # Ignore the files validation is sometimes used for # test purposes file = tiltMetadata.getAngleMovieFile() if not exists(file): missingFiles.append(file) if not self._voltage: msg.append('*Voltage*\n') if not self._magnification: msg.append('*Magnification*\n') if not self._samplingRate: msg.append('*PixelSpacing*\n') if not self.mdocHasDose: msg.append('Not able to get the *dose* with the data read.\n' 'Dose related mdoc labels are:\n\n' '- ExposureDose or\n' '- FrameDosesAndNumber or\n' '- DoseRate and ExposureTime or\n' '- MinMaxMean and CountsPerElectron' ) if not self.getTiltAxisAngle(): msg.append('*RotationAngle (tilt axis angle)*') if missingAnglesIndices: msg.append('*TiltAngle*: %s\n' % ' '.join(missingAnglesIndices)) if missingFiles: msg.append('*Missing files*:\n%s\n' % ' '.join(missingFiles)) if len(msg) > 1: validateMdocContentsErrorMsgList.append(' '.join(msg)) return validateMdocContentsErrorMsgList
[docs] def getFileName(self): return self._mdocFileName
[docs] def getTsId(self): return self._tsId
[docs] def getVoltage(self): return self._voltage
[docs] def getMagnification(self): return self._magnification
[docs] def getSamplingRate(self): return self._samplingRate
[docs] def getTiltsMetadata(self): return self._tiltsMetadata
[docs] def getTiltAxisAngle(self): return self._tiltAxisAngle
[docs]class TiltMetadata: def __init__(self, angle=None, angleFile=None, acqOrder=None, accumDose=None, incomingDose=None): self._angle = angle self._angleFile = angleFile self._acqOrder = acqOrder self._accumDose = accumDose self._incomingDose = incomingDose
[docs] def setTiltAngle(self, tiltAngle): self._angle = tiltAngle
[docs] def setAngleMovieFile(self, angleMovieFile): self._angleFile = angleMovieFile
[docs] def setAcqOorder(self, order): self._acqOrder = order
[docs] def setAccumDose(self, accumDose): self._accumDose = accumDose
[docs] def setIncomingDose(self, incDose): self._incomingDose = incDose
[docs] def getTiltAngle(self): return self._angle
[docs] def getAngleMovieFile(self): return self._angleFile
[docs] def getAcqOrder(self): return self._acqOrder
[docs] def getAccumDose(self): return self._accumDose
# TODO: remove since this is a redefinition (see above) # def getAcqOrder(self): # return self._acqOrder
[docs] def getIncomingDose(self): return self._incomingDose
[docs]def normalizeTSId(rawTSId): """ Normalizes the name of a TS to prevent sqlite errors, it ends up as a table in a set""" # remove paths and extension normTSID = removeBaseExt(rawTSId) # Avoid dots case: TS_234.mrc.mdoc normTSID = normTSID.split(".")[0] if normTSID[0].isdigit(): normTSID = "TS_" + normTSID return normTSID.replace('-', '_').replace('.', '')