# **************************************************************************
# *
# * Authors: J.M. De la Rosa Trevin (jmdelarosa@cnb.csic.es)
# *
# * Unidad de Bioinformatica of Centro Nacional de Biotecnologia , CSIC
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 3 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307 USA
# *
# * All comments concerning this program package may be sent to the
# * e-mail address 'scipion@cnb.csic.es'
# *
# **************************************************************************
"""
This module is responsible for launching local protocol executions:
1- Check if the protocol will be submitted to a queue (using Queue template from config)
2- Build the command that will be launched.
"""
import os
import re
import logging
logger = logging.getLogger(__file__)
from subprocess import Popen, PIPE
import pyworkflow as pw
from pyworkflow.exceptions import PyworkflowException
from pyworkflow.utils import (redStr, greenStr, makeFilePath, join, process,
getHostFullName)
from pyworkflow.protocol.constants import UNKNOWN_JOBID
# ******************************************************************
# * Public functions provided by the module
# ******************************************************************
[docs]def launch(protocol, wait=False, stdin=None, stdout=None, stderr=None):
""" This function should be used to launch a protocol. """
jobId = _launchLocal(protocol, wait, stdin, stdout, stderr)
protocol.setJobId(jobId)
return jobId
[docs]def stop(protocol):
"""
"""
if protocol.useQueue() and not protocol.isScheduled():
jobId = protocol.getJobId()
host = protocol.getHostConfig()
cancelCmd = host.getCancelCommand() % {'JOB_ID': jobId}
_run(cancelCmd, wait=True)
else:
process.killWithChilds(protocol.getPid())
[docs]def schedule(protocol, initialSleepTime=0, wait=False):
""" Use this function to schedule protocols that are not ready to
run yet. Right now it only make sense to schedule jobs locally.
"""
cmd = '%s %s' % (pw.PYTHON, pw.getScheduleScript())
cmd += ' "%s" "%s" %s "%s" --initial_sleep %s' % (protocol.getProject().path,
protocol.getDbPath(),
protocol.strId(),
protocol.getScheduleLog(),
initialSleepTime)
jobId = _run(cmd, wait)
protocol.setJobId(jobId)
return jobId
# ******************************************************************
# * Internal utility functions
# ******************************************************************
def _runsLocally(protocol):
""" Return True if this protocol is running in this machine,
where the PID makes sense.
"""
return protocol.getHostFullName() == getHostFullName()
# ******************************************************************
# * Function related to LAUNCH
# ******************************************************************
def _getAppsProgram(prog):
""" Get a command to launch a program under the apps folder.
"""
return "%s %s" % (pw.PYTHON, pw.join(pw.APPS, prog))
def _launchLocal(protocol, wait, stdin=None, stdout=None, stderr=None):
"""
:param protocol: Protocol to launch
:param wait: Pass true if you want to wait for the process to finish
:param stdin: stdin object to direct stdin to
:param stdout: stdout object to send process stdout
:param stderr: stderr object to send process stderr
:return: PID of queue's JOBID
"""
command = '{python} {prot_run} "{project_path}" "{db_path}" {prot_id} "{stdout_log}" "{stderr_log}"'.format(
python=pw.PYTHON,
prot_run=pw.join(pw.APPS, 'pw_protocol_run.py'),
project_path=protocol.getProject().path,
db_path=protocol.getDbPath(),
prot_id=protocol.strId(),
# We make them absolute in case working dir is not passed to the node when running through a queue.
# The reason is that since 3.0.27, the first thing that is affected by the current working dir is the
# creation of the logs. Before event than loading the project, which was and is setting the working dir to
# the project path. IMPORTANT: This assumes the paths before the queue and after the queue (nodes) are the same
# Which I think is safe since we are passing here "project_path" that is absolute.
stdout_log=os.path.abspath(protocol.getStdoutLog()),
stderr_log=os.path.abspath(protocol.getStderrLog())
)
hostConfig = protocol.getHostConfig()
useQueue = protocol.useQueue()
# Empty PID: 0
protocol.setPid(0)
# Check if need to submit to queue
if useQueue and (protocol.getSubmitDict()["QUEUE_FOR_JOBS"] == "N"):
submitDict = dict(hostConfig.getQueuesDefault())
submitDict.update(protocol.getSubmitDict())
submitDict['JOB_COMMAND'] = command
jobId = _submit(hostConfig, submitDict)
else:
jobId = _run(command, wait, stdin, stdout, stderr)
return jobId
def _submit(hostConfig, submitDict, cwd=None, env=None):
""" Submit a protocol to a queue system. Return its job id.
"""
# Create first the submission script to be launched
# formatting using the template
template = hostConfig.getSubmitTemplate()
try:
template = template % submitDict
except Exception as e:
# Capture parsing errors
exception = analyzeFormattingTypeError(template, submitDict)
if exception:
raise exception
else:
# If there is no exception, then raise actual one
raise e
# FIXME: CREATE THE PATH FIRST
scripPath = submitDict['JOB_SCRIPT']
f = open(scripPath, 'w')
# Ensure the path exists
makeFilePath(scripPath)
# Add some line ends because in some clusters it fails
# to submit jobs if the submit script does not have end of line
f.write(template + '\n\n')
f.close()
# This should format the command using a template like:
# "qsub %(JOB_SCRIPT)s"
command = hostConfig.getSubmitCommand() % submitDict
gcmd = greenStr(command)
logger.info("** Submitting to queue: '%s'" % gcmd)
p = Popen(command, shell=True, stdout=PIPE, cwd=cwd, env=env)
out = p.communicate()[0]
# Try to parse the result of qsub, searching for a number (jobId)
# Review this, seems to exclusive to torque batch system
s = re.search('(\d+)', str(out))
if p.returncode == 0 and s:
job = int(s.group(0))
logger.info("Launched job with id %s" % job)
return job
else:
logger.info("Couldn't submit to queue for reason: %s " % redStr(out.decode()))
return UNKNOWN_JOBID
def _run(command, wait, stdin=None, stdout=None, stderr=None):
""" Execute a command in a subprocess and return the pid. """
gcmd = greenStr(command)
logger.info("** Running command: '%s'" % gcmd)
p = Popen(command, shell=True, stdout=stdout, stderr=stderr)
jobId = p.pid
if wait:
p.wait()
return jobId