]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
ceph-daemon: cleanly capture stdout, err; log
authorSage Weil <sage@redhat.com>
Wed, 9 Oct 2019 17:50:47 +0000 (12:50 -0500)
committerSage Weil <sage@redhat.com>
Mon, 21 Oct 2019 15:43:48 +0000 (10:43 -0500)
- New Popen wrapper that grabs stdout, stderr, and return code.  This is
adapted from ceph-volume's process.py helper(s).

- Optionally log output to logger

- By default, be more verbose if the command fails.

Signed-off-by: Sage Weil <sage@redhat.com>
src/ceph-daemon
test_ceph_daemon.sh

index b427dc6f84b08562bc015eba78b76e508fed059a..c46684a368420543f2ae192ef759dc13df3b495e 100755 (executable)
@@ -31,9 +31,11 @@ You can invoke ceph-daemon in two ways:
 
 import argparse
 import configparser
+import fcntl
 import json
 import logging
 import os
+import select
 import socket
 import subprocess
 import sys
@@ -53,6 +55,94 @@ except ImportError:
 
 podman_path = None
 
+##################################
+# Popen wrappers, lifted from ceph-volume
+
+def call(command, desc, verbose=False, **kw):
+    """
+    Wrap subprocess.Popen to
+
+    - log stdout/stderr to a logger,
+    - decode utf-8
+    - cleanly return out, err, returncode
+
+    If verbose=True, log at info (instead of debug) level.
+
+    :param verbose_on_failure: On a non-zero exit status, it will forcefully set
+                               logging ON for the terminal
+    """
+    verbose_on_failure = kw.pop('verbose_on_failure', True)
+
+    logger.debug("Running command: %s" % ' '.join(command))
+    process = subprocess.Popen(
+        command,
+        stdout=subprocess.PIPE,
+        stderr=subprocess.PIPE,
+        close_fds=True,
+        **kw
+    )
+    # get current p.stdout flags, add O_NONBLOCK
+    stdout_flags = fcntl.fcntl(process.stdout, fcntl.F_GETFL)
+    stderr_flags = fcntl.fcntl(process.stderr, fcntl.F_GETFL)
+    fcntl.fcntl(process.stdout, fcntl.F_SETFL, stdout_flags | os.O_NONBLOCK)
+    fcntl.fcntl(process.stderr, fcntl.F_SETFL, stderr_flags | os.O_NONBLOCK)
+
+    out = ''
+    err = ''
+    reads = None
+    stop = False
+    while not stop:
+        if reads and process.poll() is not None:
+            # we want to stop, but first read off anything remaining
+            # on stdout/stderr
+            stop = True
+        else:
+            reads, _, _ = select.select(
+                [process.stdout.fileno(), process.stderr.fileno()],
+                [], []
+            )
+        for fd in reads:
+            try:
+                message = os.read(fd, 1024)
+                if not isinstance(message, str):
+                    message = message.decode('utf-8')
+                if fd == process.stdout.fileno():
+                    out += message
+                    for line in message.splitlines():
+                        if verbose:
+                            logger.info(desc + ':stdout ' + line)
+                        else:
+                            logger.debug(desc + ':stdout ' + line)
+                elif fd == process.stderr.fileno():
+                    err += message
+                    for line in message.splitlines():
+                        if verbose:
+                            logger.info(desc + ':stderr ' + line)
+                        else:
+                            logger.debug(desc + ':stderr ' + line)
+                else:
+                    assert False
+            except (IOError, OSError):
+                pass
+
+    returncode = process.wait()
+
+    if returncode != 0 and verbose_on_failure and not verbose:
+        # dump stdout + stderr
+        logger.info('Non-zero exit code %d from %s' % (returncode, ' '.join(command)))
+        for line in out.splitlines():
+            logger.info(desc + ':stdout ' + line)
+        for line in err.splitlines():
+            logger.info(desc + ':stderr ' + line)
+
+    return out, err, returncode
+
+def call_throws(command, **kwargs):
+    out, err, ret = call(command, command[0], **kwargs)
+    if ret:
+        raise RuntimeError('Failed command: %s' % ' '.join(command))
+    return out, err, ret
+
 ##################################
 
 def pathify(p):
