Source code for cistem.viewers.viewers

# **************************************************************************
# *
# * Authors:     Josue Gomez Blanco (josue.gomez-blanco@mcgill.ca) [1]
# *              Grigory Sharov (gsharov@mrc-lmb.cam.ac.uk) [2]
# *
# * [1] Department of Anatomy and Cell Biology, McGill University
# * [2] MRC Laboratory of Molecular Biology (MRC-LMB)
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 3 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307  USA
# *
# *  All comments concerning this program package may be sent to the
# *  e-mail address 'scipion@cnb.csic.es'
# *
# **************************************************************************

from pyworkflow.protocol.params import LabelParam
from pyworkflow.utils import removeExt, cleanPath
from pyworkflow.viewer import DESKTOP_TKINTER, Viewer
from pyworkflow.gui.project import ProjectWindow
from pwem.viewers import CtfView, EmPlotter, MicrographsView, EmProtocolViewer
import pwem.viewers.showj as showj
from pwem.objects import SetOfMovies


from cistem.protocols import CistemProtCTFFind, CistemProtUnblur


[docs]def createCtfPlot(ctfSet, ctfId): """ Create EmPlotter instance. """ ctfModel = ctfSet[ctfId] psdFn = ctfModel.getPsdFile() fn = removeExt(psdFn) + "_avrot.txt" xplotter = EmPlotter(windowTitle='CTFFind results') plot_title = getPlotSubtitle(ctfModel) a = xplotter.createSubPlot(plot_title, 'Spacial frequency (1/A)', 'Amplitude (or cross-correlation)') legendName = ['Amplitude spectrum', 'CTF Fit', 'Quality of fit'] _plotCurves(a, fn) xplotter.showLegend(legendName, loc='upper right') a.set_ylim([-0.1, 1.1]) a.grid(True) xplotter.show()
[docs]def getPlotSubtitle(ctf): """ Create plot subtitle using CTF values. """ ang = u"\u212B" deg = u"\u00b0" def1, def2, angle = ctf.getDefocus() phSh = ctf.getPhaseShift() score = ctf.getFitQuality() res = ctf.getResolution() title = "Def1: %d %s | Def2: %d %s | Angle: %0.1f%s | " % ( def1, ang, def2, ang, angle, deg) if phSh is not None: title += "Phase shift: %0.2f %s | " % (phSh, deg) title += "Fit: %0.1f %s | Score: %0.3f" % (res, ang, score) return title
def _plotCurves(a, fn): """ Actually plot the curves. """ res = _getValues(fn) for y in ['amp', 'fit', 'quality']: a.plot(res['freq'], res[y]) def _getValues(fn): """ Parse input file and return a dict with results. """ res = dict() with open(fn) as f: i = 0 for line in f: line = line.strip() if not line.startswith("#"): if i == 0: res['freq'] = [float(x) for x in line.split()] elif i == 2: res['amp'] = [float(x) for x in line.split()] elif i == 3: res['fit'] = [float(x) for x in line.split()] elif i == 4: res['quality'] = [float(x) for x in line.split()] break i += 1 return res OBJCMD_CTFFIND4 = "CTFFind plot results" ProjectWindow.registerObjectCommand(OBJCMD_CTFFIND4, createCtfPlot)
[docs]class CtffindViewer(Viewer): """ Specific way to visualize SetOfCtf. """ _environments = [DESKTOP_TKINTER] _targets = [CistemProtCTFFind] def _visualize(self, prot, **kwargs): outputCTF = getattr(prot, 'outputCTF', None) if outputCTF is not None: ctfView = CtfView(self._project, outputCTF) viewParams = ctfView.getViewParams() viewParams[showj.OBJCMDS] = "'%s'" % OBJCMD_CTFFIND4 return [ctfView] else: return [self.infoMessage("The output SetOfCTFs has not been " "produced", "Missing output")]
[docs]class ProtUnblurViewer(EmProtocolViewer): _targets = [CistemProtUnblur] _environments = [DESKTOP_TKINTER] _label = 'viewer unblur' def _defineParams(self, form): form.addSection(label='Visualization') if self.hasMics(): form.addParam('doShowMics', LabelParam, label="Show aligned micrographs?", default=True, help="Show the output aligned micrographs.") if self.hasDWMics(): form.addParam('doShowMicsDW', LabelParam, label="Show aligned DOSE-WEIGHTED micrographs?", default=True, help="Show the output aligned dose-weighted " "micrographs.") form.addParam('doShowMovies', LabelParam, label="Show output movies?", default=True, help="Show the output movies with alignment " "information.") form.addParam('doShowFailedMovies', LabelParam, label="Show FAILED movies?", default=True, help="Create a set of failed movies " "and display it.") def _getVisualizeDict(self): self._errors = [] visualizeDict = { 'doShowMovies': self._viewParam, 'doShowFailedMovies': self._viewParam } if self.hasMics(): visualizeDict.update({'doShowMics': self._viewParam}) if self.hasDWMics(): visualizeDict.update({'doShowMicsDW': self._viewParam}) return visualizeDict
[docs] def hasMics(self): return hasattr(self.protocol, 'outputMicrographs')
[docs] def hasDWMics(self): return hasattr(self.protocol, 'outputMicrographsDoseWeighted')
def _viewParam(self, param=None): labelsDef = 'enabled id _filename _samplingRate ' labelsDef += '_acquisition._dosePerFrame _acquisition._doseInitial ' viewParamsDef = {showj.MODE: showj.MODE_MD, showj.ORDER: labelsDef, showj.VISIBLE: labelsDef, showj.RENDER: None } if param == 'doShowMics': return [MicrographsView(self.getProject(), self.protocol.outputMicrographs)] elif param == 'doShowMicsDW': return [MicrographsView(self.getProject(), self.protocol.outputMicrographsDoseWeighted)] elif param == 'doShowMovies': if getattr(self.protocol, 'outputMovies', None) is not None: output = self.protocol.outputMovies return [self.objectView(output, viewParams=viewParamsDef)] else: return [self.errorMessage('No output movies found!', title="Visualization error")] elif param == 'doShowFailedMovies': self.failedList = self.protocol._readFailedList() if not self.failedList: return [self.errorMessage('No failed movies found!', title="Visualization error")] else: sqliteFn = self.protocol._getPath('movies_failed.sqlite') self.createFailedMoviesSqlite(sqliteFn) return [self.objectView(sqliteFn, viewParams=viewParamsDef)]
[docs] def createFailedMoviesSqlite(self, path): inputMovies = self.protocol.inputMovies.get() cleanPath(path) movieSet = SetOfMovies(filename=path) movieSet.copyInfo(inputMovies) movieSet.copyItems(inputMovies, updateItemCallback=self._findFailedMovies) movieSet.write() movieSet.close() return movieSet
def _findFailedMovies(self, item, row): if item.getObjId() not in self.failedList: setattr(item, "_appendItem", False)