class RemoteProcess(object):
-
"""
- Remote process object used to keep track of attributes of a process.
+ An object to begin and monitor execution of a process on a remote host
"""
__slots__ = [
- 'command', 'stdin', 'stdout', 'stderr', 'exitstatus', 'exited',
+ 'client', 'args', 'check_status', 'command', 'hostname',
+ 'stdin', 'stdout', 'stderr',
+ '_stdin_buf', '_stdout_buf', '_stderr_buf',
+ 'returncode', 'exitstatus',
+ 'greenlets',
# for orchestra.remote.Remote to place a backreference
'remote',
]
- def __init__(self, command, stdin, stdout, stderr, exitstatus, exited):
- self.command = command
- self.stdin = stdin
- self.stdout = stdout
- self.stderr = stderr
- self.exitstatus = exitstatus
- self.exited = exited
+ def __init__(self, client, args, check_status=True, hostname=None):
+ """
+ Create the object. Does not initiate command execution.
+
+ :param client: paramiko.SSHConnection to run the command with
+ :param args: Command to run.
+ :type args: String or list of strings
+ :param check_status: Whether to raise CommandFailedError on non-zero
+ exit status, and . Defaults to True. All signals
+ and connection loss are made to look like SIGHUP.
+ :param hostname: Name of remote host (optional)
+ """
+ self.client = client
+ self.args = args
+ if isinstance(args, basestring):
+ self.command = args
+ else:
+ self.command = quote(args)
+
+ self.check_status = check_status
+
+ if hostname:
+ self.hostname = hostname
+ else:
+ (self.hostname, port) = client.get_transport().getpeername()
+
+ self.greenlets = []
+ self.stdin, self.stdout, self.stderr = (None, None, None)
+ self.returncode = self.exitstatus = None
+
+ def execute(self):
+ """
+ Execute remote command
+ """
+ log.getChild(self.hostname).info(u"Running: {cmd!r}".format(
+ cmd=self.command))
+
+ (self._stdin_buf, self._stdout_buf, self._stderr_buf) = \
+ self.client.exec_command(self.command)
+ (self.stdin, self.stdout, self.stderr) = \
+ (self._stdin_buf, self._stdout_buf, self._stderr_buf)
+
+ def add_greenlet(self, greenlet):
+ self.greenlets.append(greenlet)
+
+ def wait(self):
+ """
+ Block until remote process finishes.
+
+ :returns: self.returncode
+ """
+ for greenlet in self.greenlets:
+ greenlet.get()
+
+ status = self._get_exitstatus()
+ self.exitstatus = self.returncode = status
+ if self.check_status:
+ if status is None:
+ # command either died due to a signal, or the connection
+ # was lost
+ transport = self.client.get_transport()
+ if not transport.is_active():
+ # look like we lost the connection
+ raise ConnectionLostError(command=self.command)
+
+ # connection seems healthy still, assuming it was a
+ # signal; sadly SSH does not tell us which signal
+ raise CommandCrashedError(command=self.command)
+ if status != 0:
+ raise CommandFailedError(command=self.command,
+ exitstatus=status, node=self.hostname)
+ return status
+
+ def _get_exitstatus(self):
+ """
+ :returns: the remote command's exit status (return code). Note that
+ if the connection is lost, or if the process was killed by a
+ signal, this returns None instead of paramiko's -1.
+ """
+ status = self._stdout_buf.channel.recv_exit_status()
+ if status == -1:
+ status = None
+ return status
+
+ @property
+ def finished(self):
+ return self._stdout_buf.channel.exit_status_ready()
+
+ def poll(self):
+ """
+ :returns: self.returncode if the process is finished; else None
+ """
+ if self.finished:
+ return self.returncode
+ return None
class Raw(object):
return ' '.join(_quote(args))
-def execute(client, args, name=None):
- """
- Execute a command remotely.
-
- Caller needs to handle stdin etc.
-
- :param client: SSHConnection to run the command with
- :param args: command to run
- :param name: name of client (optional)
- :type args: string or list of strings
-
- Returns a RemoteProcess, where exitstatus is a callable that will
- block until the exit status is available.
- """
- if isinstance(args, basestring):
- cmd = args
- else:
- cmd = quote(args)
-
- if name:
- host = name
- else:
- (host, port) = client.get_transport().getpeername()
- log.getChild(host).info(u"Running: {cmd!r}".format(cmd=cmd))
-
- (in_, out, err) = client.exec_command(cmd)
-
- def get_exitstatus():
- """
- Get exit status.
-
- When -1 on connection loss *and* signals occur, this
- maps to more pythonic None
- """
- status = out.channel.recv_exit_status()
- if status == -1:
- status = None
- return status
-
- def exitstatus_ready():
- """
- out.channel exit wrapper.
- """
- return out.channel.exit_status_ready()
-
- r = RemoteProcess(
- command=cmd,
- stdin=in_,
- stdout=out,
- stderr=err,
- # this is a callable that will block until the status is
- # available
- exitstatus=get_exitstatus,
- exited=exitstatus_ready,
- )
- return r
-
-
def copy_to_log(f, logger, loglevel=logging.INFO):
"""
Interface to older xreadlines api.
if name is None:
name = host
- r = execute(client, args, name=name)
+ r = RemoteProcess(client, args, check_status=check_status, hostname=name)
+ r.execute()
r.stdin = KludgeFile(wrapped=r.stdin)
g_in = None
if stdin is not PIPE:
g_in = gevent.spawn(copy_and_close, stdin, r.stdin)
+ r.add_greenlet(g_in)
r.stdin = None
else:
assert not wait, \
if stderr is None:
stderr = logger.getChild(name).getChild('stderr')
g_err = gevent.spawn(copy_file_to, r.stderr, stderr)
+ r.add_greenlet(g_err)
r.stderr = stderr
else:
assert not wait, \
if stdout is None:
stdout = logger.getChild(name).getChild('stdout')
g_out = gevent.spawn(copy_file_to, r.stdout, stdout)
+ r.add_greenlet(g_out)
r.stdout = stdout
else:
assert not wait, \
"Using PIPE for stdout without wait=False would deadlock."
- def _check_status(status):
- """
- get values needed if uninitialized. Handle ssh issues when checking
- the status.
- """
- if g_err is not None:
- g_err.get()
- if g_out is not None:
- g_out.get()
- if g_in is not None:
- g_in.get()
-
- status = status()
- if check_status:
- if status is None:
- # command either died due to a signal, or the connection
- # was lost
- transport = client.get_transport()
- if not transport.is_active():
- # look like we lost the connection
- raise ConnectionLostError(command=r.command)
-
- # connection seems healthy still, assuming it was a
- # signal; sadly SSH does not tell us which signal
- raise CommandCrashedError(command=r.command)
- if status != 0:
- raise CommandFailedError(
- command=r.command, exitstatus=status, node=name)
- return status
-
if wait:
- r.exitstatus = _check_status(r.exitstatus)
- else:
- r.exitstatus = spawn_asyncresult(_check_status, r.exitstatus)
+ r.wait()
return r
while len(not_ready) > 0:
check_time()
for proc in list(not_ready):
- if proc.exitstatus.ready():
+ if proc.finished:
not_ready.remove(proc)
for proc in processes:
- assert isinstance(proc.exitstatus, gevent.event.AsyncResult)
- proc.exitstatus.get()
+ proc.wait()
from cStringIO import StringIO
import fudge
-import gevent.event
import logging
from .. import run
wait=False,
)
assert r.command == 'foo'
- assert isinstance(r.exitstatus, gevent.event.AsyncResult)
e = assert_raises(
run.CommandFailedError,
- r.exitstatus.get,
+ r.wait,
)
- assert e.exitstatus == 42
+ assert r.returncode == 42
assert str(e) == "Command failed on HOST with status 42: 'foo'"
@fudge.with_fakes
logger = fudge.Fake('logger').is_a_stub()
channel = fudge.Fake('channel')
out.has_attr(channel=channel)
+ channel.expects('exit_status_ready').with_args().returns(False)
channel.expects('recv_exit_status').with_args().returns(0)
r = run.run(
client=ssh,
)
r.stdin.write('bar')
assert r.command == 'foo'
- assert isinstance(r.exitstatus, gevent.event.AsyncResult)
- assert r.exitstatus.ready() == False
- got = r.exitstatus.get()
+ assert r.poll() is None
+ got = r.wait()
+ assert isinstance(r.returncode, int)
assert got == 0
@fudge.with_fakes
logger = fudge.Fake('logger').is_a_stub()
channel = fudge.Fake('channel')
out.has_attr(channel=channel)
+ channel.expects('exit_status_ready').with_args().returns(False)
channel.expects('recv_exit_status').with_args().returns(0)
r = run.run(
client=ssh,
stdout=run.PIPE,
wait=False,
)
+ assert r.exitstatus is None
assert r.command == 'foo'
- assert isinstance(r.exitstatus, gevent.event.AsyncResult)
- assert r.exitstatus.ready() == False
+ assert r.poll() is None
assert r.stdout.read() == 'one'
assert r.stdout.read() == 'two'
assert r.stdout.read() == ''
- got = r.exitstatus.get()
+ got = r.wait()
+ assert isinstance(r.exitstatus, int)
assert got == 0
@fudge.with_fakes
logger = fudge.Fake('logger').is_a_stub()
channel = fudge.Fake('channel')
out.has_attr(channel=channel)
+ channel.expects('exit_status_ready').with_args().returns(False)
channel.expects('recv_exit_status').with_args().returns(0)
r = run.run(
client=ssh,
stderr=run.PIPE,
wait=False,
)
+ assert r.exitstatus is None
assert r.command == 'foo'
- assert isinstance(r.exitstatus, gevent.event.AsyncResult)
- assert r.exitstatus.ready() is False
+ assert r.poll() is None
assert r.stderr.read() == 'one'
assert r.stderr.read() == 'two'
assert r.stderr.read() == ''
- got = r.exitstatus.get()
+ got = r.wait()
+ assert isinstance(r.exitstatus, int)
assert got == 0
def test_quote_simple(self):