import argparse
import configparser
+import fcntl
import json
import logging
import os
+import select
import socket
import subprocess
import sys
podman_path = None
+##################################
+# Popen wrappers, lifted from ceph-volume
+
+def call(command, desc, verbose=False, **kw):
+ """
+ Wrap subprocess.Popen to
+
+ - log stdout/stderr to a logger,
+ - decode utf-8
+ - cleanly return out, err, returncode
+
+ If verbose=True, log at info (instead of debug) level.
+
+ :param verbose_on_failure: On a non-zero exit status, it will forcefully set
+ logging ON for the terminal
+ """
+ verbose_on_failure = kw.pop('verbose_on_failure', True)
+
+ logger.debug("Running command: %s" % ' '.join(command))
+ process = subprocess.Popen(
+ command,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ close_fds=True,
+ **kw
+ )
+ # get current p.stdout flags, add O_NONBLOCK
+ stdout_flags = fcntl.fcntl(process.stdout, fcntl.F_GETFL)
+ stderr_flags = fcntl.fcntl(process.stderr, fcntl.F_GETFL)
+ fcntl.fcntl(process.stdout, fcntl.F_SETFL, stdout_flags | os.O_NONBLOCK)
+ fcntl.fcntl(process.stderr, fcntl.F_SETFL, stderr_flags | os.O_NONBLOCK)
+
+ out = ''
+ err = ''
+ reads = None
+ stop = False
+ while not stop:
+ if reads and process.poll() is not None:
+ # we want to stop, but first read off anything remaining
+ # on stdout/stderr
+ stop = True
+ else:
+ reads, _, _ = select.select(
+ [process.stdout.fileno(), process.stderr.fileno()],
+ [], []
+ )
+ for fd in reads:
+ try:
+ message = os.read(fd, 1024)
+ if not isinstance(message, str):
+ message = message.decode('utf-8')
+ if fd == process.stdout.fileno():
+ out += message
+ for line in message.splitlines():
+ if verbose:
+ logger.info(desc + ':stdout ' + line)
+ else:
+ logger.debug(desc + ':stdout ' + line)
+ elif fd == process.stderr.fileno():
+ err += message
+ for line in message.splitlines():
+ if verbose:
+ logger.info(desc + ':stderr ' + line)
+ else:
+ logger.debug(desc + ':stderr ' + line)
+ else:
+ assert False
+ except (IOError, OSError):
+ pass
+
+ returncode = process.wait()
+
+ if returncode != 0 and verbose_on_failure and not verbose:
+ # dump stdout + stderr
+ logger.info('Non-zero exit code %d from %s' % (returncode, ' '.join(command)))
+ for line in out.splitlines():
+ logger.info(desc + ':stdout ' + line)
+ for line in err.splitlines():
+ logger.info(desc + ':stderr ' + line)
+
+ return out, err, returncode
+
+def call_throws(command, **kwargs):
+ out, err, ret = call(command, command[0], **kwargs)
+ if ret:
+ raise RuntimeError('Failed command: %s' % ' '.join(command))
+ return out, err, ret
+
##################################
def pathify(p):
def check_unit(unit_name):
try:
- out = subprocess.check_output(['systemctl', 'is-enabled', unit_name])
- enabled = out.decode('utf-8').strip() == 'enabled'
+ out, err, code = call(['systemctl', 'is-enabled', unit_name], 'systemctl')
+ if code:
+ raise RuntimeError('exited with %d' % code)
+ enabled = out.strip() == 'enabled'
except Exception as e:
logger.warning('unable to run systemctl' % e)
enabled = False
try:
- out = subprocess.check_output(['systemctl', 'is-active', unit_name])
- active = out.decode('utf-8').strip() == 'active'
+ out, err, code = call(['systemctl', 'is-active', unit_name], 'systemctl')
+ if code:
+ raise RuntimeError('exited with %d' % code)
+ active = out.strip() == 'active'
except Exception as e:
logger.warning('unable to run systemctl: %s' % e)
active = False
f.write(unit)
os.rename(args.unit_dir + '/' + unit_file + '.new',
args.unit_dir + '/' + unit_file)
- subprocess.check_output(['systemctl', 'daemon-reload'])
+ call_throws(['systemctl', 'daemon-reload'])
unit_name = get_unit_name(fsid, daemon_type, daemon_id)
if enable:
- subprocess.check_output(['systemctl', 'enable', unit_name])
+ call_throws(['systemctl', 'enable', unit_name])
if start:
- subprocess.check_output(['systemctl', 'start', unit_name])
+ call_throws(['systemctl', 'start', unit_name])
def install_base_units(fsid):
"""
os.rename(args.unit_dir + '/ceph.target.new',
args.unit_dir + '/ceph.target')
if not existed:
- subprocess.check_output(['systemctl', 'enable', 'ceph.target'])
- subprocess.check_output(['systemctl', 'start', 'ceph.target'])
+ call_throws(['systemctl', 'enable', 'ceph.target'])
+ call_throws(['systemctl', 'start', 'ceph.target'])
# cluster unit
existed = os.path.exists(args.unit_dir + '/ceph-%s.target' % fsid)
os.rename(args.unit_dir + '/ceph-%s.target.new' % fsid,
args.unit_dir + '/ceph-%s.target' % fsid)
if not existed:
- subprocess.check_output(['systemctl', 'enable', 'ceph-%s.target' % fsid])
- subprocess.check_output(['systemctl', 'start', 'ceph-%s.target' % fsid])
+ call_throws(['systemctl', 'enable', 'ceph-%s.target' % fsid])
+ call_throws(['systemctl', 'start', 'ceph-%s.target' % fsid])
def deploy_crash(fsid, uid, gid, config, keyring):
crash_dir = os.path.join(args.data_dir, fsid, 'crash')
def gen_ssh_key(fsid):
tmp_dir = tempfile.TemporaryDirectory()
path = tmp_dir.name + '/key'
- subprocess.check_output([
+ call_throws([
'ssh-keygen',
'-C', 'ceph-%s' % fsid,
'-N', '',
def run(self):
logger.debug(self.run_cmd())
- return subprocess.check_output(self.run_cmd()).decode('utf-8')
+ out, _, _ = call(self.run_cmd(), self.entrypoint)
+ return out
+
##################################
def command_version():
out = CephContainer(args.image, 'ceph', ['--version']).run()
- print(out.decode('utf-8'), end='')
+ print(out, end='')
return 0
##################################
with open(mon_dir + '/config', 'r') as f:
config = f.read()
logger.info('Restarting the monitor...')
- subprocess.call([
+ call_throws([
'systemctl',
'restart',
get_unit_name(fsid, 'mon', mon_id)
podman_args=['--privileged'],
volume_mounts=mounts,
)
- subprocess.call(c.run_cmd())
+ call_throws(c.run_cmd(), verbose=True)
##################################
def command_unit():
(daemon_type, daemon_id) = args.name.split('.')
unit_name = get_unit_name(args.fsid, daemon_type, daemon_id)
- subprocess.call([
+ call_throws([
'systemctl',
args.command,
unit_name])
if active:
logger.info('Stopping old systemd unit %s...' % unit_name)
- subprocess.check_output(['systemctl', 'stop', unit_name])
+ call_throws(['systemctl', 'stop', unit_name])
if enabled:
logger.info('Disabling old systemd unit %s...' % unit_name)
- subprocess.check_output(['systemctl', 'disable', unit_name])
+ call_throws(['systemctl', 'disable', unit_name])
logger.info('Moving data...')
make_data_dir_base(fsid, uid, gid)
data_dir = get_data_dir(fsid, daemon_type, daemon_id)
- subprocess.check_output([
+ call_throws([
'mv',
'/var/lib/ceph/%s/%s-%s' % (daemon_type, args.cluster, daemon_id),
data_dir])
- subprocess.check_output([
+ call_throws([
'cp',
'/etc/ceph/%s.conf' % args.cluster,
os.path.join(data_dir, 'config')])
logger.info('Moving logs...')
log_dir = make_log_dir(fsid, uid=uid, gid=gid)
try:
- subprocess.check_output(
+ call_throws(
['mv',
'/var/log/ceph/%s-%s.%s.log*' % (args.cluster,
daemon_type, daemon_id),
raise RuntimeError('must pass --force to proceed: '
'this command may destroy precious data!')
unit_name = get_unit_name(args.fsid, daemon_type, daemon_id)
- subprocess.check_output(['systemctl', 'stop', unit_name])
- subprocess.check_output(['systemctl', 'disable', unit_name])
+ call_throws(['systemctl', 'stop', unit_name])
+ call_throws(['systemctl', 'disable', unit_name])
data_dir = get_data_dir(args.fsid, daemon_type, daemon_id)
- subprocess.check_output(['rm', '-rf', data_dir])
+ call_throws(['rm', '-rf', data_dir])
##################################
unit_name = 'ceph-%s.target' % args.fsid
try:
- subprocess.check_output(['systemctl', 'stop', unit_name])
- subprocess.check_output(['systemctl', 'disable', unit_name])
- except subprocess.CalledProcessError:
+ call_throws(['systemctl', 'stop', unit_name],
+ verbose_on_failure=False)
+ call_throws(['systemctl', 'disable', unit_name],
+ verbose_on_failure=False)
+ except RuntimeError:
pass
crash_unit_name = 'ceph-%s-crash.service' % args.fsid
try:
slice_name = 'system-%s.slice' % (
('ceph-%s' % args.fsid).replace('-', '\\x2d'))
try:
- subprocess.check_output(['systemctl', 'stop', slice_name])
- except subprocess.CalledProcessError:
+ call_throws(['systemctl', 'stop', slice_name],
+ verbose_on_failure=False)
+ except RuntimeError:
pass
# FIXME: stop + disable individual daemon units, too?
# rm units
- subprocess.check_output(['rm', '-f', args.unit_dir +
+ call_throws(['rm', '-f', args.unit_dir +
'/ceph-%s@.service' % args.fsid])
- subprocess.check_output(['rm', '-f', args.unit_dir +
- '/ceph-%s-crash.service' % args.fsid])
- subprocess.check_output(['rm', '-f', args.unit_dir +
+ call_throws(['rm', '-f', args.unit_dir +
+ '/ceph-%s-crash@.service' % args.fsid])
+ call_throws(['rm', '-f', args.unit_dir +
'/ceph-%s.target' % args.fsid])
- subprocess.check_output(['rm', '-rf',
+ call_throws(['rm', '-rf',
args.unit_dir + '/ceph-%s.target.wants' % args.fsid])
# rm data
- subprocess.check_output(['rm', '-rf', args.data_dir + '/' + args.fsid])
+ call_throws(['rm', '-rf', args.data_dir + '/' + args.fsid])
# rm logs
- subprocess.check_output(['rm', '-rf', args.log_dir + '/' + args.fsid])
- subprocess.check_output(['rm', '-rf', args.log_dir +
+ call_throws(['rm', '-rf', args.log_dir + '/' + args.fsid])
+ call_throws(['rm', '-rf', args.log_dir +
'/*.wants/ceph-%s@*' % args.fsid])
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
-logger = logging.getLogger(__name__)
+logger = logging.getLogger('ceph-daemon')
# podman or docker?
if args.docker: