Creating the monitor
What is a monitor?
A monitor is a non-GUI class which main action would be to do something, periodically and notify any possible “listeners”. All code presented in this guide is available in our docs repository
Monitors anatomy
The base monitor class
looks like this:
class Monitor():
def __init__(self, **kwargs):
# Where to store any data from this monitor
self.workingDir = kwargs['workingDir']
self.samplingInterval = kwargs.get('samplingInterval', None)
self.monitorTime = kwargs.get('monitorTime', None)
self._notifiers = []
if kwargs.get('email', None) is not None:
self._notifiers.append(kwargs['email'])
if 'stdout' in kwargs:
self._notifiers.append(PrintNotifier())
def notify(self, title, message):
for n in self._notifiers:
if n:
n.notify(title, message)
def info(self, message):
self.notify("INFO", message)
def initLoop(self):
""" To be defined in subclasses. """
pass
def loop(self):
self.initLoop()
timeout = time.time() + 60. * self.monitorTime # interval minutes from now
while True:
finished = self.step()
if (time.time() > timeout) or finished:
break
time.sleep(self.samplingInterval)
def step(self):
""" To be defined in subclasses. """
pass
def addNotifier(self, notifier):
self._notifiers.append(notifier)
The most important method of a monitor is
def step(self):
""" To be defined in subclasses. """
pass
In this base class it is not implemented, but if you want to develop a monitor, you have to implement this method. The step() method will be called from the loop() in a regular manner until a self.monitorTime is reached or it is intentionally stopped.
Optionally, you can code an initLoop() method that will be called before the loops start, suitable for some initializations.
Additionally, monitors follow an observer pattern where any interested “listener/observer” could register and receive notifications from the monitor.
Developing a disk space monitor
Let’s start coding something. Let’s create a python file in our tutorial package and create there our new monitor class.
First iteration
Create a new folder called
myfacility
, and inside it, a folder calledprotocols
.Add a
space_monitor.py
file to<your-path>/myfacility/protocols
Import
Monitor
base class, and define your new class based onMonitor
from pyworkflow.em.protocol.monitors import Monitor
class SpaceMonitor(Monitor):
"""
Monitor to monitor free space on the HD where scipion project is placed
"""
Implement the step method. This method should find the HD where the project is placed.
Import some modules at the top of the file (import section)
import os
import collections
Add the following to the step method:
def step(self):
""" Using the workingdir attribute has to find the HD and then get the
available free space."""
usage = disk_usage(self.workingDir)
print(usage)
# Taken from http://code.activestate.com/recipes/577972-disk-usage/
def disk_usage(path):
_ntuple_diskusage = collections.namedtuple('usage', 'total used free')
st = os.statvfs(path)
free = st.f_bavail * st.f_frsize
total = st.f_blocks * st.f_frsize
used = (st.f_blocks - st.f_bfree) * st.f_frsize
return _ntuple_diskusage(total, used, free)
This is adding a new method disk_usage
which receives a path and
uses it in the step method, printing it (temporarily).
Expose new monitor to the package: import the monitors in
<your-path>/myfacility/protocols/__init__.py
from space_monitor import SpaceMonitor, ProtMonitorSpace
In addition, for Scipion to detect myfacility
, we need to add its container path to the PYTHONPATH
.
Remember to do this in your terminal before you test anything related with this tutorial.
$ export PYTHONPATH=~/Desktop/scipion-em-myfacility:$PYTHONPATH
Testing first iteration
We are going now to add some tests to check our progress.
Add a tests folder to the package (
<your-path>/myfacility/tests
Add an empty
__init__.py
to make it a moduleAdd
test_monitor.py
to the tests folderAdd the code below
import os
from pyworkflow.tests import *
from myfacility.protocols import SpaceMonitor
# Test monitor functionality
class TestMonitor(BaseTest):
def test_monitor(self):
# Instantiate the monitor
spaceMonitor = SpaceMonitor(workingDir=os.getcwd())
spaceMonitor.step()
Run the test. Take into account that
PYTHONPATH
needs to be set.
scipion test myfacility.tests.test_monitor.TestMonitor
Output should look like this:
Scipion (2018-06-12) ((HEAD detached at june_2018_course) b9d0e37)
>>>>> python scripts/run_tests.py myfacility.tests.test_monitor.TestMonitor
Running tests....
usage(total=245527773184, used=127268225024, free=105763827712)
[ RUN OK ] TestMonitor.test_monitor (0.001 secs)
[==========] run 1 tests (0.001 secs)
[ PASSED ] 1 tests
The disk stats are printed on the screen. This is not a proper test since it will never fail and there are no checks (assertions) but soon we will add them.
2nd iteration: persist the values in a txt file
We need to persist/store the values, and the easiest approach here is to write the values in a text file.
Test first
Following a
TDD
approach, we will first modify the test to expect the new behaviour.
Since we are going to generate files, let’s tell the monitor to use a
temporary folder instead of the os.getcwd()
. For this we need to
first import at the top mkdtemp function from tempfile
from tempfile import mkdtemp
and then use it when creating the
monitor in myfacility/tests/test_monitor.py
file:
from tempfile import mkdtemp
[ . . . . . ]
def test_monitor(self):
# Instantiate the monitor
spaceMonitor = SpaceMonitor(workingDir=mkdtemp())
spaceMonitor.step()
# Check storage file exists
fnStorageFile = spaceMonitor.getStorageFilePath()
# Check that the file exists
self.assertTrue(os.path.exists(fnStorageFile),
"Storage file %s not created." % fnStorageFile)
# Check there are 2 lines (headers and first data line)
num_lines = sum(1 for line in open(fnStorageFile,'r'))
# Assert lines are 2
self.assertEqual(2, num_lines,
"First step of the monitor does not "
"have the expected lines: %s" % 2)
Additionally, we have added some lines to: * get the name of the storage file * assert that it exists * and test that it has 2 lines * Since we haven’t modified our Monitor yet this test should fail
[ FAILED ] TestMonitor.test_monitor
Traceback (most recent call last):
File "/home/pablo/desarrollo/scipion/software/lib/python2.7/unittest/case.py", line 329, in run
testMethod()
File "/home/pablo/desarrollo/scipion/pyworkflow/em/packages/myfacility/tests/test_monitor.py", line 16, in test_monitor
fnStorageFile = spaceMonitor.getStorageFilePath()
AttributeError: SpaceMonitor instance has no attribute 'getStorageFilePath'
[==========] run 1 tests (0.003 secs)
[ FAILED ] 1 tests
[ PASSED ] 0 tests
Our monitor does not have the getStorageFilePath()
function.
Monitor second
Let’s implement what the test is expecting:
def step(self):
""" Using the workingdir attribute has to find the HD and then get the
available free space."""
usage = disk_usage(self.workingDir)
self.storeUsageData(usage)
def getStorageFilePath(self):
return os.path.join(self.workingDir, 'space_usage.txt')
def storeUsageData(self, usageData):
fnStorageFile = self.getStorageFilePath()
if not os.path.exists(fnStorageFile):
fhStorage = open(fnStorageFile, "w")
fhStorage.write("total\tused\tfree\n")
else:
fhStorage = open(fnStorageFile, "a")
fhStorage.write("%s\t%s\t%s\n" % (usageData.total, usageData.used, usageData.free))
fhStorage.close()
Note that we have:
1. Implemented the getStorageFilePath()
method.
2. Implemented a storeUsageData()
to store our usage data
3. In the step()
function we have called the storage function: self.storeUsageData(usage)
Using the Monitor
What we have done is just a piece of “behaviour” it will calculate the statistics of the HD where a certain folder (workingFolder) belongs. But how can we start it from a Scipion project?
Protocol monitor
There is a special protocol (ProtMonitor) that is designed to have
monitors running. It can be found at
pyworkflow.em.protocol.monitors.protocol_monitor.ProtMonitor
. It inherits
from EMProtocol
and defines several parameters:
inputProtocols
: Protocols to be monitoredsamplingInterval
: Monitoring time interval between samplesMail params
(Optional): All email params needed to send emails
Note that our case, monitoring the HD, does not require any specific inputProtocol to monitor.
Additionally, this special protocol will create a “monitorStep” that any “implementer” has to implement:
# -------------------------- INSERT steps functions -----------------------
def _insertAllSteps(self):
self._insertFunctionStep('monitorStep')
# -------------------------- STEPS functions ------------------------------
def monitorStep(self):
pass
First iteration: Space monitor protocol
Let’s define a new Class for our Space Monitor Protocol at space_monitor.py
.
1. Import ProtMonitor and PrintNotifier at the top:
2. Import last version
3. Add our ProtMonitorSpace:
from pyworkflow.em import ProtMonitor, PrintNotifier
from pyworkflow import VERSION_2_0
class ProtMonitorSpace(ProtMonitor):
_label = 'monitor of HD space'
_lastUpdateVersion = VERSION_2_0
# Overwrite the monitor step function
def monitorStep(self):
# Instantiate a Space Monitor
monitor = SpaceMonitor(workingDir=self._getExtraPath(),
samplingInterval=self.samplingInterval.get(),
monitorTime=100)
monitor.addNotifier(PrintNotifier())
monitor.loop()
Make our monitor to notify something, at the end of out SpaceMonitor.step() add
self.notify("HD stats", str(usage))
Expose the new protocol to Scipion. In
myfacility/protocols/__init__.py
add:
from space_monitor import SpaceMonitor, ProtMonitorSpace``
Now Scipion should be able to discover it and you should be able to run it:
So far this is OK, but:
The output is hardly readable….we should convert bytes into GB (at least) to be human-friendly
More important, there is only a Print notifier, so someone has to actively look at the logs.
A plot might be useful to plot the HD stats.
2nd iteration: Space monitor protocol tweaks
Human-friendly output
Import
from pyworkflow.utils import prettySize
In the step function of SpaceMonitor, just before calling notify, convert the values and modify the notify call:
from pyworkflow.utils import prettySize
[ . . . ]
self.storeUsageData(usage)
# Stats line readable:
free = prettySize(usage.free)
total = prettySize(usage.total)
used = prettySize(usage.used)
self.notify("HD stats (%s)" % self.workingDir,
"Free: %s, Total: %s, Used: %s" % (free, total, used))```
Add an Email notifier… and remove the inputProtocols
Let’s tweak the parameters of the protocol:
Import params from em, like so:
from pyworkflow.em import ProtMonitor, PrintNotifier, params
from pyworkflow.em import ProtMonitor, PrintNotifier, params
[ . . . ]
_label = 'monitor of HD space'
_lastUpdateVersion = VERSION_2_0
def _defineParams(self, form):
""" Overwrite the standard define params """
# This should define the inputProtocols and the sampling interval
ProtMonitor._defineParams(self, form)
# Remove the inputProtocols
section = form.getSection('Input')
section._paramList.remove('inputProtocols')
# Add a threshold for the email notifier
form.addParam('minimumFreeSpace', params.IntParam, default=500,
label="Minimum free space (GB)",
help="Notify by email or console when HD free space"
" drops below the minimum")
self._sendMailParams(form)
# Overwrite the monitor step function
def monitorStep(self):
# Instantiate a Space Monitor
monitor = SpaceMonitor(self.minimumFreeSpace,
workingDir=self._getExtraPath(),
samplingInterval=self.samplingInterval.get(),
monitorTime=100)
monitor.addNotifier(PrintNotifier())
# Create the email notifier
email = self.createEmailNotifier()
# If email option active
if email is not None:
monitor.addNotifier(email)
monitor.loop()
We have added the _defineParams(self, form)
method. There we have
called the ProtMonitor._defineParams(self, form)
to have the default
parent params. Next thing is to delete the inputProtocols
. This wasn’t
expected in our API, but python is flexible enough to make this happen:
# Remove the inputProtocols
section = form.getSection('Input')
section._paramList.remove('inputProtocols')
Next thing is to add a minimumFreeSpace
parameter to serve as a
threshold to send email in case free space goes below that threshold.
We also add the email notifications parameters with
self._sendMailParams(form)
.
Secondly, we have also made some changes in the monitorStep()
method. We
are passing the minimumFreeSpace value to the SpaceMonitor:
# Instantiate a Space Monitor
monitor = SpaceMonitor(self.minimumFreeSpace,
workingDir=self._getExtraPath(),
samplingInterval=self.samplingInterval.get(),
monitorTime=100)
and finally, at the end we have requested for an EmailNotifier and added
it to the Monitor if condition apply. Our protocol looks ok, but our
monitor is not yet aware of the new parameter 'minimumFreeSpace'
and
is notifying in any loop.
Make SpaceMonitor to understand and react to ‘minimumFreeSpace’
class SpaceMonitor(Monitor):
"""
Monitor to monitor free space on the HD where scipion project is placed
"""
def __init__(self, minimumFreeSpace, **kwargs):
Monitor.__init__(self, **kwargs)
self.minimumFreeSpace = minimumFreeSpace
And modify the
step()
method ofSpaceMonitor
to take the threshold into account
def step(self):
""" Using the workingdir attribute has to find the HD and then get the
available free space."""
usage = disk_usage(self.workingDir)
self.storeUsageData(usage)
# Stats line readable:
free = prettySize(usage.free)
total = prettySize(usage.total)
used = prettySize(usage.used)
# Free space in GB
freeGB = usage.free/(1024.0**3.0)
# Notify only if free space is bellow the threshold in GB
if freeGB < self.minimumFreeSpace:
self.notify("WARNING: There is only %s left for %s" %
(free, self.workingDir),
"Free: %s, Total: %s, Used: %s, Threshold: %s" %
(free, total, used, self.minimumFreeSpace))
Don’t forget the test!!
We haven’t touched the monitor test and more important, we are not testing the protocol.
Our test should be failing because we are not passing the
minimumFreeSpace
. Let’s do that:
Update Monitor test: Replace (for simplicity) all content in
test_monitor.py
with:
import math
from pyworkflow.tests import *
from myfacility.protocols import SpaceMonitor, ProtMonitorSpace
from myfacility.protocols.space_monitor import disk_usage
from tempfile import mkdtemp
# Test monitor functionality
class TestMonitor(BaseTest):
def test_monitor(self):
# Get a tmp folder
tmpFolder = mkdtemp()
# Get freespace
diskUsage = disk_usage(tmpFolder)
freeSpaceInGB = diskUsage.free/(1024.0**3)
# Round it to the "foor"
freeSpaceInGB = math.floor(freeSpaceInGB)
# Instantiate the monitor
spaceMonitor = SpaceMonitor(freeSpaceInGB, workingDir=tmpFolder, stdout=True)
testNotifier = TestNotifier()
spaceMonitor.addNotifier(testNotifier)
spaceMonitor.step()
# Check storage file exists
fnStorageFile = spaceMonitor.getStorageFilePath()
# Check that the file exists
self.assertTrue(os.path.exists(fnStorageFile),
"Storage file %s not created." % fnStorageFile)
# Check there are 2 lines (headers and first data line)
num_lines = sum(1 for line in open(fnStorageFile,'r'))
# Assert lines are 2
self.assertEqual(2, num_lines,
"First step of the monitor does not "
"have the expected lines: %s" % 2)
# Check notifications are empty
self.assertEqual(0, len(testNotifier.getNotifications()), "Notifications are not empty!")
# Move the threshold to trigger notifications
spaceMonitor.minimumFreeSpace = freeSpaceInGB + 1
spaceMonitor.step()
# Check there is a notification
self.assertEqual(1, len(testNotifier.getNotifications()), "There isn't a notification")
class TestNotifier():
def __init__(self):
self._notifications=[]
def getNotifications(self):
return self._notifications
def notify(self, title, message):
# Just store the message
self._notifications.append(message)
We have used the diskUsage()
method to get the free space of the HD
of the /tmp
folder. We round it “down” and use that as the threshold for
the SpaceMonitor. After the first step there should not be any
notification. After the first assertions, we add another to check there
are no notifications. Secondly, we increase the threshold by one and
call for a second time step()
. This time, there should be a
notification.
Note that we have to make our own TestNotifier
in order to check the
notifications. Something we should provide, but wasn’t available in
Scipion today.
Add a test for the protocol monitor.
We also have to test the protocol. You will need to import
ProtMonitorSpace and a wait
function:
import math
from pyworkflow.tests import *
from pyworkflow.tests.test_utils import wait
from myfacility.protocols import SpaceMonitor, ProtMonitorSpace
from myfacility.protocols.space_monitor import disk_usage
from tempfile import mkdtemp
Now add the code below inside the class TestMonitor
right after
self.assertEqual(1, len(testNotifier.getNotifications()), "There isn't a notification")
and right above our custom class TestNotifier
@classmethod
def setUpClass(cls):
setupTestProject(cls)
def test_spacemonitor_protocol(self):
prot = self.newProtocol(ProtMonitorSpace,
objLabel='HD free Space monitor',
samplingInterval=10)
self.proj.launchProtocol(prot, wait=False)
# Test that the spaceMonitor txt file is where expected
spaceMon = SpaceMonitor(10, workingDir=prot._getExtraPath())
txtPath = spaceMon.getStorageFilePath()
# Wait for a minute maximum or if file exists
wait(lambda: not os.path.exists(txtPath), timeout=15)
self.assertTrue(os.path.exists(txtPath), "Space monitor txt file not "
"found at %s" % txtPath)
# Stop the protocol. Do not wait for its timeout
self.proj.stopProtocol(prot)
With the setUpClass()
we are creating an empty Scipion project
Run the test:
scipion test myfacility.tests.test_monitor.TestMonitor
.
In the test_spacemonitor_protocol
we are creating one
ProtMonitorSpace
and launching it. Since we want to test the
existence of txt file, we make use of a temporary SpaceMonitor to get
the exact txt file at the extra folder of our protocol. We must give
some time to the monitor (15 secs) before checking the file existence,
thus wait(lambda: not os.path.exists(txtPath), timeout=15)
. Then, we
make the assertion and stop the protocol…since the only stop mechanism
it has is the default timeout (100 minutes).
You should get something like:
➜ ~ scipion test myfacility.tests.test_monitor.TestMonitor
Scipion v2.0 (2019-03-15) Diocletian (release-2.0.0 584fbfa)
>>>>> python /home/yaiza/git/scipion/pyworkflow/apps/pw_run_tests.py "myfacility.tests.test_monitor.TestMonitor"
Running tests....
('Creating project at: ', '/home/yaiza/ScipionUserData/projects/TestMonitor/project.sqlite')
WARNING: There is only 185.83 GB left for /tmp/tmptRIM1E Free: 185.83 GB, Total: 217.91 GB, Used: 20.99 GB, Threshold: 186.0
[ RUN OK ] TestMonitor.test_monitor (0.001 secs)
** Running command: 'python /home/yaiza/git/scipion/scipion runprotocol pw_protocol_run.py "/home/yaiza/ScipionUserData/projects/TestMonitor" "Runs/000002_ProtMonitorSpace/logs/run.db" 2'
Scipion v2.0 (2019-03-15) Diocletian (release-2.0.0 584fbfa)
>>>>> python /home/yaiza/git/scipion/pyworkflow/apps/pw_protocol_run.py "/home/yaiza/ScipionUserData/projects/TestMonitor" "Runs/000002_ProtMonitorSpace/logs/run.db" "2"
Terminating child pid: 27553
Terminating child pid: 27555
Terminating child pid: 27562
Terminating child pid: 27563
Terminating process pid: None
WARNING! Got None PID!!!
[ RUN OK ] TestMonitor.test_spacemonitor_protocol (2.304 secs)
[==========] run 2 tests (3.099 secs)
[ PASSED ] 2 tests