From ee3fe37158422902162257c123ea234da999c961 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 9 Oct 2019 12:50:47 -0500 Subject: [PATCH] ceph-daemon: cleanly capture stdout, err; log - New Popen wrapper that grabs stdout, stderr, and return code. This is adapted from ceph-volume's process.py helper(s). - Optionally log output to logger - By default, be more verbose if the command fails. Signed-off-by: Sage Weil --- src/ceph-daemon | 177 ++++++++++++++++++++++++++++++++++---------- test_ceph_daemon.sh | 12 +-- 2 files changed, 145 insertions(+), 44 deletions(-) diff --git a/src/ceph-daemon b/src/ceph-daemon index b427dc6f84b..c46684a3684 100755 --- a/src/ceph-daemon +++ b/src/ceph-daemon @@ -31,9 +31,11 @@ You can invoke ceph-daemon in two ways: import argparse import configparser +import fcntl import json import logging import os +import select import socket import subprocess import sys @@ -53,6 +55,94 @@ except ImportError: podman_path = None +################################## +# Popen wrappers, lifted from ceph-volume + +def call(command, desc, verbose=False, **kw): + """ + Wrap subprocess.Popen to + + - log stdout/stderr to a logger, + - decode utf-8 + - cleanly return out, err, returncode + + If verbose=True, log at info (instead of debug) level. + + :param verbose_on_failure: On a non-zero exit status, it will forcefully set + logging ON for the terminal + """ + verbose_on_failure = kw.pop('verbose_on_failure', True) + + logger.debug("Running command: %s" % ' '.join(command)) + process = subprocess.Popen( + command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=True, + **kw + ) + # get current p.stdout flags, add O_NONBLOCK + stdout_flags = fcntl.fcntl(process.stdout, fcntl.F_GETFL) + stderr_flags = fcntl.fcntl(process.stderr, fcntl.F_GETFL) + fcntl.fcntl(process.stdout, fcntl.F_SETFL, stdout_flags | os.O_NONBLOCK) + fcntl.fcntl(process.stderr, fcntl.F_SETFL, stderr_flags | os.O_NONBLOCK) + + out = '' + err = '' + reads = None + stop = False + while not stop: + if reads and process.poll() is not None: + # we want to stop, but first read off anything remaining + # on stdout/stderr + stop = True + else: + reads, _, _ = select.select( + [process.stdout.fileno(), process.stderr.fileno()], + [], [] + ) + for fd in reads: + try: + message = os.read(fd, 1024) + if not isinstance(message, str): + message = message.decode('utf-8') + if fd == process.stdout.fileno(): + out += message + for line in message.splitlines(): + if verbose: + logger.info(desc + ':stdout ' + line) + else: + logger.debug(desc + ':stdout ' + line) + elif fd == process.stderr.fileno(): + err += message + for line in message.splitlines(): + if verbose: + logger.info(desc + ':stderr ' + line) + else: + logger.debug(desc + ':stderr ' + line) + else: + assert False + except (IOError, OSError): + pass + + returncode = process.wait() + + if returncode != 0 and verbose_on_failure and not verbose: + # dump stdout + stderr + logger.info('Non-zero exit code %d from %s' % (returncode, ' '.join(command))) + for line in out.splitlines(): + logger.info(desc + ':stdout ' + line) + for line in err.splitlines(): + logger.info(desc + ':stderr ' + line) + + return out, err, returncode + +def call_throws(command, **kwargs): + out, err, ret = call(command, command[0], **kwargs) + if ret: + raise RuntimeError('Failed command: %s' % ' '.join(command)) + return out, err, ret + ################################## def pathify(p): @@ -118,14 +208,18 @@ def get_unit_name(fsid, daemon_type, daemon_id): def check_unit(unit_name): try: - out = subprocess.check_output(['systemctl', 'is-enabled', unit_name]) - enabled = out.decode('utf-8').strip() == 'enabled' + out, err, code = call(['systemctl', 'is-enabled', unit_name], 'systemctl') + if code: + raise RuntimeError('exited with %d' % code) + enabled = out.strip() == 'enabled' except Exception as e: logger.warning('unable to run systemctl' % e) enabled = False try: - out = subprocess.check_output(['systemctl', 'is-active', unit_name]) - active = out.decode('utf-8').strip() == 'active' + out, err, code = call(['systemctl', 'is-active', unit_name], 'systemctl') + if code: + raise RuntimeError('exited with %d' % code) + active = out.strip() == 'active' except Exception as e: logger.warning('unable to run systemctl: %s' % e) active = False @@ -354,13 +448,13 @@ def deploy_daemon_units(fsid, daemon_type, daemon_id, c, f.write(unit) os.rename(args.unit_dir + '/' + unit_file + '.new', args.unit_dir + '/' + unit_file) - subprocess.check_output(['systemctl', 'daemon-reload']) + call_throws(['systemctl', 'daemon-reload']) unit_name = get_unit_name(fsid, daemon_type, daemon_id) if enable: - subprocess.check_output(['systemctl', 'enable', unit_name]) + call_throws(['systemctl', 'enable', unit_name]) if start: - subprocess.check_output(['systemctl', 'start', unit_name]) + call_throws(['systemctl', 'start', unit_name]) def install_base_units(fsid): """ @@ -377,8 +471,8 @@ def install_base_units(fsid): os.rename(args.unit_dir + '/ceph.target.new', args.unit_dir + '/ceph.target') if not existed: - subprocess.check_output(['systemctl', 'enable', 'ceph.target']) - subprocess.check_output(['systemctl', 'start', 'ceph.target']) + call_throws(['systemctl', 'enable', 'ceph.target']) + call_throws(['systemctl', 'start', 'ceph.target']) # cluster unit existed = os.path.exists(args.unit_dir + '/ceph-%s.target' % fsid) @@ -395,8 +489,8 @@ def install_base_units(fsid): os.rename(args.unit_dir + '/ceph-%s.target.new' % fsid, args.unit_dir + '/ceph-%s.target' % fsid) if not existed: - subprocess.check_output(['systemctl', 'enable', 'ceph-%s.target' % fsid]) - subprocess.check_output(['systemctl', 'start', 'ceph-%s.target' % fsid]) + call_throws(['systemctl', 'enable', 'ceph-%s.target' % fsid]) + call_throws(['systemctl', 'start', 'ceph-%s.target' % fsid]) def deploy_crash(fsid, uid, gid, config, keyring): crash_dir = os.path.join(args.data_dir, fsid, 'crash') @@ -495,7 +589,7 @@ WantedBy=ceph-{fsid}.target def gen_ssh_key(fsid): tmp_dir = tempfile.TemporaryDirectory() path = tmp_dir.name + '/key' - subprocess.check_output([ + call_throws([ 'ssh-keygen', '-C', 'ceph-%s' % fsid, '-N', '', @@ -575,13 +669,15 @@ class CephContainer: def run(self): logger.debug(self.run_cmd()) - return subprocess.check_output(self.run_cmd()).decode('utf-8') + out, _, _ = call(self.run_cmd(), self.entrypoint) + return out + ################################## def command_version(): out = CephContainer(args.image, 'ceph', ['--version']).run() - print(out.decode('utf-8'), end='') + print(out, end='') return 0 ################################## @@ -743,7 +839,7 @@ def command_bootstrap(): with open(mon_dir + '/config', 'r') as f: config = f.read() logger.info('Restarting the monitor...') - subprocess.call([ + call_throws([ 'systemctl', 'restart', get_unit_name(fsid, 'mon', mon_id) @@ -1005,14 +1101,14 @@ def command_ceph_volume(): podman_args=['--privileged'], volume_mounts=mounts, ) - subprocess.call(c.run_cmd()) + call_throws(c.run_cmd(), verbose=True) ################################## def command_unit(): (daemon_type, daemon_id) = args.name.split('.') unit_name = get_unit_name(args.fsid, daemon_type, daemon_id) - subprocess.call([ + call_throws([ 'systemctl', args.command, unit_name]) @@ -1080,19 +1176,19 @@ def command_adopt(): if active: logger.info('Stopping old systemd unit %s...' % unit_name) - subprocess.check_output(['systemctl', 'stop', unit_name]) + call_throws(['systemctl', 'stop', unit_name]) if enabled: logger.info('Disabling old systemd unit %s...' % unit_name) - subprocess.check_output(['systemctl', 'disable', unit_name]) + call_throws(['systemctl', 'disable', unit_name]) logger.info('Moving data...') make_data_dir_base(fsid, uid, gid) data_dir = get_data_dir(fsid, daemon_type, daemon_id) - subprocess.check_output([ + call_throws([ 'mv', '/var/lib/ceph/%s/%s-%s' % (daemon_type, args.cluster, daemon_id), data_dir]) - subprocess.check_output([ + call_throws([ 'cp', '/etc/ceph/%s.conf' % args.cluster, os.path.join(data_dir, 'config')]) @@ -1102,7 +1198,7 @@ def command_adopt(): logger.info('Moving logs...') log_dir = make_log_dir(fsid, uid=uid, gid=gid) try: - subprocess.check_output( + call_throws( ['mv', '/var/log/ceph/%s-%s.%s.log*' % (args.cluster, daemon_type, daemon_id), @@ -1128,10 +1224,10 @@ def command_rm_daemon(): raise RuntimeError('must pass --force to proceed: ' 'this command may destroy precious data!') unit_name = get_unit_name(args.fsid, daemon_type, daemon_id) - subprocess.check_output(['systemctl', 'stop', unit_name]) - subprocess.check_output(['systemctl', 'disable', unit_name]) + call_throws(['systemctl', 'stop', unit_name]) + call_throws(['systemctl', 'disable', unit_name]) data_dir = get_data_dir(args.fsid, daemon_type, daemon_id) - subprocess.check_output(['rm', '-rf', data_dir]) + call_throws(['rm', '-rf', data_dir]) ################################## @@ -1142,9 +1238,11 @@ def command_rm_cluster(): unit_name = 'ceph-%s.target' % args.fsid try: - subprocess.check_output(['systemctl', 'stop', unit_name]) - subprocess.check_output(['systemctl', 'disable', unit_name]) - except subprocess.CalledProcessError: + call_throws(['systemctl', 'stop', unit_name], + verbose_on_failure=False) + call_throws(['systemctl', 'disable', unit_name], + verbose_on_failure=False) + except RuntimeError: pass crash_unit_name = 'ceph-%s-crash.service' % args.fsid try: @@ -1156,26 +1254,27 @@ def command_rm_cluster(): slice_name = 'system-%s.slice' % ( ('ceph-%s' % args.fsid).replace('-', '\\x2d')) try: - subprocess.check_output(['systemctl', 'stop', slice_name]) - except subprocess.CalledProcessError: + call_throws(['systemctl', 'stop', slice_name], + verbose_on_failure=False) + except RuntimeError: pass # FIXME: stop + disable individual daemon units, too? # rm units - subprocess.check_output(['rm', '-f', args.unit_dir + + call_throws(['rm', '-f', args.unit_dir + '/ceph-%s@.service' % args.fsid]) - subprocess.check_output(['rm', '-f', args.unit_dir + - '/ceph-%s-crash.service' % args.fsid]) - subprocess.check_output(['rm', '-f', args.unit_dir + + call_throws(['rm', '-f', args.unit_dir + + '/ceph-%s-crash@.service' % args.fsid]) + call_throws(['rm', '-f', args.unit_dir + '/ceph-%s.target' % args.fsid]) - subprocess.check_output(['rm', '-rf', + call_throws(['rm', '-rf', args.unit_dir + '/ceph-%s.target.wants' % args.fsid]) # rm data - subprocess.check_output(['rm', '-rf', args.data_dir + '/' + args.fsid]) + call_throws(['rm', '-rf', args.data_dir + '/' + args.fsid]) # rm logs - subprocess.check_output(['rm', '-rf', args.log_dir + '/' + args.fsid]) - subprocess.check_output(['rm', '-rf', args.log_dir + + call_throws(['rm', '-rf', args.log_dir + '/' + args.fsid]) + call_throws(['rm', '-rf', args.log_dir + '/*.wants/ceph-%s@*' % args.fsid]) @@ -1436,7 +1535,7 @@ if args.debug: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) +logger = logging.getLogger('ceph-daemon') # podman or docker? if args.docker: diff --git a/test_ceph_daemon.sh b/test_ceph_daemon.sh index 331ebcfa473..a37687f4629 100755 --- a/test_ceph_daemon.sh +++ b/test_ceph_daemon.sh @@ -4,14 +4,16 @@ fsid=2a833e3f-53e4-49a7-a7a0-bd89d193ab62 image=ceph/daemon-base:latest-master-devel [ -z "$ip" ] && ip=127.0.0.1 -../src/ceph-daemon rm-cluster --fsid $fsid --force +#A="-d" + +../src/ceph-daemon $A rm-cluster --fsid $fsid --force cat < c [global] log to file = true EOF -../src/ceph-daemon \ +../src/ceph-daemon $A \ --image $image \ bootstrap \ --mon-id a \ @@ -26,7 +28,7 @@ chmod 644 k c if [ -n "$ip2" ]; then # mon.b - ../src/ceph-daemon \ + ../src/ceph-daemon $A \ --image $image \ deploy --name mon.b \ --fsid $fsid \ @@ -40,7 +42,7 @@ bin/ceph -c c -k k auth get-or-create mgr.y \ mon 'allow profile mgr' \ osd 'allow *' \ mds 'allow *' > k-mgr.y -../src/ceph-daemon \ +../src/ceph-daemon $A \ --image $image \ deploy --name mgr.y \ --fsid $fsid \ @@ -54,7 +56,7 @@ for id in k j; do mgr 'allow profile mds' \ osd 'allow *' \ mds 'allow *' > k-mds.$id - ../src/ceph-daemon \ + ../src/ceph-daemon $A \ --image $image \ deploy --name mds.$id \ --fsid $fsid \ -- 2.39.5