Source code for emfacilities.viewers.viewer_monitors

# **************************************************************************
# *
# * Authors:     J.M. De la Rosa Trevin (delarosatrevin@scilifelab.se) [1]
# *
# * [1] SciLifeLab, Stockholm University
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 2 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'scipion@cnb.csic.es'
# *
# **************************************************************************

import sys
import tkinter as tk
from matplotlib import animation

from pyworkflow.gui.plotter import plt
from pyworkflow.gui.tree import BoundTree
from pyworkflow.gui.widgets import Button, HotButton
import pyworkflow.gui as pwgui
import pyworkflow.gui.text as text
import pyworkflow.utils as pwutils
import pyworkflow.viewer as pwviewer
from pwem.viewers.plotter import EmPlotter

import emfacilities.protocols as monitorProt
from emfacilities.protocols.protocol_monitor_system import MonitorSystem


[docs]class ProtMonitorCTFViewer(pwviewer.Viewer): _environments = [pwviewer.DESKTOP_TKINTER] _label = 'ctf monitor' _targets = [monitorProt.ProtMonitorCTF] def _visualize(self, obj, **kwargs): return [CtfMonitorPlotter(obj.createMonitor())]
[docs]class CtfMonitorPlotter(EmPlotter): def __init__(self, monitor): EmPlotter.__init__(self, windowTitle="CTF Monitor") self.monitor = monitor self.y2 = 0. self.y1 = 100. self.win = 250 # number of samples to be ploted self.step = 50 # self.win will be modified in steps of this size self.createSubPlot(self._getTitle(), "Micrographs", "Defocus (A)") self.fig = self.getFigure() self.ax = self.getLastSubPlot() self.ax.margins(0.05) self.ax.grid(True) self.oldWin = self.win self.lines = {} self.init = True self.stop = False def _getTitle(self): return ("Use scrool wheel to change view window (win=%d)\n " "S stops, C continues plotting" % self.win)
[docs] def onscroll(self, event): if event.button == 'up': self.win += self.step else: self.win -= self.step if self.win < self.step: self.win = self.step if self.oldWin != self.win: self.ax.set_title(self._getTitle()) self.oldWin = self.win self.animate() EmPlotter.show(self)
[docs] def press(self, event): sys.stdout.flush() if event.key == 'S': self.stop = True self.ax.set_title('Plot is Stopped. Press C to continue plotting') elif event.key == 'C': self.ax.set_title(self._getTitle()) self.stop = False self.animate() EmPlotter.show(self)
[docs] def has_been_closed(self, ax): fig = ax.figure.canvas.manager active_fig_managers = plt._pylab_helpers.Gcf.figs.values() return fig not in active_fig_managers
[docs] def animate(self, i=0): # do NOT remove i # FuncAnimation adds it as argument if self.stop: return data = self.monitor.getData() self.x = data['idValues'] for k, v in self.lines.items(): self.y = data[k] lenght = len(self.x) imin = max(0, len(self.x) - self.win) xdata = self.x[imin:lenght] ydata = self.y[imin:lenght] v.set_data(xdata, ydata) self.ax.relim() self.ax.autoscale() self.ax.grid(True) self.ax.legend(loc=2).get_frame().set_alpha(0.5) self.ax.axhline(y=self.monitor.minDefocus, c="red", linewidth=0.5, linestyle='dashed', zorder=0) self.ax.axhline(y=self.monitor.maxDefocus, c="red", linewidth=0.5, linestyle='dashed', zorder=0)
[docs] def show(self): self.paint([('defocusU', 'r'), ('defocusV', 'b')])
[docs] def paint(self, labels): for label in labels: labelValue = self.monitor.getData()[label[0]] color = label[1] self.lines[label[0]], = self.ax.plot(labelValue, '-o', label=label[0], color=color) self.legend() anim = animation.FuncAnimation(self.fig, self.animate, interval=self.monitor.samplingInterval * 1000) # miliseconds self.fig.canvas.mpl_connect('scroll_event', self.onscroll) self.fig.canvas.mpl_connect('key_press_event', self.press) EmPlotter.show(self)
[docs]class ProtMonitorMovieGainViewer(pwviewer.Viewer): _environments = [pwviewer.DESKTOP_TKINTER] _label = 'movie gain monitor' _targets = [monitorProt.ProtMonitorMovieGain] def _visualize(self, obj, **kwargs): return [MovieGainMonitorPlotter(obj.createMonitor())]
[docs]class MovieGainMonitorPlotter(EmPlotter): def __init__(self, monitor): EmPlotter.__init__(self, windowTitle="Movie Gain Monitor") self.monitor = monitor self.y2 = 0. self.y1 = 100. self.win = 250 # number of samples to be plotted self.step = 50 # self.win will be modified in steps of this size self.createSubPlot(self._getTitle(), "", "") self.fig = self.getFigure() self.ax = self.getLastSubPlot() self.ax2 = self.ax.twinx() self.ax2.margins(0.05) self.oldWin = self.win self.lines = {} self.init = True self.stop = False def _getTitle(self): return ("Use scrool wheel to change view window (win=%d)\n " "S stops, C continues plotting" % self.win)
[docs] def onscroll(self, event): if event.button == 'up': self.win += self.step else: self.win -= self.step if self.win < self.step: self.win = self.step if self.oldWin != self.win: self.ax.set_title(self._getTitle()) self.oldWin = self.win self.animate() EmPlotter.show(self)
[docs] def press(self, event): sys.stdout.flush() if event.key == 'S': self.stop = True self.ax.set_title('Plot is Stopped. Press C to continue plotting') elif event.key == 'C': self.ax.set_title(self._getTitle()) self.stop = False self.animate() EmPlotter.show(self)
[docs] def has_been_closed(self, ax): fig = ax.figure.canvas.manager active_fig_managers = plt._pylab_helpers.Gcf.figs.values() return fig not in active_fig_managers
[docs] def animate(self, i=0): # do NOT remove i # FuncAnimation adds it as argument if self.stop: return data = self.monitor.getData() self.x = data['idValues'] for k, v in self.lines.items(): self.y = data[k] lenght = len(self.x) imin = max(0, len(self.x) - self.win) xdata = self.x[imin:lenght] ydata = self.y[imin:lenght] v.set_data(xdata, ydata) self.ax.set_ylabel('Ratios between specified percentiles', color='b', size=10) self.ax.spines['left'].set_color('blue') self.ax.spines['right'].set_color('red') self.ax.tick_params(axis='y', labelsize=8, colors='blue') self.ax.relim() self.ax.autoscale() self.ax.grid(False) self.ax.get_xaxis().set_visible(False) self.ax2.set_ylabel('Residual gain standard deviation', color='r', size=10) self.ax2.tick_params(axis='y', labelsize=8, colors='red') self.ax2.relim() self.ax2.autoscale() lines, labels = self.ax.get_legend_handles_labels() lines2, labels2 = self.ax2.get_legend_handles_labels() self.ax2.legend(lines + lines2, labels + labels2, loc=2, prop={'size': 10}).get_frame().set_alpha(0.5)
[docs] def show(self): self.paint([('ratio1', 'b'), ('ratio2', 'b'), ('standard_deviation', 'r')])
[docs] def paint(self, labels): for label in labels: labelValue = self.monitor.getData()[label[0]] color = label[1] if label == 'standard_deviation': self.lines[label[0]], = self.ax2.plot(labelValue, '-o', label=label[0], color=color) if label == 'ratio1': self.lines[label[0]], = self.ax.plot(labelValue, '-o', label='97.5/2.5 percentile', color=color) if label == 'ratio2': self.lines[label[0]], = self.ax.plot(labelValue, '-*', label='max/97.5 percentile', color=color) self.legend() anim = animation.FuncAnimation(self.fig, self.animate, interval=self.monitor.samplingInterval * 1000) # miliseconds self.fig.canvas.mpl_connect('scroll_event', self.onscroll) self.fig.canvas.mpl_connect('key_press_event', self.press) EmPlotter.show(self)
[docs] def empty(self): return self.init
[docs]class ProtMonitorSystemViewer(pwviewer.Viewer): _environments = [pwviewer.DESKTOP_TKINTER, pwviewer.WEB_DJANGO] _label = 'system monitor' _targets = [monitorProt.ProtMonitorSystem] def __init__(self, **args): pwviewer.Viewer.__init__(self, **args) def _visualize(self, obj, **kwargs): return [SystemMonitorPlotter(obj.createMonitor(), nifName=MonitorSystem.getNifsNameList()[ self.protocol.netInterfaces.get()])]
[docs]class SystemMonitorPlotter(EmPlotter): _environments = [pwviewer.DESKTOP_TKINTER, pwviewer.WEB_DJANGO] _label = 'System Monitor' _targets = [monitorProt.ProtMonitorSystem] def __init__(self, monitor, nifName=None): EmPlotter.__init__(self, windowTitle="system Monitor") self.monitor = monitor self.y2 = 0. self.y1 = 100. self.win = 250 # number of samples to be ploted self.step = 50 # self.win will be modified in steps of this size self.createSubPlot(self._getTitle(), "time (hours)", "percentage (or MB for IO or NetWork)") self.fig = self.getFigure() self.ax = self.getLastSubPlot() self.ax.margins(0.05) self.ax.grid(True) self.oldWin = self.win self.lines = {} self.init = True self.stop = False self.nifName = nifName def _getTitle(self): return ("Use scrool wheel to change view window (win=%d)\n " "S stops, C continues plotting. Toggle ON/OFF GPU_X " "by pressing X\n" "c/n/d toggle ON-OFF cpu/network/disk usage\n" % self.win)
[docs] def onscroll(self, event): if event.button == 'up': self.win += self.step else: self.win -= self.step if self.win < self.step: self.win = self.step if self.oldWin != self.win: self.ax.set_title(self._getTitle()) self.oldWin = self.win self.animate() EmPlotter.show(self)
[docs] def press(self, event): def numericKey(key): self.colorChanged = True number = int(key) index = 3 + number * 3 if (index + 3) > self.lenPlots: return if self.color['gpuMem_%d' % number] != 'w': self.oldColor['gpuMem_%d' % number] = \ self.color['gpuMem_%d' % number] self.oldColor['gpuUse_%d' % number] = \ self.color['gpuUse_%d' % number] self.oldColor['gpuTem_%d' % number] = \ self.color['gpuTem_%d' % number] self.color['gpuMem_%d' % number] = "w" self.color['gpuUse_%d' % number] = "w" self.color['gpuTem_%d' % number] = "w" else: self.color['gpuMem_%d' % number] = \ self.oldColor['gpuMem_%d' % number] self.color['gpuUse_%d' % number] = \ self.oldColor['gpuUse_%d' % number] self.color['gpuTem_%d' % number] = \ self.oldColor['gpuTem_%d' % number] def cpuKey(key): self.colorChanged = True if self.color['cpu'] != 'w': self.oldColor['cpu'] = self.color['cpu'] self.oldColor['mem'] = self.color['mem'] self.oldColor['swap'] = self.color['swap'] self.color['cpu'] = "w" self.color['mem'] = "w" self.color['swap'] = "w" else: self.color['cpu'] = self.oldColor['cpu'] self.color['swap'] = self.oldColor['swap'] self.color['mem'] = self.oldColor['mem'] def netKey(key): self.colorChanged = True if self.color['%s_send' % self.nifName] != 'w': self.oldColor['%s_send' % self.nifName] = \ self.color['%s_send' % self.nifName] self.oldColor['%s_recv' % self.nifName] = \ self.color['%s_recv' % self.nifName] self.color['%s_send' % self.nifName] = "w" self.color['%s_recv' % self.nifName] = "w" else: self.color['%s_send' % self.nifName] = \ self.oldColor['%s_send' % self.nifName] self.color['%s_recv' % self.nifName] = \ self.oldColor['%s_recv' % self.nifName] def diskKey(key): self.colorChanged = True if self.color['disk_read'] != 'w': self.oldColor['disk_read'] = self.color['disk_read'] self.oldColor['disk_write'] = self.color['disk_write'] self.color['disk_read'] = "w" self.color['disk_write'] = "w" else: self.color['disk_read'] = self.oldColor['disk_read'] self.color['disk_write'] = self.oldColor['disk_write'] sys.stdout.flush() if event.key == 'S': self.stop = True self.ax.set_title('Plot has been Stopped. ' 'Press C to continue plotting') elif event.key == 'C': self.ax.set_title(self._getTitle()) self.stop = False self.animate() elif event.key.isdigit(): numericKey(event.key) self.animate() elif event.key == 'c': cpuKey(event.key) self.animate() elif event.key == 'n': netKey(event.key) self.animate() elif event.key == 'd': diskKey(event.key) self.animate() EmPlotter.show(self)
[docs] def has_been_closed(self, ax): fig = ax.figure.canvas.manager active_fig_managers = plt._pylab_helpers.Gcf.figs.values() return fig not in active_fig_managers
[docs] def animate(self, i=0): # do NOT remove i if self.stop: return data = self.monitor.getData() self.x = data['idValues'] for k, v in self.lines.items(): self.y = data[k] lenght = len(self.x) imin = max(0, len(self.x) - self.win) xdata = self.x[imin:lenght] ydata = self.y[imin:lenght] v.set_data(xdata, ydata) if self.colorChanged: v.set_color(self.color[k]) self.colorChanged = False self.ax.relim() self.ax.autoscale() self.ax.grid(True) self.ax.legend(loc=2).get_frame().set_alpha(0.5)
[docs] def paint(self, labels): for label in labels: labelValue = self.monitor.getData()[label] color = self.color[label] self.lines[label], = self.ax.plot(labelValue, '-', label=label, color=color) self.ax.legend() anim = animation.FuncAnimation( self.fig, self.animate, interval=self.monitor.samplingInterval * 1000) # miliseconds self.fig.canvas.mpl_connect('scroll_event', self.onscroll) self.fig.canvas.mpl_connect('key_press_event', self.press) EmPlotter.show(self)
[docs] def show(self): colortypes = ["k", "b", "r", "g", "y", "c", "m"] lenColortypes = len(colortypes) self.colorChanged = True self.color = {} self.oldColor = {} counter = 0 for key in self.monitor.getLabels(): self.color[key] = colortypes[counter % lenColortypes] self.oldColor[key] = colortypes[counter % lenColortypes] counter += 1 self.lenPlots = len(self.color) self.paint(self.monitor.getLabels())
[docs]class ViewerMonitorSummary(pwviewer.Viewer): """ Wrapper to visualize PDF objects. """ _environments = [pwviewer.DESKTOP_TKINTER] _targets = [monitorProt.ProtMonitorSummary]
[docs] def visualize(self, obj, **kwargs): self.summaryWindow = self.tkWindow(SummaryWindow, title='Scipion-Box Summary', protocol=obj ) self.summaryWindow.show()
[docs]class SummaryWindow(pwgui.Window): def __init__(self, **kwargs): pwgui.Window.__init__(self, **kwargs) self.protocol = kwargs.get('protocol') self.refresh = self.protocol.samplingInterval.get() self.provider = monitorProt.SummaryProvider(self.protocol) content = tk.Frame(self.root) self._createContent(content) content.grid(row=0, column=0, sticky='news') content.columnconfigure(0, weight=1) def _createContent(self, content): topFrame = tk.Frame(content) content.columnconfigure(0, weight=1) topFrame.grid(row=0, column=0, sticky='new', padx=5, pady=5) treeFrame = tk.Frame(content) content.rowconfigure(1, weight=1) treeFrame.grid(row=1, column=0, sticky='news', padx=5, pady=5) buttonsFrame = tk.Frame(content) buttonsFrame.grid(row=2, column=0, sticky='new', padx=5, pady=5) self._fillTreeFrame(treeFrame) # JMRT: We fill the top frame after the tree, to make sure # the provider has updated the Acquisition info self._fillTopFrame(topFrame) self._fillButtonsFrame(buttonsFrame) def _fillTopFrame(self, frame): p1 = tk.Label(frame, text='Project: ') p1.grid(row=0, column=0, sticky='nw', padx=5, pady=5) projName = self.protocol.getProject().getShortName() p2 = tk.Label(frame, text=projName, font=self.fontBold) p2.grid(row=0, column=1, sticky='nw', padx=5, pady=0) lf = tk.LabelFrame(frame, text='Acquisition') lf.grid(row=1, column=0, columnspan=2, sticky='new') lf.columnconfigure(0, weight=1) lf.columnconfigure(1, weight=1) self.r = 0 def add(t1, t2): tk.Label(lf, text=t1).grid(row=self.r, column=0, sticky='ne', padx=(10, 5), pady=(5, 0)) tk.Label(lf, text=t2, font=self.fontBold).grid(row=self.r, column=1, sticky='nw', padx=(5, 25), pady=0) self.r += 1 for k, v in self.provider.acquisition: add(k, v) def _fillTreeFrame(self, frame): self.tree = BoundTree(frame, self.provider) self.tree.grid(row=0, column=0) self.updateVar = tk.StringVar() updateLabel = tk.Label(frame, textvariable=self.updateVar) updateLabel.grid(row=1, column=0, sticky='nw', padx=5, pady=5) self._updateLabel() def _fillButtonsFrame(self, frame): subframe = tk.Frame(frame) subframe.grid(row=0, column=0, sticky='nw') frame.columnconfigure(1, weight=1) ctfBtn = Button(subframe, "CTF Monitor", command=self._monitorCTF) ctfBtn.grid(row=0, column=0, sticky='nw', padx=(0, 5)) if self.protocol.createCtfMonitor() is None: ctfBtn['state'] = 'disabled' movieGainBtn = Button(subframe, "Movie Gain Monitor", command=self._monitorMovieGain) movieGainBtn.grid(row=0, column=1, sticky='nw', padx=(0, 5)) if self.protocol.createMovieGainMonitor() is None: movieGainBtn['state'] = 'disabled' sysBtn = Button(subframe, "System Monitor", command=self._monitorSystem) sysBtn.grid(row=0, column=2, sticky='nw', padx=(0, 5)) if self.protocol.createSystemMonitor() is None: sysBtn['state'] = 'disabled' htmlBtn = HotButton(subframe, 'Open HTML Report', command=self._openHTML) htmlBtn.grid(row=0, column=3, sticky='nw', padx=(0, 5)) closeBtn = self.createCloseButton(frame) closeBtn.grid(row=0, column=1, sticky='ne') def _monitorCTF(self, e=None): CtfMonitorPlotter(self.protocol.createCtfMonitor()).show() def _monitorMovieGain(self, e=None): MovieGainMonitorPlotter(self.protocol.createMovieGainMonitor()).show() def _monitorSystem(self, e=None): nifName = MonitorSystem.getNifsNameList()[self.protocol.netInterfaces.get()] SystemMonitorPlotter(self.protocol.createSystemMonitor(), nifName).show() def _updateLabel(self): self.updateVar.set('Updated: %s' % pwutils.prettyTime(secs=True)) # Schedule a refresh in some seconds self.tree.after(self.refresh * 1000, self._updateData) def _updateData(self): self.provider.refreshObjects() self.tree.update() self._updateLabel() def _openHTML(self, e=None): reportHtml = self.protocol.createHtmlReport() reportPath = reportHtml.reportPath if pwutils.exists(reportPath): text._open_cmd(reportPath) else: self.showInfo('Your html file is not ready yet. Please try again in a minute.')