@@ -118,14 +208,18 @@ def get_unit_name(fsid, daemon_type, daemon_id):
 
 def check_unit(unit_name):
     try:
-        out = subprocess.check_output(['systemctl', 'is-enabled', unit_name])
-        enabled = out.decode('utf-8').strip() == 'enabled'
+        out, err, code = call(['systemctl', 'is-enabled', unit_name], 'systemctl')
+        if code:
+            raise RuntimeError('exited with %d' % code)
+        enabled = out.strip() == 'enabled'
     except Exception as e:
         logger.warning('unable to run systemctl' % e)
         enabled = False
     try:
-        out = subprocess.check_output(['systemctl', 'is-active', unit_name])
-        active = out.decode('utf-8').strip() == 'active'
+        out, err, code = call(['systemctl', 'is-active', unit_name], 'systemctl')
+        if code:
+            raise RuntimeError('exited with %d' % code)
+        active = out.strip() == 'active'
     except Exception as e:
         logger.warning('unable to run systemctl: %s' % e)
         active = False
@@ -354,13 +448,13 @@ def deploy_daemon_units(fsid, daemon_type, daemon_id, c,
         f.write(unit)
         os.rename(args.unit_dir + '/' + unit_file + '.new',
                   args.unit_dir + '/' + unit_file)
-    subprocess.check_output(['systemctl', 'daemon-reload'])
+    call_throws(['systemctl', 'daemon-reload'])
 
     unit_name = get_unit_name(fsid, daemon_type, daemon_id)
     if enable:
-        subprocess.check_output(['systemctl', 'enable', unit_name])
+        call_throws(['systemctl', 'enable', unit_name])
     if start:
-        subprocess.check_output(['systemctl', 'start', unit_name])
+        call_throws(['systemctl', 'start', unit_name])
 
 def install_base_units(fsid):
     """
@@ -377,8 +471,8 @@ def install_base_units(fsid):
         os.rename(args.unit_dir + '/ceph.target.new',
                   args.unit_dir + '/ceph.target')
     if not existed:
-        subprocess.check_output(['systemctl', 'enable', 'ceph.target'])
-        subprocess.check_output(['systemctl', 'start', 'ceph.target'])
+        call_throws(['systemctl', 'enable', 'ceph.target'])
+        call_throws(['systemctl', 'start', 'ceph.target'])
 
     # cluster unit
     existed = os.path.exists(args.unit_dir + '/ceph-%s.target' % fsid)
@@ -395,8 +489,8 @@ def install_base_units(fsid):
         os.rename(args.unit_dir + '/ceph-%s.target.new' % fsid,
                   args.unit_dir + '/ceph-%s.target' % fsid)
     if not existed:
-        subprocess.check_output(['systemctl', 'enable', 'ceph-%s.target' % fsid])
-        subprocess.check_output(['systemctl', 'start', 'ceph-%s.target' % fsid])
+        call_throws(['systemctl', 'enable', 'ceph-%s.target' % fsid])
+        call_throws(['systemctl', 'start', 'ceph-%s.target' % fsid])
 
 def deploy_crash(fsid, uid, gid, config, keyring):
     crash_dir = os.path.join(args.data_dir, fsid, 'crash')
@@ -495,7 +589,7 @@ WantedBy=ceph-{fsid}.target
 def gen_ssh_key(fsid):
     tmp_dir = tempfile.TemporaryDirectory()
     path = tmp_dir.name + '/key'
-    subprocess.check_output([
+    call_throws([
         'ssh-keygen',
         '-C', 'ceph-%s' % fsid,
         '-N', '',
@@ -575,13 +669,15 @@ class CephContainer:
 
     def run(self):
         logger.debug(self.run_cmd())
-        return subprocess.check_output(self.run_cmd()).decode('utf-8')
+        out, _, _ = call(self.run_cmd(), self.entrypoint)
+        return out
+
 
 ##################################
 
 def command_version():
     out = CephContainer(args.image, 'ceph', ['--version']).run()
-    print(out.decode('utf-8'), end='')
+    print(out, end='')
     return 0
 
 ##################################
@@ -743,7 +839,7 @@ def command_bootstrap():
         with open(mon_dir + '/config', 'r') as f:
             config = f.read()
         logger.info('Restarting the monitor...')
-        subprocess.call([
+        call_throws([
             'systemctl',
             'restart',
             get_unit_name(fsid, 'mon', mon_id)
@@ -1005,14 +1101,14 @@ def command_ceph_volume():
         podman_args=['--privileged'],
         volume_mounts=mounts,
     )
-    subprocess.call(c.run_cmd())
+    call_throws(c.run_cmd(), verbose=True)
 
 ##################################
 
 def command_unit():
     (daemon_type, daemon_id) = args.name.split('.')
     unit_name = get_unit_name(args.fsid, daemon_type, daemon_id)
-    subprocess.call([
+    call_throws([
         'systemctl',
         args.command,
         unit_name])
@@ -1080,19 +1176,19 @@ def command_adopt():
 
         if active:
             logger.info('Stopping old systemd unit %s...' % unit_name)
-            subprocess.check_output(['systemctl', 'stop', unit_name])
+            call_throws(['systemctl', 'stop', unit_name])
         if enabled:
             logger.info('Disabling old systemd unit %s...' % unit_name)
-            subprocess.check_output(['systemctl', 'disable', unit_name])
+            call_throws(['systemctl', 'disable', unit_name])
 
         logger.info('Moving data...')
         make_data_dir_base(fsid, uid, gid)
         data_dir = get_data_dir(fsid, daemon_type, daemon_id)
-        subprocess.check_output([
+        call_throws([
             'mv',
             '/var/lib/ceph/%s/%s-%s' % (daemon_type, args.cluster, daemon_id),
             data_dir])
-        subprocess.check_output([
+        call_throws([
             'cp',
             '/etc/ceph/%s.conf' % args.cluster,
             os.path.join(data_dir, 'config')])
@@ -1102,7 +1198,7 @@ def command_adopt():
         logger.info('Moving logs...')
         log_dir = make_log_dir(fsid, uid=uid, gid=gid)
         try:
-            subprocess.check_output(
+            call_throws(
                 ['mv',
                  '/var/log/ceph/%s-%s.%s.log*' % (args.cluster,
                                                   daemon_type, daemon_id),
@@ -1128,10 +1224,10 @@ def command_rm_daemon():
         raise RuntimeError('must pass --force to proceed: '
                            'this command may destroy precious data!')
     unit_name = get_unit_name(args.fsid, daemon_type, daemon_id)
-    subprocess.check_output(['systemctl', 'stop', unit_name])
-    subprocess.check_output(['systemctl', 'disable', unit_name])
+    call_throws(['systemctl', 'stop', unit_name])
+    call_throws(['systemctl', 'disable', unit_name])
     data_dir = get_data_dir(args.fsid, daemon_type, daemon_id)
-    subprocess.check_output(['rm', '-rf', data_dir])
+    call_throws(['rm', '-rf', data_dir])
 
 ##################################
 
@@ -1142,9 +1238,11 @@ def command_rm_cluster():
 
     unit_name = 'ceph-%s.target' % args.fsid
     try:
-        subprocess.check_output(['systemctl', 'stop', unit_name])
-        subprocess.check_output(['systemctl', 'disable', unit_name])
-    except subprocess.CalledProcessError:
+        call_throws(['systemctl', 'stop', unit_name],
+                    verbose_on_failure=False)
+        call_throws(['systemctl', 'disable', unit_name],
+                    verbose_on_failure=False)
+    except RuntimeError:
         pass
     crash_unit_name = 'ceph-%s-crash.service' % args.fsid
     try:
@@ -1156,26 +1254,27 @@ def command_rm_cluster():
     slice_name = 'system-%s.slice' % (
         ('ceph-%s' % args.fsid).replace('-', '\\x2d'))
     try:
-        subprocess.check_output(['systemctl', 'stop', slice_name])
-    except subprocess.CalledProcessError:
+        call_throws(['systemctl', 'stop', slice_name],
+                    verbose_on_failure=False)
+    except RuntimeError:
         pass
 
     # FIXME: stop + disable individual daemon units, too?
 
     # rm units
-    subprocess.check_output(['rm', '-f', args.unit_dir +
+    call_throws(['rm', '-f', args.unit_dir +
                              '/ceph-%s@.service' % args.fsid])
-    subprocess.check_output(['rm', '-f', args.unit_dir +
-                             '/ceph-%s-crash.service' % args.fsid])
-    subprocess.check_output(['rm', '-f', args.unit_dir +
+    call_throws(['rm', '-f', args.unit_dir +
+                             '/ceph-%s-crash@.service' % args.fsid])
+    call_throws(['rm', '-f', args.unit_dir +
                              '/ceph-%s.target' % args.fsid])
-    subprocess.check_output(['rm', '-rf',
+    call_throws(['rm', '-rf',
                   args.unit_dir + '/ceph-%s.target.wants' % args.fsid])
     # rm data
-    subprocess.check_output(['rm', '-rf', args.data_dir + '/' + args.fsid])
+    call_throws(['rm', '-rf', args.data_dir + '/' + args.fsid])
     # rm logs
-    subprocess.check_output(['rm', '-rf', args.log_dir + '/' + args.fsid])
-    subprocess.check_output(['rm', '-rf', args.log_dir +
+    call_throws(['rm', '-rf', args.log_dir + '/' + args.fsid])
+    call_throws(['rm', '-rf', args.log_dir +
                              '/*.wants/ceph-%s@*' % args.fsid])
 
 
@@ -1436,7 +1535,7 @@ if args.debug:
     logging.basicConfig(level=logging.DEBUG)
 else:
     logging.basicConfig(level=logging.INFO)
-logger = logging.getLogger(__name__)
+logger = logging.getLogger('ceph-daemon')
 
 # podman or docker?
 if args.docker:
index 331ebcfa473def9fbac25fb45ccd4686f1ecea05..a37687f4629b5a5b7e011f15a7af18820cb73648 100755 (executable)
@@ -4,14 +4,16 @@ fsid=2a833e3f-53e4-49a7-a7a0-bd89d193ab62
 image=ceph/daemon-base:latest-master-devel
 [ -z "$ip" ] && ip=127.0.0.1
 
-../src/ceph-daemon rm-cluster --fsid $fsid --force
+#A="-d"
+
+../src/ceph-daemon $A rm-cluster --fsid $fsid --force
 
 cat <<EOF > c
 [global]
 log to file = true
 EOF
 
-../src/ceph-daemon \
+../src/ceph-daemon $A \
     --image $image \
     bootstrap \
     --mon-id a \
@@ -26,7 +28,7 @@ chmod 644 k c
 
 if [ -n "$ip2" ]; then
     # mon.b
-    ../src/ceph-daemon \
+    ../src/ceph-daemon $A \
     --image $image \
     deploy --name mon.b \
     --fsid $fsid \
@@ -40,7 +42,7 @@ bin/ceph -c c -k k auth get-or-create mgr.y \
         mon 'allow profile mgr' \
         osd 'allow *' \
         mds 'allow *' > k-mgr.y
-../src/ceph-daemon \
+../src/ceph-daemon $A \
     --image $image \
     deploy --name mgr.y \
     --fsid $fsid \
@@ -54,7 +56,7 @@ for id in k j; do
             mgr 'allow profile mds' \
             osd 'allow *' \
             mds 'allow *' > k-mds.$id
-    ../src/ceph-daemon \
+    ../src/ceph-daemon $A \
        --image $image \
        deploy --name mds.$id \
        --fsid $fsid \