Source code for scipion.install.plugin_funcs

import logging
logger = logging.getLogger(__name__)
import requests
import os
import re
import sys
import json
from packaging.version import Version
from urllib.request import url2pathname
from urllib.parse import urlparse
from .funcs import Environment
from pwem import Domain
from pyworkflow.utils import redStr, yellowStr
from pyworkflow.utils.path import cleanPath
from pyworkflow import LAST_VERSION, CORE_VERSION, Config
import importlib_metadata

NULL_VERSION = "0.0.0"
# This constant is used in order to install all plugins taking into account a
# json file
DEVEL_VERSION = "999.9.9"

REPOSITORY_URL = Config.SCIPION_PLUGIN_JSON

if REPOSITORY_URL is None:
    REPOSITORY_URL = Config.SCIPION_PLUGIN_REPO_URL

PIP_BASE_URL = 'https://pypi.python.org/pypi'
PIP_CMD = '{0} -m pip install %(installSrc)s'.format(
    Environment.getPython())

PIP_UNINSTALL_CMD = '{0} -m pip uninstall -y %s'.format(
    Environment.getPython())


[docs]class PluginInfo(object): def __init__(self, pipName="", name="", pluginSourceUrl="", remote=True, plugin=None, **kwargs): self.pipName = pipName self.name = name self.pluginSourceUrl = pluginSourceUrl self.remote = remote # things from pypi self.homePage = "" self.summary = "" self.author = "" self.email = "" self.compatibleReleases = {} self.latestRelease = "" # things we have when installed self.dirName = "" self.pipVersion = "" self.binVersions = [] self.pluginEnv = None # Distribution self._dist = None self._plugin = plugin if self.remote: self.setRemotePluginInfo() else: self.setFakedRemotePluginInfo() self.setLocalPluginInfo() # get local info if installed # ###################### Install funcs ############################
[docs] def install(self): """Installs both pip module and default binaries of the plugin""" self.installPipModule() self.installBin() self.setLocalPluginInfo()
[docs] def getDistribution(self): if self._dist is None: try: self._dist = importlib_metadata.distribution(self.pipName) except Exception as e: logger.debug("Distribution not found: %s" % e) pass return self._dist
def _getPlugin(self): if self._plugin is None: try: dirname = self.getDirName() if dirname: ## For non installed plugins getDirName does return "". Avoid asking for the plugin self._plugin = Config.getDomain().getPluginModule(dirname) except Exception as e: logger.debug("Can't get the plugin for %s( dir name:%s )." % (self.pipName, dirname)) pass return self._plugin
[docs] def hasPipPackage(self): """Checks if the current plugin is installed via pip""" return self.getDistribution() is not None
[docs] def isInstalled(self): """Checks if the current plugin is installed (i.e. has pip package). NOTE: we might want to change definition of isInstalled, hence the extra function.""" # This is too expensive: reload(pkg_resources) return self.hasPipPackage()
[docs] def installPipModule(self, version=""): """Installs the version specified of the pip plugin, as long as it is compatible with the current Scipion version. If no version specified, will install latest compatible one.""" environment = Environment() if not version: version = self.latestRelease elif version not in self.compatibleReleases: if self.compatibleReleases: print('%s version %s not compatible with current Scipion ' 'version %s.' % (self.pipName, version, LAST_VERSION)) print("Please choose a compatible release: %s" % " ".join( self.compatibleReleases.keys())) else: print("%s has no compatible versions with current Scipion " "version %s." % (self.pipName, LAST_VERSION)) return False if version == NULL_VERSION: print("Plugin %s is not available for this Scipion %s yet" % (self.pipName, LAST_VERSION)) return False if self.pluginSourceUrl: if os.path.exists(self.pluginSourceUrl): # install from dir in editable mode installSrc = '-e %s' % self.pluginSourceUrl target = "%s*" % self.pipName if os.path.exists(os.path.join(self.pluginSourceUrl, 'pyproject.toml')): target = target.replace('-', '_') else: # path doesn't exist, we assume is git and force install installSrc = '--upgrade git+%s' % self.pluginSourceUrl target = "%s*" % self.pipName.replace('-', '_') else: # install from pypi installSrc = "%s==%s" % (self.pipName, version) target = "%s*" % self.pipName.replace('-', '_') cmd = PIP_CMD % {'installSrc': installSrc} environment.addPipModule(self.pipName, target=target, pipCmd=cmd) # Install the package/plugin environment.execute() # if plugin was already installed, we need to clean cached elements self._dist = None # Trigger reloading the distribution # Not needed? reload(pkg_resources) self.dirName = self.getDirName() Domain.refreshPlugin(self.dirName) return True
[docs] def installBin(self, args=None): """Install binaries of the plugin. Args is the list of args to be passed to the install environment.""" environment = self.getInstallenv(envArgs=args) environment.execute()
[docs] def uninstallBins(self, binList=None): """Uninstall binaries of the plugin. :param binList: if given, will uninstall the binaries in it. The binList may contain strings with only the name of the binary or name and version in the format name-version :returns None """ if binList is None: binList = self.binVersions binFolder = Environment.getEmFolder() for binVersion in binList: f = os.path.join(binFolder, binVersion) if os.path.exists(f): print('Removing %s binaries...' % binVersion) realPath = os.path.realpath(f) # in case it's a link cleanPath(f, realPath) print('Binary %s has been uninstalled successfully ' % binVersion) else: print('The binary %s does not exist ' % binVersion) return
[docs] def uninstallPip(self): """Removes pip package from site-packages""" print('Removing %s plugin...' % self.pipName) import subprocess args = (PIP_UNINSTALL_CMD % self.pipName).split() subprocess.call(PIP_UNINSTALL_CMD % self.pipName, shell=True, stdout=sys.stdout, stderr=sys.stderr)
# ###################### Remote data funcs ############################
[docs] def getPipJsonData(self): """"Request json data from pypi, return json content""" pipData = requests.get("%s/%s/json" % (PIP_BASE_URL, self.pipName)) if pipData.ok: pipData = pipData.json() return pipData else: print("Warning: Couldn't get remote plugin data for %s" % self.pipName) return {}
[docs] def getCompatiblePipReleases(self, pipJsonData=None): """Get pip releases of this plugin that are compatible with current Scipion version. Returns dict with all compatible releases and a special key "latest" with the most recent one.""" if pipJsonData is None: pipJsonData = self.getPipJsonData() reg = r'scipion-([\d.]*\d)' releases = {} latestCompRelease = NULL_VERSION for release, releaseData in pipJsonData['releases'].items(): releaseData = releaseData[0] scipionVersions = [Version(v) for v in re.findall(reg, releaseData['comment_text'])] if len(scipionVersions) != 0: releases[release] = releaseData if any([v == Version(CORE_VERSION) for v in scipionVersions]): if Version(latestCompRelease) < Version(release): latestCompRelease = release else: print(yellowStr("WARNING: %s's release %s did not specify a " "compatible Scipion version. Please, remove this " "release from pypi") % (self.pipName, release)) releases['latest'] = latestCompRelease return releases
[docs] def setRemotePluginInfo(self): """Sets value for the attributes that need to be obtained from pypi""" pipData = self.getPipJsonData() if not pipData: return info = pipData['info'] releases = self.getCompatiblePipReleases(pipJsonData=pipData) self.homePage = self.getPipData(info, ['home_page', 'project_urls.Homepage']) self.summary = self.getPipData(info, ['summary']) self.author = self.getPipData(info, ['author', 'author_email']) self.email = self.getPipData(info, ['author_email']) self.compatibleReleases = releases self.latestRelease = releases['latest']
[docs] def setFakedRemotePluginInfo(self): """Sets value for the attributes that need to be obtained from json file""" self.homePage = self.pluginSourceUrl self.compatibleReleases = {DEVEL_VERSION: {'upload_time': ' devel_mode'}} self.latestRelease = DEVEL_VERSION self.author = ' Developer mode'
[docs] def getPipData(self, info, keys): """ Extracts a value from a dictionary (info) based on the primary key. Supports nested keys using dot notation (e.g., "project_urls.Homepage"). :param info: Dictionary containing data (e.g., pip info). :param keys: List of keys to extract the value (dot notation for nested keys). :return: The extracted value or '' if no keys match. """ def getNestedValue(data, key): keysList = key.split(".") for k in keysList: if isinstance(data, dict) and k in data: data = data[k] else: return None return data # Try backup keys if the main key doesn't yield a value for key in keys: value = getNestedValue(info, key) if value: return value return ' '
# ###################### Local data funcs ############################
[docs] def setLocalPluginInfo(self): """Sets value for the attributes that can be obtained locally if the plugin is installed.""" if self.isInstalled(): metadata = {} # Take into account 2 cases here: # A.: plugin is a proper pipmodule and is installed as such # B.: Plugin is not yet a pipmodule but a local folder. try: def getValueFromMetadata(prodKey, altKey): """Gets the value from the metadata dictionary using the prodKey (package in production) if key does not exist uses the altKey :param prodKey: key in the metadata to get the value :param altKey: alternative key in case prodKey does not exist""" if prodKey in jsonMetadata: return jsonMetadata[prodKey] else: try: value = jsonMetadata[altKey] if isinstance(value, str): # author_email return value else: # Project_url: ['Homepage, https://github.com/scipion-em/scipion-em-warp', 'Issues, https://github.com/scipion-em/scipion-em-warp/issues'] return value[0].split()[1] except Exception as e: logger.debug("Can't get %s value for %s package." % (altKey, self.pipName)) return altKey jsonMetadata = self.getDistribution().metadata.json self.summary = jsonMetadata.get("summary", "") self.author = getValueFromMetadata("author", "author_email") self.homePage = getValueFromMetadata("home_page", "project_url") self.email = jsonMetadata.get("author_email", "") self.pipVersion = self._dist.version self.dirName = self.getDirName() self.binVersions = self.getBinVersions() except: # Case B: code local but not yet a pipmodule. pass if not self.remote: # only do this if we don't already have it from remote self.homePage = metadata.get('Home-page', "") self.summary = metadata.get('Summary', "") self.author = metadata.get('Author', "") self.email = metadata.get('Author-email', "")
[docs] def getPluginClass(self): """ Tries to find the Plugin object.""" pluginModule = self._getPlugin() if pluginModule is not None: pluginClass = pluginModule._pluginInstance else: print("Warning: couldn't find Plugin for %s" % self.pipName) print("Dirname: %s" % self.getDirName()) pluginClass = None return pluginClass
[docs] def getInstallenv(self, envArgs=None): """Reads the defineBinaries function from Plugin class and returns an Environment object with the plugin's binaries.""" if envArgs is None: envArgs = dict() env = Environment(**envArgs) env.setDefault(False) plugin = self.getPluginClass() if plugin is not None: try: plugin.defineBinaries(env) except Exception as e: print("Couldn't get binaries definition of %s plugin: %s" % (self.name, e)) import traceback traceback.print_exc() return env else: return None
[docs] def getBinVersions(self): """Get list with names of binaries of this plugin""" env = Environment() env.setDefault(False) defaultTargets = [target.getName() for target in env.getTargetList()] plugin = self.getPluginClass() if plugin is not None: try: plugin.defineBinaries(env) except Exception as e: print( redStr("Error retrieving plugin %s binaries: " % self.name), e) binVersions = [target.getName() for target in env.getTargetList() if target.getName() not in defaultTargets] return binVersions
[docs] def getDirName(self): """Get the name of the folder that contains the plugin code itself (e.g. to import the _plugin object.)""" if not self.dirName: try: dist = self.getDistribution() # Here, at least there are 2 cases: normal mode or editable mode. # Normal mode have __init__.py files. We will look for the first one to get the dirname # editable mode is more indirect. We need the top_level.txt to get the dirname but also # since code is somewhere else, we need to find the folder where the code is using "direct_url.json" for path in dist.files: if path.name == "__init__.py": self.dirName = path.parts[0] break elif path.name == "top_level.txt": logger.debug("Package %s in editable mode." % self.pipName) self.dirName = self._getDirNameFromTopLevel(path) elif path.name == "direct_url.json": self._addPluginPathToModules(path) except Exception as e: logger.debug("getDirName does not work for %s: %s" % (self.pipName, e)) return self.dirName
def _getDirNameFromTopLevel(self, path): topLevel = str(path.locate()) with open(topLevel) as fh: line = fh.readline().strip() return line def _addPluginPathToModules(self, path): logger.warning("Getting actual path for %s" % self.pipName) urlJson = str(path.locate()) with open(urlJson) as fh: jsonObj = json.load(fh) url= jsonObj["url"] url = urlparse(url) file_path = url2pathname(url.path) logger.warning("Path found: %s" % file_path) sys.path.append(file_path)
[docs] def printBinInfoStr(self): """Returns string with info of binaries installed to print in console with flag --help""" try: env = self.getInstallenv() return env.printHelp().split('\n', 1)[1] except IndexError as noBins: return " ".rjust(14) + "No binaries information defined.\n" except Exception as e: return " ".rjust(14) + "Error getting binaries info: %s" % \ str(e) + "\n"
[docs] def getPluginName(self): """Return the plugin name""" return self.name
[docs] def getPipName(self): """Return the plugin pip name""" return self.pipName
[docs] def getPipVersion(self): """Return the plugin pip version""" return self.pipVersion
[docs] def getSourceUrl(self): """Return the plugin source url""" return self.pluginSourceUrl
[docs] def getHomePage(self): """Return the plugin Home page""" return self.homePage
[docs] def getSummary(self): """Return the plugin summary""" return self.summary
[docs] def getAuthor(self): """Return the plugin author""" return self.author
[docs] def getReleaseDate(self, release): """Return the uploaded date from the release""" return self.compatibleReleases[release]['upload_time']
[docs] def getLatestRelease(self): """Get the plugin latest release""" return self.latestRelease
[docs]class PluginRepository(object): def __init__(self, repoUrl=REPOSITORY_URL): self.repoUrl = repoUrl self.plugins = None
[docs] @staticmethod def getBinToPluginDict(): localPlugins = Domain.getPlugins() binToPluginDict = {} for p, pobj in localPlugins.items(): pinfo = PluginInfo(name=p, plugin=pobj, remote=False) pbins = pinfo.getBinVersions() binToPluginDict.update({k: p for k in pbins}) pbinsNoVersion = set([b.split('-', 1)[0] for b in pbins]) binToPluginDict.update({k: p for k in pbinsNoVersion}) return binToPluginDict
[docs] def getPlugins(self, pluginList=None, getPipData=False): """Reads available plugins from self.repoUrl and returns a dict with PluginInfo objects. Params: - pluginList: A list with specific plugin pip-names we want to get. - getPipData: If true, each PluginInfo object will try to get the data of the plugin from pypi.""" pluginsJson = {} if self.plugins is None: self.plugins = {} if os.path.isfile(self.repoUrl): with open(self.repoUrl) as f: pluginsJson = json.load(f) getPipData = False else: try: r = requests.get(self.repoUrl) getPipData = True except requests.ConnectionError as e: print("\nWARNING: Error while trying to connect with a server:\n" " > Please, check your internet connection!\n") print(e) return self.plugins if r.ok: pluginsJson = r.json() else: print("WARNING: Can't get Scipion's plugin list, the plugin " "repository is not available") return self.plugins availablePlugins = pluginsJson.keys() if pluginList is None: targetPlugins = availablePlugins else: targetPlugins = set(availablePlugins).intersection(set(pluginList)) if len(targetPlugins) < len(pluginList): wrongPluginNames = set(pluginList) - set(availablePlugins) print("WARNING - The following plugins didn't match available " "plugin names:") print(" ".join(wrongPluginNames)) print("You can see the list of available plugins with the following command:\n" "scipion installp --help") for pluginName in targetPlugins: pluginsJson[pluginName].update(remote=getPipData) pluginInfo = PluginInfo(**pluginsJson[pluginName]) if pluginInfo.getLatestRelease() != NULL_VERSION: self.plugins[pluginName] = pluginInfo return self.plugins
[docs] def printPluginInfoStr(self, withBins=False, withUpdates=False): """Returns string to print in console which plugins are installed. :param withBins: If true, will add binary info for the plugins installed :param withUpdates: If true, will check if the installed plugins have new releases. :return A string with the plugin information summarized""" def ansi(n): """Return function that escapes text with ANSI color n.""" return lambda txt: '\x1b[%dm%s\x1b[0m' % (n, txt) black, red, green, yellow, blue, magenta, cyan, white = map(ansi, range(30, 38)) printStr = "" pluginDict = self.getPlugins(getPipData=withUpdates) if pluginDict: withBinsStr = "Installed plugins and their %s" % green("binaries") \ if withBins else "Available plugins" printStr += ("%s: " "([ ] not installed, [X] seems already installed)\n\n" % withBinsStr) keys = sorted(pluginDict.keys()) for name in keys: plugin = pluginDict[name] if withBins and not plugin.isInstalled(): continue printStr += "{0:30} {1:10} [{2}]".format(name, plugin.pipVersion, 'X' if plugin.isInstalled() else ' ') if withUpdates and plugin.isInstalled(): if plugin.latestRelease != plugin.pipVersion: printStr += yellow('\t(%s available)' % plugin.latestRelease) printStr += "\n" if withBins: printStr += green(plugin.printBinInfoStr()) else: printStr = "List of available plugins in plugin repository inaccessible at this time." return printStr
[docs]def installBinsDefault(): """ Returns the default behaviour for installing binaries By default it is TRUE, define "SCIPION_DONT_INSTALL_BINARIES" to anything to deactivate binaries installation""" return os.environ.get("SCIPION_DONT_INSTALL_BINARIES", True) == True