""" Module to host templates classes"""
import collections
import glob
import os
import tempfile
from datetime import datetime
from pyworkflow import SCIPION_JSON_TEMPLATES, Config
from pyworkflow.utils import greenStr
[docs]class Template:
def __init__(self, pluginName, tempPath):
self.pluginName = pluginName
# Tidy up templates names: removing .json.template and .json (when passed as parameter)
self.templateName = os.path.basename(tempPath).replace(SCIPION_JSON_TEMPLATES, "").replace(".json", "")
self.templatePath = os.path.abspath(tempPath)
self.description, self.content = self._parseTemplate()
self.params = None
self.projectName = None
def __str__(self):
return self.templatePath
[docs] def getObjId(self):
return self.pluginName + '-' + self.templateName
[docs] def genProjectName(self):
self.projectName = self.getObjId() + '-' + datetime.now().strftime("%y%m%d-%H%M%S")
[docs] def replaceEnvVariables(self):
self.content = (self.content % os.environ).split('~')
def _parseTemplate(self):
with open(self.templatePath, 'r') as myFile:
allContents = myFile.read().splitlines()
description, index = Template.getDescription(allContents)
if not description:
description = 'Not provided'
content = ''.join(allContents[index:])
return description, content
[docs] @staticmethod
def getDescription(strList):
# Example of json.template file with description:
# -----------------------------------------------
# Here goes the description
# Description
# Another description line...
# [
# {
# "object.className": "ProtImportMovies",
# "object.id": "2",...
# -----------------------------------------------
contents_start_1 = '['
contents_start_2 = '{'
description = []
counter = 0
nLines = len(strList)
while counter + 1 < nLines:
currentLine = strList[counter]
nextLine = strList[counter + 1]
if contents_start_1 not in currentLine:
description.append(currentLine)
else:
if contents_start_2 in nextLine:
break
else:
description.append(currentLine)
counter += 1
return ''.join(description), counter
[docs] def parseContent(self):
def paramStr2Param(fieldIndex, fieldString):
fieldLst = fieldString.split('|')
title = fieldLst[0]
defaultValue = fieldLst[1] if len(fieldLst) >= 2 else None
varType = fieldLst[2] if len(fieldLst) >= 3 else None
alias = fieldLst[3] if len(fieldLst) >= 4 else None
return TemplateParam(fieldIndex, title, defaultValue, varType, alias)
# Fill each field in the template in order to prevent spreading in the form
self.params = collections.OrderedDict()
for index in range(1, len(self.content), 2):
param = paramStr2Param(index, self.content[index])
self.params[param.getTitle()] = param
[docs] def createTemplateFile(self):
# Where to write the json file.
(fileHandle, path) = tempfile.mkstemp()
self._replaceFields()
finalJson = "".join(self.content)
os.write(fileHandle, finalJson.encode())
os.close(fileHandle)
print("New workflow saved at " + path)
return path
def _replaceFields(self):
for field in self.params.values():
self.content[field.getIndex()] = field.getValue()
[docs] def getParams(self):
return self.params
[docs] def setParamValue(self, alias, newValue):
paramsSetted = 0
for field in self.params.values():
if field.getAlias() == alias:
oldValue = field.getValue()
field.setValue(newValue)
if field.validate():
paramsSetted += 1
print(greenStr("%s set to %s") %
(field.getTitle(), str(newValue)))
else:
field.setValue(oldValue)
raise Exception("%s is not compatible with %s(%s) parameter." % (newValue, field.getTitle(), alias))
if not paramsSetted:
raise Exception("Alias %s not recognized." % alias)
return paramsSetted
[docs]class TemplateParam(object):
def __init__(self, index, title, value=None, varType=None, alias=None):
self._index = index
self._title = title
self._value = value
self._type = varType
self._alias = alias
[docs] def getTitle(self):
return self._title
[docs] def getIndex(self):
return self._index
[docs] def getType(self):
return self._type
[docs] def getValue(self):
return self._value
[docs] def setValue(self, value):
self._value = value
[docs] def getAlias(self):
return self._alias
[docs] def validate(self):
return Validations.check(self._value, self._type)
[docs]class Validations:
""" FIELDS VALIDATION """
""" FIELDS TYPES"""
FIELD_TYPE_STR = "0"
FIELD_TYPE_BOOLEAN = "1"
FIELD_TYPE_PATH = "2"
FIELD_TYPE_INTEGER = "3"
FIELD_TYPE_DECIMAL = "4"
[docs] @classmethod
def check(cls, value, fieldType):
if fieldType == cls.FIELD_TYPE_BOOLEAN:
return cls.validBoolean(value)
elif fieldType == cls.FIELD_TYPE_DECIMAL:
return cls.validDecimal(value)
elif fieldType == cls.FIELD_TYPE_INTEGER:
return cls.validInteger(value)
elif fieldType == cls.FIELD_TYPE_PATH:
return cls.validPath(value)
elif fieldType == cls.FIELD_TYPE_STR:
return cls.validString(value)
else:
print("Type %s for %s not recognized. Review the template."
% (type, value))
return
[docs] @staticmethod
def validString(value):
if value is None:
return "String does not accept None/empty values."
[docs] @staticmethod
def validInteger(value):
if not value.isdigit():
return "Value does not seem to be an integer number."
[docs] @staticmethod
def validPath(value):
if not os.path.exists(value):
return "Path does not exists."
[docs] @staticmethod
def validDecimal(value):
try:
float(value)
return None
except Exception as e:
return "Value can't be converted to a float (%s)" % str(e)
[docs] @staticmethod
def validBoolean(value):
validValues = ["true", "1", "false", "0"]
valueL = value.lower()
if valueL not in validValues:
return "Only valid values for a boolean type are: %s" % validValues
[docs]class TemplateList:
def __init__(self, templates=None):
self.templates = templates if templates else []
[docs] def addTemplate(self, t):
self.templates.append(t)
[docs] def genFromStrList(self, templateList):
for t in templateList:
parsedPath = t.split(os.path.sep)
pluginName = parsedPath[parsedPath.index('templates') - 1]
self.addTemplate(Template(pluginName, t))
[docs] def sortListByPluginName(self):
# Create a identifier with both plugin and template names to sort by both
self.templates = sorted(self.templates, key=lambda template: '.' + template.getObjId()
if template.getObjId().startswith('local') else template.getObjId())
return self
[docs] def addScipionTemplates(self, tempId=None):
# Check if there is any .json.template in the template folder
# get the template folder (we only want it to be included once)
templateFolder = Config.getExternalJsonTemplates()
for templateName in glob.glob1(templateFolder,
"*" + SCIPION_JSON_TEMPLATES):
t = Template("local", os.path.join(templateFolder, templateName))
if tempId is not None:
if t.getObjId() == tempId:
self.addTemplate(t)
break
else:
self.addTemplate(t)
[docs] def addPluginTemplates(self, tempId=None):
"""
Get the templates provided by all plugins.
:return: a list of templates
"""
# Check if other plugins have json.templates
domain = Config.getDomain()
# Check if there is any .json.template in the template folder
# get the template folder (we only want it to be included once)
for pluginName, pluginModule in domain.getPlugins().items():
tempListPlugin = pluginModule._pluginInstance.getTemplates()
for t in tempListPlugin:
if tempId is not None:
if t.getObjId() == tempId:
self.addTemplate(t)
break
else:
self.addTemplate(t)