]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Refactor teuthology.orchestra.run
authorZack Cerza <zack@cerza.org>
Fri, 30 May 2014 16:59:09 +0000 (11:59 -0500)
committerZack Cerza <zack@cerza.org>
Fri, 30 May 2014 21:05:23 +0000 (16:05 -0500)
RemoteProcess behaves more like subprocess.Popen, with some important
differences.

A summary of the API changes:
* RemoteProcess.exitstatus is either an int or None; it is never a callable
  nor a gevent.AsyncResult.
* New method: RemoteProcess.execute()
* New method: RemoteProcess.poll()
* New method: RemoteProcess.wait()
* New attribute: RemoteProcess.returncode - alias to exitstatus
* New property: RemoteProcess.finished - added because returncode can be None
  if the connection was interrupted
* run.execute() is removed.

Signed-off-by: Zack Cerza <zack.cerza@inktank.com>
teuthology/orchestra/run.py
teuthology/orchestra/test/test_integration.py
teuthology/orchestra/test/test_remote.py
teuthology/orchestra/test/test_run.py

index 5895ed48c239be5887ad9f8aca38518f69d197b1..fd03a2d28d3692402fa57dcc7c375369d9ee6615 100644 (file)
@@ -16,23 +16,114 @@ log = logging.getLogger(__name__)
 
 
 class RemoteProcess(object):
-
     """
-    Remote process object used to keep track of attributes of a process.
+    An object to begin and monitor execution of a process on a remote host
     """
     __slots__ = [
-        'command', 'stdin', 'stdout', 'stderr', 'exitstatus', 'exited',
+        'client', 'args', 'check_status', 'command', 'hostname',
+        'stdin', 'stdout', 'stderr',
+        '_stdin_buf', '_stdout_buf', '_stderr_buf',
+        'returncode', 'exitstatus',
+        'greenlets',
         # for orchestra.remote.Remote to place a backreference
         'remote',
         ]
 
-    def __init__(self, command, stdin, stdout, stderr, exitstatus, exited):
-        self.command = command
-        self.stdin = stdin
-        self.stdout = stdout
-        self.stderr = stderr
-        self.exitstatus = exitstatus
-        self.exited = exited
+    def __init__(self, client, args, check_status=True, hostname=None):
+        """
+        Create the object. Does not initiate command execution.
+
+        :param client:       paramiko.SSHConnection to run the command with
+        :param args:         Command to run.
+        :type args:          String or list of strings
+        :param check_status: Whether to raise CommandFailedError on non-zero
+                             exit status, and . Defaults to True. All signals
+                             and connection loss are made to look like SIGHUP.
+        :param hostname: Name of remote host (optional)
+        """
+        self.client = client
+        self.args = args
+        if isinstance(args, basestring):
+            self.command = args
+        else:
+            self.command = quote(args)
+
+        self.check_status = check_status
+
+        if hostname:
+            self.hostname = hostname
+        else:
+            (self.hostname, port) = client.get_transport().getpeername()
+
+        self.greenlets = []
+        self.stdin, self.stdout, self.stderr = (None, None, None)
+        self.returncode = self.exitstatus = None
+
+    def execute(self):
+        """
+        Execute remote command
+        """
+        log.getChild(self.hostname).info(u"Running: {cmd!r}".format(
+            cmd=self.command))
+
+        (self._stdin_buf, self._stdout_buf, self._stderr_buf) = \
+            self.client.exec_command(self.command)
+        (self.stdin, self.stdout, self.stderr) = \
+            (self._stdin_buf, self._stdout_buf, self._stderr_buf)
+
+    def add_greenlet(self, greenlet):
+        self.greenlets.append(greenlet)
+
+    def wait(self):
+        """
+        Block until remote process finishes.
+
+        :returns: self.returncode
+        """
+        for greenlet in self.greenlets:
+            greenlet.get()
+
+        status = self._get_exitstatus()
+        self.exitstatus = self.returncode = status
+        if self.check_status:
+            if status is None:
+                # command either died due to a signal, or the connection
+                # was lost
+                transport = self.client.get_transport()
+                if not transport.is_active():
+                    # look like we lost the connection
+                    raise ConnectionLostError(command=self.command)
+
+                # connection seems healthy still, assuming it was a
+                # signal; sadly SSH does not tell us which signal
+                raise CommandCrashedError(command=self.command)
+            if status != 0:
+                raise CommandFailedError(command=self.command,
+                                         exitstatus=status, node=self.hostname)
+        return status
+
+    def _get_exitstatus(self):
+        """
+        :returns: the remote command's exit status (return code). Note that
+                  if the connection is lost, or if the process was killed by a
+                  signal, this returns None instead of paramiko's -1.
+        """
+        status = self._stdout_buf.channel.recv_exit_status()
+        if status == -1:
+            status = None
+        return status
+
+    @property
+    def finished(self):
+        return self._stdout_buf.channel.exit_status_ready()
+
+    def poll(self):
+        """
+        :returns: self.returncode if the process is finished; else None
+        """
+        if self.finished:
+            return self.returncode
+        return None
 
 
 class Raw(object):
