if sys.version_info > (3, 0):
unicode = str
-container_path = ''
cached_stdin = None
DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
+
+logger: logging.Logger = None # type: ignore
+
+##################################
+
+class CephadmContext:
+
+ def __init__(self):
+ self._args: argparse.Namespace = None # type: ignore
+ self.container_path: str = None # type: ignore
+
+ @property
+ def args(self) -> argparse.Namespace:
+ return self._args
+
+ @args.setter
+ def args(self, args: argparse.Namespace) -> None:
+ self._args = args
+
+
+##################################
+
+
# Log and console output config
logging_config = {
'version': 1,
}
def __init__(self,
+ ctx,
fsid,
daemon_id,
config_json,
image=DEFAULT_IMAGE):
- # type: (str, Union[int, str], Dict, str) -> None
+ # type: (CephadmContext, str, Union[int, str], Dict, str) -> None
+ self.ctx = ctx
self.fsid = fsid
self.daemon_id = daemon_id
self.image = image
self.validate()
@classmethod
- def init(cls, fsid, daemon_id):
- # type: (str, Union[int, str]) -> NFSGanesha
- return cls(fsid, daemon_id, get_parm(args.config_json), args.image)
+ def init(cls, ctx, fsid, daemon_id):
+ # type: (CephadmContext, str, Union[int, str]) -> NFSGanesha
+ return cls(ctx, fsid, daemon_id, get_parm(ctx.args.config_json),
+ ctx.args.image)
def get_container_mounts(self, data_dir):
# type: (str) -> Dict[str, str]
return envs
@staticmethod
- def get_version(container_id):
- # type: (str) -> Optional[str]
+ def get_version(ctx, container_id):
+ # type: (CephadmContext, str) -> Optional[str]
version = None
- out, err, code = call(
- [container_path, 'exec', container_id,
+ out, err, code = call(ctx,
+ [ctx.container_path, 'exec', container_id,
NFSGanesha.entrypoint, '-v'])
if code == 0:
match = re.search(r'NFS-Ganesha Release\s*=\s*[V]*([\d.]+)', out)
args += ['--userid', self.userid]
args += [action, self.get_daemon_name()]
- data_dir = get_data_dir(self.fsid, self.daemon_type, self.daemon_id)
+ data_dir = get_data_dir(self.fsid, self.ctx.args.data_dir,
+ self.daemon_type, self.daemon_id)
volume_mounts = self.get_container_mounts(data_dir)
envs = self.get_container_envs()
logger.info('Creating RADOS grace for action: %s' % action)
c = CephContainer(
+ self.ctx,
image=self.image,
entrypoint=entrypoint,
args=args,
required_files = ['iscsi-gateway.cfg']
def __init__(self,
+ ctx,
fsid,
daemon_id,
config_json,
image=DEFAULT_IMAGE):
- # type: (str, Union[int, str], Dict, str) -> None
+ # type: (CephadmContext, str, Union[int, str], Dict, str) -> None
+ self.ctx = ctx
self.fsid = fsid
self.daemon_id = daemon_id
self.image = image
self.validate()
@classmethod
- def init(cls, fsid, daemon_id):
- # type: (str, Union[int, str]) -> CephIscsi
- return cls(fsid, daemon_id, get_parm(args.config_json), args.image)
+ def init(cls, ctx, fsid, daemon_id):
+ # type: (CephadmContext, str, Union[int, str]) -> CephIscsi
+ return cls(ctx, fsid, daemon_id,
+ get_parm(ctx.args.config_json), ctx.args.image)
@staticmethod
def get_container_mounts(data_dir, log_dir):
return binds
@staticmethod
- def get_version(container_id):
- # type: (str) -> Optional[str]
+ def get_version(ctx, container_id):
+ # type: (CephadmContext, str) -> Optional[str]
version = None
- out, err, code = call(
- [container_path, 'exec', container_id,
+ out, err, code = call(ctx,
+ [ctx.container_path, 'exec', container_id,
'/usr/bin/python3', '-c', "import pkg_resources; print(pkg_resources.require('ceph_iscsi')[0].version)"])
if code == 0:
version = out.strip()
def get_tcmu_runner_container(self):
# type: () -> CephContainer
- tcmu_container = get_container(self.fsid, self.daemon_type, self.daemon_id)
+ tcmu_container = get_container(self.ctx, self.fsid, self.daemon_type, self.daemon_id)
tcmu_container.entrypoint = "/usr/bin/tcmu-runner"
tcmu_container.cname = self.get_container_name(desc='tcmu')
# remove extra container args for tcmu container.
required_files = ['haproxy.cfg']
default_image = 'haproxy'
- def __init__(self, fsid: str, daemon_id: Union[int, str],
+ def __init__(self,
+ ctx: CephadmContext,
+ fsid: str, daemon_id: Union[int, str],
config_json: Dict, image: str) -> None:
+ self.ctx = ctx
self.fsid = fsid
self.daemon_id = daemon_id
self.image = image
self.validate()
@classmethod
- def init(cls, fsid: str, daemon_id: Union[int, str]) -> 'HAproxy':
- return cls(fsid, daemon_id, get_parm(args.config_json), args.image)
+ def init(cls, ctx: CephadmContext,
+ fsid: str, daemon_id: Union[int, str]) -> 'HAproxy':
+ return cls(ctx, fsid, daemon_id, get_parm(ctx.args.config_json),
+ ctx.args.image)
def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None:
"""Create files under the container data dir"""
def extract_uid_gid_haproxy(self):
# better directory for this?
- return extract_uid_gid(file_path='/var/lib')
+ return extract_uid_gid(self.ctx, file_path='/var/lib')
@staticmethod
def get_container_mounts(data_dir: str) -> Dict[str, str]:
required_files = ['keepalived.conf']
default_image = 'arcts/keepalived'
- def __init__(self, fsid: str, daemon_id: Union[int, str],
+ def __init__(self,
+ ctx: CephadmContext,
+ fsid: str, daemon_id: Union[int, str],
config_json: Dict, image: str) -> None:
+ self.ctx = ctx
self.fsid = fsid
self.daemon_id = daemon_id
self.image = image
self.validate()
@classmethod
- def init(cls, fsid: str, daemon_id: Union[int, str]) -> 'Keepalived':
- return cls(fsid, daemon_id, get_parm(args.config_json), args.image)
+ def init(cls, ctx: CephadmContext, fsid: str,
+ daemon_id: Union[int, str]) -> 'Keepalived':
+ return cls(ctx, fsid, daemon_id,
+ get_parm(ctx.args.config_json), ctx.args.image)
def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None:
"""Create files under the container data dir"""
def extract_uid_gid_keepalived(self):
# better directory for this?
- return extract_uid_gid(file_path='/var/lib')
+ return extract_uid_gid(self.ctx, file_path='/var/lib')
@staticmethod
def get_container_mounts(data_dir: str) -> Dict[str, str]:
"""Defines a custom container"""
daemon_type = 'container'
- def __init__(self, fsid: str, daemon_id: Union[int, str],
+ def __init__(self, ctx: CephadmContext,
+ fsid: str, daemon_id: Union[int, str],
config_json: Dict, image: str) -> None:
+ self.ctx = ctx
self.fsid = fsid
self.daemon_id = daemon_id
self.image = image
self.files = dict_get(config_json, 'files', {})
@classmethod
- def init(cls, fsid: str, daemon_id: Union[int, str]) -> 'CustomContainer':
- return cls(fsid, daemon_id, get_parm(args.config_json), args.image)
+ def init(cls, ctx: CephadmContext,
+ fsid: str, daemon_id: Union[int, str]) -> 'CustomContainer':
+ return cls(ctx, fsid, daemon_id,
+ get_parm(ctx.args.config_json), ctx.args.image)
def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None:
"""
##################################
-def attempt_bind(s, address, port):
- # type: (socket.socket, str, int) -> None
+def attempt_bind(ctx, s, address, port):
+ # type: (CephadmContext, socket.socket, str, int) -> None
try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((address, port))
s.close()
-def port_in_use(port_num):
- # type: (int) -> bool
+def port_in_use(ctx, port_num):
+ # type: (CephadmContext, int) -> bool
"""Detect whether a port is in use on the local machine - IPv4 and IPv6"""
logger.info('Verifying port %d ...' % port_num)
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- attempt_bind(s, '0.0.0.0', port_num)
+ attempt_bind(ctx, s, '0.0.0.0', port_num)
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
- attempt_bind(s, '::', port_num)
+ attempt_bind(ctx, s, '::', port_num)
except OSError:
return True
else:
return False
-def check_ip_port(ip, port):
- # type: (str, int) -> None
- if not args.skip_ping_check:
+def check_ip_port(ctx, ip, port):
+ # type: (CephadmContext, str, int) -> None
+ if not ctx.args.skip_ping_check:
logger.info('Verifying IP %s port %d ...' % (ip, port))
- if is_ipv6(ip):
+ if is_ipv6(ctx, ip):
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
ip = unwrap_ipv6(ip)
else:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
- attempt_bind(s, ip, port)
+ attempt_bind(ctx, s, ip, port)
except OSError as e:
raise Error(e)
class FileLock(object):
- def __init__(self, name, timeout=-1):
+ def __init__(self, ctx: CephadmContext, name, timeout=-1):
if not os.path.exists(LOCK_DIR):
os.mkdir(LOCK_DIR, 0o700)
self._lock_file = os.path.join(LOCK_DIR, name + '.lock')
+ self.ctx = ctx
# The file descriptor for the *_lock_file* as it is returned by the
# os.open() function.
This method returns now a *proxy* object instead of *self*,
so that it can be used in a with statement without side effects.
"""
+
# Use the default timeout, if no timeout is provided.
if timeout is None:
timeout = self.timeout
VERBOSE = 3
-def call(command: List[str],
+def call(ctx: CephadmContext,
+ command: List[str],
desc: Optional[str] = None,
verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE,
timeout: Optional[int] = DEFAULT_TIMEOUT,
:param timeout: timeout in seconds
"""
+
if desc is None:
desc = command[0]
if desc:
desc += ': '
- timeout = timeout or args.timeout
+ timeout = timeout or ctx.args.timeout
logger.debug("Running command: %s" % ' '.join(command))
process = subprocess.Popen(
return out, err, returncode
-def call_throws(command: List[str],
+def call_throws(
+ ctx: CephadmContext,
+ command: List[str],
desc: Optional[str] = None,
verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE,
timeout: Optional[int] = DEFAULT_TIMEOUT,
**kwargs) -> Tuple[str, str, int]:
- out, err, ret = call(command, desc, verbosity, timeout, **kwargs)
+ out, err, ret = call(ctx, command, desc, verbosity, timeout, **kwargs)
if ret:
raise RuntimeError('Failed command: %s' % ' '.join(command))
return out, err, ret
-def call_timeout(command, timeout):
- # type: (List[str], int) -> int
-
+def call_timeout(ctx, command, timeout):
+ # type: (CephadmContext, List[str], int) -> int
logger.debug('Running command (timeout=%s): %s'
% (timeout, ' '.join(command)))
##################################
-def is_available(what, func):
- # type: (str, Callable[[], bool]) -> None
+def is_available(ctx, what, func):
+ # type: (CephadmContext, str, Callable[[], bool]) -> None
"""
Wait for a service to become available
:param what: the name of the service
:param func: the callable object that determines availability
"""
- retry = args.retry
+ retry = ctx.args.retry
logger.info('Waiting for %s...' % what)
num = 1
while True:
return None
-def get_podman_version():
- # type: () -> Tuple[int, ...]
+def get_podman_version(ctx, container_path):
+ # type: (CephadmContext, str) -> Tuple[int, ...]
if 'podman' not in container_path:
raise ValueError('not using podman')
- out, _, _ = call_throws([container_path, '--version'])
+ out, _, _ = call_throws(ctx, [container_path, '--version'])
return _parse_podman_version(out)
If we only find a single fsid in /var/lib/ceph/*, use that
"""
@wraps(func)
- def _infer_fsid():
- if args.fsid:
- logger.debug('Using specified fsid: %s' % args.fsid)
- return func()
+ def _infer_fsid(ctx: CephadmContext):
+ if ctx.args.fsid:
+ logger.debug('Using specified fsid: %s' % ctx.args.fsid)
+ return func(ctx)
fsids_set = set()
- daemon_list = list_daemons(detail=False)
+ daemon_list = list_daemons(ctx, detail=False)
for daemon in daemon_list:
if not is_fsid(daemon['fsid']):
# 'unknown' fsid
continue
- elif 'name' not in args or not args.name:
- # args.name not specified
+ elif 'name' not in ctx.args or not ctx.args.name:
+ # ctx.args.name not specified
fsids_set.add(daemon['fsid'])
- elif daemon['name'] == args.name:
- # args.name is a match
+ elif daemon['name'] == ctx.args.name:
+ # ctx.args.name is a match
fsids_set.add(daemon['fsid'])
fsids = sorted(fsids_set)
pass
elif len(fsids) == 1:
logger.info('Inferring fsid %s' % fsids[0])
- args.fsid = fsids[0]
+ ctx.args.fsid = fsids[0]
else:
raise Error('Cannot infer an fsid, one must be specified: %s' % fsids)
- return func()
+ return func(ctx)
return _infer_fsid
If we find a MON daemon, use the config from that container
"""
@wraps(func)
- def _infer_config():
- if args.config:
- logger.debug('Using specified config: %s' % args.config)
- return func()
+ def _infer_config(ctx: CephadmContext):
+ if ctx.args.config:
+ logger.debug('Using specified config: %s' % ctx.args.config)
+ return func(ctx)
config = None
- if args.fsid:
- name = args.name
+ if ctx.args.fsid:
+ name = ctx.args.name
if not name:
- daemon_list = list_daemons(detail=False)
+ daemon_list = list_daemons(ctx, detail=False)
for daemon in daemon_list:
if daemon['name'].startswith('mon.'):
name = daemon['name']
break
if name:
- config = '/var/lib/ceph/{}/{}/config'.format(args.fsid, name)
+ config = '/var/lib/ceph/{}/{}/config'.format(ctx.args.fsid,
+ name)
if config:
logger.info('Inferring config %s' % config)
- args.config = config
+ ctx.args.config = config
elif os.path.exists(SHELL_DEFAULT_CONF):
logger.debug('Using default config: %s' % SHELL_DEFAULT_CONF)
- args.config = SHELL_DEFAULT_CONF
- return func()
+ ctx.args.config = SHELL_DEFAULT_CONF
+ return func(ctx)
return _infer_config
-def _get_default_image():
+def _get_default_image(ctx: CephadmContext):
if DEFAULT_IMAGE_IS_MASTER:
warn = '''This is a development version of cephadm.
For information regarding the latest stable release:
Use the most recent ceph image
"""
@wraps(func)
- def _infer_image():
- if not args.image:
- args.image = os.environ.get('CEPHADM_IMAGE')
- if not args.image:
- args.image = get_last_local_ceph_image()
- if not args.image:
- args.image = _get_default_image()
- return func()
+ def _infer_image(ctx: CephadmContext):
+ if not ctx.args.image:
+ ctx.args.image = os.environ.get('CEPHADM_IMAGE')
+ if not ctx.args.image:
+ ctx.args.image = get_last_local_ceph_image(ctx, ctx.container_path)
+ if not ctx.args.image:
+ ctx.args.image = _get_default_image(ctx)
+ return func(ctx)
return _infer_image
def default_image(func):
@wraps(func)
- def _default_image():
- if not args.image:
- if 'name' in args and args.name:
- type_ = args.name.split('.', 1)[0]
+ def _default_image(ctx: CephadmContext):
+ if not ctx.args.image:
+ if 'name' in ctx.args and ctx.args.name:
+ type_ = ctx.args.name.split('.', 1)[0]
if type_ in Monitoring.components:
- args.image = Monitoring.components[type_]['image']
+ ctx.args.image = Monitoring.components[type_]['image']
if type_ == 'haproxy':
- args.image = HAproxy.default_image
+ ctx.args.image = HAproxy.default_image
if type_ == 'keepalived':
- args.image = Keepalived.default_image
- if not args.image:
- args.image = os.environ.get('CEPHADM_IMAGE')
- if not args.image:
- args.image = _get_default_image()
+ ctx.args.image = Keepalived.default_image
+ if not ctx.args.image:
+ ctx.args.image = os.environ.get('CEPHADM_IMAGE')
+ if not ctx.args.image:
+ ctx.args.image = _get_default_image(ctx)
- return func()
+ return func(ctx)
return _default_image
-def get_last_local_ceph_image():
+def get_last_local_ceph_image(ctx: CephadmContext, container_path: str):
"""
:return: The most recent local ceph image (already pulled)
"""
- out, _, _ = call_throws(
+ out, _, _ = call_throws(ctx,
[container_path, 'images',
'--filter', 'label=ceph=True',
'--filter', 'dangling=false',
'--format', '{{.Repository}}@{{.Digest}}'])
- return _filter_last_local_ceph_image(out)
+ return _filter_last_local_ceph_image(ctx, out)
-def _filter_last_local_ceph_image(out):
- # str -> Optional[str]
+def _filter_last_local_ceph_image(ctx, out):
+ # type: (CephadmContext, str) -> Optional[str]
for image in out.splitlines():
if image and not image.endswith('@'):
logger.info('Using recent ceph image %s' % image)
os.chmod(dir, mode) # the above is masked by umask...
-def get_data_dir(fsid, t, n):
- # type: (str, str, Union[int, str]) -> str
- return os.path.join(args.data_dir, fsid, '%s.%s' % (t, n))
+def get_data_dir(fsid, data_dir, t, n):
+ # type: (str, str, str, Union[int, str]) -> str
+ return os.path.join(data_dir, fsid, '%s.%s' % (t, n))
-def get_log_dir(fsid):
- # type: (str) -> str
- return os.path.join(args.log_dir, fsid)
+def get_log_dir(fsid, log_dir):
+ # type: (str, str) -> str
+ return os.path.join(log_dir, fsid)
-def make_data_dir_base(fsid, uid, gid):
- # type: (str, int, int) -> str
- data_dir_base = os.path.join(args.data_dir, fsid)
+def make_data_dir_base(fsid, data_dir, uid, gid):
+ # type: (str, str, int, int) -> str
+ data_dir_base = os.path.join(data_dir, fsid)
makedirs(data_dir_base, uid, gid, DATA_DIR_MODE)
makedirs(os.path.join(data_dir_base, 'crash'), uid, gid, DATA_DIR_MODE)
makedirs(os.path.join(data_dir_base, 'crash', 'posted'), uid, gid,
return data_dir_base
-def make_data_dir(fsid, daemon_type, daemon_id, uid=None, gid=None):
- # type: (str, str, Union[int, str], Optional[int], Optional[int]) -> str
+def make_data_dir(ctx, fsid, daemon_type, daemon_id, uid=None, gid=None):
+ # type: (CephadmContext, str, str, Union[int, str], Optional[int], Optional[int]) -> str
if uid is None or gid is None:
- uid, gid = extract_uid_gid()
- make_data_dir_base(fsid, uid, gid)
- data_dir = get_data_dir(fsid, daemon_type, daemon_id)
+ uid, gid = extract_uid_gid(ctx)
+ make_data_dir_base(fsid, ctx.args.data_dir, uid, gid)
+ data_dir = get_data_dir(fsid, ctx.args.data_dir, daemon_type, daemon_id)
makedirs(data_dir, uid, gid, DATA_DIR_MODE)
return data_dir
-def make_log_dir(fsid, uid=None, gid=None):
- # type: (str, Optional[int], Optional[int]) -> str
+def make_log_dir(ctx, fsid, uid=None, gid=None):
+ # type: (CephadmContext, str, Optional[int], Optional[int]) -> str
if uid is None or gid is None:
- uid, gid = extract_uid_gid()
- log_dir = get_log_dir(fsid)
+ uid, gid = extract_uid_gid(ctx)
+ log_dir = get_log_dir(fsid, ctx.args.log_dir)
makedirs(log_dir, uid, gid, LOG_DIR_MODE)
return log_dir
-def make_var_run(fsid, uid, gid):
- # type: (str, int, int) -> None
- call_throws(['install', '-d', '-m0770', '-o', str(uid), '-g', str(gid),
+def make_var_run(ctx, fsid, uid, gid):
+ # type: (CephadmContext, str, int, int) -> None
+ call_throws(ctx, ['install', '-d', '-m0770', '-o', str(uid), '-g', str(gid),
'/var/run/ceph/%s' % fsid])
-def copy_tree(src, dst, uid=None, gid=None):
- # type: (List[str], str, Optional[int], Optional[int]) -> None
+def copy_tree(ctx, src, dst, uid=None, gid=None):
+ # type: (CephadmContext, List[str], str, Optional[int], Optional[int]) -> None
"""
Copy a directory tree from src to dst
"""
if uid is None or gid is None:
- (uid, gid) = extract_uid_gid()
+ (uid, gid) = extract_uid_gid(ctx)
for src_dir in src:
dst_dir = dst
os.chown(os.path.join(dirpath, filename), uid, gid)
-def copy_files(src, dst, uid=None, gid=None):
- # type: (List[str], str, Optional[int], Optional[int]) -> None
+def copy_files(ctx, src, dst, uid=None, gid=None):
+ # type: (CephadmContext, List[str], str, Optional[int], Optional[int]) -> None
"""
Copy a files from src to dst
"""
if uid is None or gid is None:
- (uid, gid) = extract_uid_gid()
+ (uid, gid) = extract_uid_gid(ctx)
for src_file in src:
dst_file = dst
os.chown(dst_file, uid, gid)
-def move_files(src, dst, uid=None, gid=None):
- # type: (List[str], str, Optional[int], Optional[int]) -> None
+def move_files(ctx, src, dst, uid=None, gid=None):
+ # type: (CephadmContext, List[str], str, Optional[int], Optional[int]) -> None
"""
Move files from src to dst
"""
if uid is None or gid is None:
- (uid, gid) = extract_uid_gid()
+ (uid, gid) = extract_uid_gid(ctx)
for src_file in src:
dst_file = dst
return 'ceph-%s@%s' % (fsid, daemon_type)
-def get_unit_name_by_daemon_name(fsid, name):
- daemon = get_daemon_description(fsid, name)
+def get_unit_name_by_daemon_name(ctx: CephadmContext, fsid, name):
+ daemon = get_daemon_description(ctx, fsid, name)
try:
return daemon['systemd_unit']
except KeyError:
raise Error('Failed to get unit name for {}'.format(daemon))
-def check_unit(unit_name):
- # type: (str) -> Tuple[bool, str, bool]
+def check_unit(ctx, unit_name):
+ # type: (CephadmContext, str) -> Tuple[bool, str, bool]
# NOTE: we ignore the exit code here because systemctl outputs
# various exit codes based on the state of the service, but the
# string result is more explicit (and sufficient).
enabled = False
installed = False
try:
- out, err, code = call(['systemctl', 'is-enabled', unit_name],
+ out, err, code = call(ctx, ['systemctl', 'is-enabled', unit_name],
verbosity=CallVerbosity.DEBUG)
if code == 0:
enabled = True
state = 'unknown'
try:
- out, err, code = call(['systemctl', 'is-active', unit_name],
+ out, err, code = call(ctx, ['systemctl', 'is-active', unit_name],
verbosity=CallVerbosity.DEBUG)
out = out.strip()
if out in ['active']:
return (enabled, state, installed)
-def check_units(units, enabler=None):
- # type: (List[str], Optional[Packager]) -> bool
+def check_units(ctx, units, enabler=None):
+ # type: (CephadmContext, List[str], Optional[Packager]) -> bool
for u in units:
- (enabled, state, installed) = check_unit(u)
+ (enabled, state, installed) = check_unit(ctx, u)
if enabled and state == 'running':
logger.info('Unit %s is enabled and running' % u)
return True
return None
-def get_legacy_daemon_fsid(cluster, daemon_type, daemon_id, legacy_dir=None):
- # type: (str, str, Union[int, str], Optional[str]) -> Optional[str]
+def get_legacy_daemon_fsid(ctx, cluster,
+ daemon_type, daemon_id, legacy_dir=None):
+ # type: (CephadmContext, str, str, Union[int, str], Optional[str]) -> Optional[str]
fsid = None
if daemon_type == 'osd':
try:
- fsid_file = os.path.join(args.data_dir,
+ fsid_file = os.path.join(ctx.args.data_dir,
daemon_type,
'ceph-%s' % daemon_id,
'ceph_fsid')
return fsid
-def get_daemon_args(fsid, daemon_type, daemon_id):
- # type: (str, str, Union[int, str]) -> List[str]
+def get_daemon_args(ctx, fsid, daemon_type, daemon_id):
+ # type: (CephadmContext, str, str, Union[int, str]) -> List[str]
r = list() # type: List[str]
if daemon_type in Ceph.daemons and daemon_type != 'crash':
metadata = Monitoring.components[daemon_type]
r += metadata.get('args', list())
if daemon_type == 'alertmanager':
- config = get_parm(args.config_json)
+ config = get_parm(ctx.args.config_json)
peers = config.get('peers', list()) # type: ignore
for peer in peers:
r += ["--cluster.peer={}".format(peer)]
# some alertmanager, by default, look elsewhere for a config
r += ["--config.file=/etc/alertmanager/alertmanager.yml"]
elif daemon_type == NFSGanesha.daemon_type:
- nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
+ nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id)
r += nfs_ganesha.get_daemon_args()
elif daemon_type == HAproxy.daemon_type:
- haproxy = HAproxy.init(fsid, daemon_id)
+ haproxy = HAproxy.init(ctx, fsid, daemon_id)
r += haproxy.get_daemon_args()
elif daemon_type == CustomContainer.daemon_type:
- cc = CustomContainer.init(fsid, daemon_id)
+ cc = CustomContainer.init(ctx, fsid, daemon_id)
r.extend(cc.get_daemon_args())
return r
-def create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid,
+def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid,
config=None, keyring=None):
- # type: (str, str, Union[int, str], int, int, Optional[str], Optional[str]) -> None
- data_dir = make_data_dir(fsid, daemon_type, daemon_id, uid=uid, gid=gid)
- make_log_dir(fsid, uid=uid, gid=gid)
+ # type: (CephadmContext, str, str, Union[int, str], int, int, Optional[str], Optional[str]) -> None
+ data_dir = make_data_dir(ctx, fsid, daemon_type, daemon_id, uid=uid, gid=gid)
+ make_log_dir(ctx, fsid, uid=uid, gid=gid)
if config:
config_path = os.path.join(data_dir, 'config')
f.write(keyring)
if daemon_type in Monitoring.components.keys():
- config_json: Dict[str, Any] = get_parm(args.config_json)
+ config_json: Dict[str, Any] = get_parm(ctx.args.config_json)
required_files = Monitoring.components[daemon_type].get('config-json-files', list())
# Set up directories specific to the monitoring component
config_dir = ''
data_dir_root = ''
if daemon_type == 'prometheus':
- data_dir_root = get_data_dir(fsid, daemon_type, daemon_id)
+ data_dir_root = get_data_dir(fsid, ctx.args.data_dir,
+ daemon_type, daemon_id)
config_dir = 'etc/prometheus'
makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
makedirs(os.path.join(data_dir_root, config_dir, 'alerting'), uid, gid, 0o755)
makedirs(os.path.join(data_dir_root, 'data'), uid, gid, 0o755)
elif daemon_type == 'grafana':
- data_dir_root = get_data_dir(fsid, daemon_type, daemon_id)
+ data_dir_root = get_data_dir(fsid, ctx.args.data_dir,
+ daemon_type, daemon_id)
config_dir = 'etc/grafana'
makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
makedirs(os.path.join(data_dir_root, config_dir, 'certs'), uid, gid, 0o755)
makedirs(os.path.join(data_dir_root, config_dir, 'provisioning/datasources'), uid, gid, 0o755)
makedirs(os.path.join(data_dir_root, 'data'), uid, gid, 0o755)
elif daemon_type == 'alertmanager':
- data_dir_root = get_data_dir(fsid, daemon_type, daemon_id)
+ data_dir_root = get_data_dir(fsid, ctx.args.data_dir,
+ daemon_type, daemon_id)
config_dir = 'etc/alertmanager'
makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
makedirs(os.path.join(data_dir_root, config_dir, 'data'), uid, gid, 0o755)
f.write(content)
elif daemon_type == NFSGanesha.daemon_type:
- nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
+ nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id)
nfs_ganesha.create_daemon_dirs(data_dir, uid, gid)
elif daemon_type == CephIscsi.daemon_type:
- ceph_iscsi = CephIscsi.init(fsid, daemon_id)
+ ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id)
ceph_iscsi.create_daemon_dirs(data_dir, uid, gid)
elif daemon_type == HAproxy.daemon_type:
- haproxy = HAproxy.init(fsid, daemon_id)
+ haproxy = HAproxy.init(ctx, fsid, daemon_id)
haproxy.create_daemon_dirs(data_dir, uid, gid)
elif daemon_type == Keepalived.daemon_type:
- keepalived = Keepalived.init(fsid, daemon_id)
+ keepalived = Keepalived.init(ctx, fsid, daemon_id)
keepalived.create_daemon_dirs(data_dir, uid, gid)
elif daemon_type == CustomContainer.daemon_type:
- cc = CustomContainer.init(fsid, daemon_id)
+ cc = CustomContainer.init(ctx, fsid, daemon_id)
cc.create_daemon_dirs(data_dir, uid, gid)
return js
-def get_config_and_keyring():
- # type: () -> Tuple[Optional[str], Optional[str]]
+def get_config_and_keyring(ctx):
+ # type: (CephadmContext) -> Tuple[Optional[str], Optional[str]]
config = None
keyring = None
- if 'config_json' in args and args.config_json:
- d = get_parm(args.config_json)
+ if 'config_json' in ctx.args and ctx.args.config_json:
+ d = get_parm(ctx.args.config_json)
config = d.get('config')
keyring = d.get('keyring')
- if 'config' in args and args.config:
- with open(args.config, 'r') as f:
+ if 'config' in ctx.args and ctx.args.config:
+ with open(ctx.args.config, 'r') as f:
config = f.read()
- if 'key' in args and args.key:
- keyring = '[%s]\n\tkey = %s\n' % (args.name, args.key)
- elif 'keyring' in args and args.keyring:
- with open(args.keyring, 'r') as f:
+ if 'key' in ctx.args and ctx.args.key:
+ keyring = '[%s]\n\tkey = %s\n' % (ctx.args.name, ctx.args.key)
+ elif 'keyring' in ctx.args and ctx.args.keyring:
+ with open(ctx.args.keyring, 'r') as f:
keyring = f.read()
return config, keyring
-def get_container_binds(fsid, daemon_type, daemon_id):
- # type: (str, str, Union[int, str, None]) -> List[List[str]]
+def get_container_binds(ctx, fsid, daemon_type, daemon_id):
+ # type: (CephadmContext, str, str, Union[int, str, None]) -> List[List[str]]
binds = list()
if daemon_type == CephIscsi.daemon_type:
binds.extend(CephIscsi.get_container_binds())
elif daemon_type == CustomContainer.daemon_type:
assert daemon_id
- cc = CustomContainer.init(fsid, daemon_id)
- data_dir = get_data_dir(fsid, daemon_type, daemon_id)
+ cc = CustomContainer.init(ctx, fsid, daemon_id)
+ data_dir = get_data_dir(fsid, ctx.args.data_dir, daemon_type, daemon_id)
binds.extend(cc.get_container_binds(data_dir))
return binds
-def get_container_mounts(fsid, daemon_type, daemon_id,
+def get_container_mounts(ctx, fsid, daemon_type, daemon_id,
no_config=False):
- # type: (str, str, Union[int, str, None], Optional[bool]) -> Dict[str, str]
+ # type: (CephadmContext, str, str, Union[int, str, None], Optional[bool]) -> Dict[str, str]
mounts = dict()
if daemon_type in Ceph.daemons:
run_path = os.path.join('/var/run/ceph', fsid);
if os.path.exists(run_path):
mounts[run_path] = '/var/run/ceph:z'
- log_dir = get_log_dir(fsid)
+ log_dir = get_log_dir(fsid, ctx.args.log_dir)
mounts[log_dir] = '/var/log/ceph:z'
crash_dir = '/var/lib/ceph/%s/crash' % fsid
if os.path.exists(crash_dir):
mounts[crash_dir] = '/var/lib/ceph/crash:z'
if daemon_type in Ceph.daemons and daemon_id:
- data_dir = get_data_dir(fsid, daemon_type, daemon_id)
+ data_dir = get_data_dir(fsid, ctx.args.data_dir, daemon_type, daemon_id)
if daemon_type == 'rgw':
cdata_dir = '/var/lib/ceph/radosgw/ceph-rgw.%s' % (daemon_id)
else:
mounts['/run/lock/lvm'] = '/run/lock/lvm'
try:
- if args.shared_ceph_folder: # make easy manager modules/ceph-volume development
- ceph_folder = pathify(args.shared_ceph_folder)
+ if ctx.args.shared_ceph_folder: # make easy manager modules/ceph-volume development
+ ceph_folder = pathify(ctx.args.shared_ceph_folder)
if os.path.exists(ceph_folder):
mounts[ceph_folder + '/src/ceph-volume/ceph_volume'] = '/usr/lib/python3.6/site-packages/ceph_volume'
mounts[ceph_folder + '/src/pybind/mgr'] = '/usr/share/ceph/mgr'
pass
if daemon_type in Monitoring.components and daemon_id:
- data_dir = get_data_dir(fsid, daemon_type, daemon_id)
+ data_dir = get_data_dir(fsid, ctx.args.data_dir, daemon_type, daemon_id)
if daemon_type == 'prometheus':
mounts[os.path.join(data_dir, 'etc/prometheus')] = '/etc/prometheus:Z'
mounts[os.path.join(data_dir, 'data')] = '/prometheus:Z'
if daemon_type == NFSGanesha.daemon_type:
assert daemon_id
- data_dir = get_data_dir(fsid, daemon_type, daemon_id)
- nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
+ data_dir = get_data_dir(fsid, ctx.args.data_dir, daemon_type, daemon_id)
+ nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id)
mounts.update(nfs_ganesha.get_container_mounts(data_dir))
if daemon_type == HAproxy.daemon_type:
assert daemon_id
- data_dir = get_data_dir(fsid, daemon_type, daemon_id)
+ data_dir = get_data_dir(fsid, daemon_type, daemon_type, daemon_id)
mounts.update(HAproxy.get_container_mounts(data_dir))
if daemon_type == CephIscsi.daemon_type:
assert daemon_id
- data_dir = get_data_dir(fsid, daemon_type, daemon_id)
- log_dir = get_log_dir(fsid)
+ data_dir = get_data_dir(fsid, ctx.args.data_dir, daemon_type, daemon_id)
+ log_dir = get_log_dir(fsid, ctx.args.log_dir)
mounts.update(CephIscsi.get_container_mounts(data_dir, log_dir))
if daemon_type == Keepalived.daemon_type:
assert daemon_id
- data_dir = get_data_dir(fsid, daemon_type, daemon_id)
+ data_dir = get_data_dir(fsid, daemon_type, daemon_type, daemon_id)
mounts.update(Keepalived.get_container_mounts(data_dir))
if daemon_type == CustomContainer.daemon_type:
assert daemon_id
- cc = CustomContainer.init(fsid, daemon_id)
- data_dir = get_data_dir(fsid, daemon_type, daemon_id)
+ cc = CustomContainer.init(ctx, fsid, daemon_id)
+ data_dir = get_data_dir(fsid, ctx.args.data_dir, daemon_type, daemon_id)
mounts.update(cc.get_container_mounts(data_dir))
return mounts
-def get_container(fsid: str, daemon_type: str, daemon_id: Union[int, str],
+def get_container(ctx: CephadmContext,
+ fsid: str, daemon_type: str, daemon_id: Union[int, str],
privileged: bool = False,
ptrace: bool = False,
container_args: Optional[List[str]] = None) -> 'CephContainer':
# to configfs we need to make this a privileged container.
privileged = True
elif daemon_type == CustomContainer.daemon_type:
- cc = CustomContainer.init(fsid, daemon_id)
+ cc = CustomContainer.init(ctx, fsid, daemon_id)
entrypoint = cc.entrypoint
host_network = False
envs.extend(cc.get_container_envs())
container_args.extend(cc.get_container_args())
if daemon_type in Monitoring.components:
- uid, gid = extract_uid_gid_monitoring(daemon_type)
+ uid, gid = extract_uid_gid_monitoring(ctx, daemon_type)
monitoring_args = [
'--user',
str(uid),
# if using podman, set -d, --conmon-pidfile & --cidfile flags
# so service can have Type=Forking
- if 'podman' in container_path:
+ if 'podman' in ctx.container_path:
runtime_dir = '/run'
container_args.extend(['-d',
'--conmon-pidfile',
runtime_dir + '/ceph-%s@%s.%s.service-cid' % (fsid, daemon_type, daemon_id)])
return CephContainer(
- image=args.image,
+ ctx,
+ image=ctx.args.image,
entrypoint=entrypoint,
- args=ceph_args + get_daemon_args(fsid, daemon_type, daemon_id),
+ args=ceph_args + get_daemon_args(ctx, fsid, daemon_type, daemon_id),
container_args=container_args,
- volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id),
- bind_mounts=get_container_binds(fsid, daemon_type, daemon_id),
+ volume_mounts=get_container_mounts(ctx, fsid, daemon_type, daemon_id),
+ bind_mounts=get_container_binds(ctx, fsid, daemon_type, daemon_id),
cname='ceph-%s-%s.%s' % (fsid, daemon_type, daemon_id),
envs=envs,
privileged=privileged,
ptrace=ptrace,
- init=args.container_init,
+ init=ctx.args.container_init,
host_network=host_network,
)
-def extract_uid_gid(img='', file_path='/var/lib/ceph'):
- # type: (str, Union[str, List[str]]) -> Tuple[int, int]
+def extract_uid_gid(ctx, img='', file_path='/var/lib/ceph'):
+ # type: (CephadmContext, str, Union[str, List[str]]) -> Tuple[int, int]
if not img:
- img = args.image
+ img = ctx.args.image
if isinstance(file_path, str):
paths = [file_path]
for fp in paths:
try:
out = CephContainer(
+ ctx,
image=img,
entrypoint='stat',
args=['-c', '%u %g', fp]
raise RuntimeError('uid/gid not found')
-def deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid,
+def deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
config=None, keyring=None,
osd_fsid=None,
reconfig=False,
ports=None):
- # type: (str, str, Union[int, str], Optional[CephContainer], int, int, Optional[str], Optional[str], Optional[str], Optional[bool], Optional[List[int]]) -> None
+ # type: (CephadmContext, str, str, Union[int, str], Optional[CephContainer], int, int, Optional[str], Optional[str], Optional[str], Optional[bool], Optional[List[int]]) -> None
ports = ports or []
- if any([port_in_use(port) for port in ports]):
+ if any([port_in_use(ctx, port) for port in ports]):
raise Error("TCP Port(s) '{}' required for {} already in use".format(",".join(map(str, ports)), daemon_type))
- data_dir = get_data_dir(fsid, daemon_type, daemon_id)
+ data_dir = get_data_dir(fsid, ctx.args.data_dir, daemon_type, daemon_id)
if reconfig and not os.path.exists(data_dir):
raise Error('cannot reconfig, data path %s does not exist' % data_dir)
if daemon_type == 'mon' and not os.path.exists(data_dir):
tmp_config = write_tmp(config, uid, gid)
# --mkfs
- create_daemon_dirs(fsid, daemon_type, daemon_id, uid, gid)
- mon_dir = get_data_dir(fsid, 'mon', daemon_id)
- log_dir = get_log_dir(fsid)
+ create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid)
+ mon_dir = get_data_dir(fsid, ctx.args.data_dir, 'mon', daemon_id)
+ log_dir = get_log_dir(fsid, ctx.args.log_dir)
out = CephContainer(
- image=args.image,
+ ctx,
+ image=ctx.args.image,
entrypoint='/usr/bin/ceph-mon',
args=['--mkfs',
'-i', str(daemon_id),
'--fsid', fsid,
'-c', '/tmp/config',
'--keyring', '/tmp/keyring',
- ] + get_daemon_args(fsid, 'mon', daemon_id),
+ ] + get_daemon_args(ctx, fsid, 'mon', daemon_id),
volume_mounts={
log_dir: '/var/log/ceph:z',
mon_dir: '/var/lib/ceph/mon/ceph-%s:z' % (daemon_id),
else:
# dirs, conf, keyring
create_daemon_dirs(
+ ctx,
fsid, daemon_type, daemon_id,
uid, gid,
config, keyring)
if daemon_type == CephadmDaemon.daemon_type:
port = next(iter(ports), None) # get first tcp port provided or None
- if args.config_json == '-':
+ if ctx.args.config_json == '-':
config_js = get_parm('-')
else:
- config_js = get_parm(args.config_json)
+ config_js = get_parm(ctx.args.config_json)
assert isinstance(config_js, dict)
- cephadm_exporter = CephadmDaemon(fsid, daemon_id, port)
+ cephadm_exporter = CephadmDaemon(ctx, fsid, daemon_id, port)
cephadm_exporter.deploy_daemon_unit(config_js)
else:
if c:
- deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
- osd_fsid=osd_fsid)
+ deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id,
+ c, osd_fsid=osd_fsid)
else:
raise RuntimeError("attempting to deploy a daemon without a container image")
os.fchmod(f.fileno(), 0o600)
os.fchown(f.fileno(), uid, gid)
- update_firewalld(daemon_type)
+ update_firewalld(ctx, daemon_type)
# Open ports explicitly required for the daemon
if ports:
- fw = Firewalld()
+ fw = Firewalld(ctx)
fw.open_ports(ports)
fw.apply_rules()
if reconfig and daemon_type not in Ceph.daemons:
# ceph daemons do not need a restart; others (presumably) do to pick
# up the new config
- call_throws(['systemctl', 'reset-failed',
+ call_throws(ctx, ['systemctl', 'reset-failed',
get_unit_name(fsid, daemon_type, daemon_id)])
- call_throws(['systemctl', 'restart',
+ call_throws(ctx, ['systemctl', 'restart',
get_unit_name(fsid, daemon_type, daemon_id)])
-def _write_container_cmd_to_bash(file_obj, container, comment=None, background=False):
- # type: (IO[str], CephContainer, Optional[str], Optional[bool]) -> None
+def _write_container_cmd_to_bash(ctx, file_obj, container, comment=None, background=False):
+ # type: (CephadmContext, IO[str], CephContainer, Optional[str], Optional[bool]) -> None
if comment:
# Sometimes adding a comment, especially if there are multiple containers in one
# unit file, makes it easier to read and grok.
# Sometimes, adding `--rm` to a run_cmd doesn't work. Let's remove the container manually
file_obj.write('! '+ ' '.join(container.rm_cmd()) + '2> /dev/null\n')
# Sometimes, `podman rm` doesn't find the container. Then you'll have to add `--storage`
- if 'podman' in container_path:
+ if 'podman' in ctx.container_path:
file_obj.write('! '+ ' '.join(container.rm_cmd(storage=True)) + '2> /dev/null\n')
# container run command
file_obj.write(' '.join(container.run_cmd()) + (' &' if background else '') + '\n')
-def deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
+def deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id, c,
enable=True, start=True,
osd_fsid=None):
- # type: (str, int, int, str, Union[int, str], CephContainer, bool, bool, Optional[str]) -> None
+ # type: (CephadmContext, str, int, int, str, Union[int, str], CephContainer, bool, bool, Optional[str]) -> None
# cmd
- data_dir = get_data_dir(fsid, daemon_type, daemon_id)
+ data_dir = get_data_dir(fsid, ctx.args.data_dir, daemon_type, daemon_id)
with open(data_dir + '/unit.run.new', 'w') as f:
f.write('set -e\n')
f.write('[ ! -L {p} ] || chown {uid}:{gid} {p}\n'.format(p=p, uid=uid, gid=gid))
else:
prestart = CephContainer(
- image=args.image,
+ ctx,
+ image=ctx.args.image,
entrypoint='/usr/sbin/ceph-volume',
args=[
'lvm', 'activate',
'--no-systemd'
],
privileged=True,
- volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id),
- bind_mounts=get_container_binds(fsid, daemon_type, daemon_id),
+ volume_mounts=get_container_mounts(ctx, fsid, daemon_type, daemon_id),
+ bind_mounts=get_container_binds(ctx, fsid, daemon_type, daemon_id),
cname='ceph-%s-%s.%s-activate' % (fsid, daemon_type, daemon_id),
)
- _write_container_cmd_to_bash(f, prestart, 'LVM OSDs use ceph-volume lvm activate')
+ _write_container_cmd_to_bash(ctx, f, prestart, 'LVM OSDs use ceph-volume lvm activate')
elif daemon_type == NFSGanesha.daemon_type:
# add nfs to the rados grace db
- nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
+ nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id)
prestart = nfs_ganesha.get_rados_grace_container('add')
- _write_container_cmd_to_bash(f, prestart, 'add daemon to rados grace')
+ _write_container_cmd_to_bash(ctx, f, prestart, 'add daemon to rados grace')
elif daemon_type == CephIscsi.daemon_type:
f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=True)) + '\n')
- ceph_iscsi = CephIscsi.init(fsid, daemon_id)
+ ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id)
tcmu_container = ceph_iscsi.get_tcmu_runner_container()
- _write_container_cmd_to_bash(f, tcmu_container, 'iscsi tcmu-runnter container', background=True)
+ _write_container_cmd_to_bash(ctx, f, tcmu_container, 'iscsi tcmu-runnter container', background=True)
- _write_container_cmd_to_bash(f, c, '%s.%s' % (daemon_type, str(daemon_id)))
+ _write_container_cmd_to_bash(ctx, f, c, '%s.%s' % (daemon_type, str(daemon_id)))
os.fchmod(f.fileno(), 0o600)
os.rename(data_dir + '/unit.run.new',
data_dir + '/unit.run')
if daemon_type == 'osd':
assert osd_fsid
poststop = CephContainer(
- image=args.image,
+ ctx,
+ image=ctx.args.image,
entrypoint='/usr/sbin/ceph-volume',
args=[
'lvm', 'deactivate',
str(daemon_id), osd_fsid,
],
privileged=True,
- volume_mounts=get_container_mounts(fsid, daemon_type, daemon_id),
- bind_mounts=get_container_binds(fsid, daemon_type, daemon_id),
+ volume_mounts=get_container_mounts(ctx, fsid, daemon_type, daemon_id),
+ bind_mounts=get_container_binds(ctx, fsid, daemon_type, daemon_id),
cname='ceph-%s-%s.%s-deactivate' % (fsid, daemon_type,
daemon_id),
)
- _write_container_cmd_to_bash(f, poststop, 'deactivate osd')
+ _write_container_cmd_to_bash(ctx, f, poststop, 'deactivate osd')
elif daemon_type == NFSGanesha.daemon_type:
# remove nfs from the rados grace db
- nfs_ganesha = NFSGanesha.init(fsid, daemon_id)
+ nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id)
poststop = nfs_ganesha.get_rados_grace_container('remove')
- _write_container_cmd_to_bash(f, poststop, 'remove daemon from rados grace')
+ _write_container_cmd_to_bash(ctx, f, poststop, 'remove daemon from rados grace')
elif daemon_type == CephIscsi.daemon_type:
# make sure we also stop the tcmu container
- ceph_iscsi = CephIscsi.init(fsid, daemon_id)
+ ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id)
tcmu_container = ceph_iscsi.get_tcmu_runner_container()
f.write('! '+ ' '.join(tcmu_container.stop_cmd()) + '\n')
f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=False)) + '\n')
data_dir + '/unit.image')
# systemd
- install_base_units(fsid)
- unit = get_unit_file(fsid)
+ install_base_units(ctx, fsid)
+ unit = get_unit_file(ctx, fsid)
unit_file = 'ceph-%s@.service' % (fsid)
- with open(args.unit_dir + '/' + unit_file + '.new', 'w') as f:
+ with open(ctx.args.unit_dir + '/' + unit_file + '.new', 'w') as f:
f.write(unit)
- os.rename(args.unit_dir + '/' + unit_file + '.new',
- args.unit_dir + '/' + unit_file)
- call_throws(['systemctl', 'daemon-reload'])
+ os.rename(ctx.args.unit_dir + '/' + unit_file + '.new',
+ ctx.args.unit_dir + '/' + unit_file)
+ call_throws(ctx, ['systemctl', 'daemon-reload'])
unit_name = get_unit_name(fsid, daemon_type, daemon_id)
- call(['systemctl', 'stop', unit_name],
+ call(ctx, ['systemctl', 'stop', unit_name],
verbosity=CallVerbosity.DEBUG)
- call(['systemctl', 'reset-failed', unit_name],
+ call(ctx, ['systemctl', 'reset-failed', unit_name],
verbosity=CallVerbosity.DEBUG)
if enable:
- call_throws(['systemctl', 'enable', unit_name])
+ call_throws(ctx, ['systemctl', 'enable', unit_name])
if start:
- call_throws(['systemctl', 'start', unit_name])
+ call_throws(ctx, ['systemctl', 'start', unit_name])
class Firewalld(object):
- def __init__(self):
- # type: () -> None
+ def __init__(self, ctx):
+ # type: (CephadmContext) -> None
+ self.ctx = ctx
self.available = self.check()
def check(self):
if not self.cmd:
logger.debug('firewalld does not appear to be present')
return False
- (enabled, state, _) = check_unit('firewalld.service')
+ (enabled, state, _) = check_unit(self.ctx, 'firewalld.service')
if not enabled:
logger.debug('firewalld.service is not enabled')
return False
if not self.cmd:
raise RuntimeError("command not defined")
- out, err, ret = call([self.cmd, '--permanent', '--query-service', svc], verbosity=CallVerbosity.DEBUG)
+ out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--query-service', svc], verbosity=CallVerbosity.DEBUG)
if ret:
logger.info('Enabling firewalld service %s in current zone...' % svc)
- out, err, ret = call([self.cmd, '--permanent', '--add-service', svc])
+ out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--add-service', svc])
if ret:
raise RuntimeError(
'unable to add service %s to current zone: %s' % (svc, err))
for port in fw_ports:
tcp_port = str(port) + '/tcp'
- out, err, ret = call([self.cmd, '--permanent', '--query-port', tcp_port], verbosity=CallVerbosity.DEBUG)
+ out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--query-port', tcp_port], verbosity=CallVerbosity.DEBUG)
if ret:
logger.info('Enabling firewalld port %s in current zone...' % tcp_port)
- out, err, ret = call([self.cmd, '--permanent', '--add-port', tcp_port])
+ out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--add-port', tcp_port])
if ret:
raise RuntimeError('unable to add port %s to current zone: %s' %
(tcp_port, err))
for port in fw_ports:
tcp_port = str(port) + '/tcp'
- out, err, ret = call([self.cmd, '--permanent', '--query-port', tcp_port], verbosity=CallVerbosity.DEBUG)
+ out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--query-port', tcp_port], verbosity=CallVerbosity.DEBUG)
if not ret:
logger.info('Disabling port %s in current zone...' % tcp_port)
- out, err, ret = call([self.cmd, '--permanent', '--remove-port', tcp_port])
+ out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--remove-port', tcp_port])
if ret:
raise RuntimeError('unable to remove port %s from current zone: %s' %
(tcp_port, err))
if not self.cmd:
raise RuntimeError("command not defined")
- call_throws([self.cmd, '--reload'])
+ call_throws(self.ctx, [self.cmd, '--reload'])
-def update_firewalld(daemon_type):
- # type: (str) -> None
- firewall = Firewalld()
+def update_firewalld(ctx, daemon_type):
+ # type: (CephadmContext, str) -> None
+ firewall = Firewalld(ctx)
firewall.enable_service_for(daemon_type)
firewall.open_ports(fw_ports)
firewall.apply_rules()
-def install_base_units(fsid):
- # type: (str) -> None
+def install_base_units(ctx, fsid):
+ # type: (CephadmContext, str) -> None
"""
Set up ceph.target and ceph-$fsid.target units.
"""
# global unit
- existed = os.path.exists(args.unit_dir + '/ceph.target')
- with open(args.unit_dir + '/ceph.target.new', 'w') as f:
+ existed = os.path.exists(ctx.args.unit_dir + '/ceph.target')
+ with open(ctx.args.unit_dir + '/ceph.target.new', 'w') as f:
f.write('[Unit]\n'
'Description=All Ceph clusters and services\n'
'\n'
'[Install]\n'
'WantedBy=multi-user.target\n')
- os.rename(args.unit_dir + '/ceph.target.new',
- args.unit_dir + '/ceph.target')
+ os.rename(ctx.args.unit_dir + '/ceph.target.new',
+ ctx.args.unit_dir + '/ceph.target')
if not existed:
# we disable before enable in case a different ceph.target
# (from the traditional package) is present; while newer
# systemd is smart enough to disable the old
# (/lib/systemd/...) and enable the new (/etc/systemd/...),
# some older versions of systemd error out with EEXIST.
- call_throws(['systemctl', 'disable', 'ceph.target'])
- call_throws(['systemctl', 'enable', 'ceph.target'])
- call_throws(['systemctl', 'start', 'ceph.target'])
+ call_throws(ctx, ['systemctl', 'disable', 'ceph.target'])
+ call_throws(ctx, ['systemctl', 'enable', 'ceph.target'])
+ call_throws(ctx, ['systemctl', 'start', 'ceph.target'])
# cluster unit
- existed = os.path.exists(args.unit_dir + '/ceph-%s.target' % fsid)
- with open(args.unit_dir + '/ceph-%s.target.new' % fsid, 'w') as f:
+ existed = os.path.exists(ctx.args.unit_dir + '/ceph-%s.target' % fsid)
+ with open(ctx.args.unit_dir + '/ceph-%s.target.new' % fsid, 'w') as f:
f.write('[Unit]\n'
'Description=Ceph cluster {fsid}\n'
'PartOf=ceph.target\n'
'WantedBy=multi-user.target ceph.target\n'.format(
fsid=fsid)
)
- os.rename(args.unit_dir + '/ceph-%s.target.new' % fsid,
- args.unit_dir + '/ceph-%s.target' % fsid)
+ os.rename(ctx.args.unit_dir + '/ceph-%s.target.new' % fsid,
+ ctx.args.unit_dir + '/ceph-%s.target' % fsid)
if not existed:
- call_throws(['systemctl', 'enable', 'ceph-%s.target' % fsid])
- call_throws(['systemctl', 'start', 'ceph-%s.target' % fsid])
+ call_throws(ctx, ['systemctl', 'enable', 'ceph-%s.target' % fsid])
+ call_throws(ctx, ['systemctl', 'start', 'ceph-%s.target' % fsid])
# logrotate for the cluster
- with open(args.logrotate_dir + '/ceph-%s' % fsid, 'w') as f:
+ with open(ctx.args.logrotate_dir + '/ceph-%s' % fsid, 'w') as f:
"""
This is a bit sloppy in that the killall/pkill will touch all ceph daemons
in all containers, but I don't see an elegant way to send SIGHUP *just* to
""" % fsid)
-def get_unit_file(fsid):
- # type: (str) -> str
+def get_unit_file(ctx, fsid):
+ # type: (CephadmContext, str) -> str
extra_args = ''
- if 'podman' in container_path:
+ if 'podman' in ctx.container_path:
extra_args = ('ExecStartPre=-/bin/rm -f /%t/%n-pid /%t/%n-cid\n'
'ExecStopPost=-/bin/rm -f /%t/%n-pid /%t/%n-cid\n'
'Type=forking\n'
[Install]
WantedBy=ceph-{fsid}.target
""".format(
- container_path=container_path,
+ container_path=ctx.container_path,
fsid=fsid,
- data_dir=args.data_dir,
+ data_dir=ctx.args.data_dir,
extra_args=extra_args)
return u
class CephContainer:
def __init__(self,
+ ctx: CephadmContext,
image: str,
entrypoint: str,
args: List[str] = [],
init: bool = False,
host_network: bool = True,
) -> None:
+ self.ctx = ctx
self.image = image
self.entrypoint = entrypoint
self.args = args
def run_cmd(self) -> List[str]:
cmd_args: List[str] = [
- str(container_path),
+ str(self.ctx.container_path),
'run',
'--rm',
'--ipc=host',
]
- if 'podman' in container_path and os.path.exists('/etc/ceph/podman-auth.json'):
+ if 'podman' in self.ctx.container_path and \
+ os.path.exists('/etc/ceph/podman-auth.json'):
cmd_args.append('--authfile=/etc/ceph/podman-auth.json')
envs: List[str] = [
def shell_cmd(self, cmd: List[str]) -> List[str]:
cmd_args: List[str] = [
- str(container_path),
+ str(self.ctx.container_path),
'run',
'--rm',
'--ipc=host',
def exec_cmd(self, cmd):
# type: (List[str]) -> List[str]
return [
- str(container_path),
+ str(self.ctx.container_path),
'exec',
] + self.container_args + [
self.cname,
def rm_cmd(self, storage=False):
# type: (bool) -> List[str]
ret = [
- str(container_path),
+ str(self.ctx.container_path),
'rm', '-f',
]
if storage:
def stop_cmd(self):
# type () -> List[str]
ret = [
- str(container_path),
+ str(self.ctx.container_path),
'stop', self.cname,
]
return ret
def run(self, timeout=DEFAULT_TIMEOUT):
# type: (Optional[int]) -> str
out, _, _ = call_throws(
+ self.ctx,
self.run_cmd(), desc=self.entrypoint, timeout=timeout)
return out
@infer_image
-def command_version():
- # type: () -> int
- out = CephContainer(args.image, 'ceph', ['--version']).run()
+def command_version(ctx):
+ # type: (CephadmContext) -> int
+ out = CephContainer(ctx, ctx.args.image, 'ceph', ['--version']).run()
print(out.strip())
return 0
@infer_image
-def command_pull():
- # type: () -> int
+def command_pull(ctx):
+ # type: (CephadmContext) -> int
- _pull_image(args.image)
- return command_inspect_image()
+ _pull_image(ctx, ctx.args.image)
+ return command_inspect_image(ctx)
-def _pull_image(image):
- # type: (str) -> None
+def _pull_image(ctx, image):
+ # type: (CephadmContext, str) -> None
logger.info('Pulling container image %s...' % image)
ignorelist = [
"Digest did not match, expected",
]
- cmd = [container_path, 'pull', image]
- if 'podman' in container_path and os.path.exists('/etc/ceph/podman-auth.json'):
+ cmd = [ctx.container_path, 'pull', image]
+ if 'podman' in ctx.container_path and os.path.exists('/etc/ceph/podman-auth.json'):
cmd.append('--authfile=/etc/ceph/podman-auth.json')
cmd_str = ' '.join(cmd)
for sleep_secs in [1, 4, 25]:
- out, err, ret = call(cmd)
+ out, err, ret = call(ctx, cmd)
if not ret:
return
@infer_image
-def command_inspect_image():
- # type: () -> int
- out, err, ret = call_throws([
- container_path, 'inspect',
+def command_inspect_image(ctx):
+ # type: (CephadmContext) -> int
+ out, err, ret = call_throws(ctx, [
+ ctx.container_path, 'inspect',
'--format', '{{.ID}},{{json .RepoDigests}}',
- args.image])
+ ctx.args.image])
if ret:
return errno.ENOENT
- info_from = get_image_info_from_inspect(out.strip(), args.image)
+ info_from = get_image_info_from_inspect(out.strip(), ctx.args.image)
- ver = CephContainer(args.image, 'ceph', ['--version']).run().strip()
+ ver = CephContainer(ctx, ctx.args.image, 'ceph', ['--version']).run().strip()
info_from['ceph_version'] = ver
print(json.dumps(info_from, indent=4, sort_keys=True))
return address
-def is_ipv6(address):
- # type: (str) -> bool
+def is_ipv6(ctx, address):
+ # type: (CephadmContext, str) -> bool
address = unwrap_ipv6(address)
try:
return ipaddress.ip_address(unicode(address)).version == 6
@default_image
-def command_bootstrap():
- # type: () -> int
+def command_bootstrap(ctx):
+ # type: (CephadmContext) -> int
+
+ args = ctx.args
- if not args.output_config:
- args.output_config = os.path.join(args.output_dir, 'ceph.conf')
- if not args.output_keyring:
- args.output_keyring = os.path.join(args.output_dir,
+ if not ctx.args.output_config:
+ ctx.args.output_config = os.path.join(ctx.args.output_dir, 'ceph.conf')
+ if not ctx.args.output_keyring:
+ ctx.args.output_keyring = os.path.join(ctx.args.output_dir,
'ceph.client.admin.keyring')
- if not args.output_pub_ssh_key:
- args.output_pub_ssh_key = os.path.join(args.output_dir, 'ceph.pub')
+ if not ctx.args.output_pub_ssh_key:
+ ctx.args.output_pub_ssh_key = os.path.join(ctx.args.output_dir, 'ceph.pub')
# verify output files
- for f in [args.output_config, args.output_keyring, args.output_pub_ssh_key]:
- if not args.allow_overwrite:
+ for f in [ctx.args.output_config, ctx.args.output_keyring,
+ ctx.args.output_pub_ssh_key]:
+ if not ctx.args.allow_overwrite:
if os.path.exists(f):
raise Error('%s already exists; delete or pass '
'--allow-overwrite to overwrite' % f)
raise Error(f"Unable to create {dirname} due to permissions failure. Retry with root, or sudo or preallocate the directory.")
- if not args.skip_prepare_host:
- command_prepare_host()
+ if not ctx.args.skip_prepare_host:
+ command_prepare_host(ctx)
else:
logger.info('Skip prepare_host')
# initial vars
- fsid = args.fsid or make_fsid()
+ fsid = ctx.args.fsid or make_fsid()
hostname = get_hostname()
- if '.' in hostname and not args.allow_fqdn_hostname:
+ if '.' in hostname and not ctx.args.allow_fqdn_hostname:
raise Error('hostname is a fully qualified domain name (%s); either fix (e.g., "sudo hostname %s" or similar) or pass --allow-fqdn-hostname' % (hostname, hostname.split('.')[0]))
- mon_id = args.mon_id or hostname
- mgr_id = args.mgr_id or generate_service_id()
+ mon_id = ctx.args.mon_id or hostname
+ mgr_id = ctx.args.mgr_id or generate_service_id()
logger.info('Cluster fsid: %s' % fsid)
ipv6 = False
- l = FileLock(fsid)
+ l = FileLock(ctx, fsid)
l.acquire()
# ip
r = re.compile(r':(\d+)$')
base_ip = ''
- if args.mon_ip:
- ipv6 = is_ipv6(args.mon_ip)
+ if ctx.args.mon_ip:
+ ipv6 = is_ipv6(ctx, ctx.args.mon_ip)
if ipv6:
- args.mon_ip = wrap_ipv6(args.mon_ip)
- hasport = r.findall(args.mon_ip)
+ ctx.args.mon_ip = wrap_ipv6(ctx.args.mon_ip)
+ hasport = r.findall(ctx.args.mon_ip)
if hasport:
port = int(hasport[0])
if port == 6789:
- addr_arg = '[v1:%s]' % args.mon_ip
+ addr_arg = '[v1:%s]' % ctx.args.mon_ip
elif port == 3300:
- addr_arg = '[v2:%s]' % args.mon_ip
+ addr_arg = '[v2:%s]' % ctx.args.mon_ip
else:
logger.warning('Using msgr2 protocol for unrecognized port %d' %
port)
- addr_arg = '[v2:%s]' % args.mon_ip
- base_ip = args.mon_ip[0:-(len(str(port)))-1]
- check_ip_port(base_ip, port)
+ addr_arg = '[v2:%s]' % ctx.args.mon_ip
+ base_ip = ctx.args.mon_ip[0:-(len(str(port)))-1]
+ check_ip_port(ctx, base_ip, port)
else:
- base_ip = args.mon_ip
- addr_arg = '[v2:%s:3300,v1:%s:6789]' % (args.mon_ip, args.mon_ip)
- check_ip_port(args.mon_ip, 3300)
- check_ip_port(args.mon_ip, 6789)
- elif args.mon_addrv:
- addr_arg = args.mon_addrv
+ base_ip = ctx.args.mon_ip
+ addr_arg = '[v2:%s:3300,v1:%s:6789]' % (ctx.args.mon_ip, ctx.args.mon_ip)
+ check_ip_port(ctx, args.mon_ip, 3300)
+ check_ip_port(ctx, args.mon_ip, 6789)
+ elif ctx.args.mon_addrv:
+ addr_arg = ctx.args.mon_addrv
if addr_arg[0] != '[' or addr_arg[-1] != ']':
raise Error('--mon-addrv value %s must use square backets' %
addr_arg)
# strip off v1: or v2: prefix
addr = re.sub(r'^\w+:', '', addr)
base_ip = addr[0:-(len(str(port)))-1]
- check_ip_port(base_ip, port)
+ check_ip_port(ctx, base_ip, port)
else:
raise Error('must specify --mon-ip or --mon-addrv')
logger.debug('Base mon IP is %s, final addrv is %s' % (base_ip, addr_arg))
mon_network = None
- if not args.skip_mon_network:
+ if not ctx.args.skip_mon_network:
# make sure IP is configured locally, and then figure out the
# CIDR network
- for net, ips in list_networks().items():
+ for net, ips in list_networks(ctx).items():
if ipaddress.ip_address(unicode(unwrap_ipv6(base_ip))) in \
[ipaddress.ip_address(unicode(ip)) for ip in ips]:
mon_network = net
'--skip-mon-network to configure it later' % base_ip)
# config
- cp = read_config(args.config)
+ cp = read_config(ctx.args.config)
if not cp.has_section('global'):
cp.add_section('global')
cp.set('global', 'fsid', fsid);
cp.set('global', 'mon host', addr_arg)
- cp.set('global', 'container_image', args.image)
+ cp.set('global', 'container_image', ctx.args.image)
cpf = StringIO()
cp.write(cpf)
config = cpf.getvalue()
- if args.registry_json or args.registry_url:
- command_registry_login()
+ if ctx.args.registry_json or ctx.args.registry_url:
+ command_registry_login(ctx)
- if not args.skip_pull:
- _pull_image(args.image)
+ if not ctx.args.skip_pull:
+ _pull_image(ctx, ctx.args.image)
logger.info('Extracting ceph user uid/gid from container image...')
- (uid, gid) = extract_uid_gid()
+ (uid, gid) = extract_uid_gid(ctx)
# create some initial keys
logger.info('Creating initial keys...')
mon_key = CephContainer(
- image=args.image,
+ ctx,
+ image=ctx.args.image,
entrypoint='/usr/bin/ceph-authtool',
args=['--gen-print-key'],
).run().strip()
admin_key = CephContainer(
- image=args.image,
+ ctx,
+ image=ctx.args.image,
entrypoint='/usr/bin/ceph-authtool',
args=['--gen-print-key'],
).run().strip()
mgr_key = CephContainer(
- image=args.image,
+ ctx,
+ image=ctx.args.image,
entrypoint='/usr/bin/ceph-authtool',
args=['--gen-print-key'],
).run().strip()
logger.info('Creating initial monmap...')
tmp_monmap = write_tmp('', 0, 0)
out = CephContainer(
- image=args.image,
+ ctx,
+ image=ctx.args.image,
entrypoint='/usr/bin/monmaptool',
args=['--create',
'--clobber',
# create mon
logger.info('Creating mon...')
- create_daemon_dirs(fsid, 'mon', mon_id, uid, gid)
- mon_dir = get_data_dir(fsid, 'mon', mon_id)
- log_dir = get_log_dir(fsid)
+ create_daemon_dirs(ctx, fsid, 'mon', mon_id, uid, gid)
+ mon_dir = get_data_dir(fsid, ctx.args.data_dir, 'mon', mon_id)
+ log_dir = get_log_dir(fsid, ctx.args.log_dir)
out = CephContainer(
- image=args.image,
+ ctx,
+ image=ctx.args.image,
entrypoint='/usr/bin/ceph-mon',
args=['--mkfs',
'-i', mon_id,
'-c', '/dev/null',
'--monmap', '/tmp/monmap',
'--keyring', '/tmp/keyring',
- ] + get_daemon_args(fsid, 'mon', mon_id),
+ ] + get_daemon_args(ctx, fsid, 'mon', mon_id),
volume_mounts={
log_dir: '/var/log/ceph:z',
mon_dir: '/var/lib/ceph/mon/ceph-%s:z' % (mon_id),
os.fchmod(f.fileno(), 0o600)
f.write(config)
- make_var_run(fsid, uid, gid)
- mon_c = get_container(fsid, 'mon', mon_id)
- deploy_daemon(fsid, 'mon', mon_id, mon_c, uid, gid,
+ make_var_run(ctx, fsid, uid, gid)
+ mon_c = get_container(ctx, fsid, 'mon', mon_id)
+ deploy_daemon(ctx, fsid, 'mon', mon_id, mon_c, uid, gid,
config=None, keyring=None)
# client.admin key + config to issue various CLI commands
mounts[k] = v
timeout = timeout or args.timeout
return CephContainer(
- image=args.image,
+ ctx,
+ image=ctx.args.image,
entrypoint='/usr/bin/ceph',
args=cmd,
volume_mounts=mounts,
logger.info('Waiting for mon to start...')
c = CephContainer(
- image=args.image,
+ ctx,
+ image=ctx.args.image,
entrypoint='/usr/bin/ceph',
args=[
'status'],
# wait for the service to become available
def is_mon_available():
# type: () -> bool
- timeout=args.timeout if args.timeout else 60 # seconds
- out, err, ret = call(c.run_cmd(),
+ timeout=ctx.args.timeout if ctx.args.timeout else 60 # seconds
+ out, err, ret = call(ctx, c.run_cmd(),
desc=c.entrypoint,
timeout=timeout)
return ret == 0
- is_available('mon', is_mon_available)
+ is_available(ctx, 'mon', is_mon_available)
# assimilate and minimize config
- if not args.no_minimize_config:
+ if not ctx.args.no_minimize_config:
logger.info('Assimilating anything we can from ceph.conf...')
cli([
'config', 'assimilate-conf',
with open(mon_dir + '/config', 'r') as f:
config = f.read()
logger.info('Restarting the monitor...')
- call_throws([
+ call_throws(ctx, [
'systemctl',
'restart',
get_unit_name(fsid, 'mon', mon_id)
# create mgr
logger.info('Creating mgr...')
mgr_keyring = '[mgr.%s]\n\tkey = %s\n' % (mgr_id, mgr_key)
- mgr_c = get_container(fsid, 'mgr', mgr_id)
+ mgr_c = get_container(ctx, fsid, 'mgr', mgr_id)
# Note:the default port used by the Prometheus node exporter is opened in fw
- deploy_daemon(fsid, 'mgr', mgr_id, mgr_c, uid, gid,
+ deploy_daemon(ctx, fsid, 'mgr', mgr_id, mgr_c, uid, gid,
config=config, keyring=mgr_keyring, ports=[9283])
# output files
- with open(args.output_keyring, 'w') as f:
+ with open(ctx.args.output_keyring, 'w') as f:
os.fchmod(f.fileno(), 0o600)
f.write('[client.admin]\n'
'\tkey = ' + admin_key + '\n')
- logger.info('Wrote keyring to %s' % args.output_keyring)
+ logger.info('Wrote keyring to %s' % ctx.args.output_keyring)
- with open(args.output_config, 'w') as f:
+ with open(ctx.args.output_config, 'w') as f:
f.write(config)
- logger.info('Wrote config to %s' % args.output_config)
+ logger.info('Wrote config to %s' % ctx.args.output_config)
# wait for the service to become available
logger.info('Waiting for mgr to start...')
def is_mgr_available():
# type: () -> bool
- timeout=args.timeout if args.timeout else 60 # seconds
+ timeout=ctx.args.timeout if ctx.args.timeout else 60 # seconds
try:
out = cli(['status', '-f', 'json-pretty'], timeout=timeout)
j = json.loads(out)
except Exception as e:
logger.debug('status failed: %s' % e)
return False
- is_available('mgr', is_mgr_available)
+ is_available(ctx, 'mgr', is_mgr_available)
# wait for mgr to restart (after enabling a module)
def wait_for_mgr_restart():
except Exception as e:
logger.debug('tell mgr mgr_status failed: %s' % e)
return False
- is_available('mgr epoch %d' % epoch, mgr_has_latest_epoch)
+ is_available(ctx, 'mgr epoch %d' % epoch, mgr_has_latest_epoch)
# ssh
host: Optional[str] = None
- if not args.skip_ssh:
- cli(['config-key', 'set', 'mgr/cephadm/ssh_user', args.ssh_user])
+ if not ctx.args.skip_ssh:
+ cli(['config-key', 'set', 'mgr/cephadm/ssh_user', ctx.args.ssh_user])
logger.info('Enabling cephadm module...')
cli(['mgr', 'module', 'enable', 'cephadm'])
logger.info('Setting orchestrator backend to cephadm...')
cli(['orch', 'set', 'backend', 'cephadm'])
- if args.ssh_config:
+ if ctx.args.ssh_config:
logger.info('Using provided ssh config...')
mounts = {
- pathify(args.ssh_config.name): '/tmp/cephadm-ssh-config:z',
+ pathify(ctx.args.ssh_config.name): '/tmp/cephadm-ssh-config:z',
}
cli(['cephadm', 'set-ssh-config', '-i', '/tmp/cephadm-ssh-config'], extra_mounts=mounts)
- if args.ssh_private_key and args.ssh_public_key:
+ if ctx.args.ssh_private_key and ctx.args.ssh_public_key:
logger.info('Using provided ssh keys...')
mounts = {
- pathify(args.ssh_private_key.name): '/tmp/cephadm-ssh-key:z',
- pathify(args.ssh_public_key.name): '/tmp/cephadm-ssh-key.pub:z'
+ pathify(ctx.args.ssh_private_key.name): '/tmp/cephadm-ssh-key:z',
+ pathify(ctx.args.ssh_public_key.name): '/tmp/cephadm-ssh-key.pub:z'
}
cli(['cephadm', 'set-priv-key', '-i', '/tmp/cephadm-ssh-key'], extra_mounts=mounts)
cli(['cephadm', 'set-pub-key', '-i', '/tmp/cephadm-ssh-key.pub'], extra_mounts=mounts)
cli(['cephadm', 'generate-key'])
ssh_pub = cli(['cephadm', 'get-pub-key'])
- with open(args.output_pub_ssh_key, 'w') as f:
+ with open(ctx.args.output_pub_ssh_key, 'w') as f:
f.write(ssh_pub)
- logger.info('Wrote public SSH key to to %s' % args.output_pub_ssh_key)
+ logger.info('Wrote public SSH key to to %s' % ctx.args.output_pub_ssh_key)
- logger.info('Adding key to %s@localhost\'s authorized_keys...' % args.ssh_user)
+ logger.info('Adding key to %s@localhost\'s authorized_keys...' % ctx.args.ssh_user)
try:
- s_pwd = pwd.getpwnam(args.ssh_user)
+ s_pwd = pwd.getpwnam(ctx.args.ssh_user)
except KeyError as e:
- raise Error('Cannot find uid/gid for ssh-user: %s' % (args.ssh_user))
+ raise Error('Cannot find uid/gid for ssh-user: %s' % (ctx.args.ssh_user))
ssh_uid = s_pwd.pw_uid
ssh_gid = s_pwd.pw_gid
ssh_dir = os.path.join(s_pwd.pw_dir, '.ssh')
except RuntimeError as e:
raise Error('Failed to add host <%s>: %s' % (host, e))
- if not args.orphan_initial_daemons:
+ if not ctx.args.orphan_initial_daemons:
for t in ['mon', 'mgr', 'crash']:
logger.info('Deploying %s service with default placement...' % t)
cli(['orch', 'apply', t])
- if not args.skip_monitoring_stack:
+ if not ctx.args.skip_monitoring_stack:
logger.info('Enabling mgr prometheus module...')
cli(['mgr', 'module', 'enable', 'prometheus'])
for t in ['prometheus', 'grafana', 'node-exporter', 'alertmanager']:
logger.info('Deploying %s service with default placement...' % t)
cli(['orch', 'apply', t])
- if args.registry_url and args.registry_username and args.registry_password:
- cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_url', args.registry_url, '--force'])
- cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_username', args.registry_username, '--force'])
- cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_password', args.registry_password, '--force'])
+ if ctx.args.registry_url and ctx.args.registry_username and ctx.args.registry_password:
+ cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_url', ctx.args.registry_url, '--force'])
+ cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_username', ctx.args.registry_username, '--force'])
+ cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_password', ctx.args.registry_password, '--force'])
- if args.container_init:
- cli(['config', 'set', 'mgr', 'mgr/cephadm/container_init', str(args.container_init), '--force'])
+ if ctx.args.container_init:
+ cli(['config', 'set', 'mgr', 'mgr/cephadm/container_init', str(ctx.args.container_init), '--force'])
- if args.with_exporter:
+ if ctx.args.with_exporter:
cli(['config-key', 'set', 'mgr/cephadm/exporter_enabled', 'true'])
- if args.exporter_config:
+ if ctx.args.exporter_config:
logger.info("Applying custom cephadm exporter settings")
# validated within the parser, so we can just apply to the store
with tempfile.NamedTemporaryFile(buffering=0) as tmp:
cli(['orch', 'apply', 'cephadm-exporter'])
- if not args.skip_dashboard:
+ if not ctx.args.skip_dashboard:
# Configure SSL port (cephadm only allows to configure dashboard SSL port)
# if the user does not want to use SSL he can change this setting once the cluster is up
- cli(["config", "set", "mgr", "mgr/dashboard/ssl_server_port" , str(args.ssl_dashboard_port)])
+ cli(["config", "set", "mgr", "mgr/dashboard/ssl_server_port" , str(ctx.args.ssl_dashboard_port)])
# configuring dashboard parameters
logger.info('Enabling the dashboard module...')
wait_for_mgr_restart()
# dashboard crt and key
- if args.dashboard_key and args.dashboard_crt:
+ if ctx.args.dashboard_key and ctx.args.dashboard_crt:
logger.info('Using provided dashboard certificate...')
mounts = {
- pathify(args.dashboard_crt.name): '/tmp/dashboard.crt:z',
- pathify(args.dashboard_key.name): '/tmp/dashboard.key:z'
+ pathify(ctx.args.dashboard_crt.name): '/tmp/dashboard.crt:z',
+ pathify(ctx.args.dashboard_key.name): '/tmp/dashboard.key:z'
}
cli(['dashboard', 'set-ssl-certificate', '-i', '/tmp/dashboard.crt'], extra_mounts=mounts)
cli(['dashboard', 'set-ssl-certificate-key', '-i', '/tmp/dashboard.key'], extra_mounts=mounts)
logger.info('Creating initial admin user...')
password = args.initial_dashboard_password or generate_password()
tmp_password_file = write_tmp(password, uid, gid)
- cmd = ['dashboard', 'ac-user-create', args.initial_dashboard_user, '-i', '/tmp/dashboard.pw', 'administrator', '--force-password']
- if not args.dashboard_password_noupdate:
+ cmd = ['dashboard', 'ac-user-create', ctx.args.initial_dashboard_user, '-i', '/tmp/dashboard.pw', 'administrator', '--force-password']
+ if not ctx.args.dashboard_password_noupdate:
cmd.append('--pwd-update-required')
cli(cmd, extra_mounts={pathify(tmp_password_file.name): '/tmp/dashboard.pw:z'})
logger.info('Fetching dashboard port number...')
port = int(out)
# Open dashboard port
- fw = Firewalld()
+ fw = Firewalld(ctx)
fw.open_ports([port])
fw.apply_rules()
'\t User: %s\n'
'\tPassword: %s\n' % (
get_fqdn(), port,
- args.initial_dashboard_user,
+ ctx.args.initial_dashboard_user,
password))
- if args.apply_spec:
- logger.info('Applying %s to cluster' % args.apply_spec)
+ if ctx.args.apply_spec:
+ logger.info('Applying %s to cluster' % ctx.args.apply_spec)
- with open(args.apply_spec) as f:
+ with open(ctx.args.apply_spec) as f:
for line in f:
if 'hostname:' in line:
line = line.replace('\n', '')
logger.info('Adding ssh key to %s' % split[1])
ssh_key = '/etc/ceph/ceph.pub'
- if args.ssh_public_key:
- ssh_key = args.ssh_public_key.name
- out, err, code = call_throws(['ssh-copy-id', '-f', '-i', ssh_key, '%s@%s' % (args.ssh_user, split[1])])
+ if ctx.args.ssh_public_key:
+ ssh_key = ctx.args.ssh_public_key.name
+ out, err, code = call_throws(ctx, ['ssh-copy-id', '-f', '-i', ssh_key, '%s@%s' % (args.ssh_user, split[1])])
mounts = {}
- mounts[pathify(args.apply_spec)] = '/tmp/spec.yml:z'
+ mounts[pathify(ctx.args.apply_spec)] = '/tmp/spec.yml:z'
out = cli(['orch', 'apply', '-i', '/tmp/spec.yml'], extra_mounts=mounts)
logger.info(out)
##################################
-def command_registry_login():
+def command_registry_login(ctx: CephadmContext):
+ args = ctx.args
if args.registry_json:
logger.info("Pulling custom registry login info from %s." % args.registry_json)
d = get_parm(args.registry_json)
args.registry_url = d.get('url')
args.registry_username = d.get('username')
args.registry_password = d.get('password')
- registry_login(args.registry_url, args.registry_username, args.registry_password)
+ registry_login(ctx, args.registry_url, args.registry_username, args.registry_password)
else:
raise Error("json provided for custom registry login did not include all necessary fields. "
"Please setup json file as\n"
" \"password\": \"REGISTRY_PASSWORD\"\n"
"}\n")
elif args.registry_url and args.registry_username and args.registry_password:
- registry_login(args.registry_url, args.registry_username, args.registry_password)
+ registry_login(ctx, args.registry_url, args.registry_username, args.registry_password)
else:
raise Error("Invalid custom registry arguments received. To login to a custom registry include "
"--registry-url, --registry-username and --registry-password "
"options or --registry-json option")
return 0
-def registry_login(url, username, password):
+def registry_login(ctx: CephadmContext, url, username, password):
logger.info("Logging into custom registry.")
try:
+ container_path = ctx.container_path
cmd = [container_path, 'login',
'-u', username, '-p', password,
url]
if 'podman' in container_path:
cmd.append('--authfile=/etc/ceph/podman-auth.json')
- out, _, _ = call_throws(cmd)
+ out, _, _ = call_throws(ctx, cmd)
if 'podman' in container_path:
os.chmod('/etc/ceph/podman-auth.json', 0o600)
except:
- raise Error("Failed to login to custom registry @ %s as %s with given password" % (args.registry_url, args.registry_username))
+ raise Error("Failed to login to custom registry @ %s as %s with given password" % (ctx.args.registry_url, ctx.args.registry_username))
##################################
-def extract_uid_gid_monitoring(daemon_type):
- # type: (str) -> Tuple[int, int]
+def extract_uid_gid_monitoring(ctx, daemon_type):
+ # type: (CephadmContext, str) -> Tuple[int, int]
if daemon_type == 'prometheus':
- uid, gid = extract_uid_gid(file_path='/etc/prometheus')
+ uid, gid = extract_uid_gid(ctx, file_path='/etc/prometheus')
elif daemon_type == 'node-exporter':
uid, gid = 65534, 65534
elif daemon_type == 'grafana':
- uid, gid = extract_uid_gid(file_path='/var/lib/grafana')
+ uid, gid = extract_uid_gid(ctx, file_path='/var/lib/grafana')
elif daemon_type == 'alertmanager':
- uid, gid = extract_uid_gid(file_path=['/etc/alertmanager', '/etc/prometheus'])
+ uid, gid = extract_uid_gid(ctx, file_path=['/etc/alertmanager', '/etc/prometheus'])
else:
raise Error("{} not implemented yet".format(daemon_type))
return uid, gid
@default_image
-def command_deploy():
- # type: () -> None
+def command_deploy(ctx):
+ # type: (CephadmContext) -> None
+ args = ctx.args
daemon_type, daemon_id = args.name.split('.', 1)
- l = FileLock(args.fsid)
+ l = FileLock(ctx, args.fsid)
l.acquire()
if daemon_type not in get_supported_daemons():
redeploy = False
unit_name = get_unit_name(args.fsid, daemon_type, daemon_id)
- (_, state, _) = check_unit(unit_name)
+ (_, state, _) = check_unit(ctx, unit_name)
if state == 'running':
redeploy = True
daemon_ports = list(map(int, args.tcp_ports.split()))
if daemon_type in Ceph.daemons:
- config, keyring = get_config_and_keyring()
- uid, gid = extract_uid_gid()
- make_var_run(args.fsid, uid, gid)
+ config, keyring = get_config_and_keyring(ctx)
+ uid, gid = extract_uid_gid(ctx)
+ make_var_run(ctx, args.fsid, uid, gid)
- c = get_container(args.fsid, daemon_type, daemon_id,
+ c = get_container(ctx, args.fsid, daemon_type, daemon_id,
ptrace=args.allow_ptrace)
- deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
+ deploy_daemon(ctx, args.fsid, daemon_type, daemon_id, c, uid, gid,
config=config, keyring=keyring,
osd_fsid=args.osd_fsid,
reconfig=args.reconfig,
raise Error("{} deployment requires config-json which must "
"contain arg for {}".format(daemon_type.capitalize(), ', '.join(required_args)))
- uid, gid = extract_uid_gid_monitoring(daemon_type)
- c = get_container(args.fsid, daemon_type, daemon_id)
- deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
+ uid, gid = extract_uid_gid_monitoring(ctx, daemon_type)
+ c = get_container(ctx, args.fsid, daemon_type, daemon_id)
+ deploy_daemon(ctx, args.fsid, daemon_type, daemon_id, c, uid, gid,
reconfig=args.reconfig,
ports=daemon_ports)
if not args.reconfig and not redeploy:
daemon_ports.extend(NFSGanesha.port_map.values())
- config, keyring = get_config_and_keyring()
+ config, keyring = get_config_and_keyring(ctx)
# TODO: extract ganesha uid/gid (997, 994) ?
- uid, gid = extract_uid_gid()
- c = get_container(args.fsid, daemon_type, daemon_id)
- deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
+ uid, gid = extract_uid_gid(ctx)
+ c = get_container(ctx, args.fsid, daemon_type, daemon_id)
+ deploy_daemon(ctx, args.fsid, daemon_type, daemon_id, c, uid, gid,
config=config, keyring=keyring,
reconfig=args.reconfig,
ports=daemon_ports)
elif daemon_type == CephIscsi.daemon_type:
- config, keyring = get_config_and_keyring()
- uid, gid = extract_uid_gid()
- c = get_container(args.fsid, daemon_type, daemon_id)
- deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
+ config, keyring = get_config_and_keyring(ctx)
+ uid, gid = extract_uid_gid(ctx)
+ c = get_container(ctx, args.fsid, daemon_type, daemon_id)
+ deploy_daemon(ctx, args.fsid, daemon_type, daemon_id, c, uid, gid,
config=config, keyring=keyring,
reconfig=args.reconfig,
ports=daemon_ports)
elif daemon_type == HAproxy.daemon_type:
- haproxy = HAproxy.init(args.fsid, daemon_id)
+ haproxy = HAproxy.init(ctx, args.fsid, daemon_id)
uid, gid = haproxy.extract_uid_gid_haproxy()
- c = get_container(args.fsid, daemon_type, daemon_id)
- deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
+ c = get_container(ctx, args.fsid, daemon_type, daemon_id)
+ deploy_daemon(ctx, args.fsid, daemon_type, daemon_id, c, uid, gid,
reconfig=args.reconfig,
ports=daemon_ports)
elif daemon_type == Keepalived.daemon_type:
- keepalived = Keepalived.init(args.fsid, daemon_id)
+ keepalived = Keepalived.init(ctx, args.fsid, daemon_id)
uid, gid = keepalived.extract_uid_gid_keepalived()
- c = get_container(args.fsid, daemon_type, daemon_id)
- deploy_daemon(args.fsid, daemon_type, daemon_id, c, uid, gid,
+ c = get_container(ctx, args.fsid, daemon_type, daemon_id)
+ deploy_daemon(ctx, args.fsid, daemon_type, daemon_id, c, uid, gid,
reconfig=args.reconfig,
ports=daemon_ports)
elif daemon_type == CustomContainer.daemon_type:
- cc = CustomContainer.init(args.fsid, daemon_id)
+ cc = CustomContainer.init(ctx, args.fsid, daemon_id)
if not args.reconfig and not redeploy:
daemon_ports.extend(cc.ports)
- c = get_container(args.fsid, daemon_type, daemon_id,
+ c = get_container(ctx, args.fsid, daemon_type, daemon_id,
privileged=cc.privileged,
ptrace=args.allow_ptrace)
- deploy_daemon(args.fsid, daemon_type, daemon_id, c,
+ deploy_daemon(ctx, args.fsid, daemon_type, daemon_id, c,
uid=cc.uid, gid=cc.gid, config=None,
keyring=None, reconfig=args.reconfig,
ports=daemon_ports)
CephadmDaemon.validate_config(config_js)
- deploy_daemon(args.fsid, daemon_type, daemon_id, None,
+ deploy_daemon(ctx, args.fsid, daemon_type, daemon_id, None,
uid, gid, ports=daemon_ports)
else:
@infer_image
-def command_run():
- # type: () -> int
+def command_run(ctx):
+ # type: (CephadmContext) -> int
+ args = ctx.args
(daemon_type, daemon_id) = args.name.split('.', 1)
- c = get_container(args.fsid, daemon_type, daemon_id)
+ c = get_container(ctx, args.fsid, daemon_type, daemon_id)
command = c.run_cmd()
- return call_timeout(command, args.timeout)
+ return call_timeout(ctx, command, args.timeout)
##################################
@infer_fsid
@infer_config
@infer_image
-def command_shell():
- # type: () -> int
+def command_shell(ctx):
+ # type: (CephadmContext) -> int
+ args = ctx.args
if args.fsid:
- make_log_dir(args.fsid)
+ make_log_dir(ctx, args.fsid)
if args.name:
if '.' in args.name:
(daemon_type, daemon_id) = args.name.split('.', 1)
args.keyring = SHELL_DEFAULT_KEYRING
container_args = [] # type: List[str]
- mounts = get_container_mounts(args.fsid, daemon_type, daemon_id,
+ mounts = get_container_mounts(ctx, args.fsid, daemon_type, daemon_id,
no_config=True if args.config else False)
- binds = get_container_binds(args.fsid, daemon_type, daemon_id)
+ binds = get_container_binds(ctx, args.fsid, daemon_type, daemon_id)
if args.config:
mounts[pathify(args.config)] = '/etc/ceph/ceph.conf:z'
if args.keyring:
mounts[home] = '/root'
c = CephContainer(
+ ctx,
image=args.image,
entrypoint='doesnotmatter',
args=[],
privileged=True)
command = c.shell_cmd(command)
- return call_timeout(command, args.timeout)
+ return call_timeout(ctx, command, args.timeout)
##################################
@infer_fsid
-def command_enter():
- # type: () -> int
+def command_enter(ctx):
+ # type: (CephadmContext) -> int
+ args = ctx.args
if not args.fsid:
raise Error('must pass --fsid to specify cluster')
(daemon_type, daemon_id) = args.name.split('.', 1)
'-e', "PS1=%s" % CUSTOM_PS1,
]
c = CephContainer(
+ ctx,
image=args.image,
entrypoint='doesnotmatter',
container_args=container_args,
cname='ceph-%s-%s.%s' % (args.fsid, daemon_type, daemon_id),
)
command = c.exec_cmd(command)
- return call_timeout(command, args.timeout)
+ return call_timeout(ctx, command, args.timeout)
##################################
@infer_fsid
@infer_image
-def command_ceph_volume():
- # type: () -> None
+def command_ceph_volume(ctx):
+ # type: (CephadmContext) -> None
+ args = ctx.args
if args.fsid:
- make_log_dir(args.fsid)
+ make_log_dir(ctx, args.fsid)
- l = FileLock(args.fsid)
+ l = FileLock(ctx, args.fsid)
l.acquire()
(uid, gid) = (0, 0) # ceph-volume runs as root
- mounts = get_container_mounts(args.fsid, 'osd', None)
+ mounts = get_container_mounts(ctx, args.fsid, 'osd', None)
tmp_config = None
tmp_keyring = None
- (config, keyring) = get_config_and_keyring()
+ (config, keyring) = get_config_and_keyring(ctx)
if config:
# tmp config file
mounts[tmp_keyring.name] = '/var/lib/ceph/bootstrap-osd/ceph.keyring:z'
c = CephContainer(
+ ctx,
image=args.image,
entrypoint='/usr/sbin/ceph-volume',
envs=args.env,
privileged=True,
volume_mounts=mounts,
)
- verbosity = CallVerbosity.VERBOSE if args.log_output else CallVerbosity.VERBOSE_ON_FAILURE
- out, err, code = call_throws(c.run_cmd(), verbosity=verbosity)
+ verbosity = CallVerbosity.VERBOSE if ctx.args.log_output else CallVerbosity.VERBOSE_ON_FAILURE
+ out, err, code = call_throws(ctx, c.run_cmd(), verbosity=verbosity)
if not code:
print(out)
@infer_fsid
-def command_unit():
- # type: () -> None
+def command_unit(ctx):
+ # type: (CephadmContext) -> None
+ args = ctx.args
if not args.fsid:
raise Error('must pass --fsid to specify cluster')
- unit_name = get_unit_name_by_daemon_name(args.fsid, args.name)
+ unit_name = get_unit_name_by_daemon_name(ctx, args.fsid, args.name)
- call_throws([
+ call_throws(ctx, [
'systemctl',
args.command,
unit_name],
@infer_fsid
-def command_logs():
- # type: () -> None
+def command_logs(ctx):
+ # type: (CephadmContext) -> None
+ args = ctx.args
if not args.fsid:
raise Error('must pass --fsid to specify cluster')
- unit_name = get_unit_name_by_daemon_name(args.fsid, args.name)
+ unit_name = get_unit_name_by_daemon_name(ctx, args.fsid, args.name)
cmd = [find_program('journalctl')]
cmd.extend(['-u', unit_name])
##################################
-def list_networks():
- # type: () -> Dict[str,List[str]]
+def list_networks(ctx):
+ # type: (CephadmContext) -> Dict[str,List[str]]
## sadly, 18.04's iproute2 4.15.0-2ubun doesn't support the -j flag,
## so we'll need to use a regex to parse 'ip' command output.
#j = json.loads(out)
#for x in j:
- res = _list_ipv4_networks()
- res.update(_list_ipv6_networks())
+ res = _list_ipv4_networks(ctx)
+ res.update(_list_ipv6_networks(ctx))
return res
-def _list_ipv4_networks():
+def _list_ipv4_networks(ctx: CephadmContext):
execstr: Optional[str] = find_executable('ip')
if not execstr:
raise FileNotFoundError("unable to find 'ip' command")
- out, _, _ = call_throws([execstr, 'route', 'ls'])
+ out, _, _ = call_throws(ctx, [execstr, 'route', 'ls'])
return _parse_ipv4_route(out)
return r
-def _list_ipv6_networks():
+def _list_ipv6_networks(ctx: CephadmContext):
execstr: Optional[str] = find_executable('ip')
if not execstr:
raise FileNotFoundError("unable to find 'ip' command")
- routes, _, _ = call_throws([execstr, '-6', 'route', 'ls'])
- ips, _, _ = call_throws([execstr, '-6', 'addr', 'ls'])
+ routes, _, _ = call_throws(ctx, [execstr, '-6', 'route', 'ls'])
+ ips, _, _ = call_throws(ctx, [execstr, '-6', 'addr', 'ls'])
return _parse_ipv6_route(routes, ips)
return r
-def command_list_networks():
- # type: () -> None
- r = list_networks()
+def command_list_networks(ctx):
+ # type: (CephadmContext) -> None
+ r = list_networks(ctx)
print(json.dumps(r, indent=4))
##################################
-def command_ls():
- # type: () -> None
-
- ls = list_daemons(detail=not args.no_detail,
+def command_ls(ctx):
+ # type: (CephadmContext) -> None
+ args = ctx.args
+ ls = list_daemons(ctx, detail=not args.no_detail,
legacy_dir=args.legacy_dir)
print(json.dumps(ls, indent=4))
-def list_daemons(detail=True, legacy_dir=None):
- # type: (bool, Optional[str]) -> List[Dict[str, str]]
+def list_daemons(ctx, detail=True, legacy_dir=None):
+ # type: (CephadmContext, bool, Optional[str]) -> List[Dict[str, str]]
host_version = None
ls = []
+ args = ctx.args
+ container_path = ctx.container_path
data_dir = args.data_dir
if legacy_dir is not None:
continue
(cluster, daemon_id) = j.split('-', 1)
fsid = get_legacy_daemon_fsid(
+ ctx,
cluster, daemon_type, daemon_id,
legacy_dir=legacy_dir)
legacy_unit_name = 'ceph-%s@%s' % (daemon_type, daemon_id)
'systemd_unit': legacy_unit_name,
}
if detail:
- (val['enabled'], val['state'], _) = check_unit(legacy_unit_name)
+ (val['enabled'], val['state'], _) = \
+ check_unit(ctx, legacy_unit_name)
if not host_version:
try:
- out, err, code = call(['ceph', '-v'])
+ out, err, code = call(ctx, ['ceph', '-v'])
if not code and out.startswith('ceph version '):
host_version = out.split(' ')[2]
except Exception:
}
if detail:
# get container id
- (val['enabled'], val['state'], _) = check_unit(unit_name)
+ (val['enabled'], val['state'], _) = \
+ check_unit(ctx, unit_name)
container_id = None
image_name = None
image_id = None
version = None
start_stamp = None
- if 'podman' in container_path and get_podman_version() < (1, 6, 2):
+ if 'podman' in container_path and \
+ get_podman_version(ctx, container_path) < (1, 6, 2):
image_field = '.ImageID'
else:
image_field = '.Image'
- out, err, code = call(
+ out, err, code = call(ctx,
[
container_path, 'inspect',
'--format', '{{.Id}},{{.Config.Image}},{{%s}},{{.Created}},{{index .Config.Labels "io.ceph.version"}}' % image_field,
if not version or '.' not in version:
version = seen_versions.get(image_id, None)
if daemon_type == NFSGanesha.daemon_type:
- version = NFSGanesha.get_version(container_id)
+ version = NFSGanesha.get_version(ctx,container_id)
if daemon_type == CephIscsi.daemon_type:
- version = CephIscsi.get_version(container_id)
+ version = CephIscsi.get_version(ctx,container_id)
elif not version:
if daemon_type in Ceph.daemons:
- out, err, code = call(
+ out, err, code = call(ctx,
[container_path, 'exec', container_id,
'ceph', '-v'])
if not code and \
version = out.split(' ')[2]
seen_versions[image_id] = version
elif daemon_type == 'grafana':
- out, err, code = call(
+ out, err, code = call(ctx,
[container_path, 'exec', container_id,
'grafana-server', '-v'])
if not code and \
'alertmanager',
'node-exporter']:
cmd = daemon_type.replace('-', '_')
- out, err, code = call(
+ out, err, code = call(ctx,
[container_path, 'exec', container_id,
cmd, '--version'])
if not code and \
version = err.split(' ')[2]
seen_versions[image_id] = version
elif daemon_type == 'haproxy':
- out, err, code = call(
+ out, err, code = call(ctx,
[container_path, 'exec', container_id,
'haproxy', '-v'])
if not code and \
version = out.split(' ')[2]
seen_versions[image_id] = version
elif daemon_type == 'keepalived':
- out, err, code = call(
+ out, err, code = call(ctx,
[container_path, 'exec', container_id,
'keepalived', '--version'])
if not code and \
return ls
-def get_daemon_description(fsid, name, detail=False, legacy_dir=None):
- # type: (str, str, bool, Optional[str]) -> Dict[str, str]
+def get_daemon_description(ctx, fsid, name, detail=False, legacy_dir=None):
+ # type: (CephadmContext, str, str, bool, Optional[str]) -> Dict[str, str]
- for d in list_daemons(detail=detail, legacy_dir=legacy_dir):
+ for d in list_daemons(ctx, detail=detail, legacy_dir=legacy_dir):
if d['fsid'] != fsid:
continue
if d['name'] != name:
##################################
@default_image
-def command_adopt():
- # type: () -> None
+def command_adopt(ctx):
+ # type: (CephadmContext) -> None
+ args = ctx.args
if not args.skip_pull:
- _pull_image(args.image)
+ _pull_image(ctx, args.image)
(daemon_type, daemon_id) = args.name.split('.', 1)
raise Error('adoption of style %s not implemented' % args.style)
# lock
- fsid = get_legacy_daemon_fsid(args.cluster,
+ fsid = get_legacy_daemon_fsid(ctx,
+ args.cluster,
daemon_type,
daemon_id,
legacy_dir=args.legacy_dir)
if not fsid:
raise Error('could not detect legacy fsid; set fsid in ceph.conf')
- l = FileLock(fsid)
+ l = FileLock(ctx, fsid)
l.acquire()
# call correct adoption
if daemon_type in Ceph.daemons:
- command_adopt_ceph(daemon_type, daemon_id, fsid);
+ command_adopt_ceph(ctx, daemon_type, daemon_id, fsid);
elif daemon_type == 'prometheus':
- command_adopt_prometheus(daemon_id, fsid)
+ command_adopt_prometheus(ctx, daemon_id, fsid)
elif daemon_type == 'grafana':
- command_adopt_grafana(daemon_id, fsid)
+ command_adopt_grafana(ctx, daemon_id, fsid)
elif daemon_type == 'node-exporter':
raise Error('adoption of node-exporter not implemented')
elif daemon_type == 'alertmanager':
- command_adopt_alertmanager(daemon_id, fsid)
+ command_adopt_alertmanager(ctx, daemon_id, fsid)
else:
raise Error('daemon type %s not recognized' % daemon_type)
class AdoptOsd(object):
- def __init__(self, osd_data_dir, osd_id):
- # type: (str, str) -> None
+ def __init__(self, ctx, osd_data_dir, osd_id):
+ # type: (CephadmContext, str, str) -> None
+ self.ctx = ctx
self.osd_data_dir = osd_data_dir
self.osd_id = osd_id
def check_offline_lvm_osd(self):
# type: () -> Tuple[Optional[str], Optional[str]]
-
+ args = self.ctx.args
osd_fsid, osd_type = None, None
c = CephContainer(
+ self.ctx,
image=args.image,
entrypoint='/usr/sbin/ceph-volume',
args=['lvm', 'list', '--format=json'],
privileged=True
)
- out, err, code = call_throws(c.run_cmd())
+ out, err, code = call_throws(self.ctx, c.run_cmd())
if not code:
try:
js = json.loads(out)
def check_offline_simple_osd(self):
# type: () -> Tuple[Optional[str], Optional[str]]
-
osd_fsid, osd_type = None, None
osd_file = glob("/etc/ceph/osd/{}-[a-f0-9-]*.json".format(self.osd_id))
if osd_type != "filestore":
# need this to be mounted for the adopt to work, as it
# needs to move files from this directory
- call_throws(['mount', js["data"]["path"], self.osd_data_dir])
+ call_throws(self.ctx, ['mount', js["data"]["path"], self.osd_data_dir])
except ValueError as e:
logger.info("Invalid JSON in {}: {}".format(osd_file, e))
return osd_fsid, osd_type
-def command_adopt_ceph(daemon_type, daemon_id, fsid):
- # type: (str, str, str) -> None
+def command_adopt_ceph(ctx, daemon_type, daemon_id, fsid):
+ # type: (CephadmContext, str, str, str) -> None
+
+ args = ctx.args
- (uid, gid) = extract_uid_gid()
+ (uid, gid) = extract_uid_gid(ctx)
data_dir_src = ('/var/lib/ceph/%s/%s-%s' %
(daemon_type, args.cluster, daemon_id))
osd_fsid = None
if daemon_type == 'osd':
- adopt_osd = AdoptOsd(data_dir_src, daemon_id)
+ adopt_osd = AdoptOsd(ctx, data_dir_src, daemon_id)
osd_fsid, osd_type = adopt_osd.check_online_osd()
if not osd_fsid:
osd_fsid, osd_type = adopt_osd.check_offline_lvm_osd()
# cluster we are adopting based on the /etc/{defaults,sysconfig}/ceph
# CLUSTER field.
unit_name = 'ceph-%s@%s' % (daemon_type, daemon_id)
- (enabled, state, _) = check_unit(unit_name)
+ (enabled, state, _) = check_unit(ctx, unit_name)
if state == 'running':
logger.info('Stopping old systemd unit %s...' % unit_name)
- call_throws(['systemctl', 'stop', unit_name])
+ call_throws(ctx, ['systemctl', 'stop', unit_name])
if enabled:
logger.info('Disabling old systemd unit %s...' % unit_name)
- call_throws(['systemctl', 'disable', unit_name])
+ call_throws(ctx, ['systemctl', 'disable', unit_name])
# data
logger.info('Moving data...')
- data_dir_dst = make_data_dir(fsid, daemon_type, daemon_id,
+ data_dir_dst = make_data_dir(ctx, fsid, daemon_type, daemon_id,
uid=uid, gid=gid)
- move_files(glob(os.path.join(data_dir_src, '*')),
+ move_files(ctx, glob(os.path.join(data_dir_src, '*')),
data_dir_dst,
uid=uid, gid=gid)
logger.debug('Remove dir \'%s\'' % (data_dir_src))
if os.path.ismount(data_dir_src):
- call_throws(['umount', data_dir_src])
+ call_throws(ctx, ['umount', data_dir_src])
os.rmdir(data_dir_src)
logger.info('Chowning content...')
- call_throws(['chown', '-c', '-R', '%d.%d' % (uid, gid), data_dir_dst])
+ call_throws(ctx, ['chown', '-c', '-R', '%d.%d' % (uid, gid), data_dir_dst])
if daemon_type == 'mon':
# rename *.ldb -> *.sst, in case they are coming from ubuntu
logger.info('Renaming %s -> %s', simple_fn, new_fn)
os.rename(simple_fn, new_fn)
logger.info('Disabling host unit ceph-volume@ simple unit...')
- call(['systemctl', 'disable',
+ call(ctx, ['systemctl', 'disable',
'ceph-volume@simple-%s-%s.service' % (daemon_id, osd_fsid)])
else:
# assume this is an 'lvm' c-v for now, but don't error
# out if it's not.
logger.info('Disabling host unit ceph-volume@ lvm unit...')
- call(['systemctl', 'disable',
+ call(ctx, ['systemctl', 'disable',
'ceph-volume@lvm-%s-%s.service' % (daemon_id, osd_fsid)])
# config
config_src = '/etc/ceph/%s.conf' % (args.cluster)
config_src = os.path.abspath(args.legacy_dir + config_src)
config_dst = os.path.join(data_dir_dst, 'config')
- copy_files([config_src], config_dst, uid=uid, gid=gid)
+ copy_files(ctx, [config_src], config_dst, uid=uid, gid=gid)
# logs
logger.info('Moving logs...')
log_dir_src = ('/var/log/ceph/%s-%s.%s.log*' %
(args.cluster, daemon_type, daemon_id))
log_dir_src = os.path.abspath(args.legacy_dir + log_dir_src)
- log_dir_dst = make_log_dir(fsid, uid=uid, gid=gid)
- move_files(glob(log_dir_src),
+ log_dir_dst = make_log_dir(ctx, fsid, uid=uid, gid=gid)
+ move_files(ctx, glob(log_dir_src),
log_dir_dst,
uid=uid, gid=gid)
logger.info('Creating new units...')
- make_var_run(fsid, uid, gid)
- c = get_container(fsid, daemon_type, daemon_id)
- deploy_daemon_units(fsid, uid, gid, daemon_type, daemon_id, c,
+ make_var_run(ctx, fsid, uid, gid)
+ c = get_container(ctx, fsid, daemon_type, daemon_id)
+ deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id, c,
enable=True, # unconditionally enable the new unit
start=(state == 'running' or args.force_start),
osd_fsid=osd_fsid)
- update_firewalld(daemon_type)
+ update_firewalld(ctx, daemon_type)
-def command_adopt_prometheus(daemon_id, fsid):
- # type: (str, str) -> None
-
+def command_adopt_prometheus(ctx, daemon_id, fsid):
+ # type: (CephadmContext, str, str) -> None
+ args = ctx.args
daemon_type = 'prometheus'
- (uid, gid) = extract_uid_gid_monitoring(daemon_type)
+ (uid, gid) = extract_uid_gid_monitoring(ctx, daemon_type)
- _stop_and_disable('prometheus')
+ _stop_and_disable(ctx, 'prometheus')
- data_dir_dst = make_data_dir(fsid, daemon_type, daemon_id,
+ data_dir_dst = make_data_dir(ctx, fsid, daemon_type, daemon_id,
uid=uid, gid=gid)
# config
config_src = os.path.abspath(args.legacy_dir + config_src)
config_dst = os.path.join(data_dir_dst, 'etc/prometheus')
makedirs(config_dst, uid, gid, 0o755)
- copy_files([config_src], config_dst, uid=uid, gid=gid)
+ copy_files(ctx, [config_src], config_dst, uid=uid, gid=gid)
# data
data_src = '/var/lib/prometheus/metrics/'
data_src = os.path.abspath(args.legacy_dir + data_src)
data_dst = os.path.join(data_dir_dst, 'data')
- copy_tree([data_src], data_dst, uid=uid, gid=gid)
+ copy_tree(ctx, [data_src], data_dst, uid=uid, gid=gid)
+
+ make_var_run(ctx, fsid, uid, gid)
+ c = get_container(ctx, fsid, daemon_type, daemon_id)
+ deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid)
+ update_firewalld(ctx, daemon_type)
- make_var_run(fsid, uid, gid)
- c = get_container(fsid, daemon_type, daemon_id)
- deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid)
- update_firewalld(daemon_type)
+def command_adopt_grafana(ctx, daemon_id, fsid):
+ # type: (CephadmContext, str, str) -> None
-def command_adopt_grafana(daemon_id, fsid):
- # type: (str, str) -> None
+ args = ctx.args
daemon_type = 'grafana'
- (uid, gid) = extract_uid_gid_monitoring(daemon_type)
+ (uid, gid) = extract_uid_gid_monitoring(ctx, daemon_type)
- _stop_and_disable('grafana-server')
+ _stop_and_disable(ctx, 'grafana-server')
- data_dir_dst = make_data_dir(fsid, daemon_type, daemon_id,
+ data_dir_dst = make_data_dir(ctx, fsid, daemon_type, daemon_id,
uid=uid, gid=gid)
# config
config_src = os.path.abspath(args.legacy_dir + config_src)
config_dst = os.path.join(data_dir_dst, 'etc/grafana')
makedirs(config_dst, uid, gid, 0o755)
- copy_files([config_src], config_dst, uid=uid, gid=gid)
+ copy_files(ctx, [config_src], config_dst, uid=uid, gid=gid)
prov_src = '/etc/grafana/provisioning/'
prov_src = os.path.abspath(args.legacy_dir + prov_src)
prov_dst = os.path.join(data_dir_dst, 'etc/grafana')
- copy_tree([prov_src], prov_dst, uid=uid, gid=gid)
+ copy_tree(ctx, [prov_src], prov_dst, uid=uid, gid=gid)
# cert
cert = '/etc/grafana/grafana.crt'
cert_src = os.path.abspath(args.legacy_dir + cert_src)
makedirs(os.path.join(data_dir_dst, 'etc/grafana/certs'), uid, gid, 0o755)
cert_dst = os.path.join(data_dir_dst, 'etc/grafana/certs/cert_file')
- copy_files([cert_src], cert_dst, uid=uid, gid=gid)
+ copy_files(ctx, [cert_src], cert_dst, uid=uid, gid=gid)
key_src = '/etc/grafana/grafana.key'
key_src = os.path.abspath(args.legacy_dir + key_src)
key_dst = os.path.join(data_dir_dst, 'etc/grafana/certs/cert_key')
- copy_files([key_src], key_dst, uid=uid, gid=gid)
+ copy_files(ctx, [key_src], key_dst, uid=uid, gid=gid)
_adjust_grafana_ini(os.path.join(config_dst, 'grafana.ini'))
else:
data_src = '/var/lib/grafana/'
data_src = os.path.abspath(args.legacy_dir + data_src)
data_dst = os.path.join(data_dir_dst, 'data')
- copy_tree([data_src], data_dst, uid=uid, gid=gid)
+ copy_tree(ctx, [data_src], data_dst, uid=uid, gid=gid)
- make_var_run(fsid, uid, gid)
- c = get_container(fsid, daemon_type, daemon_id)
- deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid)
- update_firewalld(daemon_type)
+ make_var_run(ctx, fsid, uid, gid)
+ c = get_container(ctx, fsid, daemon_type, daemon_id)
+ deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid)
+ update_firewalld(ctx, daemon_type)
-def command_adopt_alertmanager(daemon_id, fsid):
- # type: (str, str) -> None
+def command_adopt_alertmanager(ctx, daemon_id, fsid):
+ # type: (CephadmContext, str, str) -> None
+ args = ctx.args
daemon_type = 'alertmanager'
- (uid, gid) = extract_uid_gid_monitoring(daemon_type)
+ (uid, gid) = extract_uid_gid_monitoring(ctx, daemon_type)
- _stop_and_disable('prometheus-alertmanager')
+ _stop_and_disable(ctx, 'prometheus-alertmanager')
- data_dir_dst = make_data_dir(fsid, daemon_type, daemon_id,
+ data_dir_dst = make_data_dir(ctx, fsid, daemon_type, daemon_id,
uid=uid, gid=gid)
# config
config_src = os.path.abspath(args.legacy_dir + config_src)
config_dst = os.path.join(data_dir_dst, 'etc/alertmanager')
makedirs(config_dst, uid, gid, 0o755)
- copy_files([config_src], config_dst, uid=uid, gid=gid)
+ copy_files(ctx, [config_src], config_dst, uid=uid, gid=gid)
# data
data_src = '/var/lib/prometheus/alertmanager/'
data_src = os.path.abspath(args.legacy_dir + data_src)
data_dst = os.path.join(data_dir_dst, 'etc/alertmanager/data')
- copy_tree([data_src], data_dst, uid=uid, gid=gid)
+ copy_tree(ctx, [data_src], data_dst, uid=uid, gid=gid)
- make_var_run(fsid, uid, gid)
- c = get_container(fsid, daemon_type, daemon_id)
- deploy_daemon(fsid, daemon_type, daemon_id, c, uid, gid)
- update_firewalld(daemon_type)
+ make_var_run(ctx, fsid, uid, gid)
+ c = get_container(ctx, fsid, daemon_type, daemon_id)
+ deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid)
+ update_firewalld(ctx, daemon_type)
def _adjust_grafana_ini(filename):
raise Error("Cannot update {}: {}".format(filename, err))
-def _stop_and_disable(unit_name):
- # type: (str) -> None
+def _stop_and_disable(ctx, unit_name):
+ # type: (CephadmContext, str) -> None
- (enabled, state, _) = check_unit(unit_name)
+ (enabled, state, _) = check_unit(ctx, unit_name)
if state == 'running':
logger.info('Stopping old systemd unit %s...' % unit_name)
- call_throws(['systemctl', 'stop', unit_name])
+ call_throws(ctx, ['systemctl', 'stop', unit_name])
if enabled:
logger.info('Disabling old systemd unit %s...' % unit_name)
- call_throws(['systemctl', 'disable', unit_name])
+ call_throws(ctx, ['systemctl', 'disable', unit_name])
##################################
-def command_rm_daemon():
- # type: () -> None
-
- l = FileLock(args.fsid)
+def command_rm_daemon(ctx):
+ # type: (CephadmContext) -> None
+ args = ctx.args
+ l = FileLock(ctx, args.fsid)
l.acquire()
(daemon_type, daemon_id) = args.name.split('.', 1)
- unit_name = get_unit_name_by_daemon_name(args.fsid, args.name)
+ unit_name = get_unit_name_by_daemon_name(ctx, args.fsid, args.name)
if daemon_type in ['mon', 'osd'] and not args.force:
raise Error('must pass --force to proceed: '
'this command may destroy precious data!')
- call(['systemctl', 'stop', unit_name],
+ call(ctx, ['systemctl', 'stop', unit_name],
verbosity=CallVerbosity.DEBUG)
- call(['systemctl', 'reset-failed', unit_name],
+ call(ctx, ['systemctl', 'reset-failed', unit_name],
verbosity=CallVerbosity.DEBUG)
- call(['systemctl', 'disable', unit_name],
+ call(ctx, ['systemctl', 'disable', unit_name],
verbosity=CallVerbosity.DEBUG)
- data_dir = get_data_dir(args.fsid, daemon_type, daemon_id)
+ data_dir = get_data_dir(args.fsid, ctx.args.data_dir, daemon_type, daemon_id)
if daemon_type in ['mon', 'osd', 'prometheus'] and \
not args.force_delete_data:
# rename it out of the way -- do not delete
os.path.join(backup_dir, dirname))
else:
if daemon_type == CephadmDaemon.daemon_type:
- CephadmDaemon.uninstall(args.fsid, daemon_type, daemon_id)
- call_throws(['rm', '-rf', data_dir])
+ CephadmDaemon.uninstall(ctx, args.fsid, daemon_type, daemon_id)
+ call_throws(ctx, ['rm', '-rf', data_dir])
##################################
-def command_rm_cluster():
- # type: () -> None
+def command_rm_cluster(ctx):
+ # type: (CephadmContext) -> None
+ args = ctx.args
if not args.force:
raise Error('must pass --force to proceed: '
'this command may destroy precious data!')
- l = FileLock(args.fsid)
+ l = FileLock(ctx, args.fsid)
l.acquire()
# stop + disable individual daemon units
- for d in list_daemons(detail=False):
+ for d in list_daemons(ctx, detail=False):
if d['fsid'] != args.fsid:
continue
if d['style'] != 'cephadm:v1':
continue
unit_name = get_unit_name(args.fsid, d['name'])
- call(['systemctl', 'stop', unit_name],
+ call(ctx, ['systemctl', 'stop', unit_name],
verbosity=CallVerbosity.DEBUG)
- call(['systemctl', 'reset-failed', unit_name],
+ call(ctx, ['systemctl', 'reset-failed', unit_name],
verbosity=CallVerbosity.DEBUG)
- call(['systemctl', 'disable', unit_name],
+ call(ctx, ['systemctl', 'disable', unit_name],
verbosity=CallVerbosity.DEBUG)
# cluster units
for unit_name in ['ceph-%s.target' % args.fsid]:
- call(['systemctl', 'stop', unit_name],
+ call(ctx, ['systemctl', 'stop', unit_name],
verbosity=CallVerbosity.DEBUG)
- call(['systemctl', 'reset-failed', unit_name],
+ call(ctx, ['systemctl', 'reset-failed', unit_name],
verbosity=CallVerbosity.DEBUG)
- call(['systemctl', 'disable', unit_name],
+ call(ctx, ['systemctl', 'disable', unit_name],
verbosity=CallVerbosity.DEBUG)
slice_name = 'system-%s.slice' % (('ceph-%s' % args.fsid).replace('-',
'\\x2d'))
- call(['systemctl', 'stop', slice_name],
+ call(ctx, ['systemctl', 'stop', slice_name],
verbosity=CallVerbosity.DEBUG)
# rm units
- call_throws(['rm', '-f', args.unit_dir +
+ call_throws(ctx, ['rm', '-f', args.unit_dir +
'/ceph-%s@.service' % args.fsid])
- call_throws(['rm', '-f', args.unit_dir +
+ call_throws(ctx, ['rm', '-f', args.unit_dir +
'/ceph-%s.target' % args.fsid])
- call_throws(['rm', '-rf',
+ call_throws(ctx, ['rm', '-rf',
args.unit_dir + '/ceph-%s.target.wants' % args.fsid])
# rm data
- call_throws(['rm', '-rf', args.data_dir + '/' + args.fsid])
+ call_throws(ctx, ['rm', '-rf', args.data_dir + '/' + args.fsid])
# rm logs
- call_throws(['rm', '-rf', args.log_dir + '/' + args.fsid])
- call_throws(['rm', '-rf', args.log_dir +
+ call_throws(ctx, ['rm', '-rf', args.log_dir + '/' + args.fsid])
+ call_throws(ctx, ['rm', '-rf', args.log_dir +
'/*.wants/ceph-%s@*' % args.fsid])
# rm logrotate config
- call_throws(['rm', '-f', args.logrotate_dir + '/ceph-%s' % args.fsid])
+ call_throws(ctx, ['rm', '-f', args.logrotate_dir + '/ceph-%s' % args.fsid])
# clean up config, keyring, and pub key files
files = ['/etc/ceph/ceph.conf', '/etc/ceph/ceph.pub', '/etc/ceph/ceph.client.admin.keyring']
##################################
-def check_time_sync(enabler=None):
- # type: (Optional[Packager]) -> bool
+def check_time_sync(ctx, enabler=None):
+ # type: (CephadmContext, Optional[Packager]) -> bool
units = [
'chrony.service', # 18.04 (at least)
'chronyd.service', # el / opensuse
'ntp.service', # 18.04 (at least)
'ntpsec.service', # 20.04 (at least) / buster
]
- if not check_units(units, enabler):
+ if not check_units(ctx, units, enabler):
logger.warning('No time sync service is running; checked for %s' % units)
return False
return True
-def command_check_host():
- # type: () -> None
- global container_path
+def command_check_host(ctx: CephadmContext) -> None:
+ container_path = ctx.container_path
+ args = ctx.args
errors = []
commands = ['systemctl', 'lvcreate']
errors.append('ERROR: %s binary does not appear to be installed' % command)
# check for configured+running chronyd or ntp
- if not check_time_sync():
+ if not check_time_sync(ctx):
errors.append('ERROR: No time synchronization is active')
if 'expect_hostname' in args and args.expect_hostname:
##################################
-def command_prepare_host():
- # type: () -> None
+def command_prepare_host(ctx: CephadmContext) -> None:
+ args = ctx.args
+ container_path = ctx.container_path
+
logger.info('Verifying podman|docker is present...')
pkg = None
if not container_path:
if not pkg:
- pkg = create_packager()
+ pkg = create_packager(ctx)
pkg.install_podman()
logger.info('Verifying lvm2 is present...')
if not find_executable('lvcreate'):
if not pkg:
- pkg = create_packager()
+ pkg = create_packager(ctx)
pkg.install(['lvm2'])
logger.info('Verifying time synchronization is in place...')
- if not check_time_sync():
+ if not check_time_sync(ctx):
if not pkg:
- pkg = create_packager()
+ pkg = create_packager(ctx)
pkg.install(['chrony'])
# check again, and this time try to enable
# the service
- check_time_sync(enabler=pkg)
+ check_time_sync(ctx, enabler=pkg)
if 'expect_hostname' in args and args.expect_hostname and args.expect_hostname != get_hostname():
logger.warning('Adjusting hostname from %s -> %s...' % (get_hostname(), args.expect_hostname))
- call_throws(['hostname', args.expect_hostname])
+ call_throws(ctx, ['hostname', args.expect_hostname])
with open('/etc/hostname', 'w') as f:
f.write(args.expect_hostname + '\n')
logger.info('Repeating the final host check...')
- command_check_host()
+ command_check_host(ctx)
##################################
class Packager(object):
- def __init__(self, stable=None, version=None, branch=None, commit=None):
+ def __init__(self, ctx: CephadmContext,
+ stable=None, version=None, branch=None, commit=None):
assert \
(stable and not version and not branch and not commit) or \
(not stable and version and not branch and not commit) or \
(not stable and not version and branch) or \
(not stable and not version and not branch and not commit)
+ self.ctx = ctx
self.stable = stable
self.version = version
self.branch = branch
return chacra_response.read().decode('utf-8')
def repo_gpgkey(self):
+ args = self.ctx.args
if args.gpg_url:
return args.gpg_url
if self.stable or self.version:
"""
Start and enable the service (typically using systemd).
"""
- call_throws(['systemctl', 'enable', '--now', service])
+ call_throws(self.ctx, ['systemctl', 'enable', '--now', service])
class Apt(Packager):
'debian': 'debian',
}
- def __init__(self, stable, version, branch, commit,
+ def __init__(self, ctx: CephadmContext,
+ stable, version, branch, commit,
distro, distro_version, distro_codename):
- super(Apt, self).__init__(stable=stable, version=version,
+ super(Apt, self).__init__(ctx, stable=stable, version=version,
branch=branch, commit=commit)
+ self.ctx = ctx
self.distro = self.DISTRO_NAMES[distro]
self.distro_codename = distro_codename
self.distro_version = distro_version
return '/etc/apt/sources.list.d/ceph.list'
def add_repo(self):
+ args = self.ctx.args
+
url, name = self.repo_gpgkey()
logger.info('Installing repo GPG key from %s...' % url)
try:
def install(self, ls):
logger.info('Installing packages %s...' % ls)
- call_throws(['apt', 'install', '-y'] + ls)
+ call_throws(self.ctx, ['apt', 'install', '-y'] + ls)
def install_podman(self):
if self.distro == 'ubuntu':
logger.info('Setting up repo for podman...')
self.add_kubic_repo()
- call_throws(['apt', 'update'])
+ call_throws(self.ctx, ['apt', 'update'])
logger.info('Attempting podman install...')
try:
key = response.read().decode('utf-8')
tmp_key = write_tmp(key, 0, 0)
keyring = self.kubric_repo_gpgkey_path()
- call_throws(['apt-key', '--keyring', keyring, 'add', tmp_key.name])
+ call_throws(self.ctx, ['apt-key', '--keyring', keyring, 'add', tmp_key.name])
logger.info('Installing repo file at %s...' % self.kubic_repo_path())
content = 'deb %s /\n' % self.kubic_repo_url()
'fedora': ('fedora', 'fc'),
}
- def __init__(self, stable, version, branch, commit,
+ def __init__(self, ctx: CephadmContext,
+ stable, version, branch, commit,
distro, distro_version):
- super(YumDnf, self).__init__(stable=stable, version=version,
+ super(YumDnf, self).__init__(ctx, stable=stable, version=version,
branch=branch, commit=commit)
+ self.ctx = ctx
self.major = int(distro_version.split('.')[0])
self.distro_normalized = self.DISTRO_NAMES[distro][0]
self.distro_code = self.DISTRO_NAMES[distro][1] + str(self.major)
def repo_baseurl(self):
assert self.stable or self.version
+ args = self.ctx.args
if self.version:
return '%s/rpm-%s/%s' % (args.repo_url, self.version,
self.distro_code)
if self.distro_code.startswith('el'):
logger.info('Enabling EPEL...')
- call_throws([self.tool, 'install', '-y', 'epel-release'])
+ call_throws(self.ctx, [self.tool, 'install', '-y', 'epel-release'])
def rm_repo(self):
if os.path.exists(self.repo_path()):
def install(self, ls):
logger.info('Installing packages %s...' % ls)
- call_throws([self.tool, 'install', '-y'] + ls)
+ call_throws(self.ctx, [self.tool, 'install', '-y'] + ls)
def install_podman(self):
self.install(['podman'])
'opensuse-leap'
]
- def __init__(self, stable, version, branch, commit,
+ def __init__(self, ctx: CephadmContext,
+ stable, version, branch, commit,
distro, distro_version):
- super(Zypper, self).__init__(stable=stable, version=version,
+ super(Zypper, self).__init__(ctx, stable=stable, version=version,
branch=branch, commit=commit)
+ self.ctx = ctx
self.tool = 'zypper'
self.distro = 'opensuse'
self.distro_version = '15.1'
def repo_baseurl(self):
assert self.stable or self.version
+ args = self.ctx.args
if self.version:
return '%s/rpm-%s/%s' % (args.repo_url, self.stable, self.distro)
else:
def install(self, ls):
logger.info('Installing packages %s...' % ls)
- call_throws([self.tool, 'in', '-y'] + ls)
+ call_throws(self.ctx, [self.tool, 'in', '-y'] + ls)
def install_podman(self):
self.install(['podman'])
-def create_packager(stable=None, version=None, branch=None, commit=None):
+def create_packager(ctx: CephadmContext,
+ stable=None, version=None, branch=None, commit=None):
distro, distro_version, distro_codename = get_distro()
if distro in YumDnf.DISTRO_NAMES:
- return YumDnf(stable=stable, version=version,
+ return YumDnf(ctx, stable=stable, version=version,
branch=branch, commit=commit,
distro=distro, distro_version=distro_version)
elif distro in Apt.DISTRO_NAMES:
- return Apt(stable=stable, version=version,
+ return Apt(ctx, stable=stable, version=version,
branch=branch, commit=commit,
distro=distro, distro_version=distro_version,
distro_codename=distro_codename)
elif distro in Zypper.DISTRO_NAMES:
- return Zypper(stable=stable, version=version,
+ return Zypper(ctx, stable=stable, version=version,
branch=branch, commit=commit,
distro=distro, distro_version=distro_version)
raise Error('Distro %s version %s not supported' % (distro, distro_version))
-def command_add_repo():
+def command_add_repo(ctx: CephadmContext):
+ args = ctx.args
if args.version and args.release:
raise Error('you can specify either --release or --version but not both')
if not args.version and not args.release and not args.dev and not args.dev_commit:
except Exception as e:
raise Error('version must be in the form x.y.z (e.g., 15.2.0)')
- pkg = create_packager(stable=args.release,
+ pkg = create_packager(ctx, stable=args.release,
version=args.version,
branch=args.dev,
commit=args.dev_commit)
pkg.add_repo()
-def command_rm_repo():
- pkg = create_packager()
+def command_rm_repo(ctx: CephadmContext):
+ pkg = create_packager(ctx)
pkg.rm_repo()
-def command_install():
- pkg = create_packager()
- pkg.install(args.packages)
+def command_install(ctx: CephadmContext):
+ pkg = create_packager(ctx)
+ pkg.install(ctx.args.packages)
##################################
"0x1af4": "Virtio Block Device"
}
- def __init__(self):
+ def __init__(self, ctx: CephadmContext):
+ self.ctx = ctx
self.cpu_model = 'Unknown'
self.cpu_count = 0
self.cpu_cores = 0
"""Get kernel parameters required/used in Ceph clusters"""
k_param = {}
- out, _, _ = call_throws(['sysctl', '-a'], verbosity=CallVerbosity.SILENT)
+ out, _, _ = call_throws(self.ctx, ['sysctl', '-a'], verbosity=CallVerbosity.SILENT)
if out:
param_list = out.split('\n')
param_dict = { param.split(" = ")[0]:param.split(" = ")[-1] for param in param_list}
##################################
-def command_gather_facts():
+def command_gather_facts(ctx: CephadmContext):
"""gather_facts is intended to provide host releated metadata to the caller"""
- host = HostFacts()
+ host = HostFacts(ctx)
print(host.dump())
##################################
-def command_verify_prereqs():
+def command_verify_prereqs(ctx: CephadmContext):
+ args = ctx.args
if args.service_type == 'haproxy' or args.service_type == 'keepalived':
- out, err, code = call(['sysctl', '-n', 'net.ipv4.ip_nonlocal_bind'])
+ out, err, code = call(
+ ctx, ['sysctl', '-n', 'net.ipv4.ip_nonlocal_bind']
+ )
if out.strip() != "1":
raise Error('net.ipv4.ip_nonlocal_bind not set to 1')
f'/{api_version}/metadata/daemons',
f'/{api_version}/metadata/host',
]
+
class Decorators:
@classmethod
def authorize(cls, f):
loop_delay = 1
thread_check_interval = 5
- def __init__(self, fsid, daemon_id=None, port=None):
+ def __init__(self, ctx: CephadmContext, fsid, daemon_id=None, port=None):
+ self.ctx = ctx
self.fsid = fsid
self.daemon_id = daemon_id
if not port:
@property
def port_active(self):
- return port_in_use(self.port)
+ return port_in_use(self.ctx, self.port)
@property
def can_run(self):
@property
def daemon_path(self):
return os.path.join(
- args.data_dir,
+ self.ctx.args.data_dir,
self.fsid,
f'{self.daemon_type}.{self.daemon_id}'
)
@property
def binary_path(self):
return os.path.join(
- args.data_dir,
+ self.ctx.args.data_dir,
self.fsid,
CephadmDaemon.bin_name
)
s_time = time.time()
try:
- facts = HostFacts()
+ facts = HostFacts(self.ctx)
except Exception as e:
self._handle_thread_exception(e, 'host')
exception_encountered = True
def _scrape_ceph_volume(self, refresh_interval=15):
# we're invoking the ceph_volume command, so we need to set the args that it
# expects to use
+ args = self.ctx.args
args.command = "inventory --format=json".split()
args.fsid = self.fsid
args.log_output = False
stream = io.StringIO()
try:
with redirect_stdout(stream):
- command_ceph_volume()
+ command_ceph_volume(self.ctx)
except Exception as e:
self._handle_thread_exception(e, 'disks')
exception_encountered = True
try:
# list daemons should ideally be invoked with a fsid
- data = list_daemons()
+ data = list_daemons(self.ctx)
except Exception as e:
self._handle_thread_exception(e, 'daemons')
exception_encountered = True
daemon since it's not a container, so we just create a
simple service definition and add it to the fsid's target
"""
+ args = self.ctx.args
if not config:
raise Error("Attempting to deploy cephadm daemon without a config")
assert isinstance(config, dict)
os.path.join(args.unit_dir, f"{self.unit_name}.new"),
os.path.join(args.unit_dir, self.unit_name))
- call_throws(['systemctl', 'daemon-reload'])
- call(['systemctl', 'stop', self.unit_name],
+ call_throws(self.ctx, ['systemctl', 'daemon-reload'])
+ call(self.ctx, ['systemctl', 'stop', self.unit_name],
verbosity=CallVerbosity.DEBUG)
- call(['systemctl', 'reset-failed', self.unit_name],
+ call(self.ctx, ['systemctl', 'reset-failed', self.unit_name],
verbosity=CallVerbosity.DEBUG)
- call_throws(['systemctl', 'enable', '--now', self.unit_name])
+ call_throws(self.ctx, ['systemctl', 'enable', '--now', self.unit_name])
@classmethod
- def uninstall(cls, fsid, daemon_type, daemon_id):
+ def uninstall(cls, ctx: CephadmContext, fsid, daemon_type, daemon_id):
+ args = ctx.args
unit_name = CephadmDaemon._unit_name(fsid, daemon_id)
unit_path = os.path.join(args.unit_dir, unit_name)
unit_run = os.path.join(args.data_dir, fsid, f"{daemon_type}.{daemon_id}", "unit.run")
break
if port:
- fw = Firewalld()
+ fw = Firewalld(ctx)
try:
fw.close_ports([port])
except RuntimeError:
logger.error(f"Unable to close port {port}")
- stdout, stderr, rc = call(["rm", "-f", unit_path])
+ stdout, stderr, rc = call(ctx, ["rm", "-f", unit_path])
if rc:
logger.error(f"Unable to remove the systemd file @ {unit_path}")
else:
logger.info(f"removed systemd unit file @ {unit_path}")
- stdout, stderr, rc = call(["systemctl", "daemon-reload"])
+ stdout, stderr, rc = call(ctx, ["systemctl", "daemon-reload"])
-def command_exporter():
-
- exporter = CephadmDaemon(args.fsid, daemon_id=args.id, port=args.port)
+def command_exporter(ctx: CephadmContext):
+ args = ctx.args
+ exporter = CephadmDaemon(ctx, args.fsid, daemon_id=args.id, port=args.port)
if args.fsid not in os.listdir(args.data_dir):
raise Error(f"cluster fsid '{args.fsid}' not found in '{args.data_dir}'")
@infer_fsid
-def command_maintenance():
+def command_maintenance(ctx: CephadmContext):
+ args = ctx.args
if not args.fsid:
raise Error('must pass --fsid to specify cluster')
if args.maintenance_action.lower() == 'enter':
logger.info("Requested to place host into maintenance")
if systemd_target_state(target):
- _out, _err, code = call(
+ _out, _err, code = call(ctx,
['systemctl', 'disable', target],
verbosity=CallVerbosity.DEBUG
)
return "failed - to disable the target"
else:
# stopping a target waits by default
- _out, _err, code = call(
+ _out, _err, code = call(ctx,
['systemctl', 'stop', target],
verbosity=CallVerbosity.DEBUG
)
logger.info("Requested to exit maintenance state")
# exit maintenance request
if not systemd_target_state(target):
- _out, _err, code = call(
+ _out, _err, code = call(ctx,
['systemctl', 'enable', target],
verbosity=CallVerbosity.DEBUG
)
return "failed - unable to enable the target"
else:
# starting a target waits by default
- _out, _err, code = call(
+ _out, _err, code = call(ctx,
['systemctl', 'start', target],
verbosity=CallVerbosity.DEBUG
)
return args
-if __name__ == "__main__":
+def main():
+
+ global logger
# root?
if os.geteuid() != 0:
sys.stderr.write('No command specified; pass -h or --help for usage\n')
sys.exit(1)
+ container_path = ""
+
# podman or docker?
if args.func != command_check_host:
if args.docker:
sys.stderr.write('Unable to locate any of %s\n' % CONTAINER_PREFERENCE)
sys.exit(1)
+ ctx = CephadmContext()
+ ctx.args = args
+ ctx.container_path = container_path
+
try:
- r = args.func()
+ r = args.func(ctx)
except Error as e:
if args.verbose:
raise
if not r:
r = 0
sys.exit(r)
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file