import time
import errno
import struct
+from enum import Enum
try:
from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO
except ImportError:
##################################
# Popen wrappers, lifted from ceph-volume
-def call(command, # type: List[str]
- desc=None, # type: Optional[str]
- verbose=False, # type: bool
- verbose_on_failure=True, # type: bool
- timeout=DEFAULT_TIMEOUT, # type: Optional[int]
- **kwargs):
+class CallVerbosity(Enum):
+ SILENT = 0
+ # log stdout/stderr to logger.debug
+ DEBUG = 1
+ # On a non-zero exit status, it will forcefully set
+ # logging ON for the terminal
+ VERBOSE_ON_FAILURE = 2
+ # log at info (instead of debug) level.
+ VERBOSE = 3
+
+
+def call(command: List[str],
+ desc: Optional[str] = None,
+ verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE,
+ timeout: Optional[int] = DEFAULT_TIMEOUT,
+ **kwargs) -> Tuple[str, str, int]:
"""
Wrap subprocess.Popen to
- decode utf-8
- cleanly return out, err, returncode
- If verbose=True, log at info (instead of debug) level.
-
- :param verbose_on_failure: On a non-zero exit status, it will forcefully set
- logging ON for the terminal
:param timeout: timeout in seconds
"""
if desc is None:
lines = message.split('\n')
out_buffer = lines.pop()
for line in lines:
- if verbose:
+ if verbosity == CallVerbosity.VERBOSE:
logger.info(desc + 'stdout ' + line)
- else:
+ elif verbosity != CallVerbosity.SILENT:
logger.debug(desc + 'stdout ' + line)
elif fd == process.stderr.fileno():
err += message
lines = message.split('\n')
err_buffer = lines.pop()
for line in lines:
- if verbose:
+ if verbosity == CallVerbosity.VERBOSE:
logger.info(desc + 'stderr ' + line)
- else:
+ elif verbosity != CallVerbosity.SILENT:
logger.debug(desc + 'stderr ' + line)
else:
assert False
except (IOError, OSError):
pass
- if verbose:
+ if verbosity == CallVerbosity.VERBOSE:
logger.debug(desc + 'profile rt=%s, stop=%s, exit=%s, reads=%s'
% (time.time()-start_time, stop, process.poll(), reads))
returncode = process.wait()
if out_buffer != '':
- if verbose:
+ if verbosity == CallVerbosity.VERBOSE:
logger.info(desc + 'stdout ' + out_buffer)
- else:
+ elif verbosity != CallVerbosity.SILENT:
logger.debug(desc + 'stdout ' + out_buffer)
if err_buffer != '':
- if verbose:
+ if verbosity == CallVerbosity.VERBOSE:
logger.info(desc + 'stderr ' + err_buffer)
- else:
+ elif verbosity != CallVerbosity.SILENT:
logger.debug(desc + 'stderr ' + err_buffer)
- if returncode != 0 and verbose_on_failure and not verbose:
+ if returncode != 0 and verbosity == CallVerbosity.VERBOSE_ON_FAILURE:
# dump stdout + stderr
logger.info('Non-zero exit code %d from %s' % (returncode, ' '.join(command)))
for line in out.splitlines():
return out, err, returncode
-def call_throws(command, **kwargs):
- # type: (List[str], Any) -> Tuple[str, str, int]
- out, err, ret = call(command, **kwargs)
+def call_throws(command: List[str],
+ desc: Optional[str] = None,
+ verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE,
+ timeout: Optional[int] = DEFAULT_TIMEOUT,
+ **kwargs) -> Tuple[str, str, int]:
+ out, err, ret = call(command, desc, verbosity, timeout, **kwargs)
if ret:
raise RuntimeError('Failed command: %s' % ' '.join(command))
return out, err, ret
installed = False
try:
out, err, code = call(['systemctl', 'is-enabled', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
if code == 0:
enabled = True
installed = True
state = 'unknown'
try:
out, err, code = call(['systemctl', 'is-active', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
out = out.strip()
if out in ['active']:
state = 'running'
unit_name = get_unit_name(fsid, daemon_type, daemon_id)
call(['systemctl', 'stop', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
call(['systemctl', 'reset-failed', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
if enable:
call_throws(['systemctl', 'enable', unit_name])
if start:
else:
return
- out, err, ret = call([self.cmd, '--permanent', '--query-service', svc], verbose_on_failure=False)
+ out, err, ret = call([self.cmd, '--permanent', '--query-service', svc], verbosity=CallVerbosity.DEBUG)
if ret:
logger.info('Enabling firewalld service %s in current zone...' % svc)
out, err, ret = call([self.cmd, '--permanent', '--add-service', svc])
for port in fw_ports:
tcp_port = str(port) + '/tcp'
- out, err, ret = call([self.cmd, '--permanent', '--query-port', tcp_port], verbose_on_failure=False)
+ out, err, ret = call([self.cmd, '--permanent', '--query-port', tcp_port], verbosity=CallVerbosity.DEBUG)
if ret:
logger.info('Enabling firewalld port %s in current zone...' % tcp_port)
out, err, ret = call([self.cmd, '--permanent', '--add-port', tcp_port])
else:
logger.debug('firewalld port %s is enabled in current zone' % tcp_port)
+ out, err, ret = call([self.cmd, '--permanent', '--query-port', tcp_port], verbose_on_failure=False)
def apply_rules(self):
# type: () -> None
if not self.available:
privileged=True,
volume_mounts=mounts,
)
- out, err, code = call_throws(c.run_cmd(), verbose=True)
+ out, err, code = call_throws(c.run_cmd(), verbosity=CallVerbosity.VERBOSE)
if not code:
print(out)
'systemctl',
args.command,
unit_name],
- verbose=True,
+ verbosity=CallVerbosity.VERBOSE,
desc=''
)
'--format', '{{.Id}},{{.Config.Image}},{{%s}},{{.Created}},{{index .Config.Labels "io.ceph.version"}}' % image_field,
'ceph-%s-%s' % (fsid, j)
],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
if not code:
(container_id, image_name, image_id, start,
version) = out.strip().split(',')
args=['lvm', 'list', '--format=json'],
privileged=True
)
- out, err, code = call_throws(c.run_cmd(), verbose=False)
+ out, err, code = call_throws(c.run_cmd())
if not code:
try:
js = json.loads(out)
'this command may destroy precious data!')
call(['systemctl', 'stop', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
call(['systemctl', 'reset-failed', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
call(['systemctl', 'disable', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
data_dir = get_data_dir(args.fsid, daemon_type, daemon_id)
if daemon_type in ['mon', 'osd', 'prometheus'] and \
not args.force_delete_data:
continue
unit_name = get_unit_name(args.fsid, d['name'])
call(['systemctl', 'stop', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
call(['systemctl', 'reset-failed', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
call(['systemctl', 'disable', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
# cluster units
for unit_name in ['ceph-%s.target' % args.fsid]:
call(['systemctl', 'stop', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
call(['systemctl', 'reset-failed', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
call(['systemctl', 'disable', unit_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
slice_name = 'system-%s.slice' % (('ceph-%s' % args.fsid).replace('-',
'\\x2d'))
call(['systemctl', 'stop', slice_name],
- verbose_on_failure=False)
+ verbosity=CallVerbosity.DEBUG)
# rm units
call_throws(['rm', '-f', args.unit_dir +
"""Get kernel parameters required/used in Ceph clusters"""
k_param = {}
- out, _, _ = call_throws(['sysctl', '-a'])
+ out, _, _ = call_throws(['sysctl', '-a'], verbosity=CallVerbosity.SILENT)
if out:
param_list = out.split('\n')
param_dict = { param.split(" = ")[0]:param.split(" = ")[-1] for param in param_list}