Source code for pyworkflow.utils.utils

# **************************************************************************
# *
# * Authors:     J.M. De la Rosa Trevin (delarosatrevin@scilifelab.se) [1]
# *
# * [1] SciLifeLab, Stockholm University
# *
# * This program is free software: you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation, either version 3 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program.  If not, see <https://www.gnu.org/licenses/>.
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'scipion@cnb.csic.es'
# *
# **************************************************************************

import sys
import os
import re
from datetime import datetime
import traceback
from enum import Enum

import bibtexparser
import numpy as np
import math

from pyworkflow import Config


[docs]def prettyDate(time=False): """ Get a datetime object or a int() Epoch timestamp and return a pretty string like 'an hour ago', 'Yesterday', '3 months ago', 'just now', etc """ now = datetime.now() if type(time) is int: diff = now - datetime.fromtimestamp(time) elif type(time) is float: diff = now - datetime.fromtimestamp(int(time)) elif isinstance(time, datetime): diff = now - time elif not time: # Avoid now - now (sonar cloud bug) copy = now diff = now - copy second_diff = diff.seconds day_diff = diff.days if day_diff < 0: return '' if day_diff == 0: if second_diff < 10: return "just now" if second_diff < 60: return str(second_diff) + " seconds ago" if second_diff < 120: return "a minute ago" if second_diff < 3600: return str(int(second_diff / 60)) + " minutes ago" if second_diff < 7200: return "an hour ago" if second_diff < 86400: return str(int(second_diff / 3600)) + " hours ago" if day_diff == 1: return "Yesterday" if day_diff < 7: return str(day_diff) + " days ago" if day_diff < 31: return str(int(day_diff / 7)) + " weeks ago" if day_diff < 365: return str(int(day_diff / 30)) + " months ago" return str(int(day_diff / 365)) + " years ago"
[docs]def dateStr(dt=None, time=True, secs=False, dateFormat=None): """ Get a normal string representation of datetime. If dt is None, use NOW. """ if dt is None: dt = datetime.now() elif isinstance(dt, float) or isinstance(dt, int): dt = datetime.fromtimestamp(dt) if dateFormat is None: dateFormat = '%d-%m-%Y' if time: dateFormat += ' %H:%M' if secs: dateFormat += ':%S' return dt.strftime(dateFormat)
prettyTime = dateStr
[docs]def prettyTimestamp(dt=None, format='%Y-%m-%d_%H%M%S'): if dt is None: dt = datetime.now() return dt.strftime(format)
[docs]def prettySize(size): """ Human friendly file size. """ unit_list = list(zip(['bytes', 'kB', 'MB', 'GB', 'TB', 'PB'], [0, 0, 1, 2, 2, 2])) if size > 1: exponent = min(int(math.log(size, 1024)), len(unit_list) - 1) quotient = float(size) / 1024 ** exponent unit, num_decimals = unit_list[exponent] format_string = '{:.%sf} {}' % num_decimals return format_string.format(quotient, unit) if size == 0: return '0 bytes' if size == 1: return '1 byte'
[docs]def prettyDelta(timedelta): """ Remove the milliseconds of the timedelta. """ return str(timedelta).split('.')[0]
[docs]class UtcConverter: """ Class to make date conversions to utc""" utc_delta = datetime.utcnow() - datetime.now() def __call__(cls, t): return t + cls.utc_delta
# Use to_utc like a function: to_utc(date) to_utc = UtcConverter()
[docs]def prettyLog(msg): print(cyan(prettyTime(datetime.now(), secs=True)), msg)
[docs]class Timer(object): """ Simple Timer base in datetime.now and timedelta. """
[docs] def tic(self): self._dt = datetime.now()
[docs] def getElapsedTime(self): return datetime.now() - self._dt
[docs] def toc(self, message='Elapsed:'): print(message, self.getElapsedTime())
[docs] def getToc(self): return prettyDelta(self.getElapsedTime())
def __enter__(self): self.tic() def __exit__(self, type, value, traceback): self.toc()
[docs]def timeit(func): """ Decorator function to have a simple measurement of the execution time of a given function. Just use: @timeit def func(...) ... to use it. """ def timedFunc(*args, **kwargs): t = Timer() t.tic() result = func(*args, **kwargs) t.toc("Function '%s' took" % func) return result return timedFunc
[docs]def trace(nlevels, separator=' --> ', stream=sys.stdout): # Example: # @trace(3) # def doRefresh(... # gives as output whenever doRefresh is called lines like: # text.py:486 _addFileTab --> text.py:330 __init__ --> doRefresh def realTrace(f): """ Decorator function to print stack call in a human-readable way. """ def tracedFunc(*args, **kwargs): stack = traceback.extract_stack()[-nlevels - 1:-1] fmt = lambda x: '%s:%d %s' % (os.path.basename(x[0]), x[1], x[2]) stList = list(map(fmt, stack)) stream.write(separator.join(stList + [f.__name__]) + '\n') return f(*args, **kwargs) return tracedFunc return realTrace
[docs]def prettyDict(d): print("{") for k, v in d.items(): print(" %s: %s" % (k, v)) print("}")
[docs]def prettyXml(elem, level=0): """ Add indentation for XML elements for more human readable text. """ i = "\n" + level * " " if len(elem): if not elem.text or not elem.text.strip(): elem.text = i + " " if not elem.tail or not elem.tail.strip(): elem.tail = i for _elem in elem: prettyXml(_elem, level + 1) if not _elem.tail or not _elem.tail.strip(): _elem.tail = i
[docs]def getUniqueItems(originalList): """ Method to remove repeated items from one list originalList -- Original list with repeated items, or not. returns -- New list with the content of original list without repeated items """ auxDict = {} resultList = [auxDict.setdefault(x, x) for x in originalList if x not in auxDict] return resultList
[docs]def executeRemoteX(command, hostName, userName, password): """ Execute a remote command with X11 forwarding. Params: command: Command to execute. hostName: Remote host name. userName: User name. password: Password. Returns: Tuple with standard output and error output. """ scriptPath = os.path.abspath(os.path.join(os.path.dirname(__file__), "sshAskPass.sh")) pswCommand = "echo '" + password + "' | " + scriptPath + " ssh -X " + userName + "@" + hostName + " " + command import subprocess p = subprocess.Popen(pswCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() return stdout, stderr
[docs]def executeRemote(command, hostName, userName, password): """ Execute a remote command. Params: command: Command to execute. hostName: Remote host name. userName: User name. password: Password. Returns: Tuple with standard input, standard output and error output. """ import paramiko ssh = paramiko.SSHClient() ssh.load_system_host_keys() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostName, 22, userName, password) stdin, stdout, stderr = ssh.exec_command(command) ssh.close() return stdin, stdout, stderr
[docs]def executeLongRemote(command, hostName, userName, password): """ Execute a remote command. Params: command: Command to execute. hostName: Remote host name. userName: User name. password: Password. Returns: Tuple with standard input, standard output and error output. """ import paramiko import select ssh = paramiko.SSHClient() ssh.load_system_host_keys() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostName, 22, userName, password) transport = ssh.get_transport() channel = transport.open_session() channel.exec_command(command) while True: if channel.exit_status_ready(): break rl, wl, xl = select.select([channel], [], [], 0.0) if len(rl) > 0: print(channel.recv(1024))
[docs]def getLocalUserName(): """ Recover local machine user name. returns: Local machine user name. """ import getpass return getpass.getuser()
[docs]def getLocalHostName(): return getHostName()
[docs]def getHostName(): """ Return the name of the local machine. """ import socket return socket.gethostname()
[docs]def getHostFullName(): """ Return the fully-qualified name of the local machine. """ import socket return socket.getfqdn()
[docs]def isInFile(text, filePath): """ Checks if given text is in the given file. params: text: Text to check. filePath : File path to check. returns: True if the given text is in the given file, False if it is not in the file. """ return any(text in line for line in open(filePath))
[docs]def getLineInFile(text, fileName): """ Find the line where the given text is located in the given file. params: text: Text to check. filePath : File path to check. returns: File number where the text was located. """ with open(fileName) as f: for i, line in enumerate(f): if text in line: return i + 1 return None
# ------------- Colored message strings -----------------------------
[docs]class StrColors(Enum): gray = 30 red = 31 green = 32 yellow = 33 blue = 34 magenta = 35 cyan = 36
[docs]def getColorStr(text, color, bold=False): """ Add ANSI color codes to the string if there is a terminal sys.stdout. Params: text: text to be colored color: red or green bold: bold the text """ if envVarOn('SCIPION_SAFE_COLORS') and not sys.stdout.isatty(): return text attr = [str(color.value)] if bold: attr.append('1') return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), text)
[docs]def grayStr(text): return getColorStr(text, color=StrColors.gray)
[docs]def redStr(text): return getColorStr(text, color=StrColors.red)
[docs]def greenStr(text): return getColorStr(text, color=StrColors.green)
[docs]def yellowStr(text): return getColorStr(text, color=StrColors.yellow)
[docs]def blueStr(text): return getColorStr(text, color=StrColors.blue)
[docs]def magentaStr(text): return getColorStr(text, color=StrColors.magenta)
[docs]def cyanStr(text): return getColorStr(text, color=StrColors.cyan)
[docs]def ansi(n, bold=False): """Return function that escapes text with ANSI color n.""" return lambda txt: '\x1b[%d%sm%s\x1b[0m' % (n, ';1' if bold else '', txt)
black, red, green, yellow, blue, magenta, cyan, white = map(ansi, range(30, 38)) blackB, redB, greenB, yellowB, blueB, magentaB, cyanB, whiteB = [ ansi(i, bold=True) for i in range(30, 38)] # -------------- Hyper text highlighting ---------------------------- """ We use a subset of TWiki hyper text conventions. In particular: *some_text* will display some_text in bold _some_text_ will display some_text in italic Links: http://www.link-page.com -> hyperlink using the url as label [[http://www.link-page.com][Link page]] -> hyperlink using "Link page" as label """ # Types of recognized styles HYPER_BOLD = 'bold' HYPER_ITALIC = 'italic' HYPER_LINK1 = 'link1' HYPER_SCIPION_OPEN = 'sci-open' HYPER_LINK2 = 'link2' HYPER_ALL = 'all' # Associated regular expressions PATTERN_BOLD = "(^|[\s])[*](?P<bold>[^\s*][^*]*[^\s*]|[^\s*])[*]" # PATTERN_BOLD = r"[\s]+[*]([^\s][^*]+[^\s])[*][\s]+" PATTERN_ITALIC = "(^|[\s])[_](?P<italic>[^\s_][^_]*[^\s_]|[^\s_])[_]" # PATTERN_ITALIC = r"[\s]+[_]([^\s][^_]+[^\s])[_][\s]+" PATTERN_LINK1 = '(?P<link1>http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+#]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+)' PATTERN_LINK2 = "[\[]{2}(?P<link2>[^\s][^\]]+[^\s])[\]][\[](?P<link2_label>[^\s][^\]]+[^\s])[\]]{2}" # __PATTERN_LINK2 should be first since it could contain __PATTERN_LINK1 PATTERN_ALL = '|'.join([PATTERN_BOLD, PATTERN_ITALIC, PATTERN_LINK2, PATTERN_LINK1]) # Compiled regex # Not need now, each pattern compiled separately # HYPER_REGEX = { # HYPER_BOLD: re.compile(PATTERN_BOLD), # HYPER_ITALIC: re.compile(PATTERN_ITALIC), # HYPER_LINK1: re.compile(PATTERN_LINK1), # HYPER_LINK2: re.compile(PATTERN_LINK1), # } HYPER_ALL_RE = re.compile(PATTERN_ALL)
[docs]def parseHyperText(text, matchCallback): """ Parse the text recognizing Hyper definitions below. Params: matchCallback: a callback function to processing each matching, it should accept the type of match (HYPER_BOLD, ITALIC or LINK) Return: The input text with the replacements made by matchCallback """ def _match(match): """ Call the proper matchCallback with some extra info. """ m = match.group().strip() if m.startswith('*'): tag = HYPER_BOLD elif m.startswith('_'): tag = HYPER_ITALIC elif m.startswith('http'): tag = HYPER_LINK1 elif m.startswith('[['): tag = HYPER_LINK2 else: raise Exception("Bad prefix for HyperText match") return matchCallback(match, tag) return HYPER_ALL_RE.sub(_match, text)
# for hyperMode, hyperRegex in HYPER_REGEX.iteritems(): # text = hyperRegex.sub(lambda match: matchCallback(match, hyperMode), text) # # return text
[docs]class LazyDict(object): """ Dictionary to be initialized in the moment it is accessed for the first time. Initialization is done by a callback passed at instantiation""" def __init__(self, callback=dict): """ :param callback: method to initialize the dictionary. SHould return a dictionary""" self.data = None self.callback = callback
[docs] def evaluate_callback(self): self.data = self.callback()
def __getitem__(self, name): if self.data is None: self.evaluate_callback() return self.data.__getitem__(name) def __setitem__(self, name, value): if self.data is None: self.evaluate_callback() return self.data.__setitem__(name, value) def __getattr__(self, name): if self.data is None: self.evaluate_callback() return getattr(self.data, name) def __iter__(self): if self.data is None: self.evaluate_callback() return self.data.__iter__()
[docs]def parseBibTex(bibtexStr): """ Parse a bibtex file and return a dictionary. """ return bibtexparser.loads(bibtexStr).entries_dict
[docs]def isPower2(num): """ Return True if 'num' is a power of 2. """ return num != 0 and ((num & (num - 1)) == 0)
# --------------------------------------------------------------------------- # Parsing of arguments # ---------------------------------------------------------------------------
[docs]def getListFromRangeString(rangeStr): """ Create a list of integers from a string with range definitions. Examples: "1,5-8,10" -> [1,5,6,7,8,10] "2,6,9-11" -> [2,6,9,10,11] "2 5, 6-8" -> [2,5,6,7,8] """ # Split elements by command or space elements = re.split(',|\s', rangeStr) values = [] for e in elements: if '-' in e: limits = e.split('-') values += range(int(limits[0]), int(limits[1]) + 1) else: # If values are separated by comma also splitted values += map(int, e.split()) return values
[docs]def getRangeStringFromList(list): left = None right = None ranges = [] def addRange(): if left == right: # Single element ranges.append("%d" % right) else: ranges.append("%(left)d-%(right)d" % locals()) for item in list: if right is None: left = right = item else: if item == right + 1: right += 1 else: addRange() left = right = item addRange() return ','.join(ranges)
[docs]def getListFromValues(valuesStr, length=None): """ Convert a string representing list items into a list. The items should be separated by spaces and a multiplier 'x' can be used. If length is not None, then the last element will be repeated until the desired length is reached. Examples: '1 1 2x2 4 4' -> ['1', '1', '2', '2', '4', '4'] '2x3, 3x4, 1' -> ['3', '3', '4', '4', '4', '1'] """ result = [] for chunk in valuesStr.split(): values = chunk.split('x') n = len(values) if n == 1: # 'x' is not present in the chunk, single value result += values elif n == 2: # multiple the values by the number after 'x' result += [values[1]] * int(values[0]) else: raise Exception("More than one 'x' is not allowed in list string value.") # If length is passed, we fill the list with # the last element until length is reached if length is not None and length > len(result): item = result[-1] result += [item] * (length - len(result)) return result
[docs]def getFloatListFromValues(valuesStr, length=None): """ Convert a string to a list of floats""" return [float(v) for v in getListFromValues(valuesStr, length)]
[docs]def getBoolListFromValues(valuesStr, length=None): """ Convert a string to a list of booleans""" from pyworkflow.object import Boolean return [Boolean(value=v).get() for v in getListFromValues(valuesStr, length)]
[docs]def getStringListFromValues(valuesStr, length=None): """ Convert a string to a list of booleans""" from pyworkflow.object import String return [String(value=v).get() for v in getListFromValues(valuesStr, length)]
[docs]class Environ(dict): """ Some utilities to handle environment settings. """ REPLACE = 0 BEGIN = 1 END = 2
[docs] def getFirst(self, keys, mandatory=False): """ Return the value of the first key present in the environment. If none is found, returns the 'defaultValue' parameter. """ for k in keys: if k in self: return self.get(k) if mandatory: print("None of the variables: %s found in the Environment. " "Please check scipion.conf files." % (str(keys))) return None
[docs] def set(self, varName, varValue, position=REPLACE): """ Modify the value for some variable. Params: varName: for example LD_LIBRARY_PATH varValue: the value to add or replace. position: controls how the value will be changed. If REPLACE, it will overwrite the value of the var. BEGIN or END will preserve the current value and add (at begin or end) the new value. """ if varName in self and position != self.REPLACE: if position == self.BEGIN: self[varName] = varValue + os.pathsep + self[varName] elif position == self.END: self[varName] = self[varName] + os.pathsep + varValue else: self[varName] = varValue
[docs] def update(self, valuesDict, position=REPLACE): """ Use set for each key, value pair in valuesDict. """ for k, v in valuesDict.items(): self.set(k, v, position)
[docs] def addLibrary(self, libraryPath, position=BEGIN): """ Adds a path to LD_LIBRARY_PATH at the requested position if the provided paths exist. """ if libraryPath is None: return if existsVariablePaths(libraryPath): self.update({'LD_LIBRARY_PATH': libraryPath}, position=position) else: print("Some paths do not exist in: % s" % libraryPath)
[docs] def setPrepend(self, prepend): """ Use this method to set a prepend string that will be added at the beginning of any command that will be run in this environment. This can be useful for example when 'modules' need to be loaded and a simple environment variables setup is not enough. """ setattr(self, '__prepend', prepend)
[docs] def getPrepend(self): """ Return if there is any prepend value. See setPrepend function. """ return getattr(self, '__prepend', '')
[docs]def existsVariablePaths(variableValue): """ Check if the path (or paths) in variableValue exists. Multiple paths are allowed if separated by os.""" return all(os.path.exists(p) for p in variableValue.split(os.pathsep) if p.split())
[docs]def environAdd(varName, newValue, valueFirst=False): """ Add a new value to some environ variable. If valueFirst is true, the new value will be at the beginning. """ varList = [os.environ[varName]] i = 1 if valueFirst: i = 0 varList.insert(i, newValue) os.environ[varName] = os.pathsep.join(varList)
[docs]def envVarOn(varName, env=None): """ Is variable set to True in the environment? """ v = env.get(varName) if env else os.environ.get(varName) return strToBoolean(v)
[docs]def strToBoolean(string): return string is not None and string.lower() in ['true', 'yes', 'on', '1']
[docs]def getMemoryAvailable(): """ Return the total memory of the system in MB """ from psutil import virtual_memory return virtual_memory().total // 1024 ** 2
[docs]def startDebugger(password='a'): if Config.debugOn(): try: # FIXME: rpdb2 does not support python 3 from rpdb2 import start_embedded_debugger print("Starting debugger...") start_embedded_debugger(password) except Exception: print("Error importing rpdb2 debugging module, consider installing winpdb.")
[docs]def getFreePort(basePort=0, host=''): import socket port = 0 try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((host, basePort)) ipaddr, port = s.getsockname() s.close() except Exception as e: print(e) return 0 return port
[docs]def readProperties(propsFile): myprops = {} with open(propsFile, 'r') as f: for line in f: line = line.rstrip() # removes trailing whitespace and '\n' chars if "=" not in line: continue # skips blanks and comments w/o = if line.startswith("#"): continue # skips comments which contain = k, v = line.split("=", 1) myprops[k] = v return myprops
# ---------------------Color utils --------------------------
[docs]def hex_to_rgb(value): value = value.lstrip('#') lv = len(value) return tuple(int(value[i:i + lv // 3], 16) for i in range(0, lv, lv // 3))
[docs]def rgb_to_hex(rgb): return '#%02x%02x%02x' % rgb
[docs]def lighter(color, percent): """assumes color is rgb between (0, 0, 0) and (255, 255, 255)""" color = np.array(color) white = np.array([255, 255, 255]) vector = white - color return tuple(np.around(color + vector * percent))
[docs]def formatExceptionInfo(level=6): error_type, error_value, trbk = sys.exc_info() tb_list = traceback.format_tb(trbk, level) s = "Error: %s \nDescription: %s \nTraceback:" % (error_type.__name__, error_value) for i in tb_list: s += "\n" + i return s
[docs]def printTraceBack(): traceback.print_stack()
[docs]def getEnvVariable(variableName, default=None, exceptionMsg=None): """ Returns the value of an environment variable or raise an exception message. Useful when adding variable to the config file and report accurate messages""" value = os.getenv(variableName) if exceptionMsg is None: exceptionMsg = ("Environment variable %s not found. " "Please check scipion configuration. " "Try running : scipion config." % variableName) if value is None: if default is None: raise Exception(exceptionMsg) else: return default else: return value