# **************************************************************************
# *
# * Authors: Roberto Marabini (roberto@cnb.csic.es)
# *
# * Unidad de Bioinformatica of Centro Nacional de Biotecnologia , CSIC
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307 USA
# *
# * All comments concerning this program package may be sent to the
# * e-mail address 'scipion@cnb.csic.es'
# *
# **************************************************************************
import os
import sys
import time
import sqlite3 as lite
import datetime
import pytz
from configparser import ConfigParser
try:
import psutil
except ImportError:
print("Cannot import psutil module - this is needed for this application.")
print("Exiting...")
sys.exit()
from pyworkflow.utils import red
import pyworkflow.protocol.params as params
from pyworkflow import VERSION_1_1
from pyworkflow.protocol.constants import STATUS_RUNNING
from pyworkflow.protocol import getUpdatedProtocol
from pynvml import (nvmlInit, nvmlDeviceGetHandleByIndex,
nvmlDeviceGetMemoryInfo, nvmlDeviceGetUtilizationRates,
NVMLError, nvmlDeviceGetTemperature, NVML_TEMPERATURE_GPU,
nvmlDeviceGetComputeRunningProcesses)
from .protocol_monitor import ProtMonitor, Monitor
SYSTEM_LOG_SQLITE = 'system_log.sqlite'
[docs]def initGPU():
nvmlInit()
[docs]class ProtMonitorSystem(ProtMonitor):
""" check CPU, mem and IO usage.
"""
_label = 'system_monitor'
_lastUpdateVersion = VERSION_1_1
def __init__(self, **kwargs):
ProtMonitor.__init__(self, **kwargs)
self.dataBase = 'log.sqlite'
self.tableName = 'log'
# --------------------------- DEFINE param functions ---------------------
def _defineParams(self, form):
ProtMonitor._defineParams(self, form)
form.addParam('cpuAlert', params.FloatParam, default=101,
label="Raise Alarm if CPU > XX%",
help="Raise alarm if memory allocated is greater "
"than given percentage")
form.addParam('memAlert', params.FloatParam, default=101,
label="Raise Alarm if Memory > XX%",
help="Raise alarm if cpu allocated is greater "
"than given percentage")
form.addParam('swapAlert', params.FloatParam, default=101,
label="Raise Alarm if Swap > XX%",
help="Raise alarm if swap allocated is greater "
"than given percentage")
form.addParam('monitorTime', params.FloatParam, default=300,
label="Total Logging time (min)",
help="Log during this interval")
ProtMonitor._sendMailParams(self, form)
group = form.addGroup('GPU')
group.addParam('doGpu', params.BooleanParam, default=False,
label="Check GPU",
help="Set to true if you want to monitor the GPU")
group.addParam('gpusToUse', params.StringParam, default='0',
label='Which GPUs to use:', condition='doGpu',
help='Providing a list of which GPUs '
'(0,1,2,3, etc). Default is monitor GPU 0 only')
group = form.addGroup('GPU')
group.addParam('doGpu', params.BooleanParam, default=False,
label="Check GPU",
help="Set to true if you want to monitor the GPU")
group.addParam('gpusToUse', params.StringParam, default='0',
label='Which GPUs to use:', condition='doGpu',
help='Providing a list of which GPUs '
'(0,1,2,3, etc). Default is monitor GPU 0 only')
group = form.addGroup('NETWORK')
group.addParam('doNetwork', params.BooleanParam, default=False,
label="Check Network",
help="Set to true if you want to monitor the Network")
group.addParam('netInterfaces', params.EnumParam,
choices=MonitorSystem.getNifsNameList(),
default=1, # usually 0 is the loopback
label="Interface", condition='doNetwork',
help="Name of the network interface to be checked")
group = form.addGroup('Disk')
group.addParam('doDiskIO', params.BooleanParam, default=False,
label="Check Disk IO",
help="Set to true if you want to monitor the Disk "
"Access")
# --------------------------- STEPS functions ----------------------------
[docs] def monitorStep(self):
self.createMonitor().loop()
[docs] def createMonitor(self):
protocols = []
for protPointer in self.inputProtocols:
prot = protPointer.get()
prot.setProject(self.getProject())
protocols.append(prot)
sysMon = MonitorSystem(protocols,
workingDir=self.workingDir.get(),
samplingInterval=self.samplingInterval.get(),
monitorTime=self.monitorTime.get(),
email=self.createEmailNotifier(),
stdout=True,
cpuAlert=self.cpuAlert.get(),
memAlert=self.memAlert.get(),
swapAlert=self.swapAlert.get(),
doGpu=self.doGpu.get(),
doNetwork=self.doNetwork.get(),
doDiskIO=self.doDiskIO.get(),
nif=MonitorSystem.getNifsNameList()[
self.netInterfaces.get()],
gpusToUse=self.gpusToUse.get())
return sysMon
# --------------------------- INFO functions -----------------------------
def _validate(self):
# TODO if less than 20 sec complain
return [] # no errors
def _summary(self):
summary = []
summary.append("GPU running Processes:")
initGPU()
try:
gpusToUse = [int(n) for n in (self.gpusToUse.get()).split()]
for i in gpusToUse:
handle = nvmlDeviceGetHandleByIndex(i)
cps = nvmlDeviceGetComputeRunningProcesses(handle)
for ps in cps:
# p_tags['pid'] = ps.pid
msg = " %d) " % i + psutil.Process(ps.pid).name()
msg += " (mem =%.2f MB)" % (float(ps.usedGpuMemory) /
1048576.)
summary.append(msg)
except NVMLError as err:
summary.append(str(err))
return summary
def _methods(self):
return []
[docs]class MonitorSystem(Monitor):
""" This will will be monitoring a System protocol.
It will internally handle a database to store produced
system values.
"""
mega = 1048576.
_nifsNameList = None
[docs] @classmethod
def getNifsNameList(cls):
from . import getnifs
if cls._nifsNameList is None:
# get list with network interfaces
nifs = getnifs.get_network_interfaces()
cls._nifsNameList = [nif.getName() for nif in nifs]
return cls._nifsNameList
def __init__(self, protocols, influx=False, **kwargs):
Monitor.__init__(self, **kwargs)
self.protocols = protocols
self.cpuAlert = kwargs['cpuAlert']
self.memAlert = kwargs['memAlert']
self.swapAlert = kwargs['swapAlert']
self._dataBase = kwargs.get('dbName', SYSTEM_LOG_SQLITE)
self._tableName = kwargs.get('tableName', 'log')
self.doGpu = kwargs['doGpu']
self.doNetwork = kwargs['doNetwork']
self.doDiskIO = kwargs['doDiskIO']
self.samplingTime = 1. # seconds
self.labelList = ["cpu", "mem", "swap"]
if self.doGpu:
self.gpuLabelList = []
# get Gpus to monitor
self.gpusToUse = [int(n) for n in (kwargs['gpusToUse']).split()]
for i in self.gpusToUse:
self.gpuLabelList.append("gpuMem_%d" % i)
self.gpuLabelList.append("gpuUse_%d" % i)
self.gpuLabelList.append("gpuTem_%d" % i)
# init GPUs
nvmlInit()
self.labelList += self.gpuLabelList
else:
self.gpusToUse = None
if self.doNetwork:
self.nif = kwargs['nif']
self.netLabelList = [] # in the future we may display
# all the network interfaces
self.netLabelList.append("%s_send" % self.nif)
self.netLabelList.append("%s_recv" % self.nif)
self.labelList += self.netLabelList
else:
self.nif = None
if self.doDiskIO:
self.netLabelList = [] # in the future we may display
# all the network interfaces
self.netLabelList.append("disk_read")
self.netLabelList.append("disk_write")
self.labelList += self.netLabelList
else:
pass
self.conn = lite.connect(os.path.join(self.workingDir,
self._dataBase),
isolation_level=None)
self.influx = influx
if influx:
# get results as a list of dictionaries
# versus a list of tuples
self.conn.row_factory = \
lambda c, r: dict([(col[0], r[idx])
for idx, col in enumerate(c.description)])
# read timezone and offset
from emfacilities.constants import (SECRETSFILE,
EMFACILITIES_HOME_VARNAME)
_path = os.getenv(EMFACILITIES_HOME_VARNAME)
secretsfile = os.path.join(_path, SECRETSFILE)
confParser = ConfigParser()
confParser.read(secretsfile)
self.timeDelta = int(confParser.get('influx', 'timeDelta'))
self.timeZone = confParser.get('influx', 'timeZone')
self.cur = self.conn.cursor()
[docs] def warning(self, msg):
self.notify("Scipion System Monitor WARNING", msg)
[docs] def initLoop(self):
self._createTable()
psutil.cpu_percent(True)
psutil.virtual_memory()
[docs] def step(self):
valuesDict = {}
valuesDict['table'] = self._tableName
cpu = valuesDict['cpu'] = psutil.cpu_percent(interval=0)
mem = valuesDict['mem'] = psutil.virtual_memory().percent
swap = valuesDict['swap'] = psutil.swap_memory().percent
# some code examples:
# https://github.com/ngi644/datadog_nvml/blob/master/nvml.py
if self.doGpu:
for i in self.gpusToUse:
try:
handle = nvmlDeviceGetHandleByIndex(i)
memInfo = nvmlDeviceGetMemoryInfo(handle)
valuesDict["gpuMem_%d" % i] = \
float(memInfo.used)*100./float(memInfo.total)
util = nvmlDeviceGetUtilizationRates(handle)
valuesDict["gpuUse_%d" % i] = util.gpu
temp = nvmlDeviceGetTemperature(handle,
NVML_TEMPERATURE_GPU)
valuesDict["gpuTem_%d" % i] = temp
except NVMLError as err:
msg = "ERROR monitoring GPU %d: %s." \
" Remove device %d from FORM" % (i, err, i)
print(red(msg))
if self.doNetwork:
try:
# measure a sort interval
pnic_before = psutil.net_io_counters(pernic=True)[self.nif]
time.sleep(self.samplingTime) # sec
pnic_after = psutil.net_io_counters(pernic=True)[self.nif]
bytes_sent = pnic_after.bytes_sent - pnic_before.bytes_sent
bytes_recv = pnic_after.bytes_recv - pnic_before.bytes_recv
valuesDict["%s_send" % self.nif] = \
bytes_sent * self.samplingTime / 1048576
valuesDict["%s_recv" % self.nif] = \
bytes_recv * self.samplingTime / 1048576
except Exception as ex:
msg = "cannot get information of network interface %s" % \
self.nif
if self.doDiskIO:
try:
# measure a sort interval
disk_before = psutil.disk_io_counters(perdisk=False)
time.sleep(self.samplingTime) # sec
disk_after = psutil.disk_io_counters(perdisk=False)
bytes_read = disk_after.read_bytes - disk_before.read_bytes
bytes_write = disk_after.write_bytes - disk_before.write_bytes
valuesDict["disk_read"] = \
self.samplingTime * bytes_read / self.mega
valuesDict["disk_write"] = \
self.samplingTime * bytes_write / self.mega
except Exception as ex:
msg = "cannot get information of disk usage "
if self.cpuAlert < 100 and cpu > self.cpuAlert:
self.warning("CPU allocation =%f." % cpu)
self.cpuAlert = cpu
if self.memAlert < 100 and mem > self.memAlert:
self.warning("Memory allocation =%f." % mem)
self.memAlert = mem
if self.swapAlert < 100 and swap > self.swapAlert:
self.warning("SWAP allocation =%f." % swap)
self.swapAlert = swap
sqlCommand = "INSERT INTO %(table)s ("
for label in self.labelList:
sqlCommand += "%s, " % label
# remove last comma
sqlCommand = sqlCommand[:-2]
sqlCommand += ") VALUES("
for label in self.labelList:
sqlCommand += "%"+"(%s)f, " % label
# remove last comma
sqlCommand = sqlCommand[:-2]
sqlCommand += ");"
sql = sqlCommand % valuesDict
try:
self.cur.execute(sql)
except Exception as e:
print("ERROR: saving one data point (monitor). I continue")
# Return finished = True if all protocols have finished
finished = []
for prot in self.protocols:
updatedProt = getUpdatedProtocol(prot)
finished.append(updatedProt.getStatus() != STATUS_RUNNING)
return all(finished)
def _createTable(self):
sqlCommand = """CREATE TABLE IF NOT EXISTS %s(
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATE DEFAULT
(datetime('now')),
""" % self._tableName
for label in self.labelList:
sqlCommand += "%s FLOAT,\n" % label
# remove last comma and new line
sqlCommand = sqlCommand[:-2]
sqlCommand += ")"
self.cur.execute(sqlCommand)
[docs] def getLabels(self):
return self.labelList
[docs] def getData(self, lastId=-1):
if self.influx:
return self.getDataInflux(lastId)
else:
return self.getDataHtml()
[docs] def getDataInflux(self, lastId=-1):
"get resulset as a a list of dictionaries"
try:
self.cur.execute("select * from %s where id > %d "
"order by id" % (self._tableName, lastId))
except Exception as e:
print("MonitorCTF, ERROR reading data from db: %s" %
os.path.join(self.workingDir, self._dataBase))
# As we are using a row factory, fetchall returns a list of
# dictionaries, each item in list(each dictionary)
# represents a row of the table
listOfDictionaries = self.cur.fetchall()
for item in listOfDictionaries:
local = pytz.timezone(self.timeZone)
# convert dates from scipion to datetime.datetime
for d in listOfDictionaries:
datum = d['timestamp']
# string -> date time
# oposite -> strftime
if isinstance(datum, str):
naive = datetime.datetime.strptime(datum, "%Y-%m-%d %H:%M:%S")
if self.timeDelta != 0:
naive = naive + datetime.timedelta(hours=self.timeDelta)
elif isinstance(datum, datetime.datetime):
continue
else:
raise Exception('Error: (CTF:getData()) Can not convert timestamp')
local_dt = local.localize(naive, is_dst=None)
d['timestamp'] = local_dt.astimezone(pytz.utc)
return listOfDictionaries
[docs] def getDataHtml(self):
"""Fill a dictionary for each label in self.labeldisk.
The key is the label name. The value a list with
data read from the database"""
cur = self.cur
# Starting time
cur.execute("select julianday(timestamp) from %s where id=1" %
self._tableName)
data = cur.fetchone()
# fill list with adquisition times
if (data is None) or (len(data) == 0):
initTime = 0
initTimeTitle = 0
idValues = [0]
else:
initTime = data[0]
cur.execute("select timestamp from %s where id=1" %
self._tableName)
initTimeTitle = cur.fetchone()[0]
cur.execute("select (julianday(timestamp) - %f)*24 from %s" %
(initTime, self._tableName))
idValues = [r[0] for r in cur.fetchall()]
def get(name):
try:
cur.execute("select %s from %s" % (name, self._tableName))
except Exception as e:
print("ERROR readind data (plotter). I continue")
print("SQLCOMMAND", "select %s from %s" %
(name, self._tableName))
data = cur.fetchall()
if len(data) == 0:
return [0]
else:
return [r[0] for r in data]
# TODO: this multiple calls to get may raise a
# race condition. That is data may be added to the DB
# between two get call and therefore the lengths of
# the list can be different. IT would be better to do a single
# call to the database and retrieve all at the same time
# see getDataInflux for details
data = {'initTime': initTime,
'initTimeTitle': initTimeTitle,
'idValues': idValues}
# fill several lists with requested data
for label in self.labelList:
data[label] = get(label)
# conn.close()
return data