From 073a4bbc18840a82d372f835a0ed3c9f87563055 Mon Sep 17 00:00:00 2001 From: Tommi Virtanen Date: Tue, 24 May 2011 13:06:00 -0700 Subject: [PATCH] Paramiko ChannelFile.close() didn't actually close the remote stdin. Add a wrapper that does the calls shutdown on the channel itself, to actually cause EOF. Add integration test using remote cat. Move shuffling stdout bytes to background, so run.run returns before seeing EOF on stdout, and thus actually making the stdin useful. Similarly, don't wait for EOF on stderr before returning from run.run. --- orchestra/run.py | 28 +++++++++++++++++++++++----- orchestra/test/test_integration.py | 19 +++++++++++++++++++ orchestra/test/test_run.py | 6 ++++++ 3 files changed, 48 insertions(+), 5 deletions(-) diff --git a/orchestra/run.py b/orchestra/run.py index da740957c01dc..d82ac655ab69b 100644 --- a/orchestra/run.py +++ b/orchestra/run.py @@ -146,6 +146,21 @@ class Sentinel(object): PIPE = Sentinel('PIPE') +class KludgeFile(object): + """ + Wrap Paramiko's ChannelFile in a way that lets ``f.close()`` + actually cause an EOF for the remote command. + """ + def __init__(self, wrapped): + self._wrapped = wrapped + + def __getattr__(self, name): + return getattr(self._wrapped, name) + + def close(self): + self._wrapped.close() + self._wrapped.channel.shutdown_write() + def run( client, args, stdin=None, stdout=None, stderr=None, @@ -168,6 +183,8 @@ def run( """ r = execute(client, args) + r.stdin = KludgeFile(wrapped=r.stdin) + g_in = None if stdin is not PIPE: g_in = gevent.spawn(copy_and_close, stdin, r.stdin) @@ -185,14 +202,15 @@ def run( if stdout is None: stdout = logger.getChild('out') - copy_file_to(r.stdout, stdout) + g_out = gevent.spawn(copy_file_to, r.stdout, stdout) r.stdout = stdout - g_err.get() - if g_in is not None: - g_in.get() - def _check_status(status): + g_err.get() + g_out.get() + if g_in is not None: + g_in.get() + status = status() if check_status: if status is None: diff --git a/orchestra/test/test_integration.py b/orchestra/test/test_integration.py index f1f43931ffca6..ee3c34391bba2 100644 --- a/orchestra/test/test_integration.py +++ b/orchestra/test/test_integration.py @@ -1,6 +1,7 @@ from .. import monkey; monkey.patch_all() from nose.tools import eq_ as eq +from cStringIO import StringIO import os import nose @@ -41,3 +42,21 @@ def test_lost(): ) eq(e.command, "sh -c 'kill -ABRT $PPID'") eq(str(e), "SSH connection was lost: \"sh -c 'kill -ABRT $PPID'\"") + +def test_pipe(): + ssh = connection.connect(HOST) + r = run.run( + client=ssh, + args=['cat'], + stdin=run.PIPE, + stdout=StringIO(), + wait=False, + ) + eq(r.stdout.getvalue(), '') + r.stdin.write('foo\n') + r.stdin.write('bar\n') + r.stdin.close() + + got = r.exitstatus.get() + eq(got, 0) + eq(r.stdout.getvalue(), 'foo\nbar\n') diff --git a/orchestra/test/test_run.py b/orchestra/test/test_run.py index 79bfd7508b6a2..d3213aabefed5 100644 --- a/orchestra/test/test_run.py +++ b/orchestra/test/test_run.py @@ -22,6 +22,9 @@ def test_run_log_simple(): err = fudge.Fake('ChannelFile(stderr)') cmd.returns((in_, out, err)) in_.expects('close').with_args() + in_chan = fudge.Fake('channel') + in_chan.expects('shutdown_write').with_args() + in_.has_attr(channel=in_chan) out.expects('xreadlines').with_args().returns(['foo', 'bar']) err.expects('xreadlines').with_args().returns(['bad']) logger = fudge.Fake('logger') @@ -54,6 +57,9 @@ def test_run_capture_stdout(): err = fudge.Fake('ChannelFile(stderr)') cmd.returns((in_, out, err)) in_.expects('close').with_args() + in_chan = fudge.Fake('channel') + in_chan.expects('shutdown_write').with_args() + in_.has_attr(channel=in_chan) out.remember_order() out.expects('read').with_args().returns('foo\nb') out.expects('read').with_args().returns('ar\n') -- 2.39.5