From: Zack Cerza Date: Tue, 19 Jul 2016 16:25:05 +0000 (-0600) Subject: Move some stream management into RemoteProcess X-Git-Tag: 1.1.0~560^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ca97867d2e76f48eb5ab76004f5215715e18592a;p=teuthology.git Move some stream management into RemoteProcess For better organization and less copypasta. Signed-off-by: Zack Cerza --- diff --git a/teuthology/orchestra/run.py b/teuthology/orchestra/run.py index d146ca679..74d68936d 100644 --- a/teuthology/orchestra/run.py +++ b/teuthology/orchestra/run.py @@ -28,12 +28,16 @@ class RemoteProcess(object): '_stdin_buf', '_stdout_buf', '_stderr_buf', 'returncode', 'exitstatus', 'timeout', 'greenlets', + '_wait', 'logger', # for orchestra.remote.Remote to place a backreference 'remote', 'label', ] - def __init__(self, client, args, check_status=True, hostname=None, label=None, timeout=None): + deadlock_warning = "Using PIPE for %s without wait=False would deadlock" + + def __init__(self, client, args, check_status=True, hostname=None, + label=None, timeout=None, wait=True, logger=None): """ Create the object. Does not initiate command execution. @@ -48,6 +52,8 @@ class RemoteProcess(object): command is doing. :param timeout: timeout value for arg that is passed to exec_command of paramiko + :param wait: Whether self.wait() will be called automatically + :param logger: Alternative logger to use (optional) """ self.client = client self.args = args @@ -68,6 +74,8 @@ class RemoteProcess(object): self.greenlets = [] self.stdin, self.stdout, self.stderr = (None, None, None) self.returncode = self.exitstatus = None + self._wait = wait + self.logger = logger or log def execute(self): """ @@ -91,6 +99,36 @@ class RemoteProcess(object): def add_greenlet(self, greenlet): self.greenlets.append(greenlet) + def setup_stdin(self, stream_obj): + self.stdin = KludgeFile(wrapped=self.stdin) + if stream_obj is not PIPE: + greenlet = gevent.spawn(copy_and_close, stream_obj, self.stdin) + self.add_greenlet(greenlet) + self.stdin = None + elif self._wait: + # FIXME: Is this actually true? + raise RuntimeError(self.deadlock_warning % 'stdin') + + def setup_output_stream(self, stream_obj, stream_name): + if stream_obj is not PIPE: + # Log the stream + host_log = self.logger.getChild(self.hostname) + stream_log = host_log.getChild(stream_name) + # If stream_obj is an actual stream, have the logging module write + # to it as well + if stream_obj is not None: + stream_log.addHandler(logging.StreamHandler(stream_obj)) + greenlet = gevent.spawn( + copy_file_to, + getattr(self, stream_name), + stream_log, + ) + self.add_greenlet(greenlet) + setattr(self, stream_name, stream_obj) + elif self._wait: + # FIXME: Is this actually true? + raise RuntimeError(self.deadlock_warning % stream_name) + def wait(self): """ Block until remote process finishes. @@ -360,56 +398,13 @@ def run( if timeout: log.info("Running command with timeout %d", timeout) r = RemoteProcess(client, args, check_status=check_status, hostname=name, - label=label, timeout=timeout) + label=label, timeout=timeout, wait=wait, logger=logger) r.execute() - - r.stdin = KludgeFile(wrapped=r.stdin) - - g_in = None - if stdin is not PIPE: - g_in = gevent.spawn(copy_and_close, stdin, r.stdin) - r.add_greenlet(g_in) - r.stdin = None - else: - assert not wait, \ - "Using PIPE for stdin without wait=False would deadlock." - - if logger is None: - logger = log - - g_err = None - if stderr is not PIPE: - # Log stderr - stderr_logger = logger.getChild(name).getChild('stderr') - # If the stderr arg is a stream, have the logging module write to it as - # well - if stderr is not None: - stderr_logger.addHandler(logging.StreamHandler(stdout)) - g_err = gevent.spawn(copy_file_to, r.stderr, stderr_logger) - r.add_greenlet(g_err) - r.stderr = stderr - else: - assert not wait, \ - "Using PIPE for stderr without wait=False would deadlock." - - g_out = None - if stdout is not PIPE: - # Log stdout - stdout_logger = logger.getChild(name).getChild('stdout') - # If the stdout arg is a stream, have the logging module write to it as - # well - if stdout is not None: - stdout_logger.addHandler(logging.StreamHandler(stdout)) - g_out = gevent.spawn(copy_file_to, r.stdout, stdout_logger) - r.add_greenlet(g_out) - r.stdout = stdout - else: - assert not wait, \ - "Using PIPE for stdout without wait=False would deadlock." - + r.setup_stdin(stdin) + r.setup_output_stream(stderr, 'stderr') + r.setup_output_stream(stdout, 'stdout') if wait: r.wait() - return r