import os
import sys
from os.path import join, exists
from pyworkflow.utils import getParentFolder, removeBaseExt
from pathlib import PureWindowsPath
[docs]class MDoc:
"""class to interact with the IMOD "autodoc" files used by serialEM.
This format consists of keyword-value pairs organized into blocks
called sections.
A section begins with a bracketed key-value pair:
[sectionType = name]
where the section "name" or value will typically be unique.
Lines below a section header of the form
key = value
provide data associated with that section.
In addition, key-value pairs can occur at the beginning of the file,
before any section header, and these are referred to as global values.
Files with extension ".mdoc" provides data about an MRC file and has
the same name as the image file, with the additional extension ".mdoc".
In these files, the main section type is "ZValue" and the name
for each section is the Z value of the image in the file, numbered from 0.
A description of each key is available at URL:
https://bio3d.colorado.edu/SerialEM/hlp/html/about_formats.htm
Additional information may be stored in section headers of the type "T"
(i.e. [T a = b ]). In theory these information in also stored in the
"titles" of the MRC files.
Example of mdoc file
DataMode = 6 # MRC type
ImageSize = 4096 4096 # x, y dim
ImageFile = 20211130_PKV_tomo_01.mrc # image with tilt series
PixelSpacing = 3.64 # A/px
Voltage = 200.00 # kvolt
[T = Tomography: TALOS-D3558 21-Nov-30 17:42:06]
[T = TiltAxisAngle = -91.81 Binning = 1 SpotSize = 7]
[ZValue = 0] # first movie
TiltAngle = -0.01
...
[ZValue = 1] # second movie
TiltAngle = -10.00
...
"""
def __init__(self, fileName, voltage=None, magnification=None,
samplingRate=None, doseProvidedByUser=None,
tiltAngleProvidedByUser=None):
self._mdocFileName = fileName
self._tsId = None
# Dose related attributes
self.doseProvidedByUser = doseProvidedByUser
self.mdocHasDose = False
# Acquisition general attributes
self._voltage = voltage
self._magnification = magnification
self._samplingRate = samplingRate
self._tiltAxisAngle = tiltAngleProvidedByUser
# Acquisition specific attributes (per angle)
self._tiltsMetadata = []
[docs] def read(self, isImportingTsMovies=True, ignoreFilesValidation=False):
""" Parse mdoc file. There is a collection of getters that may be
used to access this information:
"""
validateTSFromMdocErrMsg = ''
tsFile = None
mdoc = self._mdocFileName
headerDict, zSlices = self._parseMdoc()
# Get acquisition general info
self._getAcquisitionInfoFromMdoc(headerDict, zSlices[0])
self._tsId = normalizeTSId(mdoc)
parentFolder = getParentFolder(mdoc)
if not isImportingTsMovies:
# Some mdoc files point to an .st file stored in the ImageFile
# header line
tsFile = join(parentFolder, headerDict.get("ImageFile", None))
if not os.path.exists(tsFile):
tsFile = join(parentFolder, self._tsId + '.mrcs')
if not os.path.exists(tsFile):
tsFile = join(parentFolder, self._tsId + '.mrc')
if not os.path.exists(tsFile):
tsFile = join(parentFolder, self._tsId + '.st')
validateTSFromMdocErrMsg = self._validateTSFromMdoc(mdoc, tsFile)
# Get acquisition specific (per angle) info
self._getSlicesData(zSlices, tsFile)
# Check Mdoc info read
validateMdocContentsErrorMsgList = self._validateMdocInfoRead(
ignoreFilesValidation=ignoreFilesValidation or not
isImportingTsMovies)
# Check all the possible errors found
exceptionMsg = ''
if validateTSFromMdocErrMsg:
exceptionMsg += validateTSFromMdocErrMsg
if validateMdocContentsErrorMsgList:
exceptionMsg += ' '.join(validateMdocContentsErrorMsgList)
# If ts images we are assuming slices follow the angle order
if not isImportingTsMovies and not exceptionMsg:
self._tiltsMetadata.sort(key=lambda x: float(x.getTiltAngle()),
reverse=False)
for index, tiMd in enumerate(self._tiltsMetadata):
tiMd.setAngleMovieFile((index+1, tiMd.getAngleMovieFile()))
return exceptionMsg
def _parseMdoc(self):
"""
Parse the mdoc file and return a list with a dict key=value for each
of the [Zvalue = X] sections and a dictionary for the first lines
global variables.
:return: dictionary (header), list of dictionaries (Z slices)
"""
headerDict = {}
headerParsed = False
zvalueList = [] # list of dictionaries with
with open(self._mdocFileName) as f:
for line in f:
if line.startswith('[ZValue'): # each tilt movie
# We have found a new z value
headerParsed = True
zvalue = int(line.split(']')[0].split('=')[1])
if zvalue != len(zvalueList):
raise Exception("Unexpected ZValue = %d" % zvalue)
zvalueDict = {}
zvalueList.append(zvalueDict)
elif line.startswith('[T'): # auxiliary global information
if self.getTiltAxisAngle():
# It's in the mdoc, but the user has specified
# it manually
continue
else:
strLine = line.strip().replace(' ', '').\
replace(',', '').lower()
pattern = 'tiltaxisangle='
if pattern in strLine:
# Example of the most common syntax
# [T = Tilt axis angle = 90.1, binning = 1
# spot = 9 camera = 0]
# [T = TiltAxisAngle = -91.81 Binning = 1
# SpotSize = 7]
tiltAxisAngle = \
strLine.split('=')[2].split('binning')[0]
# Check if it's a string which
# represents a float or not
if tiltAxisAngle.lstrip('-+').\
replace('.', '', 1).isdigit():
self._tiltAxisAngle = float(tiltAxisAngle)
elif line.strip(): # global variables no in [T sections]
key, value = line.split('=')
if not headerParsed:
headerDict[key.strip()] = value.strip()
if zvalueList:
zvalueDict[key.strip()] = value.strip()
return headerDict, zvalueList
def _getAcquisitionInfoFromMdoc(self, headerDict, firstSlice):
"""Acquisition data is read from to data sources
(from higher to lower priority):
- From the values introduced in the form by
the user (introduced as attributes of the MDoc object)
- From the first ZSlice data.
- From the file header data."""
if not self.getVoltage():
VOLTAGE = 'Voltage'
self._voltage = \
firstSlice.get(VOLTAGE, headerDict.get(VOLTAGE, self._voltage))
if not self.getMagnification():
MAGNIFICATION = 'Magnification'
self._magnification = \
firstSlice.get(MAGNIFICATION,
headerDict.get(MAGNIFICATION,
self._magnification))
if not self.getSamplingRate():
PIXEL_SPACING = 'PixelSpacing'
self._samplingRate = \
firstSlice.get(PIXEL_SPACING,
headerDict.get(PIXEL_SPACING,
self._samplingRate))
def _getSlicesData(self, zSlices, tsFile):
parentFolder = getParentFolder(self._mdocFileName)
accumulatedDose = 0
for counter, zSlice in enumerate(zSlices):
if self.doseProvidedByUser:
incomingDose = self.doseProvidedByUser
else:
incomingDose = \
self._getDoseFromMdoc(zSlice, self.getSamplingRate())
accumulatedDose += incomingDose
self._tiltsMetadata.append(TiltMetadata(
angle=zSlice.get('TiltAngle', None),
angleFile=self._getAngleMovieFileName(
parentFolder, zSlice, tsFile),
acqOrder=counter+1,
accumDose=accumulatedDose,
incomingDose=incomingDose
))
# round is used to make the condition more robust
# for cases like 0.0000000001
if round(accumulatedDose) > 0:
self.mdocHasDose = True
@staticmethod
def _getAngleMovieFileName(parentFolder, zSlice, tsFile):
if tsFile:
return tsFile
else:
# PureWindowsPath pathlib is used to make
# possible deal with different path separators, like \\
return join(parentFolder, PureWindowsPath(
zSlice['SubFramePath']).parts[-1])
@staticmethod
def _getDoseFromMdoc(zSlice, pixelSize):
"""It calculates the accumulated dose on the frames represented by
zSlice, and add it to the previous accumulated dose"""
# Dose on specimen during camera exposure in electrons/sq. Angstrom
EXPOSURE_DOSE = 'ExposureDose'
# Dose per frame in electrons per square Angstrom followed
# by number of frames at that dose
FRAME_DOSES_AND_NUMBERS = 'FrameDosesAndNumber'
# Dose rate to the camera, in electrons per unbinned pixel per second
DOSE_RATE = 'DoseRate'
# Image exposure time
EXPOSURE_TIME = 'ExposureTime'
# Minimum, maximum, and mean value for this image
MIN_MAX_MEAN = 'MinMaxMean'
COUNTS_PER_ELECTRON = 'CountsPerElectron'
DIVIDED_BY_TWO = 'DividedBy2'
def _keysInDict(listOfKeys):
return all([key in zSlice.keys() for key in listOfKeys])
# Different ways of calculating the dose, ordered by priority
# considering the possible variability between different mdoc files
newDose = 0
if pixelSize:
pixelSize = float(pixelSize)
else:
# This case cover the possibility of no sampling rate in form
# nor mdoc, avoiding the error execution before the whole
# information is gathered and the exception is raised
pixelSize = 1
# Directly from field ExposureDose
if EXPOSURE_DOSE in zSlice:
expDoseVal = zSlice[EXPOSURE_DOSE]
if expDoseVal:
newDose = float(expDoseVal)
# Directly from field FrameDosesAndNumbers
if not newDose and FRAME_DOSES_AND_NUMBERS in zSlice:
frameDoseAndNums = zSlice[FRAME_DOSES_AND_NUMBERS]
if frameDoseAndNums:
# Get the mean from a string like '0 6'
data = frameDoseAndNums.split()
dosePerFrame = data[0]
nFrames = data[1]
newDose = float(dosePerFrame) * float(nFrames)
# Calculated from fields DoseRate and ExposureTime
if not newDose and _keysInDict([DOSE_RATE, EXPOSURE_TIME]):
doseRate = zSlice[DOSE_RATE]
expTime = zSlice[EXPOSURE_TIME]
if doseRate and expTime:
newDose = float(doseRate) * float(expTime) / pixelSize ** 2
# Calculated from fields MinMaxMean, PixelSpacing and CountsPerElectron
if not newDose and _keysInDict([MIN_MAX_MEAN, COUNTS_PER_ELECTRON]):
minMaxMean = zSlice[MIN_MAX_MEAN]
counts = zSlice[COUNTS_PER_ELECTRON]
if all([minMaxMean, pixelSize, counts]):
# Get the mean from a string like '-42 2441 51.7968'
meanVal = minMaxMean.split()[-1]
newDose = (float(meanVal) / float(counts)) / pixelSize ** 2
# # Calculated as in Grigorieff paper -->
# https://doi.org/10.7554/eLife.06980.001
# if not newDose and
# _keysInDict([MIN_MAX_MEAN, COUNTS_PER_ELECTRON,
# EXPOSURE_TIME, DIVIDED_BY_TWO]):
# minMaxMean = zSlice[MIN_MAX_MEAN]
# counts = zSlice[COUNTS_PER_ELECTRON]
# expTime = zSlice[EXPOSURE_TIME]
# divByTwo = zSlice[DIVIDED_BY_TWO]
# if all([minMaxMean, counts, expTime]):
# # Get the mean from a string like '-42 2441 51.7968'
# meanVal = minMaxMean.split()[-1]
# newDose = _getDivByTwoFactor(divByTwo) * float(meanVal) /
# (float(counts) * float(expTime))
divByTwo = 0
if _keysInDict([DIVIDED_BY_TWO]):
divByTwo = int(zSlice[DIVIDED_BY_TWO])
divByTwoFactor = 2 if divByTwo else 1
return newDose * divByTwoFactor
@staticmethod
def _validateTSFromMdoc(mdoc, tsFile):
errMsg = ''
if not exists(tsFile):
errMsg = '\nMdoc --> %s\nExpected tilt series file not found \n%s'\
% (mdoc, tsFile)
return errMsg
def _validateMdocInfoRead(self, ignoreFilesValidation=False):
validateMdocContentsErrorMsgList = []
msg = ['\n*Data not found in file*\n%s:\n' % self._mdocFileName]
missingFiles = []
missingAnglesIndices = []
for i, tiltMetadata in enumerate(self._tiltsMetadata):
# Check the angles
if not tiltMetadata.getTiltAngle():
missingAnglesIndices.append(str(i))
# Check the angle stack files read
if not ignoreFilesValidation:
# Ignore the files validation is sometimes used for
# test purposes
file = tiltMetadata.getAngleMovieFile()
if not exists(file):
missingFiles.append(file)
if not self._voltage:
msg.append('*Voltage*\n')
if not self._magnification:
msg.append('*Magnification*\n')
if not self._samplingRate:
msg.append('*PixelSpacing*\n')
if not self.mdocHasDose:
msg.append('Not able to get the *dose* with the data read.\n'
'Dose related mdoc labels are:\n\n'
'- ExposureDose or\n'
'- FrameDosesAndNumber or\n'
'- DoseRate and ExposureTime or\n'
'- MinMaxMean and CountsPerElectron'
)
if not self.getTiltAxisAngle():
msg.append('*RotationAngle (tilt axis angle)*')
if missingAnglesIndices:
msg.append('*TiltAngle*: %s\n' % ' '.join(missingAnglesIndices))
if missingFiles:
msg.append('*Missing files*:\n%s\n' % ' '.join(missingFiles))
if len(msg) > 1:
validateMdocContentsErrorMsgList.append(' '.join(msg))
return validateMdocContentsErrorMsgList
[docs] def getFileName(self):
return self._mdocFileName
[docs] def getTsId(self):
return self._tsId
[docs] def getVoltage(self):
return self._voltage
[docs] def getMagnification(self):
return self._magnification
[docs] def getSamplingRate(self):
return self._samplingRate
[docs] def getTiltAxisAngle(self):
return self._tiltAxisAngle
[docs]def normalizeTSId(rawTSId):
""" Normalizes the name of a TS to prevent sqlite errors,
it ends up as a table in a set"""
# remove paths and extension
normTSID = removeBaseExt(rawTSId)
# Avoid dots case: TS_234.mrc.mdoc
normTSID = normTSID.split(".")[0]
if normTSID[0].isdigit():
normTSID = "TS_" + normTSID
return normTSID.replace('-', '_').replace('.', '')