@@ -66,64 +157,6 @@ def quote(args):
     return ' '.join(_quote(args))
 
 
-def execute(client, args, name=None):
-    """
-    Execute a command remotely.
-
-    Caller needs to handle stdin etc.
-
-    :param client: SSHConnection to run the command with
-    :param args: command to run
-    :param name: name of client (optional)
-    :type args: string or list of strings
-
-    Returns a RemoteProcess, where exitstatus is a callable that will
-    block until the exit status is available.
-    """
-    if isinstance(args, basestring):
-        cmd = args
-    else:
-        cmd = quote(args)
-
-    if name:
-        host = name
-    else:
-        (host, port) = client.get_transport().getpeername()
-    log.getChild(host).info(u"Running: {cmd!r}".format(cmd=cmd))
-
-    (in_, out, err) = client.exec_command(cmd)
-
-    def get_exitstatus():
-        """
-        Get exit status.
-
-        When -1 on connection loss *and* signals occur, this
-        maps to more pythonic None
-        """
-        status = out.channel.recv_exit_status()
-        if status == -1:
-            status = None
-        return status
-
-    def exitstatus_ready():
-        """
-        out.channel exit wrapper.
-        """
-        return out.channel.exit_status_ready()
-
-    r = RemoteProcess(
-        command=cmd,
-        stdin=in_,
-        stdout=out,
-        stderr=err,
-        # this is a callable that will block until the status is
-        # available
-        exitstatus=get_exitstatus,
-        exited=exitstatus_ready,
-        )
-    return r
-
-
 def copy_to_log(f, logger, loglevel=logging.INFO):
     """
     Interface to older xreadlines api.
@@ -316,13 +349,15 @@ def run(
     if name is None:
         name = host
 
-    r = execute(client, args, name=name)
+    r = RemoteProcess(client, args, check_status=check_status, hostname=name)
+    r.execute()
 
     r.stdin = KludgeFile(wrapped=r.stdin)
 
     g_in = None
     if stdin is not PIPE:
         g_in = gevent.spawn(copy_and_close, stdin, r.stdin)
+        r.add_greenlet(g_in)
         r.stdin = None
     else:
         assert not wait, \
@@ -336,6 +371,7 @@ def run(
         if stderr is None:
             stderr = logger.getChild(name).getChild('stderr')
         g_err = gevent.spawn(copy_file_to, r.stderr, stderr)
+        r.add_greenlet(g_err)
         r.stderr = stderr
     else:
         assert not wait, \
@@ -346,45 +382,14 @@ def run(
         if stdout is None:
             stdout = logger.getChild(name).getChild('stdout')
         g_out = gevent.spawn(copy_file_to, r.stdout, stdout)
+        r.add_greenlet(g_out)
         r.stdout = stdout
     else:
         assert not wait, \
             "Using PIPE for stdout without wait=False would deadlock."
 
-    def _check_status(status):
-        """
-        get values needed if uninitialized.  Handle ssh issues when checking
-        the status.
-        """
-        if g_err is not None:
-            g_err.get()
-        if g_out is not None:
-            g_out.get()
-        if g_in is not None:
-            g_in.get()
-
-        status = status()
-        if check_status:
-            if status is None:
-                # command either died due to a signal, or the connection
-                # was lost
-                transport = client.get_transport()
-                if not transport.is_active():
-                    # look like we lost the connection
-                    raise ConnectionLostError(command=r.command)
-
-                # connection seems healthy still, assuming it was a
-                # signal; sadly SSH does not tell us which signal
-                raise CommandCrashedError(command=r.command)
-            if status != 0:
-                raise CommandFailedError(
-                    command=r.command, exitstatus=status, node=name)
-        return status
-
     if wait:
-        r.exitstatus = _check_status(r.exitstatus)
-    else:
-        r.exitstatus = spawn_asyncresult(_check_status, r.exitstatus)
+        r.wait()
 
     return r
 
@@ -403,9 +408,8 @@ def wait(processes, timeout=None):
             while len(not_ready) > 0:
                 check_time()
                 for proc in list(not_ready):
-                    if proc.exitstatus.ready():
+                    if proc.finished:
                         not_ready.remove(proc)
 
     for proc in processes:
-        assert isinstance(proc.exitstatus, gevent.event.AsyncResult)
-        proc.exitstatus.get()
+        proc.wait()
index 03aa962ad9b719ffd529f6710440edb291c3bafc..7009b8994a8394e72d93ed8027a0a3e018f23a0f 100644 (file)
@@ -59,7 +59,8 @@ class TestIntegration():
         r.stdin.write('bar\n')
         r.stdin.close()
 
-        got = r.exitstatus.get()
+        r.wait()
+        got = r.exitstatus
         assert got == 0
         assert r.stdout.getvalue() == 'foo\nbar\n'
 
index e52a2f7a2f3555ab2ac2da55b1d60bec573a037b..b79cd176972c795c34064a153dea85ec283701d6 100644 (file)
@@ -27,6 +27,8 @@ class TestRemote(object):
     def test_run(self):
         fudge.clear_expectations()
         ssh = fudge.Fake('SSHConnection')
+        ssh.expects('get_transport').returns_fake().expects('getpeername')\
+            .returns(('name', 22))
         run = fudge.Fake('run')
         args = [
             'something',
@@ -34,12 +36,8 @@ class TestRemote(object):
             ]
         foo = object()
         ret = RemoteProcess(
-            command='fakey',
-            stdin=None,
-            stdout=None,
-            stderr=None,
-            exitstatus=None,
-            exited=None,
+            client=ssh,
+            args='fakey',
             )
         r = remote.Remote(name='jdoe@xyzzy.example.com', ssh=ssh)
         run.expects_call().with_args(
index 75d45dcdb38ea766020ff0d62130a8717ff99e9f..49f52ee960a3986f64c89fcd0468691bc2a086e5 100644 (file)
@@ -1,7 +1,6 @@
 from cStringIO import StringIO
 
 import fudge
-import gevent.event
 import logging
 
 from .. import run
@@ -280,12 +279,11 @@ class TestRun(object):
             wait=False,
             )
         assert r.command == 'foo'
-        assert isinstance(r.exitstatus, gevent.event.AsyncResult)
         e = assert_raises(
             run.CommandFailedError,
-            r.exitstatus.get,
+            r.wait,
             )
-        assert e.exitstatus == 42
+        assert r.returncode == 42
         assert str(e) == "Command failed on HOST with status 42: 'foo'"
 
     @fudge.with_fakes
@@ -305,6 +303,7 @@ class TestRun(object):
         logger = fudge.Fake('logger').is_a_stub()
         channel = fudge.Fake('channel')
         out.has_attr(channel=channel)
+        channel.expects('exit_status_ready').with_args().returns(False)
         channel.expects('recv_exit_status').with_args().returns(0)
         r = run.run(
             client=ssh,
@@ -315,9 +314,9 @@ class TestRun(object):
             )
         r.stdin.write('bar')
         assert r.command == 'foo'
-        assert isinstance(r.exitstatus, gevent.event.AsyncResult)
-        assert r.exitstatus.ready() == False
-        got = r.exitstatus.get()
+        assert r.poll() is None
+        got = r.wait()
+        assert isinstance(r.returncode, int)
         assert got == 0
 
     @fudge.with_fakes
@@ -339,6 +338,7 @@ class TestRun(object):
         logger = fudge.Fake('logger').is_a_stub()
         channel = fudge.Fake('channel')
         out.has_attr(channel=channel)
+        channel.expects('exit_status_ready').with_args().returns(False)
         channel.expects('recv_exit_status').with_args().returns(0)
         r = run.run(
             client=ssh,
@@ -347,13 +347,14 @@ class TestRun(object):
             stdout=run.PIPE,
             wait=False,
             )
+        assert r.exitstatus is None
         assert r.command == 'foo'
-        assert isinstance(r.exitstatus, gevent.event.AsyncResult)
-        assert r.exitstatus.ready() == False
+        assert r.poll() is None
         assert r.stdout.read() == 'one'
         assert r.stdout.read() == 'two'
         assert r.stdout.read() == ''
-        got = r.exitstatus.get()
+        got = r.wait()
+        assert isinstance(r.exitstatus, int)
         assert got == 0
 
     @fudge.with_fakes
@@ -375,6 +376,7 @@ class TestRun(object):
         logger = fudge.Fake('logger').is_a_stub()
         channel = fudge.Fake('channel')
         out.has_attr(channel=channel)
+        channel.expects('exit_status_ready').with_args().returns(False)
         channel.expects('recv_exit_status').with_args().returns(0)
         r = run.run(
             client=ssh,
@@ -383,13 +385,14 @@ class TestRun(object):
             stderr=run.PIPE,
             wait=False,
             )
+        assert r.exitstatus is None
         assert r.command == 'foo'
-        assert isinstance(r.exitstatus, gevent.event.AsyncResult)
-        assert r.exitstatus.ready() is False
+        assert r.poll() is None
         assert r.stderr.read() == 'one'
         assert r.stderr.read() == 'two'
         assert r.stderr.read() == ''
-        got = r.exitstatus.get()
+        got = r.wait()
+        assert isinstance(r.exitstatus, int)
         assert got == 0
 
     def test_quote_simple(self):