]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
Move some stream management into RemoteProcess
authorZack Cerza <zack@redhat.com>
Tue, 19 Jul 2016 16:25:05 +0000 (10:25 -0600)
committerZack Cerza <zack@redhat.com>
Wed, 20 Jul 2016 23:42:48 +0000 (17:42 -0600)
For better organization and less copypasta.

Signed-off-by: Zack Cerza <zack@redhat.com>
teuthology/orchestra/run.py

index d146ca6798a124d9a62b13a24288fad56d13c41b..74d68936dedcdbd915f14fb13da32d42d9e1fbd3 100644 (file)
@@ -28,12 +28,16 @@ class RemoteProcess(object):
         '_stdin_buf', '_stdout_buf', '_stderr_buf',
         'returncode', 'exitstatus', 'timeout',
         'greenlets',
+        '_wait', 'logger',
         # for orchestra.remote.Remote to place a backreference
         'remote',
         'label',
         ]
 
-    def __init__(self, client, args, check_status=True, hostname=None, label=None, timeout=None):
+    deadlock_warning = "Using PIPE for %s without wait=False would deadlock"
+
+    def __init__(self, client, args, check_status=True, hostname=None,
+                 label=None, timeout=None, wait=True, logger=None):
         """
         Create the object. Does not initiate command execution.
 
@@ -48,6 +52,8 @@ class RemoteProcess(object):
                              command is doing.
         :param timeout:      timeout value for arg that is passed to
                              exec_command of paramiko
+        :param wait:         Whether self.wait() will be called automatically
+        :param logger:       Alternative logger to use (optional)
         """
         self.client = client
         self.args = args
@@ -68,6 +74,8 @@ class RemoteProcess(object):
         self.greenlets = []
         self.stdin, self.stdout, self.stderr = (None, None, None)
         self.returncode = self.exitstatus = None
+        self._wait = wait
+        self.logger = logger or log
 
     def execute(self):
         """
@@ -91,6 +99,36 @@ class RemoteProcess(object):
     def add_greenlet(self, greenlet):
         self.greenlets.append(greenlet)
 
+    def setup_stdin(self, stream_obj):
+        self.stdin = KludgeFile(wrapped=self.stdin)
+        if stream_obj is not PIPE:
+            greenlet = gevent.spawn(copy_and_close, stream_obj, self.stdin)
+            self.add_greenlet(greenlet)
+            self.stdin = None
+        elif self._wait:
+            # FIXME: Is this actually true?
+            raise RuntimeError(self.deadlock_warning % 'stdin')
+
+    def setup_output_stream(self, stream_obj, stream_name):
+        if stream_obj is not PIPE:
+            # Log the stream
+            host_log = self.logger.getChild(self.hostname)
+            stream_log = host_log.getChild(stream_name)
+            # If stream_obj is an actual stream, have the logging module write
+            # to it as well
+            if stream_obj is not None:
+                stream_log.addHandler(logging.StreamHandler(stream_obj))
+            greenlet = gevent.spawn(
+                copy_file_to,
+                getattr(self, stream_name),
+                stream_log,
+            )
+            self.add_greenlet(greenlet)
+            setattr(self, stream_name, stream_obj)
+        elif self._wait:
+            # FIXME: Is this actually true?
+            raise RuntimeError(self.deadlock_warning % stream_name)
+
     def wait(self):
         """
         Block until remote process finishes.
@@ -360,56 +398,13 @@ def run(
     if timeout:
         log.info("Running command with timeout %d", timeout)
     r = RemoteProcess(client, args, check_status=check_status, hostname=name,
-                      label=label, timeout=timeout)
+                      label=label, timeout=timeout, wait=wait, logger=logger)
     r.execute()
-
-    r.stdin = KludgeFile(wrapped=r.stdin)
-
-    g_in = None
-    if stdin is not PIPE:
-        g_in = gevent.spawn(copy_and_close, stdin, r.stdin)
-        r.add_greenlet(g_in)
-        r.stdin = None
-    else:
-        assert not wait, \
-            "Using PIPE for stdin without wait=False would deadlock."
-
-    if logger is None:
-        logger = log
-
-    g_err = None
-    if stderr is not PIPE:
-        # Log stderr
-        stderr_logger = logger.getChild(name).getChild('stderr')
-        # If the stderr arg is a stream, have the logging module write to it as
-        # well
-        if stderr is not None:
-            stderr_logger.addHandler(logging.StreamHandler(stdout))
-        g_err = gevent.spawn(copy_file_to, r.stderr, stderr_logger)
-        r.add_greenlet(g_err)
-        r.stderr = stderr
-    else:
-        assert not wait, \
-            "Using PIPE for stderr without wait=False would deadlock."
-
-    g_out = None
-    if stdout is not PIPE:
-        # Log stdout
-        stdout_logger = logger.getChild(name).getChild('stdout')
-        # If the stdout arg is a stream, have the logging module write to it as
-        # well
-        if stdout is not None:
-            stdout_logger.addHandler(logging.StreamHandler(stdout))
-        g_out = gevent.spawn(copy_file_to, r.stdout, stdout_logger)
-        r.add_greenlet(g_out)
-        r.stdout = stdout
-    else:
-        assert not wait, \
-            "Using PIPE for stdout without wait=False would deadlock."
-
+    r.setup_stdin(stdin)
+    r.setup_output_stream(stderr, 'stderr')
+    r.setup_output_stream(stdout, 'stdout')
     if wait:
         r.wait()
-
     return r