Source code for pyworkflow.mapper.xmlmapper

# **************************************************************************
# *
# * Authors:     J.M. De la Rosa Trevin (jmdelarosa@cnb.csic.es)
# *              Roberto Marabini       (roberto@cnb.csic.es)
# *
# * Unidad de  Bioinformatica of Centro Nacional de Biotecnologia , CSIC
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 3 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'scipion@cnb.csic.es'
# *
# **************************************************************************

from os.path import exists

try:
    import xml.etree.cElementTree as ET
except ImportError:
    import xml.etree.ElementTree as ET

from .mapper import Mapper


[docs]class XmlMapper(Mapper): """Mapper for XML""" def __init__(self, filename, dictClasses=None, rootName='ALL', **args): self.filename = filename if exists(filename): self._read() else: self._create(rootName, **args) Mapper.__init__(self, dictClasses) # Objects map (id should be defined) self.objDict = {} # Store some objects during parsing # that have some Pointer attributes and need to be fixed later self.pendingPtrDict = {} # This dictionary serve to define how to write classes # for example if the pair 'Integer': 'attribute' is present # all Integer will be store as attributes in the xml # possible values are: # 'attribute', 'class_only', 'class_name', 'name_class', 'name_only' self.classTags = {} # Counter to provide default objects id's self.objCount = 0
[docs] def setClassTag(self, classname, tag): self.classTags[classname] = tag
[docs] def indent(self, elem, level=0): i = "\n" + level*" " if len(elem): if not elem.text or not elem.text.strip(): elem.text = i + " " if not elem.tail or not elem.tail.strip(): elem.tail = i for _elem in elem: self.indent(_elem, level+1) if not _elem.tail or not _elem.tail.strip(): _elem.tail = i else: # TODO: generalize the matching of consecutive items if level and (not elem.tail or not elem.tail.strip()): for ii in ['t11', 't12', 't13', 't21', 't22', 't23', 't31', 't32', 't33']: if elem.tag == ii: elem.tail = " " return elem.tail = i
def _create(self, rootName, **args): self.root = ET.Element(rootName) if 'header' in args: self.root.append(ET.Comment(args.get('header'))) if 'version' in args: self.root.set("version", str(args.get('version'))) def _read(self): if self.filename is None: raise Exception("XmlMapper.read: filename is None.") if not exists(self.filename): raise Exception("XmlMapper.read: filename '%s' does not exists.") self.tree = ET.parse(self.filename) self.root = self.tree.getroot()
[docs] def strId(self, objId): """Create an unique id from the objId, specially for dictionaries, where all keys and values will be converted to string""" if type(objId) is dict: ordKeys = objId.keys() ordKeys.sort() sid = '' for k in ordKeys: sid += '%s:%s ' % (k, objId[k]) return sid return str(objId)
[docs] def selectById(self, objId): """Retrieve an object give the id""" return self.objDict.get(self.strId(objId), None)
def _addObjectToDict(self, obj, objDict): """Add object if have id""" if obj.hasObjId(): key = self.strId(obj.getObjId()) else: self.objCount += 1 key = '%s_%06d' % (obj.getClassName(), self.objCount) objDict[key] = obj
[docs] def selectAll(self, iterate=False): """Select object from storage""" for child in self.root: obj = self._buildObjectFromClass(child.tag) self.fillObject(obj, child) self._addObjectToDict(obj, self.objDict) # set properly primary keys for obj in self.pendingPtrDict.values(): for key, attr in obj.getAttributesToStore(): if attr.isPointer(): ptr = self.selectById(attr._objId) attr._objId = None attr.set(ptr) return self.objDict.values()
[docs] def setChildObject(self, obj, childName, childClass=None): childObj = getattr(obj, childName, None) if childObj is None: if childClass is None: raise Exception('Attribute "%s" was not found in parent object and classname not stored' % childName) childObj = self._buildObjectFromClass(childClass) setattr(obj, childName, childObj) return childObj
[docs] def fillObject(self, obj, objElem): # Set attributes first if not obj.isPointer(): for k, v in objElem.attrib.items(): if k not in ['id', 'classname', 'attrname']: childObj = self.setChildObject(obj, k) childObj.set(v) # Set tree childs attributes for child in objElem: if 'classname' in child.attrib: childName = child.tag childClass = child.attrib['classname'] elif 'attrname' in child.attrib: childName = child.attrib['attrname'] childClass = child.tag else: tagKey = '%s.%s' % (obj.getClassName(), child.tag) tag = self.classTags.get(tagKey, None) if tag == 'name_only' or child.tag not in self.dictClasses: childName = child.tag childClass = None else: childName = '' childClass = child.tag childObj = self.setChildObject(obj, childName, childClass) if child.text and len(child.text.strip()): childObj.set(child.text.strip()) else: # does have attributes? attributes = child.attrib if attributes and childObj.isPointer(): childObj._objId = attributes # save obj in auxiliary file self._addObjectToDict(obj, self.pendingPtrDict) self.fillObject(childObj, child)
[docs] def commit(self): if self.filename is None: raise Exception("XmlMapper.commit: filename is None") self.indent(self.root) tree = ET.ElementTree(self.root) tree.write(self.filename, xml_declaration=True, encoding='utf-8')
[docs] def setObjectId(self, objElem, objId): # Set attributes of this object element if objId: if type(objId) is dict: for k, v in objId.items(): if v: objElem.set(k, str(v)) else: objElem.set('id', str(objId))
[docs] def insert(self, obj): """Insert a new object into the system""" objElem = self.addSubElement(self.root, obj.getClassName(), obj._objValue) self.setObjectId(objElem, obj.getObjId()) # Insert object childs self.insertObjectWithChilds(obj, objElem)
[docs] def addSubElement(self, parentElem, name, value=None): childElem = ET.SubElement(parentElem, name) if value is not None: childElem.text = str(value) return childElem
[docs] def insertObjectWithChilds(self, obj, parentElem): for key, attr in obj.getAttributesToStore(): if attr.hasValue(): attrClass = attr.getClassName() objClass = obj.getClassName() allKey = '%s.ALL' % objClass # First try with .ALL tag = self.classTags.get(allKey, '') allKey = 'ALL.%s' % attrClass tag = self.classTags.get(allKey, tag) allKey = 'ALL.%s' % key # also with ALL.attribute tag = self.classTags.get(allKey, tag) classKey = '%s.%s' % (objClass, attrClass) # Second, with .childClass tag = self.classTags.get(classKey, tag) attrKey = '%s.%s' % (objClass, key) # Finally with .attribute tag = self.classTags.get(attrKey, tag) if tag == 'attribute': parentElem.set(key, str(attr)) else: if attr.isPointer(): childElem = self.addSubElement(parentElem, key) self.setObjectId(childElem, attr.get().getObjId()) else: # Select if we want to use the classname or elem name # as element tagname if tag.startswith('class'): name = attrClass else: name = key childElem = self.addSubElement(parentElem, name, attr._objValue) if tag.endswith('class'): childElem.set('classname', attrClass) elif tag.endswith('name'): childElem.set('attrname', key) self.insertObjectWithChilds(attr, childElem)
[docs] def updateFrom(self, obj): """Update object data with storage info""" pass
[docs] def updateTo(self, obj): """Update storage with object info""" pass