# **************************************************************************
# *
# * Authors: Yaiza Rancel (cyrancel@cnb.csic.es) [1]
# * Pablo Conesa (pconesa@cnb.csic.es) [1]
# * J.M. De la Rosa Trevin (delarosatrevin@scilifelab.se) [2]
# *
# * [1] Unidad de Bioinformatica of Centro Nacional de Biotecnologia , CSIC
# * [2] SciLifeLab, Stockholm University
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 3 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307 USA
# *
# * All comments concerning this program package may be sent to the
# * e-mail address 'scipion@cnb.csic.es'
# *
# **************************************************************************
import logging
import sys
from pyworkflow import Variable, VariablesRegistry, VarTypes
from .protocol import Protocol
from .viewer import Viewer
from .wizard import Wizard
logger = logging.getLogger(__name__)
import glob
import os
import importlib
import inspect
import traceback
import types
from abc import ABCMeta, abstractmethod
import pyworkflow as pw
import pyworkflow.utils as pwutils
import pyworkflow.object as pwobj
from pyworkflow.template import LocalTemplate
from pyworkflow.utils import sortListByList
try:
import importlib_metadata
except ModuleNotFoundError:
raise ModuleNotFoundError('You are missing importlib-metadata package. '
'Please run: scipion3 pip install importlib-metadata')
from .constants import *
[docs]class Domain:
""" Class to represent the application domain.
It will allow to specify new objects, protocols, viewers and wizards
through the registration of new plugins.
"""
# The following classes should be defined in subclasses of Domain
_name = __name__
_protocolClass = Protocol
_objectClass = pwobj.Object
_viewerClass = Viewer
_wizardClass = Wizard
_baseClasses = {} # Update this with the base classes of the Domain
# Dictionaries to store different type of objects
_plugins = {}
_protocols = {}
_objects = {}
_viewers = {}
_wizards = {}
[docs] @classmethod
def registerPlugin(cls, name):
""" Register a new plugin. This function should only be called when
creating a class with __metaclass__=PluginMeta that will trigger this.
"""
try:
m = importlib.import_module(name)
# Define variables
m._pluginInstance = m.Plugin()
# This needs an explanation. _defineVariables() are classmethods, therfore we need a class variable and thus .name can't be used.
# Ideally, since Plugin class is instantiated we could transform _defineVariables methods into instance methods and only then we
# could use .name.
Plugin._tmpName = name
m._pluginInstance.name = name
m._pluginInstance._defineVariables()
m.Domain = cls # Register the domain class for this module
# TODO avoid loading bibtex here and make a lazy load like the rest.
# Load bibtex
m._bibtex = {}
bib = cls.__getSubmodule(name, 'bibtex')
if bib is not None:
if hasattr(bib, "_bibtex"):
logger.info("WARNING FOR DEVELOPERS: %s/%s._bibtex unnecessarily declared. Just the doc string is enough." % (name, "bibtex"))
else:
try:
m._bibtex = pwutils.LazyDict(lambda: pwutils.parseBibTex(bib.__doc__))
except Exception:
pass
cls._plugins[name] = m # Register the name to as a plugin
# Catch any import exception, warn about it but continue.
except ModuleNotFoundError as e:
if e.name == name:
# This is probably due to a priority package like pwchem not being installed
pass
else:
logger.warning("Plugin '%s' has import errors: %s. Maybe a missing dependency?. "
"Is it devel mode and need to be reinstalled?. Ignoring it and continuing." % (name, str(e)))
except Exception as e:
(pwutils.yellow("WARNING!!: Plugin containing module %s does not import properly. "
"All its content will be missing in this execution." % name))
logger.info("Please, contact developers at %s and send this ugly information below. They'll understand it!." % DOCSITEURLS.CONTACTUS)
logger.info("Error message: %s"% str(e))
logger.info(pwutils.yellow(traceback.format_exc()))
[docs] @classmethod
def getPlugins(cls):
""" Return existing plugins for this Domain. """
loaded = getattr(cls, '_pluginsLoaded', False)
if not loaded: # Load plugin only once
cls._discoverPlugins()
cls._pluginsLoaded = True
return dict(cls._plugins)
@classmethod
def _discoverPlugins(cls):
# Get the list of plugins registered
plugin_modules = importlib_metadata.entry_points(group='pyworkflow.plugin')
# Sort the list taking into account the priority
plugin_modules = sortListByList(plugin_modules.names, pw.Config.getPriorityPackageList())
for module in plugin_modules:
cls.registerPlugin(module)
@classmethod
def _discoverGUIPlugins(cls):
for entry_point in importlib_metadata.entry_points(group='pyworkflow.guiplugin'):
try:
entry_point.load()
except Exception as e:
logger.warning("Can't import %s GUI plugin: %s" % (entry_point, e))
[docs] @classmethod
def getPluginModule(cls, name):
""" Return the root of a plugin module initialized properly """
cls.registerPlugin(name)
return cls._plugins[name]
[docs] @classmethod
def getPlugin(cls, name):
logger.warning("This method will return the plugin class in the future. Please use getPluginModule instead")
return cls.getPluginModule(name)
[docs] @classmethod
def refreshPlugin(cls, name):
""" Refresh a given plugin name. """
plugin = cls.getPlugin(name)
if plugin is not None:
fn = plugin.__file__
fn_dir = os.path.dirname(fn) + os.sep
module_visit = {plugin}
module_visit_path = {fn}
del fn
def get_recursive_modules(module):
""" Get all plugin modules recursively """
for module_child in vars(module).values():
if isinstance(module_child, types.ModuleType):
fn_child = getattr(module_child, "__file__", None)
if (fn_child is not None) and fn_child.startswith(
fn_dir):
if fn_child not in module_visit_path:
module_visit.add(module_child)
module_visit_path.add(fn_child)
get_recursive_modules(module_child)
get_recursive_modules(plugin)
# reload all plugin modules
while module_visit:
for module in module_visit:
try:
importlib.reload(module)
module_visit.remove(module)
break
except Exception as ex:
pass
@classmethod
def __getSubclasses(cls, submoduleName, BaseClass,
updateBaseClasses=False, setPackage=False):
""" Load all detected subclasses of a given BaseClass.
Params:
updateBaseClasses: if True, it will try to load classes from the
Domain submodule that was not imported as globals()
"""
subclasses = getattr(cls, '_%s' % submoduleName)
if not subclasses: # Only discover subclasses once
if updateBaseClasses:
sub = cls.__getSubmodule(cls.getName(), submoduleName)
if sub is not None:
for name in cls.getModuleClasses(sub):
attr = getattr(sub, name)
if inspect.isclass(attr) and issubclass(attr, BaseClass):
cls._baseClasses[name] = attr
for pluginName, module in cls.getPlugins().items():
sub = cls.__getSubmodule(pluginName, submoduleName)
if sub is not None:
for name in cls.getModuleClasses(sub):
attr = getattr(sub, name)
if inspect.isclass(attr) and issubclass(attr, BaseClass):
# Check if the class already exists (to prevent
# naming collisions)
if name in subclasses:
# Get already added class plugin
pluginCollision = subclasses[name]._package.__name__
logger.info("ERROR: Name collision (%s) detected "
"while discovering %s.%s.\n"
" It conflicts with %s" %
(name, pluginName, submoduleName,
pluginCollision))
else:
# Set this special property used by Scipion
# Protocols need the package to be set
if setPackage:
attr._plugin = getattr(module, "_pluginInstance", None)
attr._package = module
subclasses[name] = attr
subclasses.update(
pwutils.getSubclasses(BaseClass, cls._baseClasses))
return subclasses
[docs] @classmethod
def getModuleClasses(cls, module):
# Dir was used before but dir returns all imported elements
# included those imported to be BaseClasses.
# return dir(module)
# Get the module name
moduleName = module.__name__
# Get any module class
for name, declaredClass in inspect.getmembers(module, inspect.isclass):
if moduleName in declaredClass.__module__:
yield name
[docs] @classmethod
def getProtocols(cls):
""" Return all Protocol subclasses from all plugins for this domain."""
return cls.__getSubclasses('protocols', cls._protocolClass, setPackage=True)
[docs] @classmethod
def getObjects(cls):
""" Return all EMObject subclasses from all plugins for this domain."""
return cls.__getSubclasses('objects', cls._objectClass)
[docs] @classmethod
def viewersLoaded(cls):
""" Returns true if viewers have been already discovered"""
return len(cls._viewers) != 0
[docs] @classmethod
def getViewers(cls):
""" Return all Viewer subclasses from all plugins for this domain."""
return cls.__getSubclasses('viewers', cls._viewerClass,
updateBaseClasses=True, setPackage=True)
[docs] @classmethod
def getWizards(cls):
""" Return all Wizard subclasses from all plugins for this domain."""
return cls.__getSubclasses('wizards', cls._wizardClass)
[docs] @classmethod
def getMapperDict(cls):
""" Return a dictionary that can be used with subclasses of Mapper
to store/retrieve objects (including protocols) defined in this
Domain. """
mapperDict = getattr(cls, '__mapperDict', None)
if mapperDict is None:
mapperDict = dict(pwobj.OBJECTS_DICT)
mapperDict.update(cls.getObjects())
mapperDict.update(cls.getProtocols())
cls.__mapperDict = mapperDict
return mapperDict
[docs] @classmethod
def getName(cls):
""" Return the name of this Domain. """
return cls._name
[docs] @staticmethod
def importFromPlugin(module, objects=None, errorMsg='', doRaise=False):
"""
This method try to import either a list of objects from the
module/plugin or the whole module/plugin and returns what is
imported if not fails.
When the import fails (due to the plugin or the object is not found),
it prints a common message + optional errorMsg;
or it raise an error with the same message, if doRaise is True.
:param module: Module name to import
:param objects: Optional, string with objects to return present in module
:param errorMsg: Optional, extra error message to append to the main message.
:param doRaise: If True it will raise an exception instead of tolerating the import error
Usages::
# Import the whole plugin 'plugin1' as 'plug1'
plug1 = importFromPlugin('plugin1')
# Import a plugin's module
pl1Cons = importFromPlugin('plug1.constants')
# Import a single class from a plugin's module
p1prot1 = importFromPlugin('plug1.protocols', 'prot1')
# Import some classes from a plugin's module,
# the returned tuple has the same length of the second argument
pt1, pt2, ... = importFromPlugin('plugin1.protocols',
['pt1', 'pt2', ...])
"""
def _tryImportFromPlugin(submodule=None):
try:
if submodule is None: # Import only the module
output = importlib.import_module(module)
else: # Import the class of that module
output = getattr(importlib.import_module(module), submodule)
return output
except Exception as e:
plugName = module.split('.')[0] # The Main module is the plugin
errMsg = (str(e) if errorMsg == ''
else "%s. %s" % (str(e), errorMsg))
Domain.__pluginNotFound(plugName, errMsg, doRaise)
if objects is None or isinstance(objects, str):
output = _tryImportFromPlugin(objects)
else:
output = tuple()
for obj in objects:
output += (_tryImportFromPlugin(obj), ) # append in tuple
return output
[docs] @classmethod
def findClass(cls, className):
""" Find a class object given its name.
The search will start with protocols and then with protocols.
"""
# FIXME: Why not also Viewers, Wizards?
c = cls.getProtocols().get(className,
cls.getObjects().get(className, None))
if c is None:
raise Exception("findClass: class '%s' not found." % className)
return c
[docs] @classmethod
def findSubClasses(cls, classDict, className):
""" Find all subclasses of a give className. """
clsObj = classDict[className]
subclasses = {}
for k, v in classDict.items():
if issubclass(v, clsObj):
subclasses[k] = v
return subclasses
[docs] @classmethod
def getPreferredViewers(cls, className):
""" Find and import the preferred viewers for this class. """
viewerNames = pw.Config.VIEWERS.get(className, [])
if not isinstance(viewerNames, list):
viewerNames = [viewerNames]
viewers = [] # we will try to import them and store here
for prefViewerStr in viewerNames:
try:
viewerModule, viewerClassName = prefViewerStr.rsplit('.', 1)
prefViewer = cls.importFromPlugin(viewerModule,
viewerClassName,
doRaise=True)
viewers.append(prefViewer)
except Exception as e:
logger.error("Couldn't load \"%s\" as preferred viewer for %s.\n"
"There might be a typo in your VIEWERS variable "
"or an error in the viewer's plugin installation"
% (prefViewerStr, className), exc_info=e)
return viewers
[docs] @classmethod
def findViewers(cls, className, environment):
""" Find the available viewers in this Domain for this class. """
viewers = []
try:
clazz = cls.findClass(className)
baseClasses = clazz.mro()
preferredViewers = cls.getPreferredViewers(className)
preferedFlag = 0
for viewer in cls.getViewers().values():
viewerAdded=False
if environment in viewer._environments:
for t in viewer._targets:
if t in baseClasses:
for prefViewer in preferredViewers:
if viewer is prefViewer:
viewers.insert(0, viewer)
viewerAdded=True
preferedFlag = 1
break
else:
if t == clazz:
viewers.insert(preferedFlag, viewer)
viewerAdded = True
else:
viewers.append(viewer)
viewerAdded = True
break
if viewerAdded:
break
except Exception as e:
# Catch if there is a missing plugin, we will get Legacy which triggers and Exception
pass
return viewers
[docs] @classmethod
def findWizards(cls, protocol, environment):
"""
Find available wizards for this class, in this Domain.
:param protocol: Protocol instance for which wizards will be search.
:param environment: The environment name for wizards (e.g TKINTER)
:return A dict with the paramName and wizards for the protocol passed."""
return cls.__findWizardsFromDict(protocol, environment,
cls.getWizards())
[docs] @classmethod
def printInfo(cls):
""" Simple function (mainly for debugging) that prints basic
information about this Domain. """
logger.info("Domain: %s" % cls._name)
logger.info(" objects: %s" % len(cls._objects))
logger.info(" protocols: %s" % len(cls._protocols))
logger.info(" viewers: %s" % len(cls._viewers))
logger.info(" wizards: %s" % len(cls._wizards))
# ---------- Private methods of Domain class ------------------------------
@staticmethod
def __pluginNotFound(plugName, errorMsg='', doRaise=False):
""" Prints or raise (depending on the doRaise) telling why it is failing
"""
hint = (" Check the plugin manager (Configuration->Plugins in "
"Scipion manager window) \n")
# the str casting is to work with Exceptions as errorMsg
if 'No module named %s' % plugName in str(errorMsg):
msgStr = " > %s plugin not found. %s" % (plugName, errorMsg)
hint += " or use 'scipion installp --help' in the command line "
hint += "to install it."
else:
msgStr = " > error when importing from %s: %s" % (plugName, errorMsg)
if errorMsg != '': # if empty we know nothing...
hint += (" or use 'scipion installp --help --checkUpdates' "
"in the command line to check for upgrades,\n "
"it could be a versions compatibility issue.")
stackList = traceback.extract_stack()
if len(stackList) > 3:
callIdx = -3 # We use the most probable index as default
for idx, stackLine in enumerate(stackList):
if stackLine[0].endswith('/unittest/loader.py'):
callIdx = idx + 1
else:
callIdx = 0
callBy = stackList[callIdx][0]
if callBy.endswith('pyworkflow/plugin.py'):
# This special case is to know why is failing and not where is called
# because we know that we call all plugins.protocols at the beginning
calling = traceback.format_exc().split('\n')[-4]
else:
line = stackList[callIdx][1]
calling = " Called by %s, line %s" % (callBy, line)
raiseMsg = "%s\n %s\n%s\n" % (msgStr, calling, hint)
if doRaise:
raise Exception("\n\n" + raiseMsg)
else:
logger.info(raiseMsg)
@staticmethod
def __getSubmodule(name, subname):
try:
completeModuleText = '%s.%s' % (name, subname)
if pwutils.isModuleAFolder(name):
return importlib.import_module(completeModuleText)
except Exception as e:
msg = str(e)
# FIXME: The following is a quick and dirty way to filter
# when the submodule is not present
if msg != 'No module named \'%s\'' % completeModuleText:
Domain.__pluginNotFound(completeModuleText, msg)
return None
@classmethod
def __isPlugin(cls, m):
""" Return True if the module is a Scipion plugin. """
return m.__name__ in cls._plugins
@staticmethod
def __findWizardsFromDict(protocol, environment, wizDict):
wizards = {}
baseClasses = [c.__name__ for c in protocol.getClass().mro()]
for wiz in wizDict.values():
if environment in wiz._environments:
for c, params in wiz._targets:
if c.__name__ in baseClasses:
for p in params:
wizards[p] = wiz
return wizards
[docs]class Plugin:
__metaclass__ = ABCMeta
_vars = {}
_homeVar = '' # Change in subclasses to define the "home" variable
_pathVars = []
_supportedVersions = []
_url = "" # For the plugin
_condaActivationCmd = None
_tmpName = None # This would be temporary. It will hold the plugin name during the call to defineVariables
def __init__(self):
self._path = None
self._inDevelMode = None
self._name = None
@classmethod
def _defineVar(cls, varName, defaultValue, description="Missing", var_type:VarTypes=None):
""" Internal method to define variables trying to get it from the environment first. """
cls._addVar(varName, os.environ.get(varName, defaultValue), default=defaultValue, description=description, var_type=var_type)
@classmethod
def _addVar(cls, varName, value, default=None, description="Missing", var_type:VarTypes=None):
""" Adds a variable to the local variable dictionary directly. Avoiding the environment"""
var = Variable(varName, description, cls._tmpName, value, default, var_type=var_type)
VariablesRegistry.register(var)
cls._vars[varName] = value
[docs] @classmethod
@abstractmethod
def getEnviron(cls):
""" Set up the environment variables needed to launch programs. """
pass
[docs] @classmethod
def getCondaActivationCmd(cls):
""" Returns the conda activation command with && at the end if defined otherwise empty"""
if cls._condaActivationCmd is None:
condaActivationCmd = pw.Config.CONDA_ACTIVATION_CMD
if not condaActivationCmd:
logger.info("WARNING!!_condaActivationCmd: %s variable not defined. "
"Relying on conda being in the PATH" % CONDA_ACTIVATION_CMD_VAR)
elif condaActivationCmd[-1] not in [";", "&"]:
condaActivationCmd += "&&"
cls._condaActivationCmd = condaActivationCmd
return cls._condaActivationCmd
@abstractmethod
def _defineVariables(cls):
""" Method to define variables and their default values.
It will use the method _defineVar that will take a variable value
from the environment or from an optional default value.
This method is not supposed to be called from client code,
except from the Domain class when registering a Plugin.
"""
pass
[docs] @classmethod
@abstractmethod
def defineBinaries(cls, env):
""" Define required binaries in the given Environment. """
pass
[docs] @classmethod
def getVar(cls, varName, defaultValue=None):
""" Return the value of a given variable. """
return cls._vars.get(varName, defaultValue)
[docs] @classmethod
def getVars(cls):
""" Return the value of a given variable. """
return cls._vars
[docs] @classmethod
def getHome(cls, *paths):
""" Return a path from the "home" of the package
if the _homeVar is defined in the plugin. """
home = cls.getVar(cls._homeVar)
return os.path.join(home, *paths) if home else ''
[docs] @classmethod
def getSupportedVersions(cls):
""" Return the list of supported binary versions. """
return cls._supportedVersions
[docs] @classmethod
def getActiveVersion(cls, home=None, versions=None):
"""
Returns the version of the binaries that are currently active.
In the current implementation it will be inferred from the *_HOME
variable, so it should contain the version number in it. """
# FIXME: (JMRT) Use the basename might alleviate the issue with matching
# the binaries version, but we might consider to find a better solution
home = os.path.basename(home or cls.getHome())
versions = versions or cls.getSupportedVersions()
for v in versions:
if v in home:
return v
return ''
[docs] @classmethod
def validateInstallation(cls):
"""
Check if the binaries are properly installed and if not, return
a list with the error messages.
The default implementation will check if the _pathVars exists.
"""
try:
missing = ["%s: %s" % (var, cls.getVar(var))
for var in cls._pathVars if not os.path.exists(cls.getVar(var))]
return (["Missing paths: the variables below point to non existing paths."]
+ missing + [
"Either install the software ( %s )" % DOCSITEURLS.PLUGIN_MANAGER,
"or edit the config file ( %s )" % DOCSITEURLS.CONFIG]) if missing else []
except Exception as e:
return ["validateInstallation fails: %s" % e]
[docs] @classmethod
def getUrl(cls, protClass=None):
""" Url for the plugin to point users to it"""
return cls._url
# -------------- Instance methods ----------------
[docs] def getName(self):
if self._name is None:
self._name= self.__class__.__module__
return self._name
[docs] def getPluginDir(self):
return self.getPath()
[docs] def getPluginTemplateDir(self):
return os.path.join(self.getPath(), 'templates')
[docs] def getTemplates(self):
""" Get the plugin templates from the templates directory.
If more than one template is found or passed, a dialog is raised
to choose one.
"""
tempList = []
pluginName = self.getName()
tDir = self.getPluginTemplateDir()
if os.path.exists(tDir):
for file in glob.glob1(tDir, "*" + SCIPION_JSON_TEMPLATES):
t = LocalTemplate(pluginName, os.path.join(tDir, file))
tempList.append(t)
return tempList
[docs] def getPath(self):
if self._path is None:
self._path = sys.modules[self.__class__.__module__].__path__[0]
return self._path
[docs] def inDevelMode(self)-> bool:
""" Returns true if code is not in python's site-packages folder"""
if self._inDevelMode is None:
self._inDevelMode = pwutils.getPythonPackagesFolder() not in self.getPath()
return self._inDevelMode