# **************************************************************************
# *
# * Authors: J.M. De la Rosa Trevin (jmdelarosa@cnb.csic.es)
# *
# * Unidad de Bioinformatica of Centro Nacional de Biotecnologia , CSIC
# *
# * This program is free software; you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation; either version 3 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program; if not, write to the Free Software
# * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
# * 02111-1307 USA
# *
# * All comments concerning this program package may be sent to the
# * e-mail address 'scipion@cnb.csic.es'
# *
# **************************************************************************
"""
This module is responsible for launching protocol executions.
There are two main scenarios: local execution and remote execution.
A. Local execution:
This will depend on the 'localhost' configuration
1- Check if the protocol will be launched with MPI or not (using MPI template from config)
2- Check if the protocol will be submitted to a queue (using Queue template from config)
3- Build the command that will be launched.
B. Remote execution:
1- Establish a connection with remote host for protocol execution
2- Copy necessary files to remote host.
3- Run a local process (for local execution, see case A) in the remote host
4- Get the result back after launching remotely
"""
import os
import re
import logging
logger = logging.getLogger(__file__)
from subprocess import Popen, PIPE
import pyworkflow as pw
from pyworkflow.exceptions import PyworkflowException
from pyworkflow.utils import (redStr, greenStr, makeFilePath, join, process,
getHostFullName)
from pyworkflow.protocol.constants import UNKNOWN_JOBID
LOCALHOST = 'localhost'
# ******************************************************************
# * Public functions provided by the module
# ******************************************************************
[docs]def launch(protocol, wait=False, stdin=None, stdout=None, stderr=None):
""" This function should be used to launch a protocol
This function will decide which case, A or B will be used.
"""
if _isLocal(protocol):
jobId = _launchLocal(protocol, wait, stdin, stdout, stderr)
else:
jobId = _launchRemote(protocol, wait)
protocol.setJobId(jobId)
return jobId
[docs]def stop(protocol):
"""
"""
if _isLocal(protocol):
return _stopLocal(protocol)
else:
return _stopRemote(protocol)
[docs]def schedule(protocol, initialSleepTime=0, wait=False):
""" Use this function to schedule protocols that are not ready to
run yet. Right now it only make sense to schedule jobs locally.
"""
cmd = '%s %s' % (pw.PYTHON, pw.getScheduleScript())
cmd += ' "%s" "%s" %s "%s" --initial_sleep %s' % (protocol.getProject().path,
protocol.getDbPath(),
protocol.strId(),
protocol.getScheduleLog(),
initialSleepTime)
jobId = _run(cmd, wait)
protocol.setJobId(jobId)
return jobId
# ******************************************************************
# * Internal utility functions
# ******************************************************************
def _isLocal(protocol):
return protocol.getHostName() == LOCALHOST
def _runsLocally(protocol):
""" Return True if this protocol is running in this machine,
where the PID makes sense.
"""
return protocol.getHostFullName() == getHostFullName()
# ******************************************************************
# * Function related to LAUNCH
# ******************************************************************
def _getAppsProgram(prog):
""" Get a command to launch a program under the apps folder.
"""
return "%s %s" % (pw.PYTHON, pw.join(pw.APPS, prog))
def _launchLocal(protocol, wait, stdin=None, stdout=None, stderr=None):
"""
:param protocol: Protocol to launch
:param wait: Pass true if you want to wait for the process to finish
:param stdin: stdin object to direct stdin to
:param stdout: stdout object to send process stdout
:param stderr: stderr object to send process stderr
:return: PID of queue's JOBID
"""
command = '{python} {prot_run} "{project_path}" "{db_path}" {prot_id} "{stdout_log}" "{stderr_log}"'.format(
python=pw.PYTHON,
prot_run=pw.join(pw.APPS, 'pw_protocol_run.py'),
project_path=protocol.getProject().path,
db_path=protocol.getDbPath(),
prot_id=protocol.strId(),
# We make them absolute in case working dir is not passed to the node when running through a queue.
# The reason is that since 3.0.27, the first thing that is affected by the current working dir is the
# creation of the logs. Before event than loading the project, which was and is setting the working dir to
# the project path. IMPORTANT: This assumes the paths before the queue and after the queue (nodes) are the same
# Which I think is safe since we are passing here "project_path" that is absolute.
stdout_log=os.path.abspath(protocol.getStdoutLog()),
stderr_log=os.path.abspath(protocol.getStderrLog())
)
#command = ('%s %s "%s" "%s" %s "%s" "%s"'
# % (pw.PYTHON, pw.join(pw.APPS, 'pw_protocol_run.py'),
# protocol.getProject().path, protocol.getDbPath(),
# protocol.strId()))
hostConfig = protocol.getHostConfig()
useQueue = protocol.useQueue()
# Empty PID: 0
protocol.setPid(0)
# Check if need to submit to queue
if useQueue and (protocol.getSubmitDict()["QUEUE_FOR_JOBS"] == "N"):
submitDict = dict(hostConfig.getQueuesDefault())
submitDict.update(protocol.getSubmitDict())
submitDict['JOB_COMMAND'] = command
jobId = _submit(hostConfig, submitDict)
else:
jobId = _run(command, wait, stdin, stdout, stderr)
return jobId
def _runRemote(protocol, mode):
""" Launch remotely 'pw_protocol_remote.py' script to run or stop a protocol.
Params:
protocol: the protocol to be ran or stopped.
mode: should be either 'run' or 'stop'
"""
host = protocol.getHostConfig()
tpl = "ssh %(address)s '%(scipion)s/scipion "
if host.getScipionConfig() is not None:
tpl += "--config %(config)s "
tpl += "runprotocol pw_protocol_remote.py %(mode)s "
tpl += "%(project)s %(protDb)s %(protId)s' "
# Use project base name,
# in remote SCIPION_USER_DATA/projects should be prepended
projectPath = os.path.basename(protocol.getProject().path)
args = {'address': host.getAddress(),
'mode': mode,
'scipion': host.getScipionHome(),
'config': host.getScipionConfig(),
'project': projectPath,
'protDb': protocol.getDbPath(),
'protId': protocol.getObjId()
}
cmd = tpl % args
logger.info("** Running remote: %s" % greenStr(cmd))
p = Popen(cmd, shell=True, stdout=PIPE)
return p
def _launchRemote(protocol, wait):
p = _runRemote(protocol, 'run')
jobId = UNKNOWN_JOBID
out, err = p.communicate()
if err:
raise Exception(err)
s = re.search('Scipion remote jobid: (\d+)', out)
if s:
jobId = int(s.group(1))
else:
raise Exception("** Couldn't parse ouput: %s" % redStr(out))
return jobId
def _copyFiles(protocol, rpath):
""" Copy all required files for protocol to run
in a remote execution host.
NOTE: this function should always be execute with
the current working dir pointing to the project dir.
And the remotePath is assumed to be in protocol.getHostConfig().getHostPath()
Params:
protocol: protocol to copy files
ssh: an ssh connection to copy the files.
"""
remotePath = protocol.getHostConfig().getHostPath()
for f in protocol.getFiles():
remoteFile = join(remotePath, f)
rpath.putFile(f, remoteFile)
def _submit(hostConfig, submitDict, cwd=None, env=None):
""" Submit a protocol to a queue system. Return its job id.
"""
# Create first the submission script to be launched
# formatting using the template
template = hostConfig.getSubmitTemplate()
try:
template = template % submitDict
except Exception as e:
# Capture parsing errors
exception = analyzeFormattingTypeError(template, submitDict)
if exception:
raise exception
else:
# If there is no exception, then raise actual one
raise e
# FIXME: CREATE THE PATH FIRST
scripPath = submitDict['JOB_SCRIPT']
f = open(scripPath, 'w')
# Ensure the path exists
makeFilePath(scripPath)
# Add some line ends because in some clusters it fails
# to submit jobs if the submit script does not have end of line
f.write(template + '\n\n')
f.close()
# This should format the command using a template like:
# "qsub %(JOB_SCRIPT)s"
command = hostConfig.getSubmitCommand() % submitDict
gcmd = greenStr(command)
logger.info("** Submitting to queue: '%s'" % gcmd)
p = Popen(command, shell=True, stdout=PIPE, cwd=cwd, env=env)
out = p.communicate()[0]
# Try to parse the result of qsub, searching for a number (jobId)
# Review this, seems to exclusive to torque batch system
s = re.search('(\d+)', str(out))
if p.returncode == 0 and s:
job = int(s.group(0))
logger.info("Launched job with id %s" % job)
return job
else:
logger.info("Couldn't submit to queue for reason: %s " % redStr(out.decode()))
return UNKNOWN_JOBID
def _run(command, wait, stdin=None, stdout=None, stderr=None):
""" Execute a command in a subprocess and return the pid. """
gcmd = greenStr(command)
logger.info("** Running command: '%s'" % gcmd)
p = Popen(command, shell=True, stdout=stdout, stderr=stderr)
jobId = p.pid
if wait:
p.wait()
return jobId
# ******************************************************************
# * Function related to STOP
# ******************************************************************
def _stopLocal(protocol):
if protocol.useQueue() and not protocol.isScheduled():
jobId = protocol.getJobId()
host = protocol.getHostConfig()
cancelCmd = host.getCancelCommand() % {'JOB_ID': jobId}
_run(cancelCmd, wait=True)
else:
process.killWithChilds(protocol.getPid())
def _stopRemote(protocol):
_runRemote(protocol, 'stop')