Source code for emxlib.utils

# **************************************************************************
# *
# * Authors:     Roberto Marabini (roberto@cnb.csic.es) [1]
# *              J.M. De la Rosa Trevin (delarosatrevin@scilifelab.se) [2]
# *
# * [1] SciLifeLab, Stockholm University
# * [2] Unidad de  Bioinformatica of Centro Nacional de Biotecnologia , CSIC
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'scipion@cnb.csic.es'
# *
# **************************************************************************
"""
MODIFICATION ADVICE:

Please, do not generate or distribute
a modified version of this file under its original name.
"""


import sys
try:
    import collections
except ImportError:
    sys.stderr.write('Could not import OrderedDict. For Python versions '
                     'earlier than 2.7 this module may be missing. '
                     )

"""
NAMING CONVENTIONS:
1) 
 xxxxx__yy will be written in XML as
<xxxx>
   <yy> value </yy>
</xxxx>

anything starting by the name of a class (i.e. micrographXXXX) 
 is a pointer to that class

GLOSARY:

 primary key: A primary key is a set of labels/attributes
 that uniquely identify an object (i.e: a micrograph)
 
 foreign key: Given two objects (i.e one micrograph an one particle) 
 a foreign key is a set of labels/attributes in the first object 
 that uniquely identify the second object 
"""
EMX_SEP = '__'
#classes 
MICROGRAPH = 'micrograph'
PARTICLE   = 'particle'
#order in which items should be read
CLASSLIST  = [MICROGRAPH, PARTICLE]
#primary keys
FILENAME   = 'fileName'
INDEX      = 'index'
COMMENT    = 'comment'

