'_stdin_buf', '_stdout_buf', '_stderr_buf',
'returncode', 'exitstatus', 'timeout',
'greenlets',
+ '_wait', 'logger',
# for orchestra.remote.Remote to place a backreference
'remote',
'label',
]
- def __init__(self, client, args, check_status=True, hostname=None, label=None, timeout=None):
+ deadlock_warning = "Using PIPE for %s without wait=False would deadlock"
+
+ def __init__(self, client, args, check_status=True, hostname=None,
+ label=None, timeout=None, wait=True, logger=None):
"""
Create the object. Does not initiate command execution.
command is doing.
:param timeout: timeout value for arg that is passed to
exec_command of paramiko
+ :param wait: Whether self.wait() will be called automatically
+ :param logger: Alternative logger to use (optional)
"""
self.client = client
self.args = args
self.greenlets = []
self.stdin, self.stdout, self.stderr = (None, None, None)
self.returncode = self.exitstatus = None
+ self._wait = wait
+ self.logger = logger or log
def execute(self):
"""
def add_greenlet(self, greenlet):
self.greenlets.append(greenlet)
+ def setup_stdin(self, stream_obj):
+ self.stdin = KludgeFile(wrapped=self.stdin)
+ if stream_obj is not PIPE:
+ greenlet = gevent.spawn(copy_and_close, stream_obj, self.stdin)
+ self.add_greenlet(greenlet)
+ self.stdin = None
+ elif self._wait:
+ # FIXME: Is this actually true?
+ raise RuntimeError(self.deadlock_warning % 'stdin')
+
+ def setup_output_stream(self, stream_obj, stream_name):
+ if stream_obj is not PIPE:
+ # Log the stream
+ host_log = self.logger.getChild(self.hostname)
+ stream_log = host_log.getChild(stream_name)
+ # If stream_obj is an actual stream, have the logging module write
+ # to it as well
+ if stream_obj is not None:
+ stream_log.addHandler(logging.StreamHandler(stream_obj))
+ greenlet = gevent.spawn(
+ copy_file_to,
+ getattr(self, stream_name),
+ stream_log,
+ )
+ self.add_greenlet(greenlet)
+ setattr(self, stream_name, stream_obj)
+ elif self._wait:
+ # FIXME: Is this actually true?
+ raise RuntimeError(self.deadlock_warning % stream_name)
+
def wait(self):
"""
Block until remote process finishes.
if timeout:
log.info("Running command with timeout %d", timeout)
r = RemoteProcess(client, args, check_status=check_status, hostname=name,
- label=label, timeout=timeout)
+ label=label, timeout=timeout, wait=wait, logger=logger)
r.execute()
-
- r.stdin = KludgeFile(wrapped=r.stdin)
-
- g_in = None
- if stdin is not PIPE:
- g_in = gevent.spawn(copy_and_close, stdin, r.stdin)
- r.add_greenlet(g_in)
- r.stdin = None
- else:
- assert not wait, \
- "Using PIPE for stdin without wait=False would deadlock."
-
- if logger is None:
- logger = log
-
- g_err = None
- if stderr is not PIPE:
- # Log stderr
- stderr_logger = logger.getChild(name).getChild('stderr')
- # If the stderr arg is a stream, have the logging module write to it as
- # well
- if stderr is not None:
- stderr_logger.addHandler(logging.StreamHandler(stdout))
- g_err = gevent.spawn(copy_file_to, r.stderr, stderr_logger)
- r.add_greenlet(g_err)
- r.stderr = stderr
- else:
- assert not wait, \
- "Using PIPE for stderr without wait=False would deadlock."
-
- g_out = None
- if stdout is not PIPE:
- # Log stdout
- stdout_logger = logger.getChild(name).getChild('stdout')
- # If the stdout arg is a stream, have the logging module write to it as
- # well
- if stdout is not None:
- stdout_logger.addHandler(logging.StreamHandler(stdout))
- g_out = gevent.spawn(copy_file_to, r.stdout, stdout_logger)
- r.add_greenlet(g_out)
- r.stdout = stdout
- else:
- assert not wait, \
- "Using PIPE for stdout without wait=False would deadlock."
-
+ r.setup_stdin(stdin)
+ r.setup_output_stream(stderr, 'stderr')
+ r.setup_output_stream(stdout, 'stdout')
if wait:
r.wait()
-
return r