from cStringIO import StringIO
import gevent
+import gevent.event
import pipes
import logging
import shutil
self.stderr = stderr
self.exitstatus = exitstatus
+def spawn_asyncresult(fn, *args, **kwargs):
+ """
+ Spawn a Greenlet and pass it's results to an AsyncResult.
+
+ This function is useful to shuffle data from a Greenlet to
+ AsyncResult, which then again is useful because any Greenlets that
+ raise exceptions will cause tracebacks to be shown on stderr by
+ gevent, even when ``.link_exception`` has been called. Using an
+ AsyncResult avoids this.
+ """
+ r = gevent.event.AsyncResult()
+ def wrapper():
+ try:
+ value = fn(*args, **kwargs)
+ except Exception as e:
+ r.set_exception(e)
+ else:
+ r.set(value)
+ g = gevent.spawn(wrapper)
+
+ return r
+
def run(
client, args,
stdin=None, stdout=None, stderr=None,
logger=None,
check_status=True,
+ wait=True,
):
"""
Run a command remotely.
:param stderr: What to do with standard error. See `stdout`.
:param logger: If logging, write stdout/stderr to "out" and "err" children of this logger. Defaults to logger named after this module.
:param check_status: Whether to raise CalledProcessError on non-zero exit status, and . Defaults to True. All signals and connection loss are made to look like SIGHUP.
+ :param wait: Whether to wait for process to exit. If False, returned ``r.exitstatus`` s a `gevent.event.AsyncResult`, and the actual status is available via ``.get()``.
"""
r = execute(client, args)
g_err.get()
g_in.get()
- status = r.exitstatus
- if check_status:
- if status is None:
- # command either died due to a signal, or the connection
- # was lost
- transport = client.get_transport()
- if not transport.is_active():
- # look like we lost the connection
- raise ConnectionLostError(command=r.command)
-
- # connection seems healthy still, assuming it was a
- # signal; sadly SSH does not tell us which signal
- raise CommandCrashedError(command=r.command)
- if status != 0:
- raise CommandFailedError(command=r.command, exitstatus=status)
+ def get_status():
+ status = r.exitstatus
+ if check_status:
+ if status is None:
+ # command either died due to a signal, or the connection
+ # was lost
+ transport = client.get_transport()
+ if not transport.is_active():
+ # look like we lost the connection
+ raise ConnectionLostError(command=r.command)
+
+ # connection seems healthy still, assuming it was a
+ # signal; sadly SSH does not tell us which signal
+ raise CommandCrashedError(command=r.command)
+ if status != 0:
+ raise CommandFailedError(command=r.command, exitstatus=status)
+ return status
+
+ if wait:
+ status = get_status()
+ else:
+ status = spawn_asyncresult(get_status)
return CommandResult(
command=r.command,
from cStringIO import StringIO
import fudge
+import gevent.event
import nose
import logging
check_status=False,
)
assert r.exitstatus is None
+
+
+@nose.with_setup(fudge.clear_expectations)
+@fudge.with_fakes
+def test_run_nowait():
+ ssh = fudge.Fake('SSHConnection')
+ cmd = ssh.expects('exec_command')
+ cmd.with_args("foo")
+ in_ = fudge.Fake('ChannelFile').is_a_stub()
+ out = fudge.Fake('ChannelFile').is_a_stub()
+ err = fudge.Fake('ChannelFile').is_a_stub()
+ cmd.returns((in_, out, err))
+ out.expects('xreadlines').with_args().returns([])
+ err.expects('xreadlines').with_args().returns([])
+ logger = fudge.Fake('logger').is_a_stub()
+ channel = fudge.Fake('channel')
+ out.has_attr(channel=channel)
+ channel.expects('recv_exit_status').with_args().returns(42)
+ r = run.run(
+ client=ssh,
+ logger=logger,
+ args=['foo'],
+ wait=False,
+ )
+ eq(r.command, 'foo')
+ assert isinstance(r.exitstatus, gevent.event.AsyncResult)
+ e = assert_raises(
+ run.CommandFailedError,
+ r.exitstatus.get,
+ )
+ eq(e.exitstatus, 42)
+ eq(str(e), "Command failed with status 42: 'foo'")