[docs]class EmxLabel: """ Auxiliary class to assign data type (i.e.: int, str, etc) and unit to each attribute/label pair. """ def __init__(self, type, unit=None): self.type = type self.unit = unit
[docs] def hasUnit(self): return not self.unit is None
[docs] def getUnit(self): return self.unit
[docs] def getType(self): return self.type
#Dictionary with attribute names, data types and units #By default an attribute does not has unit assign to it emxDataTypes={ FILENAME:EmxLabel(str) ,INDEX:EmxLabel(int) , COMMENT:EmxLabel(str) ,'acceleratingVoltage':EmxLabel(float,'kV') ,'activeFlag':EmxLabel(int) ,'amplitudeContrast':EmxLabel(float) ,'boxSize__X':EmxLabel(int,'px') ,'boxSize__Y':EmxLabel(int,'px') ,'boxSize__Z':EmxLabel(int,'px') ,'centerCoord__X':EmxLabel(float,'px') ,'centerCoord__Y':EmxLabel(float,'px') ,'centerCoord__Z':EmxLabel(float,'px') ,'cs':EmxLabel(float,'mm') ,'defocusU':EmxLabel(float,'nm') ,'defocusV':EmxLabel(float,'nm') ,'defocusUAngle':EmxLabel(float,'deg') ,'fom':EmxLabel(float) ,'pixelSpacing__X':EmxLabel(float,'A/px') ,'pixelSpacing__Y':EmxLabel(float,'A/px') ,'pixelSpacing__Z':EmxLabel(float,'A/px') ,'transformationMatrix__t11':EmxLabel(float) ,'transformationMatrix__t12':EmxLabel(float) ,'transformationMatrix__t13':EmxLabel(float) ,'transformationMatrix__t14':EmxLabel(float,'px') ,'transformationMatrix__t21':EmxLabel(float) ,'transformationMatrix__t22':EmxLabel(float) ,'transformationMatrix__t23':EmxLabel(float) ,'transformationMatrix__t24':EmxLabel(float,'px') ,'transformationMatrix__t31':EmxLabel(float) ,'transformationMatrix__t32':EmxLabel(float) ,'transformationMatrix__t33':EmxLabel(float) ,'transformationMatrix__t34':EmxLabel(float,'px') }
[docs]class EmxObject: """ Base class for all EMX objects/classes name is the class type so far micrograph or particles """ _foreignKeys = [] _primaryKey = [] _attributes = [] _name = None def __init__(self): self.dictPrimaryKeys = collections.OrderedDict() self.dictForeignKeys = collections.OrderedDict() self.dictAttributes = collections.OrderedDict() #---------- Public object methods ------------------------------------ def __str__(self): return self._pprint()
[docs] def clear(self): """ Generic clean. PK cannot be modified or clean. """ for key in self.dictAttributes: self.dictAttributes[key] = None for key, value in self.iterForeignKeys(): self.dictForeignKeys[key] = None
[docs] def get(self, key, default=None): """ Given a key (attribute name) returns the value assigned to it. If not present, the default will be returned. """ if key in self.dictPrimaryKeys: return self.dictPrimaryKeys[key] if key in self.dictAttributes: return self.dictAttributes[key] return default
[docs] def has(self, key): return self.get(key) is not None
[docs] def set(self, key, value=None): """ Given a key (attribute name) assigns a value to it. """ if value is None: if key in self._primaryKey: self.dictPrimaryKeys[key] = None elif key in self._attributes: self.dictAttributes[key] = None else: if key in self._primaryKey: self.dictPrimaryKeys[key] = emxDataTypes[key].getType()(value) elif key in self._attributes: self.dictAttributes[key] = emxDataTypes[key].getType()(value) else: raise Exception("Key %s not allowed in: %s" % (key, self._name))
[docs] def iterAttributes(self): """Returns list with valid keys (attribute names) and values for this class. Primary keys are ignored""" return self.dictAttributes.items()
[docs] def iterPrimaryKeys(self): """Returns list with valid primary keys (attribute names) and values for this class""" return self.dictPrimaryKeys.items()
[docs] def iterForeignKeys(self): """Returns list with valid primary keys (attribute names) and values for this class""" return self.dictForeignKeys.items()
def __eq__(self, other): """ If the primary keys of two objects are identical then both objects are identical""" return (self.dictPrimaryKeys == other.dictPrimaryKeys)
[docs] def strongEq(self, other): """ true if both objects are truly identical""" return (self.dictPrimaryKeys == other.dictPrimaryKeys and self.dictForeignKeys == other.dictForeignKeys and self.dictAttributes == other.dictAttributes)
#---------- Internal object methods---------------------------------- def _pprintPK(self, printNone=False): """ Print primary keys .""" out = "fileName: %(fileName)s" if (self.get(INDEX) != None) or printNone: out += ", index:%(index)s" return (out % self.dictPrimaryKeys) +'\n' def _pprint(self, printNone=False): """print ordered dictionaries, default routine is ugly. """ #primary key out = "\nObject type: %s\n"% self._name out += "Primary key:\n " out += self._pprintPK(printNone) #foreign key if len(self.dictForeignKeys) and\ self.dictForeignKeys[list(self.dictForeignKeys.keys())[0]]: out += "Foreign keys:\n " for key, value in self.dictForeignKeys.items(): if (value != None) or printNone: out += "%s -> " % key if value is None: out += "None" else: #value is not a EMobject and therefore has no _pprintPK #out += value._pprintPK(printNone) out += str(value) +"\n" #other attributes out += "Other Attributes:\n" for key, value in self.iterAttributes(): _unit = emxDataTypes[key].getUnit() if _unit is None: _unit="" else: _unit= "(%s)"%_unit if (value != None) or printNone: out += " %s %s:%s,\n" % (key, _unit, str(value) ) return out def _initAttribute(self,key, value=None): """Private function do not use outside this file """ self.dictAttributes[key] = value def _initPrimaryKey(self,key, value=None): """Private function do not use outside this file """ if value is None: pass else: self.dictPrimaryKeys[key] = emxDataTypes[key].getType()(value) def _validateForeignKey(self, className): """ Raise an Exception if className is not in foreign keys dict. """ if className not in self._foreignKeys: raise Exception("class %s does not have FK of type: %s" % (self._name, className)) def _setForeignKey(self, className, object): """ Set another object as foreign key of self for a given className. """ self._validateForeignKey(className) self.dictForeignKeys[className] = object def _getForeignKey(self, className, validate=False): """ Return the object assigned as foreign key for a given className. """ if validate: self._validateForeignKey(className) return self.dictForeignKeys.get(className, None)
[docs]class EmxImage(EmxObject): """ Base class for EmxMicrograph and EmxParticle. Both share that have FILENAME and INDEX as primary keys. """ _primaryKey = [FILENAME, INDEX] def __init__(self, fileName=None, index=None): #init emx object EmxObject.__init__(self) if fileName is None and index is None: raise Exception(self._name + "cannot be created with fileName=None and index=None") #set primary keys. At least one of this must be different from None self._initPrimaryKey(FILENAME, fileName) self._initPrimaryKey(INDEX, index)
[docs]class EmxMicrograph(EmxImage): """Class for Micrographs """ _foreignKey = None _name = MICROGRAPH _attributes = [ COMMENT ,'acceleratingVoltage' ,'activeFlag' ,'amplitudeContrast' ,'cs' ,'defocusU' ,'defocusV' ,'defocusUAngle' ,'fom' ,'pixelSpacing__X' ,'pixelSpacing__Y' ,'pixelSpacing__Z' ]
[docs]class EmxParticle(EmxImage): """Class for Particles """ _foreignKeys = [MICROGRAPH] _foreignKeysMap = {MICROGRAPH:EmxMicrograph('a',1)} _name = PARTICLE _attributes=[ COMMENT ,'activeFlag' ,'boxSize__X' ,'boxSize__Y' ,'boxSize__Z' ,'centerCoord__X' ,'centerCoord__Y' ,'centerCoord__Z' ,'defocusU' ,'defocusV' ,'defocusUAngle' ,'fom' ,'pixelSpacing__X' ,'pixelSpacing__Y' ,'pixelSpacing__Z' ,'transformationMatrix__t11' ,'transformationMatrix__t12' ,'transformationMatrix__t13' ,'transformationMatrix__t14' ,'transformationMatrix__t21' ,'transformationMatrix__t22' ,'transformationMatrix__t23' ,'transformationMatrix__t24' ,'transformationMatrix__t31' ,'transformationMatrix__t32' ,'transformationMatrix__t33' ,'transformationMatrix__t34' ]
[docs] def setMicrograph(self, micrograph): """ Set the micrograph associated with this particle. """ self._setForeignKey(MICROGRAPH, micrograph)
[docs] def getMicrograph(self): """ Return the micrograph associated with this particles. """ return self._getForeignKey(MICROGRAPH)
[docs]class EmxData(): """ Class to group EMX objects""" def __init__(self): self.objLists = {MICROGRAPH : [], PARTICLE : []} self.mapObjectPK = {} self._mapper = EmxXmlMapper(self)
[docs] def addObject(self, obj): self.objLists[obj._name].append(obj) self.mapObjectPK[str(obj.dictPrimaryKeys)] = obj
[docs] def getObject(self, mapPK): """ Return an object given its primary key. """ return self.mapObjectPK[str(mapPK)]
[docs] def clear(self): for listName in self.objLists: self.objLists[listName] = [] self.mapObjectPK = {}
[docs] def size(self): return len(self.mapObjectPK)
def __iter__(self): for node, nodelist in self.objLists.items(): for subnode in nodelist: yield subnode
[docs] def iterClasses(self, className): """ Iterate through objects of a particular class. """ return self.objLists[className]
def __str__(self): partStr="" for k, v in self.objLists.items(): if len(v): partStr += "\n****\n%sS\n****\n"% k.upper() for obj in v: partStr += obj.__str__()+"\n" return partStr
[docs] def read(self, emxFile): """ Read data from an emxFile. """ self._mapper.readEMXFile(emxFile)
[docs] def readFirstObject(self, className, emxFile): """ Read only the first object of a given className from file.""" return self._mapper.firstObject(className, emxFile)
[docs] def getFirstObject(self, className): """ Return the first object of a given className. This function should be called after read. """ objList = self.objLists[className] if len(objList): return objList[0] return None
[docs] def write(self, emxFile): """ Write data to an emxFile. """ self._mapper.writeEMXFile(emxFile)
#------------------- XmlMapper implementation ----------------------------------- """ Following from here there is the implementation of the XmlMapper to store EmxData objects in XML files as described in the EMX format. This mapper is the default one used by EmxData (and no other make sense at this moment) """ try: import xml.etree.cElementTree as ET except ImportError: import xml.etree.ElementTree as ET ERR_VALIDATION_WRONG=1 VERSION = 1.0 ROOTNAME = 'EMX' HEADER = """<%(ROOTNAME)s version="%(VERSION)s"> <!-- ########################################################################## # EMX Exchange file # Produced using the emx library # (http://i2pc.cnb.csic.es/emx/LoadTools.htm?type=Library) # # Information on this file format is available at # http://i2pc.cnb.csic.es/emx ########################################################################## # One of the best ways you can help us to improve this software # is to let us know about any problems you find with it. # Please report bugs to: emx@cnb.csic.es ########################################################################## --> """ % globals() EMXSCHEMA10 = 'https://raw.githubusercontent.com/scipion-em/scipion-em-emxlib/devel/emxlib/emx.xsd' EMXSCHEMA11 = 'https://raw.githubusercontent.com/scipion-em/scipion-em-emxlib/devel/emxlib/emx_11.xsd'
[docs]class ValidateError(Exception): def __init__(self, code, message): self.errorMessage = message self.errorCode = code def __str__(self): return "Error Code: %d. Message: %s" % (self.errorCode, self.errorMessage)
[docs] def getCode(self): return self.errorCode
[docs] def getMessage(self): return self.errorMessage
[docs]class EmxXmlMapper(): """Mapper for XML""" def __init__(self, emxData): self.emxData = emxData self.classObject = {MICROGRAPH: EmxMicrograph, PARTICLE: EmxParticle} def __del__(self): pass
[docs] def objectToXML(self, object): """ Given an object persist it in XML dataBase. Each object goes to a different element. Much much faster... """ # write primary key xmlString = r" <%s" % object._name for key, value in object.dictPrimaryKeys.items(): if value is None: continue xmlString += ' %(key)s="%(value)s"' % ({'key':key, 'value':str(value)}) xmlString += ">\n" # write attributes oldParent = "" for key, value in object.iterAttributes(): if value is None: continue unit = emxDataTypes[key].getUnit() # is this an special case, that is, # does the label contains '__'? # I asumme there is no grandchild if EMX_SEP in key: (parent, child) = key.split(EMX_SEP) # take care of cases like: # <pixelSpacing> # <X>5.6</X> # <Y>5.7</Y> # </pixelSpacing> # second entry if oldParent == parent: xmlString = xmlString.replace(" </%s>\n" % parent, "") # first entry else: xmlString += " <%s>\n " % parent if unit is None: xmlString += " <%(child)s>%(value)s"\ "</%(child)s>\n </%(parent)s>\n" % ({'parent':parent, 'child':child, 'value':str(value)}) else: xmlString += ' <%(child)s unit="%(unit)s">%(value)s'\ "</%(child)s>\n </%(parent)s>\n" % ({'parent':parent, 'child':child, 'value':str(value), 'unit':unit}) oldParent = parent # simple attributes with no child else: if unit is None: xmlString += " <%(key)s>%(value)s</%(key)s>\n" % ({'key':key, 'value':str(value)}) else: xmlString += ' <%(key)s unit="%(unit)s">%(value)s</%(key)s>\n' % ({'key':key, 'value':str(value), 'unit':unit}) # write foreign key if len(object.dictForeignKeys) and\ object.dictForeignKeys[object.dictForeignKeys.keys()[0]]: pointedObject = object.dictForeignKeys[object.dictForeignKeys.keys()[0]] xmlString += " <%s" % pointedObject._name for key, value in pointedObject.dictPrimaryKeys.items(): if value is None: continue xmlString += ' %(key)s="%(value)s"' % ({'key':key, 'value':str(value)}) xmlString += "/>\n" xmlString += " </%s>\n" % object._name # print xmlString return xmlString
_attributes = [ 'acceleratingVoltage' , 'activeFlag' , 'amplitudeContrast' , 'cs' , 'defocusU' , 'defocusV' , 'defocusUAngle' , 'fom' , 'pixelSpacing__X' , 'pixelSpacing__Y' , 'pixelSpacing__Z' ]
[docs] def readEMXFile(self, fileName, classElement=None): """ create tree from xml file If classElement is not None, the first element of this class will be returned """ # get context context = ET.iterparse(fileName, events=('start', 'end')) # turn it into an iterator context = iter(context) # get the root element event, root = next(context) # self.classObject = globals()['Emx'+element] doItPK = True skipLabelPK = 'kk' mergeParent = False parentLabel = 'kk' lastStartTagA = 'kk' lastEventStartA = False listObjectWithForeignKey = [] for event, elem in context: tag = elem.tag if tag == 'EMX': continue if event == 'start': # primary key and FK if tag in CLASSLIST: # only primary key if(doItPK): self.createObject(elem) doItPK = False skipLabelPK = tag # foreign key else: # get PF and save the map for the first pass # since the actual pointed object may not exists FK = self.readObjectPK(elem) self._object._setForeignKey(tag, FK) listObjectWithForeignKey.append(self._object) else: if lastEventStartA == True: mergeParent = True parentLabel = lastStartTagA lastStartTagA = tag lastEventStartA = True elif event == 'end': # PK or FG if tag in CLASSLIST and skipLabelPK == tag: doItPK = True if tag == classElement: return # other attributes else: # simple element if lastStartTagA == tag: if elem.text is None: raise Exception ("Element: " + tag + " is empty") else: text = elem.text.strip(' \n\t') if(len(text) < 1): raise Exception ("ZERO for tag=%s, value=%s" % (tag, text)) if mergeParent: self._object.set(parentLabel + EMX_SEP + tag, text) else: self._object.set(tag, text) elif parentLabel == tag: mergeParent = False parentLabel = 'kk' lastEventStartA = False else: raise Exception ("Unknown event type %s" % event) root.clear() # Now loop Trough all objects and fix the FK for object in listObjectWithForeignKey: for key in object._foreignKeys: fk = object._getForeignKey(key) object._setForeignKey(key, self.emxData.getObject(fk))
[docs] def createObject(self, elem): self.myClass = self.classObject[elem.tag] # primary key # get PK self.dict = self.readObjectPK(elem) # create object self._object = self.classObject[elem.tag](**(self.dict)) # add it to emxData self.emxData.addObject(self._object)
[docs] def readObjectPK(self, elem): """ read primary key. So far all entities has the same PK. We may need to specialize or use dictPrimaryKeys in the future """ mapPK = collections.OrderedDict() for attribute in elem.attrib: mapPK[attribute] = emxDataTypes[attribute].getType()(elem.get(attribute)) if mapPK: return collections.OrderedDict(sorted(mapPK.items(), key=lambda t: t[0])) else: raise Exception("readObjectPK: No fileName or index provided")
[docs] def firstObject(self, classname, fileName): """ Iterate over the tags elements and find the first one of type 'classname', build the object and return it. The foreing keys will be not updated. """ self._object = None # context = ET.iterparse(fileName, events=('start', 'end')) # for event, elem in iter(context): # tag = elem.tag # if event == 'start': # # print "tag: '%s'" % tag, "class: '%s'" % classname # if tag == classname: # # print "tag==class" # self.createObject(elem) # # print "self._object: ", self._object # return self._object self.readEMXFile(fileName, classElement=classname) return self._object
[docs] def writeEMXFile(self, fileName): """write xml file and store it in a document """ xmlFile = open(fileName, "w") xmlFile.write("<?xml version='1.0' encoding='utf-8'?>\n") xmlFile.write(HEADER) for object in self.emxData: text = self.objectToXML(object) # # #implement this with a regular expression # #format matrices properly for i, j in { '</t11>\n ':'</t11> ' , '</t12>\n ':'</t12> ' , '</t13>\n ':'</t13> ' , '</t21>\n ':'</t21> ' , '</t22>\n ':'</t22> ' , '</t23>\n ':'</t23> ' , '</t31>\n ':'</t31> ' , '</t32>\n ':'</t32> ' , '</t33>\n ':'</t33> ' }.items(): text = text.replace(i, j) text = text.replace('<comment>','<!--') text = text.replace('</comment>','-->') xmlFile.write(text) xmlFile.write("</EMX>") xmlFile.close()
[docs]def validateSchema(filename, schema_file=None): """ Code from astropy project released under BSD licence Validates an XML file against a schema or DTD. Functions to do XML schema and DTD validation. At the moment, this makes a subprocess call first to xerces then to xmllint. This could use a Python-based library at some point in the future, if something appropriate could be found. lxml is a possibility but has too many dependences if anyone knows about a pure python validator let my know Parameters ---------- filename : str The path to the XML file to validate schema : str The path to the XML schema or DTD Returns ------- returncode, stdout, stderr : int, str, str Returns the returncode from validator and the stdout and stderr as strings """ import subprocess, os ########### # try xerces ########### answerSize = 1024 # avoid overflow in web endding = '' if schema_file is None: _schema = EMXSCHEMA11 else: _schema = schema_file # print "java jaxp.SourceValidator -a %s -i %s -xsd11"% (_schema, filename) p = subprocess.Popen("java jaxp.SourceValidator -a %s -i %s -xsd11" % (_schema, filename), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() # xerces exists but is error if p.returncode == 0 and (stderr != ""): if len(stderr) > answerSize: endding = '... (too many errors, displayed first %d characters)' % (answerSize) raise ValidateError(ERR_VALIDATION_WRONG, """Error: when validating file %s with schema %s. \nError:%s""" % (filename, _schema, stderr[:answerSize] + endding)) ####### # no xerces available, let us try xmlint ###### if p.returncode != 0: print("validating with xmllint") if schema_file is None: _schema = EMXSCHEMA10 else: _schema = schema_file schema_part = '--schema ' + _schema p = subprocess.Popen( "xmllint --noout %s %s" % (schema_part, filename), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() if p.returncode == 127: raise ValidateError(127, """Error: neither xerces-f nor xmllint could be found, I cannot validate schema. Schema validation is based either on the xmllint program that belongs to the libxml2-tools package. or on the xerces-f project""") if p.returncode != 0: if len(stderr) > answerSize: endding = '... (too many errors, displayed first %d characters)' % (answerSize) message = """Error: when validating file %s with schema %s. \nError:%s""" % (filename, _schema, stderr[:answerSize] + endding) # print "message", message raise ValidateError(ERR_VALIDATION_WRONG, message) return p.returncode, stdout[:answerSize], stderr[:answerSize]