]> git.apps.os.sepia.ceph.com Git - teuthology.git/commitdiff
teuthology/task/install: add stdin-killer helper 1846/head
authorPatrick Donnelly <pdonnell@redhat.com>
Thu, 18 May 2023 13:24:57 +0000 (09:24 -0400)
committerPatrick Donnelly <pdonnell@redhat.com>
Wed, 7 Jun 2023 16:21:13 +0000 (12:21 -0400)
This helper tool runs commands which may or may not take data on stdin.
Like "daemon-helper", if stdin signals EOF, stdin-killer will kill the
command but only as a last resort. It forwards EOF to the command by
closing the command's stdin (pipe) and then waiting a configurable
amount of time for the command to gracefully exit.

Additionally, if stdout or stderr are hung up -- i.e. the ssh parent
process has terminated -- then stdin-killer also detects this and
initiates the graceful shutdown of the command. This is something
daemon-helper does not do.

In general, this tool is a superior replacement of the daemon-helper
tool because you can write to the command's stdin normally.

Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
setup.cfg
teuthology/task/install/bin/stdin-killer [new file with mode: 0755]
teuthology/task/install/util.py

index 2f21662b12891637728aa4a4dd9d01e36e065aa3..ae201dac1677843f9ea8b41bf5b4c7f395bc5a04 100644 (file)
--- a/setup.cfg
+++ b/setup.cfg
@@ -62,6 +62,7 @@ python_requires = >=3.6
 scripts =
     teuthology/task/install/bin/adjust-ulimits
     teuthology/task/install/bin/daemon-helper
+    teuthology/task/install/bin/stdin-killer
 
 [options.entry_points]
 console_scripts =
@@ -128,6 +129,7 @@ teuthology.suite =
 teuthology.task.install =
     bin/adjust-ulimits
     bin/daemon-helper
+    bin/stdin-killer
 teuthology.task.internal =
     edit_sudoers.sh
 
