Source code for pyworkflow.plugin

# **************************************************************************
# *
# * Authors:     Yaiza Rancel ( [1]
# *              Pablo Conesa ( [1]
# *              J.M. De la Rosa Trevin ( [2]
# *
# * [1] Unidad de  Bioinformatica of Centro Nacional de Biotecnologia , CSIC
# * [2] SciLifeLab, Stockholm University
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 3 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address ''
# *
# **************************************************************************
import glob
import os
import importlib
import inspect
import traceback
import types
import pkg_resources
from email import message_from_string
from collections import OrderedDict
from abc import ABCMeta, abstractmethod

import pyworkflow as pw
import pyworkflow.utils as pwutils
import pyworkflow.object as pwobj
from pyworkflow.template import Template

from .constants import *

[docs]class Domain: """ Class to represent the application domain. It will allow to specify new objects, protocols, viewers and wizards through the registration of new plugins. """ # The following classes should be defined in subclasses of Domain _name = None _protocolClass = None _objectClass = pwobj.Object _viewerClass = None _wizardClass = None _baseClasses = {} # Update this with the base classes of the Domain # Dictionaries to store different type of objects _plugins = {} _protocols = {} _objects = {} _viewers = {} _wizards = {}
[docs] @classmethod def registerPlugin(cls, name): """ Register a new plugin. This function should only be called when creating a class with __metaclass__=PluginMeta that will trigger this. """ try: m = importlib.import_module(name) # Define variables m.Plugin._defineVariables() m.Domain = cls # Register the domain class for this module # TODO avoid loading bibtex here and make a lazy load like the rest. # Load bibtex m._bibtex = {} bib = cls.__getSubmodule(name, 'bibtex') if bib is not None: if hasattr(bib, "_bibtex"): print("WARNING FOR DEVELOPERS: %s/%s._bibtex unnecessarily declared. Just the doc string is enough." % (name, "bibtex")) else: try: m._bibtex = pwutils.LazyDict(lambda: pwutils.parseBibTex(bib.__doc__)) except Exception: pass cls._plugins[name] = m # Register the name to as a plugin # Catch any import exception, warn about it but continue. except Exception as e: print(pwutils.yellow("WARNING!!: Plugin containing module %s does not import properly. " "All its content will be missing in this execution." % name)) print("Please, contact developers at %s and send this ugly information bellow. They'll understand it!." % DOCSITEURLS.CONTACTUS) print(pwutils.yellow(traceback.format_exc()))
[docs] @classmethod def getPlugins(cls): """ Return existing plugins for this Domain. """ loaded = getattr(cls, '_pluginsLoaded', False) if not loaded: # Load plugin only once cls._discoverPlugins() cls._pluginsLoaded = True return dict(cls._plugins)
@classmethod def _discoverPlugins(cls): for entry_point in pkg_resources.iter_entry_points('pyworkflow.plugin'): cls.registerPlugin( @classmethod def _discoverGUIPlugins(cls): for entry_point in pkg_resources.iter_entry_points('pyworkflow.guiplugin'): entry_point.load()
[docs] @classmethod def getPlugin(cls, name): """ Load a given plugin name. """ m = importlib.import_module(name) # if not cls.__isPlugin(m): # raise Exception("Invalid plugin '%s'. " # "Class Plugin with __metaclass__=PluginMeta " # "not found" % name) return m
[docs] @classmethod def refreshPlugin(cls, name): """ Refresh a given plugin name. """ plugin = cls.getPlugin(name) if plugin is not None: fn = plugin.__file__ fn_dir = os.path.dirname(fn) + os.sep module_visit = {plugin} module_visit_path = {fn} del fn def get_recursive_modules(module): """ Get all plugin modules recursively """ for module_child in vars(module).values(): if isinstance(module_child, types.ModuleType): fn_child = getattr(module_child, "__file__", None) if (fn_child is not None) and fn_child.startswith( fn_dir): if fn_child not in module_visit_path: module_visit.add(module_child) module_visit_path.add(fn_child) get_recursive_modules(module_child) get_recursive_modules(plugin) # reload all plugin modules while module_visit: for module in module_visit: try: importlib.reload(module) module_visit.remove(module) break except Exception as ex: pass
@classmethod def __getSubclasses(cls, submoduleName, BaseClass, updateBaseClasses=False): """ Load all detected subclasses of a given BaseClass. Params: updateBaseClasses: if True, it will try to load classes from the Domain submodule that was not imported as globals() """ subclasses = getattr(cls, '_%s' % submoduleName) if not subclasses: # Only discover subclasses once if updateBaseClasses: sub = cls.__getSubmodule(cls.getName(), submoduleName) if sub is not None: for name in cls.getModuleClasses(sub): attr = getattr(sub, name) if inspect.isclass(attr) and issubclass(attr, BaseClass): cls._baseClasses[name] = attr for pluginName, plugin in cls.getPlugins().items(): sub = cls.__getSubmodule(pluginName, submoduleName) if sub is not None: for name in cls.getModuleClasses(sub): attr = getattr(sub, name) if inspect.isclass(attr) and issubclass(attr, BaseClass): # Check if the class already exists (to prevent # naming collisions) if name in subclasses: # Get already added class plugin pluginCollision = subclasses[name]._package.__name__ print("ERROR: Name collision (%s) detected " "while discovering %s.%s.\n" " It conflicts with %s" % (name, pluginName, submoduleName, pluginCollision)) else: # Set this special property used by Scipion attr._package = plugin subclasses[name] = attr subclasses.update( pwutils.getSubclasses(BaseClass, cls._baseClasses)) return subclasses
[docs] @classmethod def getModuleClasses(cls, module): # Dir was used before but dir returns all imported elements # included those imported to be BaseClasses. # return dir(module) # Get the module name moduleName = module.__name__ # Get any module class for name, declaredClass in inspect.getmembers(module, inspect.isclass): if moduleName in declaredClass.__module__: yield name
[docs] @classmethod def getProtocols(cls): """ Return all Protocol subclasses from all plugins for this domain.""" return cls.__getSubclasses('protocols', cls._protocolClass)
[docs] @classmethod def getObjects(cls): """ Return all EMObject subclasses from all plugins for this domain.""" return cls.__getSubclasses('objects', cls._objectClass)
[docs] @classmethod def getViewers(cls): """ Return all Viewer subclasses from all plugins for this domain.""" return cls.__getSubclasses('viewers', cls._viewerClass, updateBaseClasses=True)
[docs] @classmethod def getWizards(cls): """ Return all Wizard subclasses from all plugins for this domain.""" return cls.__getSubclasses('wizards', cls._wizardClass)
[docs] @classmethod def getMapperDict(cls): """ Return a dictionary that can be used with subclasses of Mapper to store/retrieve objects (including protocols) defined in this Domain. """ mapperDict = getattr(cls, '__mapperDict', None) if mapperDict is None: mapperDict = dict(pwobj.OBJECTS_DICT) mapperDict.update(cls.getObjects()) mapperDict.update(cls.getProtocols()) cls.__mapperDict = mapperDict return mapperDict
[docs] @classmethod def getName(cls): """ Return the name of this Domain. """ return cls._name
[docs] @staticmethod def importFromPlugin(module, objects=None, errorMsg='', doRaise=False): """ This method try to import either a list of objects from the module/plugin or the whole module/plugin and returns what is imported if not fails. When the import fails (due to the plugin or the object is not found), it prints a common message + optional errorMsg; or it raise an error with the same message, if doRaise is True. -> Usages: # Import the whole plugin 'plugin1' as 'plug1' plug1 = importFromPlugin('plugin1') # Import a plugin's module pl1Cons = importFromPlugin('plug1.constants') # Import a single class from a plugin's module p1prot1 = importFromPlugin('plug1.protocols', 'prot1') # Import some classes from a plugin's module, # the returned tuple has the same length of the second argument pt1, pt2, ... = importFromPlugin('plugin1.protocols', ['pt1', 'pt2', ...]) """ def _tryImportFromPlugin(submodule=None): try: if submodule is None: # Import only the module output = importlib.import_module(module) else: # Import the class of that module output = getattr(importlib.import_module(module), submodule) return output except Exception as e: plugName = module.split('.')[0] # The Main module is the plugin errMsg = (str(e) if errorMsg == '' else "%s. %s" % (str(e), errorMsg)) Domain.__pluginNotFound(plugName, errMsg, doRaise) if objects is None or isinstance(objects, str): output = _tryImportFromPlugin(objects) else: output = tuple() for obj in objects: output += (_tryImportFromPlugin(obj), ) # append in tuple return output
[docs] @classmethod def findClass(cls, className): """ Find a class object given its name. The search will start with protocols and then with protocols. """ # FIXME: Why not also Viewers, Wizards? c = cls.getProtocols().get(className, cls.getObjects().get(className, None)) if c is None: raise Exception("findClass: class '%s' not found." % className) return c
[docs] @classmethod def findSubClasses(cls, classDict, className): """ Find all subclasses of a give className. """ clsObj = classDict[className] subclasses = {} for k, v in classDict.items(): if issubclass(v, clsObj): subclasses[k] = v return subclasses
[docs] @classmethod def getPreferredViewers(cls, className): """ Find and import the preferred viewers for this class. """ viewerNames = pw.Config.VIEWERS.get(className, []) if not isinstance(viewerNames, list): viewerNames = [viewerNames] viewers = [] # we will try to import them and store here for prefViewerStr in viewerNames: try: viewerModule, viewerClassName = prefViewerStr.rsplit('.', 1) prefViewer = cls.importFromPlugin(viewerModule, viewerClassName, doRaise=True) viewers.append(prefViewer) except Exception as e: print("Couldn't load \"%s\" as preferred viewer for %s.\n" "There might be a typo in your VIEWERS variable " "or an error in the viewer's plugin installation" % (prefViewerStr, className)) print(e) return viewers
[docs] @classmethod def findViewers(cls, className, environment): """ Find the available viewers in this Domain for this class. """ viewers = [] try: clazz = cls.findClass(className) baseClasses = clazz.mro() preferredViewers = cls.getPreferredViewers(className) preferedFlag = 0 for viewer in cls.getViewers().values(): if environment in viewer._environments: for t in viewer._targets: if t in baseClasses: for prefViewer in preferredViewers: if viewer is prefViewer: viewers.insert(0, viewer) preferedFlag = 1 break else: if t == clazz: viewers.insert(preferedFlag, viewer) else: viewers.append(viewer) break except Exception as e: # Catch if there is a missing plugin, we will get Legacy which triggers and Exception pass return viewers
[docs] @classmethod def findWizards(cls, protocol, environment): """ Find available wizards for this class, in this Domain. Params: protocols: Protocol instance for which wizards will be search. environment: The environment name for wizards (e.g TKINTER) Returns: a dict with the paramName and wizards for this class.""" return cls.__findWizardsFromDict(protocol, environment, cls.getWizards())
[docs] @classmethod def printInfo(cls): """ Simple function (mainly for debugging) that prints basic information about this Domain. """ print("Domain: %s" % cls._name) print(" objects: %s" % len(cls._objects)) print(" protocols: %s" % len(cls._protocols)) print(" viewers: %s" % len(cls._viewers)) print(" wizards: %s" % len(cls._wizards))
# ---------- Private methods of Domain class ------------------------------ @staticmethod def __pluginNotFound(plugName, errorMsg='', doRaise=False): """ Prints or raise (depending on the doRaise) telling why it is failing """ hint = (" Check the plugin manager (Configuration->Plugins in " "Scipion manager window) \n") # the str casting is to work with Exceptions as errorMsg if 'No module named %s' % plugName in str(errorMsg): msgStr = " > %s plugin not found. %s" % (plugName, errorMsg) hint += " or use 'scipion installp --help' in the command line " hint += "to install it." else: msgStr = " > error when importing from %s: %s" % (plugName, errorMsg) if errorMsg != '': # if empty we know nothing... hint += (" or use 'scipion installp --help --checkUpdates' " "in the command line to check for upgrades,\n " "it could be a versions compatibility issue.") stackList = traceback.extract_stack() if len(stackList) > 3: callIdx = -3 # We use the most probable index as default for idx, stackLine in enumerate(stackList): if stackLine[0].endswith('/unittest/'): callIdx = idx + 1 else: callIdx = 0 callBy = stackList[callIdx][0] if callBy.endswith('pyworkflow/'): # This special case is to know why is failing and not where is called # because we know that we call all plugins.protocols at the beginning calling = traceback.format_exc().split('\n')[-4] else: line = stackList[callIdx][1] calling = " Called by %s, line %s" % (callBy, line) raiseMsg = "%s\n %s\n%s\n" % (msgStr, calling, hint) if doRaise: raise Exception("\n\n" + raiseMsg) else: print(raiseMsg) @staticmethod def __getSubmodule(name, subname): try: completeModuleText = '%s.%s' % (name, subname) if pwutils.isModuleAFolder(name): return importlib.import_module(completeModuleText) except Exception as e: msg = str(e) # FIXME: The following is a quick and dirty way to filter # when the submodule is not present if msg != 'No module named \'%s\'' % completeModuleText: Domain.__pluginNotFound(completeModuleText, msg) return None @classmethod def __isPlugin(cls, m): """ Return True if the module is a Scipion plugin. """ return m.__name__ in cls._plugins @staticmethod def __findWizardsFromDict(protocol, environment, wizDict): wizards = {} baseClasses = [c.__name__ for c in protocol.getClass().mro()] for wiz in wizDict.values(): if environment in wiz._environments: for c, params in wiz._targets: if c.__name__ in baseClasses: for p in params: wizards[p] = wiz return wizards
[docs]class Plugin: __metaclass__ = ABCMeta _vars = {} _homeVar = '' # Change in subclasses to define the "home" variable _pathVars = [] _supportedVersions = [] _name = "" _url = "" # For the plugin _condaActivationCmd = None @classmethod def _defineVar(cls, varName, defaultValue): """ Internal method to define variables trying to get it from the environment first. """ cls._addVar(varName, os.environ.get(varName, defaultValue)) @classmethod def _addVar(cls, varName, value): """ Adds a variable to the local variable dictionary directly. Avoiding the environment""" cls._vars[varName] = value
[docs] @classmethod @abstractmethod def getEnviron(cls): """ Setup the environment variables needed to launch programs. """ pass
[docs] @classmethod def getCondaActivationCmd(cls): if cls._condaActivationCmd is None: condaActivationCmd = os.environ.get(CONDA_ACTIVATION_CMD_VAR, "") if not condaActivationCmd: print("WARNING!!_condaActivationCmd: %s variable not defined. " "Relying on conda being in the PATH" % CONDA_ACTIVATION_CMD_VAR) elif condaActivationCmd[-1] not in [";", "&"]: condaActivationCmd += "&&" cls._condaActivationCmd = condaActivationCmd return cls._condaActivationCmd
@classmethod @abstractmethod def _defineVariables(cls): """ Method to define variables and their default values. It will use the method _defineVar that will take a variable value from the environment or from an optional default value. This method is not supposed to be called from client code, except from the Domain class when registering a Plugin. """ pass
[docs] @classmethod @abstractmethod def defineBinaries(cls, env): """ Define required binaries in the given Environment. """ pass
[docs] @classmethod def getVar(cls, varName, defaultValue=None): """ Return the value of a given variable. """ return cls._vars.get(varName, defaultValue)
[docs] @classmethod def getVars(cls): """ Return the value of a given variable. """ return cls._vars
[docs] @classmethod def getHome(cls, *paths): """ Return a path from the "home" of the package if the _homeVar is defined in the plugin. """ home = cls.getVar(cls._homeVar) return os.path.join(home, *paths) if home else ''
[docs] @classmethod def getSupportedVersions(cls): """ Return the list of supported binary versions. """ return cls._supportedVersions
[docs] @classmethod def getActiveVersion(cls, home=None, versions=None): """ Return the version of the binaries that are currently active. In the current implementation it will be inferred from the *_HOME variable, so it should contain the version number in it. """ # FIXME: (JMRT) Use the basename might alleviate the issue with matching # the binaries version, but we might consider to find a better solution home = os.path.basename(home or cls.getHome()) versions = versions or cls.getSupportedVersions() for v in versions: if v in home: return v return ''
[docs] @classmethod def getName(cls): return cls.__module__
[docs] @classmethod def validateInstallation(cls): """ Check if the binaries are properly installed and if not, return a list with the error messages. The default implementation will check if the _pathVars exists. """ try: missing = ["%s: %s" % (var, cls.getVar(var)) for var in cls._pathVars if not os.path.exists(cls.getVar(var))] return (["Missing paths: the variables bellow point to non existing paths."] + missing + [ "Either install the software ( %s )" % DOCSITEURLS.PLUGIN_MANAGER, "or edit the config file ( %s )" % DOCSITEURLS.CONFIG]) if missing else [] except Exception as e: return ["validateInstallation fails: %s" % e]
[docs] @classmethod def getPluginTemplateDir(cls): return os.path.join(pw.getModuleFolder(cls.getName()), 'templates')
[docs] @classmethod def getTemplates(cls): """ Get the plugin templates from the templates directory. If more than one template is found or passed, a dialog is raised to choose one. """ tempList = [] pluginName = cls.getName() tDir = cls.getPluginTemplateDir() if os.path.exists(tDir): for file in glob.glob1(tDir, "*" + SCIPION_JSON_TEMPLATES): t = Template(pluginName, os.path.join(tDir, file)) tempList.append(t) return tempList
[docs] @classmethod def getUrl(cls, protClass=None): """ Url for the plugin to point users to it""" return cls._url
[docs]class PluginInfo: """ Information related to a given plugin when it is installed via PIP """ def __init__(self, name): try: dist = pkg_resources.get_distribution(name) lines = [l for l in dist._get_metadata(dist.PKG_INFO)] tuples = message_from_string('\n'.join(lines)) except Exception: print("Plugin %s seems is not a pip module yet. " "No metadata found" % name) tuples = message_from_string('Author: plugin in development mode?') self._name = name self._metadata = OrderedDict() for v in tuples.items(): if v[0] == 'Keywords': break self._metadata[v[0]] = v[1]
[docs] def getAuthor(self): return self._metadata.get('Author', "")
[docs] def getAuthorEmail(self): return self._metadata.get('Author-email', '')
[docs] def getHomePage(self): return self._metadata.get('Home-page', '')
[docs] def getKeywords(self): return self._metadata.get('Keywords', '')