import logging
logger = logging.getLogger(__name__)
import requests
import os
import re
import sys
import json
from packaging.version import Version
from urllib.request import url2pathname
from urllib.parse import urlparse
from .funcs import Environment
from pwem import Domain
from pyworkflow.utils import redStr, yellowStr
from pyworkflow.utils.path import cleanPath
from pyworkflow import LAST_VERSION, CORE_VERSION, Config
import importlib_metadata
NULL_VERSION = "0.0.0"
# This constant is used in order to install all plugins taking into account a
# json file
DEVEL_VERSION = "999.9.9"
REPOSITORY_URL = Config.SCIPION_PLUGIN_JSON
if REPOSITORY_URL is None:
REPOSITORY_URL = Config.SCIPION_PLUGIN_REPO_URL
PIP_BASE_URL = 'https://pypi.python.org/pypi'
PIP_CMD = '{0} -m pip install %(installSrc)s'.format(
Environment.getPython())
PIP_UNINSTALL_CMD = '{0} -m pip uninstall -y %s'.format(
Environment.getPython())
[docs]class PluginInfo(object):
def __init__(self, pipName="", name="", pluginSourceUrl="", remote=True,
plugin=None, **kwargs):
self.pipName = pipName
self.name = name
self.pluginSourceUrl = pluginSourceUrl
self.remote = remote
# things from pypi
self.homePage = ""
self.summary = ""
self.author = ""
self.email = ""
self.compatibleReleases = {}
self.latestRelease = ""
# things we have when installed
self.dirName = ""
self.pipVersion = ""
self.binVersions = []
self.pluginEnv = None
# Distribution
self._dist = None
self._plugin = plugin
if self.remote:
self.setRemotePluginInfo()
else:
self.setFakedRemotePluginInfo()
self.setLocalPluginInfo() # get local info if installed
# ###################### Install funcs ############################
[docs] def install(self):
"""Installs both pip module and default binaries of
the plugin"""
self.installPipModule()
self.installBin()
self.setLocalPluginInfo()
[docs] def getDistribution(self):
if self._dist is None:
try:
self._dist = importlib_metadata.distribution(self.pipName)
except Exception as e:
logger.debug("Distribution not found: %s" % e)
pass
return self._dist
def _getPlugin(self):
if self._plugin is None:
try:
dirname = self.getDirName()
if dirname: ## For non installed plugins getDirName does return "". Avoid asking for the plugin
self._plugin = Config.getDomain().getPluginModule(dirname)
except Exception as e:
logger.debug("Can't get the plugin for %s( dir name:%s )." % (self.pipName, dirname))
pass
return self._plugin
[docs] def hasPipPackage(self):
"""Checks if the current plugin is installed via pip"""
return self.getDistribution() is not None
[docs] def isInstalled(self):
"""Checks if the current plugin is installed (i.e. has pip package).
NOTE: we might want to change definition of isInstalled, hence the extra function."""
# This is too expensive: reload(pkg_resources)
return self.hasPipPackage()
[docs] def installPipModule(self, version=""):
"""Installs the version specified of the pip plugin, as long as it is compatible
with the current Scipion version. If no version specified, will install latest
compatible one."""
environment = Environment()
if not version:
version = self.latestRelease
elif version not in self.compatibleReleases:
if self.compatibleReleases:
print('%s version %s not compatible with current Scipion '
'version %s.' % (self.pipName, version, LAST_VERSION))
print("Please choose a compatible release: %s" % " ".join(
self.compatibleReleases.keys()))
else:
print("%s has no compatible versions with current Scipion "
"version %s." % (self.pipName, LAST_VERSION))
return False
if version == NULL_VERSION:
print("Plugin %s is not available for this Scipion %s yet" % (self.pipName, LAST_VERSION))
return False
if self.pluginSourceUrl:
if os.path.exists(self.pluginSourceUrl):
# install from dir in editable mode
installSrc = '-e %s' % self.pluginSourceUrl
target = "%s*" % self.pipName
if os.path.exists(os.path.join(self.pluginSourceUrl, 'pyproject.toml')):
target = target.replace('-', '_')
else:
# path doesn't exist, we assume is git and force install
installSrc = '--upgrade git+%s' % self.pluginSourceUrl
target = "%s*" % self.pipName.replace('-', '_')
else:
# install from pypi
installSrc = "%s==%s" % (self.pipName, version)
target = "%s*" % self.pipName.replace('-', '_')
cmd = PIP_CMD % {'installSrc': installSrc}
environment.addPipModule(self.pipName,
target=target,
pipCmd=cmd)
# Install the package/plugin
environment.execute()
# if plugin was already installed, we need to clean cached elements
self._dist = None # Trigger reloading the distribution
# Not needed? reload(pkg_resources)
self.dirName = self.getDirName()
Domain.refreshPlugin(self.dirName)
return True
[docs] def installBin(self, args=None):
"""Install binaries of the plugin. Args is the list of args to be
passed to the install environment."""
environment = self.getInstallenv(envArgs=args)
environment.execute()
[docs] def uninstallBins(self, binList=None):
"""Uninstall binaries of the plugin.
:param binList: if given, will uninstall the binaries in it. The binList
may contain strings with only the name of the binary or
name and version in the format name-version
:returns None
"""
if binList is None:
binList = self.binVersions
binFolder = Environment.getEmFolder()
for binVersion in binList:
f = os.path.join(binFolder, binVersion)
if os.path.exists(f):
print('Removing %s binaries...' % binVersion)
realPath = os.path.realpath(f) # in case it's a link
cleanPath(f, realPath)
print('Binary %s has been uninstalled successfully ' % binVersion)
else:
print('The binary %s does not exist ' % binVersion)
return
[docs] def uninstallPip(self):
"""Removes pip package from site-packages"""
print('Removing %s plugin...' % self.pipName)
import subprocess
args = (PIP_UNINSTALL_CMD % self.pipName).split()
subprocess.call(PIP_UNINSTALL_CMD % self.pipName, shell=True,
stdout=sys.stdout,
stderr=sys.stderr)
# ###################### Remote data funcs ############################
[docs] def getPipJsonData(self):
""""Request json data from pypi, return json content"""
pipData = requests.get("%s/%s/json" % (PIP_BASE_URL, self.pipName))
if pipData.ok:
pipData = pipData.json()
return pipData
else:
print("Warning: Couldn't get remote plugin data for %s" % self.pipName)
return {}
[docs] def getCompatiblePipReleases(self, pipJsonData=None):
"""Get pip releases of this plugin that are compatible with
current Scipion version. Returns dict with all compatible releases and
a special key "latest" with the most recent one."""
if pipJsonData is None:
pipJsonData = self.getPipJsonData()
reg = r'scipion-([\d.]*\d)'
releases = {}
latestCompRelease = NULL_VERSION
for release, releaseData in pipJsonData['releases'].items():
releaseData = releaseData[0]
scipionVersions = [Version(v)
for v in re.findall(reg,
releaseData['comment_text'])]
if len(scipionVersions) != 0:
releases[release] = releaseData
if any([v == Version(CORE_VERSION)
for v in scipionVersions]):
if Version(latestCompRelease) < Version(release):
latestCompRelease = release
else:
print(yellowStr("WARNING: %s's release %s did not specify a "
"compatible Scipion version. Please, remove this "
"release from pypi") % (self.pipName, release))
releases['latest'] = latestCompRelease
return releases
[docs] def setRemotePluginInfo(self):
"""Sets value for the attributes that need to be obtained from pypi"""
pipData = self.getPipJsonData()
if not pipData:
return
info = pipData['info']
releases = self.getCompatiblePipReleases(pipJsonData=pipData)
self.homePage = self.getPipData(info, ['home_page', 'project_urls.Homepage'])
self.summary = self.getPipData(info, ['summary'])
self.author = self.getPipData(info, ['author', 'author_email'])
self.email = self.getPipData(info, ['author_email'])
self.compatibleReleases = releases
self.latestRelease = releases['latest']
[docs] def setFakedRemotePluginInfo(self):
"""Sets value for the attributes that need to be obtained from json file"""
self.homePage = self.pluginSourceUrl
self.compatibleReleases = {DEVEL_VERSION: {'upload_time': ' devel_mode'}}
self.latestRelease = DEVEL_VERSION
self.author = ' Developer mode'
[docs] def getPipData(self, info, keys):
"""
Extracts a value from a dictionary (info) based on the primary key.
Supports nested keys using dot notation (e.g., "project_urls.Homepage").
:param info: Dictionary containing data (e.g., pip info).
:param keys: List of keys to extract the value (dot notation for nested keys).
:return: The extracted value or '' if no keys match.
"""
def getNestedValue(data, key):
keysList = key.split(".")
for k in keysList:
if isinstance(data, dict) and k in data:
data = data[k]
else:
return None
return data
# Try backup keys if the main key doesn't yield a value
for key in keys:
value = getNestedValue(info, key)
if value:
return value
return ' '
# ###################### Local data funcs ############################
[docs] def setLocalPluginInfo(self):
"""Sets value for the attributes that can be obtained locally if the
plugin is installed."""
if self.isInstalled():
metadata = {}
# Take into account 2 cases here:
# A.: plugin is a proper pipmodule and is installed as such
# B.: Plugin is not yet a pipmodule but a local folder.
try:
def getValueFromMetadata(prodKey, altKey):
"""Gets the value from the metadata dictionary using the prodKey (package in production)
if key does not exist uses the altKey
:param prodKey: key in the metadata to get the value
:param altKey: alternative key in case prodKey does not exist"""
if prodKey in jsonMetadata:
return jsonMetadata[prodKey]
else:
try:
value = jsonMetadata[altKey]
if isinstance(value, str):
# author_email
return value
else:
# Project_url: ['Homepage, https://github.com/scipion-em/scipion-em-warp', 'Issues, https://github.com/scipion-em/scipion-em-warp/issues']
return value[0].split()[1]
except Exception as e:
logger.debug("Can't get %s value for %s package." % (altKey, self.pipName))
return altKey
jsonMetadata = self.getDistribution().metadata.json
self.summary = jsonMetadata.get("summary", "")
self.author = getValueFromMetadata("author", "author_email")
self.homePage = getValueFromMetadata("home_page", "project_url")
self.email = jsonMetadata.get("author_email", "")
self.pipVersion = self._dist.version
self.dirName = self.getDirName()
self.binVersions = self.getBinVersions()
except:
# Case B: code local but not yet a pipmodule.
pass
if not self.remote:
# only do this if we don't already have it from remote
self.homePage = metadata.get('Home-page', "")
self.summary = metadata.get('Summary', "")
self.author = metadata.get('Author', "")
self.email = metadata.get('Author-email', "")
[docs] def getPluginClass(self):
""" Tries to find the Plugin object."""
pluginModule = self._getPlugin()
if pluginModule is not None:
pluginClass = pluginModule._pluginInstance
else:
print("Warning: couldn't find Plugin for %s" % self.pipName)
print("Dirname: %s" % self.getDirName())
pluginClass = None
return pluginClass
[docs] def getInstallenv(self, envArgs=None):
"""Reads the defineBinaries function from Plugin class and returns an
Environment object with the plugin's binaries."""
if envArgs is None:
envArgs = dict()
env = Environment(**envArgs)
env.setDefault(False)
plugin = self.getPluginClass()
if plugin is not None:
try:
plugin.defineBinaries(env)
except Exception as e:
print("Couldn't get binaries definition of %s plugin: %s" % (self.name, e))
import traceback
traceback.print_exc()
return env
else:
return None
[docs] def getBinVersions(self):
"""Get list with names of binaries of this plugin"""
env = Environment()
env.setDefault(False)
defaultTargets = [target.getName() for target in env.getTargetList()]
plugin = self.getPluginClass()
if plugin is not None:
try:
plugin.defineBinaries(env)
except Exception as e:
print(
redStr("Error retrieving plugin %s binaries: " % self.name), e)
binVersions = [target.getName() for target in env.getTargetList() if target.getName() not in defaultTargets]
return binVersions
[docs] def getDirName(self):
"""Get the name of the folder that contains the plugin code
itself (e.g. to import the _plugin object.)"""
if not self.dirName:
try:
dist = self.getDistribution()
# Here, at least there are 2 cases: normal mode or editable mode.
# Normal mode have __init__.py files. We will look for the first one to get the dirname
# editable mode is more indirect. We need the top_level.txt to get the dirname but also
# since code is somewhere else, we need to find the folder where the code is using "direct_url.json"
for path in dist.files:
if path.name == "__init__.py":
self.dirName = path.parts[0]
break
elif path.name == "top_level.txt":
logger.debug("Package %s in editable mode." % self.pipName)
self.dirName = self._getDirNameFromTopLevel(path)
elif path.name == "direct_url.json":
self._addPluginPathToModules(path)
except Exception as e:
logger.debug("getDirName does not work for %s: %s" % (self.pipName, e))
return self.dirName
def _getDirNameFromTopLevel(self, path):
topLevel = str(path.locate())
with open(topLevel) as fh:
line = fh.readline().strip()
return line
def _addPluginPathToModules(self, path):
logger.warning("Getting actual path for %s" % self.pipName)
urlJson = str(path.locate())
with open(urlJson) as fh:
jsonObj = json.load(fh)
url= jsonObj["url"]
url = urlparse(url)
file_path = url2pathname(url.path)
logger.warning("Path found: %s" % file_path)
sys.path.append(file_path)
[docs] def printBinInfoStr(self):
"""Returns string with info of binaries installed to print in console
with flag --help"""
try:
env = self.getInstallenv()
return env.printHelp().split('\n', 1)[1]
except IndexError as noBins:
return " ".rjust(14) + "No binaries information defined.\n"
except Exception as e:
return " ".rjust(14) + "Error getting binaries info: %s" % \
str(e) + "\n"
[docs] def getPluginName(self):
"""Return the plugin name"""
return self.name
[docs] def getPipName(self):
"""Return the plugin pip name"""
return self.pipName
[docs] def getPipVersion(self):
"""Return the plugin pip version"""
return self.pipVersion
[docs] def getSourceUrl(self):
"""Return the plugin source url"""
return self.pluginSourceUrl
[docs] def getHomePage(self):
"""Return the plugin Home page"""
return self.homePage
[docs] def getSummary(self):
"""Return the plugin summary"""
return self.summary
[docs] def getAuthor(self):
"""Return the plugin author"""
return self.author
[docs] def getReleaseDate(self, release):
"""Return the uploaded date from the release"""
return self.compatibleReleases[release]['upload_time']
[docs] def getLatestRelease(self):
"""Get the plugin latest release"""
return self.latestRelease
[docs]class PluginRepository(object):
def __init__(self, repoUrl=REPOSITORY_URL):
self.repoUrl = repoUrl
self.plugins = None
[docs] @staticmethod
def getBinToPluginDict():
localPlugins = Domain.getPlugins()
binToPluginDict = {}
for p, pobj in localPlugins.items():
pinfo = PluginInfo(name=p, plugin=pobj, remote=False)
pbins = pinfo.getBinVersions()
binToPluginDict.update({k: p for k in pbins})
pbinsNoVersion = set([b.split('-', 1)[0] for b in pbins])
binToPluginDict.update({k: p for k in pbinsNoVersion})
return binToPluginDict
[docs] def getPlugins(self, pluginList=None, getPipData=False):
"""Reads available plugins from self.repoUrl and returns a dict with
PluginInfo objects. Params:
- pluginList: A list with specific plugin pip-names we want to get.
- getPipData: If true, each PluginInfo object will try to get the data
of the plugin from pypi."""
pluginsJson = {}
if self.plugins is None:
self.plugins = {}
if os.path.isfile(self.repoUrl):
with open(self.repoUrl) as f:
pluginsJson = json.load(f)
getPipData = False
else:
try:
r = requests.get(self.repoUrl)
getPipData = True
except requests.ConnectionError as e:
print("\nWARNING: Error while trying to connect with a server:\n"
" > Please, check your internet connection!\n")
print(e)
return self.plugins
if r.ok:
pluginsJson = r.json()
else:
print("WARNING: Can't get Scipion's plugin list, the plugin "
"repository is not available")
return self.plugins
availablePlugins = pluginsJson.keys()
if pluginList is None:
targetPlugins = availablePlugins
else:
targetPlugins = set(availablePlugins).intersection(set(pluginList))
if len(targetPlugins) < len(pluginList):
wrongPluginNames = set(pluginList) - set(availablePlugins)
print("WARNING - The following plugins didn't match available "
"plugin names:")
print(" ".join(wrongPluginNames))
print("You can see the list of available plugins with the following command:\n"
"scipion installp --help")
for pluginName in targetPlugins:
pluginsJson[pluginName].update(remote=getPipData)
pluginInfo = PluginInfo(**pluginsJson[pluginName])
if pluginInfo.getLatestRelease() != NULL_VERSION:
self.plugins[pluginName] = pluginInfo
return self.plugins
[docs] def printPluginInfoStr(self, withBins=False, withUpdates=False):
"""Returns string to print in console which plugins are installed.
:param withBins: If true, will add binary info for the plugins installed
:param withUpdates: If true, will check if the installed plugins have new releases.
:return A string with the plugin information summarized"""
def ansi(n):
"""Return function that escapes text with ANSI color n."""
return lambda txt: '\x1b[%dm%s\x1b[0m' % (n, txt)
black, red, green, yellow, blue, magenta, cyan, white = map(ansi,
range(30, 38))
printStr = ""
pluginDict = self.getPlugins(getPipData=withUpdates)
if pluginDict:
withBinsStr = "Installed plugins and their %s" % green("binaries") \
if withBins else "Available plugins"
printStr += ("%s: "
"([ ] not installed, [X] seems already installed)\n\n" % withBinsStr)
keys = sorted(pluginDict.keys())
for name in keys:
plugin = pluginDict[name]
if withBins and not plugin.isInstalled():
continue
printStr += "{0:30} {1:10} [{2}]".format(name,
plugin.pipVersion,
'X' if plugin.isInstalled() else ' ')
if withUpdates and plugin.isInstalled():
if plugin.latestRelease != plugin.pipVersion:
printStr += yellow('\t(%s available)' % plugin.latestRelease)
printStr += "\n"
if withBins:
printStr += green(plugin.printBinInfoStr())
else:
printStr = "List of available plugins in plugin repository inaccessible at this time."
return printStr
[docs]def installBinsDefault():
""" Returns the default behaviour for installing binaries
By default it is TRUE, define "SCIPION_DONT_INSTALL_BINARIES" to anything to deactivate binaries installation"""
return os.environ.get("SCIPION_DONT_INSTALL_BINARIES", True) == True