diff --git a/teuthology/task/install/bin/stdin-killer b/teuthology/task/install/bin/stdin-killer
new file mode 100755 (executable)
index 0000000..8a78744
--- /dev/null
@@ -0,0 +1,251 @@
+#!/bin/python3
+
+# Forward stdin to a subcommand. If EOF is read from stdin or
+# stdin/stdout/stderr are closed or hungup, then give the command "timeout"
+# seconds to complete before it is killed.
+#
+# The command is run in a separate process group. This is mostly to simplify
+# killing the set of processes (if well-behaving). You can configure that with
+# --setpgrp switch.
+
+# usage: stdin-killer [-h] [--timeout TIMEOUT] [--debug DEBUG] [--signal SIGNAL] [--verbose] [--setpgrp {no,self,child}] command [arguments ...]
+#
+# wait for stdin EOF then kill forked subcommand
+#
+# positional arguments:
+#   command            command to execute
+#   arguments          arguments to command
+#
+# options:
+#   -h, --help         show this help message and exit
+#   --timeout TIMEOUT  time to wait for forked subcommand to willing terminate
+#   --debug DEBUG      debug file
+#   --signal SIGNAL    signal to send
+#   --verbose          increase debugging
+#  --setpgrp {no,self,child}
+#                        create process group
+
+
+import argparse
+import fcntl
+import logging
+import os
+import select
+import signal
+import struct
+import subprocess
+import sys
+import time
+
+NAME = "stdin-killer"
+
+log = logging.getLogger(NAME)
+PAGE_SIZE = 4096
+
+POLL_HANGUP = select.POLLHUP | select.POLLRDHUP | select.POLLERR
+
+
+def handle_event(poll, buffer, fd, event, p):
+    if sigfdr == fd:
+        b = os.read(sigfdr, 1)
+        (signum,) = struct.unpack("B", b)
+        log.debug("got signal %d", signum)
+        try:
+            p.wait(timeout=0)
+            return True
+        except subprocess.TimeoutExpired:
+            pass
+    elif 0 == fd:
+        if event & POLL_HANGUP:
+            log.debug("peer closed connection, waiting for process exit")
+            poll.unregister(0)
+            sys.stdin.close()
+            if len(buffer) == 0 and p.stdin is not None:
+                p.stdin.close()
+                p.stdin = None
+            return True
+        elif event & select.POLLIN:
+            b = os.read(0, PAGE_SIZE)
+            if b == b"":
+                log.debug("read EOF")
+                poll.unregister(0)
+                sys.stdin.close()
+                if len(buffer) == 0:
+                    p.stdin.close()
+                return True
+            if p.stdin is not None:
+                buffer += b
+                # ignore further POLLIN until buffer is written to p.stdin
+                poll.register(0, POLL_HANGUP)
+                poll.register(p.stdin.fileno(), select.POLLOUT)
+    elif p.stdin is not None and p.stdin.fileno() == fd:
+        assert event & select.POLLOUT
+        b = buffer[:PAGE_SIZE]
+        log.debug("sending %d bytes to process", len(b))
+        try:
+            n = p.stdin.write(b)
+            p.stdin.flush()
+            log.debug("wrote %d bytes", n)
+            buffer = buffer[n:]
+            poll.register(0, select.POLLIN | POLL_HANGUP)
+            poll.unregister(p.stdin.fileno())
+        except BrokenPipeError:
+            log.debug("got SIGPIPE")
+            poll.unregister(p.stdin.fileno())
+            p.stdin.close()
+            p.stdin = None
+            return True
+        except BlockingIOError:
+            poll.register(p.stdin.fileno(), select.POLLOUT | POLL_HANGUP)
+    elif 1 == fd:
+        assert event & POLL_HANGUP
+        log.debug("stdout pipe has closed")
+        poll.unregister(1)
+        return True
+    elif 2 == fd:
+        assert event & POLL_HANGUP
+        log.debug("stderr pipe has closed")
+        poll.unregister(2)
+        return True
+    else:
+        assert False
+    return False
+
+
+def listen_for_events(sigfdr, p, timeout):
+    poll = select.poll()
+    # listen for data on stdin
+    poll.register(0, select.POLLIN | POLL_HANGUP)
+    # listen for stdout/stderr to be closed, if they are closed then my parent
+    # is gone and I should expire the command and myself.
+    poll.register(1, POLL_HANGUP)
+    poll.register(2, POLL_HANGUP)
+    # for SIGCHLD
+    poll.register(sigfdr, select.POLLIN)
+    buffer = bytearray()
+    expired = 0.0
+    while True:
+        if expired > 0.0:
+            since = time.monotonic() - expired
+            wait = int((timeout - since) * 1000.0)
+            if wait <= 0:
+                return
+        else:
+            wait = 5000
+        log.debug("polling for %d milliseconds", wait)
+        events = poll.poll(wait)
+        for fd, event in events:
+            log.debug("event: (%d, %d)", fd, event)
+            if handle_event(poll, buffer, fd, event, p):
+                if p.returncode is not None:
+                    return
+                if expired == 0.0:
+                    expired = time.monotonic()
+                    log.info(
+                        "expiration expected; waiting %d seconds for command to complete",
+                        NS.timeout,
+                    )
+
+
+if __name__ == "__main__":
+    signal.signal(signal.SIGPIPE, signal.SIG_IGN)
+    (sigfdr, sigfdw) = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
+    signal.set_wakeup_fd(sigfdw)
+
+    def do_nothing(signum, frame):
+        pass
+
+    signal.signal(signal.SIGCHLD, do_nothing)
+
+    P = argparse.ArgumentParser(
+        description="wait for stdin EOF then kill forked subcommand"
+    )
+    P.add_argument(
+        "--timeout",
+        action="store",
+        default=5,
+        help="time to wait for forked subcommand to willing terminate",
+        type=int,
+    )
+    P.add_argument("--debug", action="store", help="debug file", type=str)
+    P.add_argument(
+        "--signal",
+        action="store",
+        help="signal to send",
+        type=int,
+        default=signal.SIGKILL,
+    )
+    P.add_argument("--verbose", action="store_true", help="increase debugging")
+    P.add_argument(
+        "--setpgrp",
+        action="store",
+        choices=["no", "self", "child"],
+        default="self",
+        help="create process group",
+    )
+    P.add_argument(
+        "cmd", metavar="command", type=str, nargs=1, help="command to execute"
+    )
+    P.add_argument(
+        "args", metavar="arguments", type=str, nargs="*", help="arguments to command"
+    )
+    NS = P.parse_args()
+
+    logargs = {}
+    if NS.debug is not None:
+        logargs["filename"] = NS.debug
+    else:
+        logargs["stream"] = sys.stderr
+    if NS.verbose:
+        logargs["level"] = logging.DEBUG
+    else:
+        logargs["level"] = logging.INFO
+    logargs["format"] = f"%(asctime)s {NAME} %(levelname)s: %(message)s"
+    logargs["datefmt"] = "%Y-%m-%dT%H:%M:%S"
+    logging.basicConfig(**logargs)
+
+    cargs = NS.cmd + NS.args
+    popen_kwargs = {
+        "stdin": subprocess.PIPE,
+    }
+
+    if NS.setpgrp == "self":
+        os.setpgrp()
+        pgrp = os.getpgrp()
+    elif NS.setpgrp == "child":
+        popen_kwargs["preexec_fn"] = os.setpgrp
+        pgrp = None
+    elif NS.setpgrp == "no":
+        pgrp = 0
+    else:
+        assert False
+
+    log.debug("executing %s", cargs)
+    p = subprocess.Popen(cargs, **popen_kwargs)
+    if pgrp is None:
+        pgrp = p.pid
+    flags = fcntl.fcntl(p.stdin.fileno(), fcntl.F_GETFL)
+    fcntl.fcntl(p.stdin.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+    listen_for_events(sigfdr, p, NS.timeout)
+
+    if p.returncode is None:
+        log.error("timeout expired: sending signal %d to command and myself", NS.signal)
+        if pgrp == 0:
+            os.kill(p.pid, NS.signal)
+        else:
+            os.killpg(pgrp, NS.signal)  # should kill me too
+        os.kill(os.getpid(), NS.signal)  # to exit abnormally with same signal
+        log.error("signal did not cause termination, sending myself SIGKILL")
+        os.kill(os.getpid(), signal.SIGKILL)  # failsafe
+    rc = p.returncode
+    log.debug("rc = %d", rc)
+    assert rc is not None
+    if rc < 0:
+        log.error("command terminated with signal %d: sending same signal to myself!", -rc)
+        os.kill(os.getpid(), -rc)  # kill myself with the same signal
+        log.error("signal did not cause termination, sending myself SIGKILL")
+        os.kill(os.getpid(), signal.SIGKILL)  # failsafe
+    else:
+        log.info("command exited with status %d: exiting normally with same code!", rc)
+        sys.exit(rc)
index 6e58c9a5785ec781b026db13c8d4d33bd4392981..46fbde9c9ce2cd6d96d5b6b573f7f0579c668e5b 100644 (file)
@@ -81,7 +81,7 @@ def _ship_utilities(ctx):
     except IOError as e:
         log.info('Cannot ship supression file for valgrind: %s...', e.strerror)
 
-    FILES = ['daemon-helper', 'adjust-ulimits']
+    FILES = ['daemon-helper', 'adjust-ulimits', 'stdin-killer']
     destdir = '/usr/bin'
     for filename in FILES:
         log.info('Shipping %r...', filename)